/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaErrorCode;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer;
import org.apache.flink.streaming.connectors.kafka.internal.TransactionalIdsGenerator;
import org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class FlinkKafkaProducer<IN>
extends TwoPhaseCommitSinkFunction<IN, KafkaTransactionState, KafkaTransactionContext> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducer.class);
    private static final long serialVersionUID = 1L;
    public static final int SAFE_SCALE_DOWN_FACTOR = 5;
    public static final int DEFAULT_KAFKA_PRODUCERS_POOL_SIZE = 5;
    public static final Time DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Time.hours((long)1L);
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    @Deprecated
    private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = new ListStateDescriptor("next-transactional-id-hint", TypeInformation.of(NextTransactionalIdHint.class));
    private static final ListStateDescriptor<NextTransactionalIdHint> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2 = new ListStateDescriptor("next-transactional-id-hint-v2", (TypeSerializer)new NextTransactionalIdHintSerializer());
    private transient ListState<NextTransactionalIdHint> nextTransactionalIdHintState;
    private transient TransactionalIdsGenerator transactionalIdsGenerator;
    private transient NextTransactionalIdHint nextTransactionalIdHint;
    protected final Properties producerConfig;
    private final String defaultTopicId;
    @Nullable
    private final KeyedSerializationSchema<IN> keyedSchema;
    @Nullable
    private final KafkaSerializationSchema<IN> kafkaSchema;
    @Nullable
    private final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    private final Map<String, int[]> topicPartitionsMap;
    private final int kafkaProducersPoolSize;
    private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque<String>();
    private boolean writeTimestampToKafka = false;
    private boolean logFailuresOnly;
    protected Semantic semantic;
    @Nullable
    private transient Callback callback;
    @Nullable
    private volatile transient Exception asyncException;
    private final AtomicLong pendingRecords = new AtomicLong();
    private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<String, KafkaMetricMutableWrapper>();

    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
        this(topicId, new KeyedSerializationSchemaWrapper<IN>(serializationSchema), FlinkKafkaProducer.getPropertiesFromBrokerList(brokerList), Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
        this(topicId, new KeyedSerializationSchemaWrapper<IN>(serializationSchema), producerConfig, Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
        this(topicId, new KeyedSerializationSchemaWrapper<IN>(serializationSchema), producerConfig, customPartitioner);
    }

    @Deprecated
    public FlinkKafkaProducer(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
        this(topicId, serializationSchema, FlinkKafkaProducer.getPropertiesFromBrokerList(brokerList), Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
        this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()));
    }

    @Deprecated
    public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic) {
        this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()), semantic, 5);
    }

    @Deprecated
    public FlinkKafkaProducer(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
        this(defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.AT_LEAST_ONCE, 5);
    }

    @Deprecated
    public FlinkKafkaProducer(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, Semantic semantic, int kafkaProducersPoolSize) {
        this(defaultTopicId, serializationSchema, customPartitioner.orElse(null), null, producerConfig, semantic, kafkaProducersPoolSize);
    }

    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic) {
        this(defaultTopic, serializationSchema, producerConfig, semantic, 5);
    }

    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, Semantic semantic, int kafkaProducersPoolSize) {
        this(defaultTopic, null, null, serializationSchema, producerConfig, semantic, kafkaProducersPoolSize);
    }

    private FlinkKafkaProducer(String defaultTopic, KeyedSerializationSchema<IN> keyedSchema, FlinkKafkaPartitioner<IN> customPartitioner, KafkaSerializationSchema<IN> kafkaSchema, Properties producerConfig, Semantic semantic, int kafkaProducersPoolSize) {
        super((TypeSerializer)new TransactionStateSerializer(), (TypeSerializer)new ContextStateSerializer());
        this.defaultTopicId = (String)Preconditions.checkNotNull((Object)defaultTopic, (String)"defaultTopic is null");
        if (kafkaSchema != null) {
            this.keyedSchema = null;
            this.kafkaSchema = kafkaSchema;
            this.flinkKafkaPartitioner = null;
            ClosureCleaner.clean(this.kafkaSchema, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            if (customPartitioner != null) {
                throw new IllegalArgumentException("Customer partitioner can only be used whenusing a KeyedSerializationSchema or SerializationSchema.");
            }
        } else if (keyedSchema != null) {
            this.kafkaSchema = null;
            this.keyedSchema = keyedSchema;
            this.flinkKafkaPartitioner = customPartitioner;
            ClosureCleaner.clean(this.flinkKafkaPartitioner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
            ClosureCleaner.clean(this.keyedSchema, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
        } else {
            throw new IllegalArgumentException("You must provide either a KafkaSerializationSchema or aKeyedSerializationSchema.");
        }
        this.producerConfig = (Properties)Preconditions.checkNotNull((Object)producerConfig, (String)"producerConfig is null");
        this.semantic = (Semantic)((Object)Preconditions.checkNotNull((Object)((Object)semantic), (String)"semantic is null"));
        this.kafkaProducersPoolSize = kafkaProducersPoolSize;
        Preconditions.checkState((kafkaProducersPoolSize > 0 ? 1 : 0) != 0, (Object)"kafkaProducersPoolSize must be non empty");
        if (!producerConfig.containsKey("key.serializer")) {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"key.serializer");
        }
        if (!producerConfig.containsKey("value.serializer")) {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"value.serializer");
        }
        if (!this.producerConfig.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
        }
        if (!producerConfig.containsKey("transaction.timeout.ms")) {
            long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
            Preconditions.checkState((timeout < Integer.MAX_VALUE && timeout > 0L ? 1 : 0) != 0, (Object)"timeout does not fit into 32 bit integer");
            this.producerConfig.put("transaction.timeout.ms", (Object)((int)timeout));
            LOG.warn("Property [{}] not specified. Setting it to {}", (Object)"transaction.timeout.ms", (Object)DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
        }
        if (semantic == Semantic.EXACTLY_ONCE) {
            long transactionTimeout;
            Object object = this.producerConfig.get("transaction.timeout.ms");
            if (object instanceof String && StringUtils.isNumeric((String)object)) {
                transactionTimeout = Long.parseLong((String)object);
            } else if (object instanceof Number) {
                transactionTimeout = ((Number)object).longValue();
            } else {
                throw new IllegalArgumentException("transaction.timeout.ms must be numeric, was " + object);
            }
            super.setTransactionTimeout(transactionTimeout);
            super.enableTransactionTimeoutWarnings(0.8);
        }
        this.topicPartitionsMap = new HashMap<String, int[]>();
    }

    public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
        this.writeTimestampToKafka = writeTimestampToKafka;
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        this.logFailuresOnly = logFailuresOnly;
    }

    public FlinkKafkaProducer<IN> ignoreFailuresAfterTransactionTimeout() {
        super.ignoreFailuresAfterTransactionTimeout();
        return this;
    }

    public void open(Configuration configuration) throws Exception {
        this.callback = this.logFailuresOnly ? new Callback(){

            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
                }
                FlinkKafkaProducer.this.acknowledgeMessage();
            }
        } : new Callback(){

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null && FlinkKafkaProducer.this.asyncException == null) {
                    FlinkKafkaProducer.this.asyncException = exception;
                }
                FlinkKafkaProducer.this.acknowledgeMessage();
            }
        };
        super.open(configuration);
    }

    public void invoke(KafkaTransactionState transaction, IN next, SinkFunction.Context context) throws FlinkKafkaException {
        ProducerRecord<byte[], byte[]> record2;
        this.checkErroneous();
        if (this.keyedSchema != null) {
            int[] partitions;
            byte[] serializedKey = this.keyedSchema.serializeKey(next);
            byte[] serializedValue = this.keyedSchema.serializeValue(next);
            String targetTopic = this.keyedSchema.getTargetTopic(next);
            if (targetTopic == null) {
                targetTopic = this.defaultTopicId;
            }
            Long timestamp = null;
            if (this.writeTimestampToKafka) {
                timestamp = context.timestamp();
            }
            if (null == (partitions = this.topicPartitionsMap.get(targetTopic))) {
                partitions = FlinkKafkaProducer.getPartitionsByTopic(targetTopic, transaction.producer);
                this.topicPartitionsMap.put(targetTopic, partitions);
            }
            record2 = this.flinkKafkaPartitioner != null ? new ProducerRecord<byte[], byte[]>(targetTopic, (Integer)this.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue) : new ProducerRecord<byte[], byte[]>(targetTopic, null, timestamp, serializedKey, serializedValue);
        } else if (this.kafkaSchema != null) {
            if (this.kafkaSchema instanceof KafkaContextAware) {
                int[] partitions;
                KafkaContextAware contextAwareSchema = (KafkaContextAware)((Object)this.kafkaSchema);
                String targetTopic = contextAwareSchema.getTargetTopic(next);
                if (targetTopic == null) {
                    targetTopic = this.defaultTopicId;
                }
                if (null == (partitions = this.topicPartitionsMap.get(targetTopic))) {
                    partitions = FlinkKafkaProducer.getPartitionsByTopic(targetTopic, transaction.producer);
                    this.topicPartitionsMap.put(targetTopic, partitions);
                }
                contextAwareSchema.setPartitions(partitions);
            }
            record2 = this.kafkaSchema.serialize(next, context.timestamp());
        } else {
            throw new RuntimeException("We have neither KafkaSerializationSchema nor KeyedSerializationSchema, thisis a bug.");
        }
        this.pendingRecords.incrementAndGet();
        transaction.producer.send(record2, this.callback);
    }

    public void close() throws FlinkKafkaException {
        try {
            KafkaTransactionState currentTransaction = (KafkaTransactionState)this.currentTransaction();
            if (currentTransaction != null) {
                this.flush(currentTransaction);
                switch (this.semantic) {
                    case EXACTLY_ONCE: {
                        break;
                    }
                    case AT_LEAST_ONCE: 
                    case NONE: {
                        currentTransaction.producer.close();
                    }
                }
            }
            super.close();
        }
        catch (Exception e) {
            this.asyncException = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)this.asyncException);
        }
        finally {
            if (this.currentTransaction() != null) {
                IOUtils.closeQuietly((AutoCloseable)((KafkaTransactionState)this.currentTransaction()).producer);
            }
            this.pendingTransactions().forEach(transaction -> IOUtils.closeQuietly((AutoCloseable)((KafkaTransactionState)transaction.getValue()).producer));
            this.checkErroneous();
        }
    }

    protected KafkaTransactionState beginTransaction() throws FlinkKafkaException {
        switch (this.semantic) {
            case EXACTLY_ONCE: {
                FlinkKafkaInternalProducer<byte[], byte[]> producer = this.createTransactionalProducer();
                producer.beginTransaction();
                return new KafkaTransactionState(producer.getTransactionalId(), producer);
            }
            case AT_LEAST_ONCE: 
            case NONE: {
                KafkaTransactionState currentTransaction = (KafkaTransactionState)this.currentTransaction();
                if (currentTransaction != null && currentTransaction.producer != null) {
                    return new KafkaTransactionState(currentTransaction.producer);
                }
                return new KafkaTransactionState(this.initNonTransactionalProducer(true));
            }
        }
        throw new UnsupportedOperationException("Not implemented semantic");
    }

    protected void preCommit(KafkaTransactionState transaction) throws FlinkKafkaException {
        switch (this.semantic) {
            case EXACTLY_ONCE: 
            case AT_LEAST_ONCE: {
                this.flush(transaction);
                break;
            }
            case NONE: {
                break;
            }
            default: {
                throw new UnsupportedOperationException("Not implemented semantic");
            }
        }
        this.checkErroneous();
    }

    protected void commit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            }
            finally {
                this.recycleTransactionalProducer(transaction.producer);
            }
        }
    }

    protected void recoverAndCommit(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (FlinkKafkaInternalProducer<byte[], byte[]> producer = this.initTransactionalProducer(transaction.transactionalId, false);){
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();
            }
            catch (InvalidTxnStateException | ProducerFencedException ex) {
                LOG.warn("Encountered error {} while recovering transaction {}. Presumably this transaction has been already committed before", (Object)ex, (Object)transaction);
            }
        }
    }

    protected void abort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            transaction.producer.abortTransaction();
            this.recycleTransactionalProducer(transaction.producer);
        }
    }

    protected void recoverAndAbort(KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (FlinkKafkaInternalProducer<byte[], byte[]> producer = this.initTransactionalProducer(transaction.transactionalId, false);){
                producer.initTransactions();
            }
        }
    }

    private void acknowledgeMessage() {
        this.pendingRecords.decrementAndGet();
    }

    private void flush(KafkaTransactionState transaction) throws FlinkKafkaException {
        long pendingRecordsCount;
        if (transaction.producer != null) {
            transaction.producer.flush();
        }
        if ((pendingRecordsCount = this.pendingRecords.get()) != 0L) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + pendingRecordsCount);
        }
        this.checkErroneous();
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.nextTransactionalIdHintState.clear();
        if (this.getRuntimeContext().getIndexOfThisSubtask() == 0 && this.semantic == Semantic.EXACTLY_ONCE) {
            Preconditions.checkState((this.nextTransactionalIdHint != null ? 1 : 0) != 0, (Object)"nextTransactionalIdHint must be set for EXACTLY_ONCE");
            long nextFreeTransactionalId = this.nextTransactionalIdHint.nextFreeTransactionalId;
            if (this.getRuntimeContext().getNumberOfParallelSubtasks() > this.nextTransactionalIdHint.lastParallelism) {
                nextFreeTransactionalId += (long)(this.getRuntimeContext().getNumberOfParallelSubtasks() * this.kafkaProducersPoolSize);
            }
            this.nextTransactionalIdHintState.add((Object)new NextTransactionalIdHint(this.getRuntimeContext().getNumberOfParallelSubtasks(), nextFreeTransactionalId));
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (this.semantic != Semantic.NONE && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", (Object)this.semantic, (Object)Semantic.NONE);
            this.semantic = Semantic.NONE;
        }
        this.nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
        if (context.getOperatorStateStore().getRegisteredStateNames().contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR)) {
            this.migrateNextTransactionalIdHindState(context);
        }
        this.transactionalIdsGenerator = new TransactionalIdsGenerator(this.getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext)this.getRuntimeContext()).getOperatorUniqueID(), this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks(), this.kafkaProducersPoolSize, 5);
        if (this.semantic != Semantic.EXACTLY_ONCE) {
            this.nextTransactionalIdHint = null;
        } else {
            ArrayList transactionalIdHints = Lists.newArrayList((Iterable)((Iterable)this.nextTransactionalIdHintState.get()));
            if (transactionalIdHints.size() > 1) {
                throw new IllegalStateException("There should be at most one next transactional id hint written by the first subtask");
            }
            if (transactionalIdHints.size() == 0) {
                this.nextTransactionalIdHint = new NextTransactionalIdHint(0, 0L);
                this.abortTransactions(this.transactionalIdsGenerator.generateIdsToAbort());
            } else {
                this.nextTransactionalIdHint = (NextTransactionalIdHint)transactionalIdHints.get(0);
            }
        }
        super.initializeState(context);
    }

    protected Optional<KafkaTransactionContext> initializeUserContext() {
        if (this.semantic != Semantic.EXACTLY_ONCE) {
            return Optional.empty();
        }
        Set<String> transactionalIds = this.generateNewTransactionalIds();
        this.resetAvailableTransactionalIdsPool(transactionalIds);
        return Optional.of(new KafkaTransactionContext(transactionalIds));
    }

    private Set<String> generateNewTransactionalIds() {
        Preconditions.checkState((this.nextTransactionalIdHint != null ? 1 : 0) != 0, (Object)"nextTransactionalIdHint must be present for EXACTLY_ONCE");
        Set<String> transactionalIds = this.transactionalIdsGenerator.generateIdsToUse(this.nextTransactionalIdHint.nextFreeTransactionalId);
        LOG.info("Generated new transactionalIds {}", (Object)transactionalIds);
        return transactionalIds;
    }

    protected void finishRecoveringContext() {
        this.cleanUpUserContext();
        this.resetAvailableTransactionalIdsPool(((KafkaTransactionContext)this.getUserContext().get()).transactionalIds);
        LOG.info("Recovered transactionalIds {}", (Object)((KafkaTransactionContext)this.getUserContext().get()).transactionalIds);
    }

    protected FlinkKafkaInternalProducer<byte[], byte[]> createProducer() {
        return new FlinkKafkaInternalProducer<byte[], byte[]>(this.producerConfig);
    }

    private void cleanUpUserContext() {
        if (!this.getUserContext().isPresent()) {
            return;
        }
        this.abortTransactions(((KafkaTransactionContext)this.getUserContext().get()).transactionalIds);
    }

    private void resetAvailableTransactionalIdsPool(Collection<String> transactionalIds) {
        this.availableTransactionalIds.clear();
        this.availableTransactionalIds.addAll(transactionalIds);
    }

    private void abortTransactions(Set<String> transactionalIds) {
        for (String transactionalId : transactionalIds) {
            FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = this.initTransactionalProducer(transactionalId, false);
            Throwable throwable = null;
            try {
                kafkaProducer.initTransactions();
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (kafkaProducer == null) continue;
                if (throwable != null) {
                    try {
                        kafkaProducer.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                kafkaProducer.close();
            }
        }
    }

    int getTransactionCoordinatorId() {
        KafkaTransactionState currentTransaction = (KafkaTransactionState)this.currentTransaction();
        if (currentTransaction == null || currentTransaction.producer == null) {
            throw new IllegalArgumentException();
        }
        return currentTransaction.producer.getTransactionCoordinatorId();
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() throws FlinkKafkaException {
        String transactionalId = this.availableTransactionalIds.poll();
        if (transactionalId == null) {
            throw new FlinkKafkaException(FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY, "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
        }
        FlinkKafkaInternalProducer<byte[], byte[]> producer = this.initTransactionalProducer(transactionalId, true);
        producer.initTransactions();
        return producer;
    }

    private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
        this.availableTransactionalIds.add(producer.getTransactionalId());
        producer.close();
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initTransactionalProducer(String transactionalId, boolean registerMetrics) {
        this.producerConfig.put("transactional.id", transactionalId);
        return this.initProducer(registerMetrics);
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initNonTransactionalProducer(boolean registerMetrics) {
        this.producerConfig.remove("transactional.id");
        return this.initProducer(registerMetrics);
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> initProducer(boolean registerMetrics) {
        FlinkKafkaInternalProducer<byte[], byte[]> producer = this.createProducer();
        RuntimeContext ctx = this.getRuntimeContext();
        if (this.flinkKafkaPartitioner != null) {
            if (this.flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
                ((FlinkKafkaDelegatePartitioner)this.flinkKafkaPartitioner).setPartitions(FlinkKafkaProducer.getPartitionsByTopic(this.defaultTopicId, producer));
            }
            this.flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
        }
        if (this.kafkaSchema instanceof KafkaContextAware) {
            KafkaContextAware contextAwareSchema = (KafkaContextAware)((Object)this.kafkaSchema);
            contextAwareSchema.setParallelInstanceId(ctx.getIndexOfThisSubtask());
            contextAwareSchema.setNumParallelInstances(ctx.getNumberOfParallelSubtasks());
        }
        LOG.info("Starting FlinkKafkaInternalProducer ({}/{}) to produce into default topic {}", ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), this.defaultTopicId);
        if (registerMetrics && !Boolean.parseBoolean(this.producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
            Map<MetricName, Metric> metrics = producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                MetricGroup kafkaMetricGroup = this.getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, Metric> entry : metrics.entrySet()) {
                    String name = entry.getKey().name();
                    Metric metric = entry.getValue();
                    KafkaMetricMutableWrapper wrapper = this.previouslyCreatedMetrics.get(name);
                    if (wrapper != null) {
                        wrapper.setKafkaMetric(metric);
                        continue;
                    }
                    wrapper = new KafkaMetricMutableWrapper(metric);
                    this.previouslyCreatedMetrics.put(name, wrapper);
                    kafkaMetricGroup.gauge(name, (Gauge)wrapper);
                }
            }
        }
        return producer;
    }

    private void checkErroneous() throws FlinkKafkaException {
        Exception e = this.asyncException;
        if (e != null) {
            this.asyncException = null;
            throw new FlinkKafkaException(FlinkKafkaErrorCode.EXTERNAL_ERROR, "Failed to send data to Kafka: " + e.getMessage(), e);
        }
    }

    private void readObject(ObjectInputStream in2) throws IOException, ClassNotFoundException {
        in2.defaultReadObject();
    }

    private void migrateNextTransactionalIdHindState(FunctionInitializationContext context) throws Exception {
        ListState oldNextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
        this.nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
        ArrayList oldTransactionalIdHints = Lists.newArrayList((Iterable)((Iterable)oldNextTransactionalIdHintState.get()));
        if (!oldTransactionalIdHints.isEmpty()) {
            this.nextTransactionalIdHintState.addAll((List)oldTransactionalIdHints);
            oldNextTransactionalIdHintState.clear();
        }
    }

    private static Properties getPropertiesFromBrokerList(String brokerList) {
        String[] elements;
        for (String broker : elements = brokerList.split(",")) {
            NetUtils.getCorrectHostnamePort((String)broker);
        }
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokerList);
        return props;
    }

    private static int[] getPartitionsByTopic(String topic, Producer<byte[], byte[]> producer) {
        ArrayList<PartitionInfo> partitionsList = new ArrayList<PartitionInfo>(producer.partitionsFor(topic));
        Collections.sort(partitionsList, new Comparator<PartitionInfo>(){

            @Override
            public int compare(PartitionInfo o1, PartitionInfo o2) {
                return Integer.compare(o1.partition(), o2.partition());
            }
        });
        int[] partitions = new int[partitionsList.size()];
        for (int i = 0; i < partitions.length; ++i) {
            partitions[i] = ((PartitionInfo)partitionsList.get(i)).partition();
        }
        return partitions;
    }

    @VisibleForTesting
    @Internal
    public static class NextTransactionalIdHintSerializer
    extends TypeSerializerSingleton<NextTransactionalIdHint> {
        private static final long serialVersionUID = 1L;

        public boolean isImmutableType() {
            return true;
        }

        public NextTransactionalIdHint createInstance() {
            return new NextTransactionalIdHint();
        }

        public NextTransactionalIdHint copy(NextTransactionalIdHint from2) {
            return from2;
        }

        public NextTransactionalIdHint copy(NextTransactionalIdHint from2, NextTransactionalIdHint reuse) {
            return from2;
        }

        public int getLength() {
            return 12;
        }

        public void serialize(NextTransactionalIdHint record2, DataOutputView target) throws IOException {
            target.writeLong(record2.nextFreeTransactionalId);
            target.writeInt(record2.lastParallelism);
        }

        public NextTransactionalIdHint deserialize(DataInputView source2) throws IOException {
            long nextFreeTransactionalId = source2.readLong();
            int lastParallelism = source2.readInt();
            return new NextTransactionalIdHint(lastParallelism, nextFreeTransactionalId);
        }

        public NextTransactionalIdHint deserialize(NextTransactionalIdHint reuse, DataInputView source2) throws IOException {
            return this.deserialize(source2);
        }

        public void copy(DataInputView source2, DataOutputView target) throws IOException {
            target.writeLong(source2.readLong());
            target.writeInt(source2.readInt());
        }

        public TypeSerializerSnapshot<NextTransactionalIdHint> snapshotConfiguration() {
            return new NextTransactionalIdHintSerializerSnapshot();
        }

        public static final class NextTransactionalIdHintSerializerSnapshot
        extends SimpleTypeSerializerSnapshot<NextTransactionalIdHint> {
            public NextTransactionalIdHintSerializerSnapshot() {
                super(NextTransactionalIdHintSerializer::new);
            }
        }
    }

    public static class NextTransactionalIdHint {
        public int lastParallelism = 0;
        public long nextFreeTransactionalId = 0L;

        public NextTransactionalIdHint() {
            this(0, 0L);
        }

        public NextTransactionalIdHint(int parallelism, long nextFreeTransactionalId) {
            this.lastParallelism = parallelism;
            this.nextFreeTransactionalId = nextFreeTransactionalId;
        }

        public String toString() {
            return "NextTransactionalIdHint[lastParallelism=" + this.lastParallelism + ", nextFreeTransactionalId=" + this.nextFreeTransactionalId + ']';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            NextTransactionalIdHint that = (NextTransactionalIdHint)o;
            if (this.lastParallelism != that.lastParallelism) {
                return false;
            }
            return this.nextFreeTransactionalId == that.nextFreeTransactionalId;
        }

        public int hashCode() {
            int result2 = this.lastParallelism;
            result2 = 31 * result2 + (int)(this.nextFreeTransactionalId ^ this.nextFreeTransactionalId >>> 32);
            return result2;
        }
    }

    @VisibleForTesting
    @Internal
    public static class ContextStateSerializer
    extends TypeSerializerSingleton<KafkaTransactionContext> {
        private static final long serialVersionUID = 1L;

        public boolean isImmutableType() {
            return true;
        }

        public KafkaTransactionContext createInstance() {
            return null;
        }

        public KafkaTransactionContext copy(KafkaTransactionContext from2) {
            return from2;
        }

        public KafkaTransactionContext copy(KafkaTransactionContext from2, KafkaTransactionContext reuse) {
            return from2;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(KafkaTransactionContext record2, DataOutputView target) throws IOException {
            int numIds = record2.transactionalIds.size();
            target.writeInt(numIds);
            for (String id2 : record2.transactionalIds) {
                target.writeUTF(id2);
            }
        }

        public KafkaTransactionContext deserialize(DataInputView source2) throws IOException {
            int numIds = source2.readInt();
            HashSet<String> ids = new HashSet<String>(numIds);
            for (int i = 0; i < numIds; ++i) {
                ids.add(source2.readUTF());
            }
            return new KafkaTransactionContext(ids);
        }

        public KafkaTransactionContext deserialize(KafkaTransactionContext reuse, DataInputView source2) throws IOException {
            return this.deserialize(source2);
        }

        public void copy(DataInputView source2, DataOutputView target) throws IOException {
            int numIds = source2.readInt();
            target.writeInt(numIds);
            for (int i = 0; i < numIds; ++i) {
                target.writeUTF(source2.readUTF());
            }
        }

        public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
            return new ContextStateSerializerSnapshot();
        }

        public static final class ContextStateSerializerSnapshot
        extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
            public ContextStateSerializerSnapshot() {
                super(ContextStateSerializer::new);
            }
        }
    }

    @VisibleForTesting
    @Internal
    public static class TransactionStateSerializer
    extends TypeSerializerSingleton<KafkaTransactionState> {
        private static final long serialVersionUID = 1L;

        public boolean isImmutableType() {
            return true;
        }

        public KafkaTransactionState createInstance() {
            return null;
        }

        public KafkaTransactionState copy(KafkaTransactionState from2) {
            return from2;
        }

        public KafkaTransactionState copy(KafkaTransactionState from2, KafkaTransactionState reuse) {
            return from2;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(KafkaTransactionState record2, DataOutputView target) throws IOException {
            if (record2.transactionalId == null) {
                target.writeBoolean(false);
            } else {
                target.writeBoolean(true);
                target.writeUTF(record2.transactionalId);
            }
            target.writeLong(record2.producerId);
            target.writeShort((int)record2.epoch);
        }

        public KafkaTransactionState deserialize(DataInputView source2) throws IOException {
            String transactionalId = null;
            if (source2.readBoolean()) {
                transactionalId = source2.readUTF();
            }
            long producerId = source2.readLong();
            short epoch = source2.readShort();
            return new KafkaTransactionState(transactionalId, producerId, epoch, null);
        }

        public KafkaTransactionState deserialize(KafkaTransactionState reuse, DataInputView source2) throws IOException {
            return this.deserialize(source2);
        }

        public void copy(DataInputView source2, DataOutputView target) throws IOException {
            boolean hasTransactionalId = source2.readBoolean();
            target.writeBoolean(hasTransactionalId);
            if (hasTransactionalId) {
                target.writeUTF(source2.readUTF());
            }
            target.writeLong(source2.readLong());
            target.writeShort((int)source2.readShort());
        }

        public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
            return new TransactionStateSerializerSnapshot();
        }

        public static final class TransactionStateSerializerSnapshot
        extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
            public TransactionStateSerializerSnapshot() {
                super(TransactionStateSerializer::new);
            }
        }
    }

    @VisibleForTesting
    @Internal
    public static class KafkaTransactionContext {
        final Set<String> transactionalIds;

        KafkaTransactionContext(Set<String> transactionalIds) {
            Preconditions.checkNotNull(transactionalIds);
            this.transactionalIds = transactionalIds;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaTransactionContext that = (KafkaTransactionContext)o;
            return this.transactionalIds.equals(that.transactionalIds);
        }

        public int hashCode() {
            return this.transactionalIds.hashCode();
        }
    }

    @VisibleForTesting
    @Internal
    static class KafkaTransactionState {
        private final transient FlinkKafkaInternalProducer<byte[], byte[]> producer;
        @Nullable
        final String transactionalId;
        final long producerId;
        final short epoch;

        KafkaTransactionState(String transactionalId, FlinkKafkaInternalProducer<byte[], byte[]> producer) {
            this(transactionalId, producer.getProducerId(), producer.getEpoch(), producer);
        }

        KafkaTransactionState(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
            this(null, -1L, -1, producer);
        }

        KafkaTransactionState(@Nullable String transactionalId, long producerId, short epoch, FlinkKafkaInternalProducer<byte[], byte[]> producer) {
            this.transactionalId = transactionalId;
            this.producerId = producerId;
            this.epoch = epoch;
            this.producer = producer;
        }

        boolean isTransactional() {
            return this.transactionalId != null;
        }

        public String toString() {
            return String.format("%s [transactionalId=%s, producerId=%s, epoch=%s]", this.getClass().getSimpleName(), this.transactionalId, this.producerId, this.epoch);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            KafkaTransactionState that = (KafkaTransactionState)o;
            if (this.producerId != that.producerId) {
                return false;
            }
            if (this.epoch != that.epoch) {
                return false;
            }
            return this.transactionalId != null ? this.transactionalId.equals(that.transactionalId) : that.transactionalId == null;
        }

        public int hashCode() {
            int result2 = this.transactionalId != null ? this.transactionalId.hashCode() : 0;
            result2 = 31 * result2 + (int)(this.producerId ^ this.producerId >>> 32);
            result2 = 31 * result2 + this.epoch;
            return result2;
        }
    }

    public static enum Semantic {
        EXACTLY_ONCE,
        AT_LEAST_ONCE,
        NONE;

    }
}

