/*
 * Decompiled with CFR 0.152.
 */
package org.mydotey.kbear.client;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.mydotey.java.ObjectExtension;
import org.mydotey.java.StringExtension;
import org.mydotey.kbear.client.KafkaAppInfoExtension;
import org.mydotey.kbear.client.KafkaMetaHolder;
import org.mydotey.kbear.client.KafkaMetaManager;
import org.mydotey.kbear.client.KafkaProducerConfig;
import org.mydotey.kbear.client.ProducerHolder;
import org.mydotey.kbear.meta.Cluster;
import org.mydotey.kbear.meta.Topic;
import org.mydotey.kbear.route.Route;
import org.mydotey.scf.ConfigurationManager;
import org.mydotey.scf.Property;
import org.mydotey.scf.facade.StringProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerProxy<K, V>
implements Producer<K, V> {
    protected static final String JMX_PREFIX = "kafka.producer";
    private static Logger _logger = LoggerFactory.getLogger(ProducerProxy.class);
    private KafkaMetaManager _metaManager;
    private KafkaProducerConfig _kafkaProducerConfig;
    private String _clientId;
    private Object _addRemoveLock;
    private ConcurrentHashMap<String, ProducerHolder> _producerHolders;
    Property<String, Integer> _destroyDelay;
    private ScheduledExecutorService _executorService;

    public ProducerProxy(ConfigurationManager configurationManager, KafkaMetaManager metaManager, KafkaProducerConfig kafkaProducerConfig) {
        ObjectExtension.requireNonNull((Object)configurationManager, (String)"configurationManager");
        ObjectExtension.requireNonNull((Object)metaManager, (String)"metaManager");
        ObjectExtension.requireNonNull((Object)kafkaProducerConfig, (String)"kafkaProducerConfig");
        this._clientId = kafkaProducerConfig.getProperties().getProperty("client.id");
        if (StringExtension.isBlank((String)this._clientId)) {
            this._clientId = "";
        }
        this._metaManager = metaManager;
        this._kafkaProducerConfig = kafkaProducerConfig;
        this._addRemoveLock = new Object();
        this._producerHolders = new ConcurrentHashMap();
        StringProperties stringProperties = new StringProperties(configurationManager);
        this._destroyDelay = stringProperties.getIntProperty((Object)"kafka.producer-proxy.producer-destroy-delay", Integer.valueOf(60000), v -> v < 0 ? null : v);
        this._executorService = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "kafka.producer-proxy.scheduled-executor");
            thread.setDaemon(true);
            return thread;
        });
    }

    public void initTransactions() {
        throw new UnsupportedOperationException();
    }

    public void beginTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void flush() {
        throw new UnsupportedOperationException();
    }

    public void commitTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public void abortTransaction() throws ProducerFencedException {
        throw new UnsupportedOperationException();
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        ObjectExtension.requireNonNull(record, (String)"record");
        return this.getOrAddProducer(record.topic()).getProducer().send(record);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ObjectExtension.requireNonNull(record, (String)"record");
        ObjectExtension.requireNonNull((Object)callback, (String)"callback");
        return this.getOrAddProducer(record.topic()).getProducer().send(record, callback);
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        ObjectExtension.requireNonNull((Object)topic, (String)"topic");
        return this.getOrAddProducer(topic).getProducer().partitionsFor(topic);
    }

    public Map<MetricName, ? extends Metric> metrics() {
        HashMap result = new HashMap();
        this.forEach((t, p) -> result.putAll(p.getProducer().metrics()));
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this._addRemoveLock;
        synchronized (object) {
            this.forEach((t, p) -> this.removeProducer((String)t));
            _logger.info("producer closed");
            this._executorService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(long timeout, TimeUnit unit) {
        Object object = this._addRemoveLock;
        synchronized (object) {
            this.forEach((t, p) -> this.removeProducer((String)t, timeout, unit));
            _logger.info("producer closed");
            this._executorService.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ProducerHolder getOrAddProducer(String topicId) {
        ObjectExtension.requireNonNull((Object)topicId, (String)"topicId");
        ProducerHolder producerHolder = this._producerHolders.get(topicId);
        if (producerHolder != null) {
            return producerHolder;
        }
        Object object = this._addRemoveLock;
        synchronized (object) {
            producerHolder = this._producerHolders.get(topicId);
            if (producerHolder != null) {
                return producerHolder;
            }
            producerHolder = this.newProducerHolder(topicId);
            this._producerHolders.put(topicId, producerHolder);
            _logger.info("producer created: {}", (Object)topicId);
            return producerHolder;
        }
    }

    protected ProducerHolder newProducerHolder(String topicId) {
        this._metaManager.registerProducer(topicId, () -> this.restartProducer(topicId));
        KafkaMetaHolder metaHolder = this._metaManager.getMetaHolder();
        Route route = metaHolder.getTopicRoutes().get(topicId);
        if (route == null) {
            throw new IllegalArgumentException("Topic not found: " + topicId);
        }
        Cluster cluster = metaHolder.getClusters().get(route.getClusterId());
        Topic topic = metaHolder.getTopics().get(route.getTopicId());
        KafkaProducerConfig kafkaProducerConfig = this.constructProducerConfig(this._kafkaProducerConfig, topic, cluster);
        Producer producer = this.newProducer(topicId, kafkaProducerConfig);
        return new ProducerHolder(topicId, kafkaProducerConfig, route, producer);
    }

    protected KafkaProducerConfig constructProducerConfig(KafkaProducerConfig producerConfig, Topic topic, Cluster cluster) {
        Object kafkaProducerConfig = producerConfig.clone();
        ((KafkaProducerConfig)kafkaProducerConfig).getProperties().setProperty("bootstrap.servers", cluster.getMeta().get("bootstrap.servers"));
        ((KafkaProducerConfig)kafkaProducerConfig).getProperties().setProperty("client.id", this.constructClientId(topic.getId()));
        return kafkaProducerConfig;
    }

    protected String constructClientId(String topicId) {
        return this._clientId + "_" + topicId;
    }

    protected Producer newProducer(String topicId, KafkaProducerConfig kafkaProducerConfig) {
        _logger.info("Producer {} created with config: {}", (Object)topicId, (Object)kafkaProducerConfig);
        return new KafkaProducer(kafkaProducerConfig.getProperties(), kafkaProducerConfig.getKeySerializer(), kafkaProducerConfig.getValueSerializer());
    }

    protected void removeProducer(String topicId) {
        this.removeProducer(topicId, c -> c.close());
    }

    protected void removeProducer(String topicId, long timeout, TimeUnit unit) {
        this.removeProducer(topicId, c -> c.close(timeout, unit));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeProducer(String topicId, Consumer<Producer> closer) {
        ProducerHolder producerHolder = this._producerHolders.get(topicId);
        if (producerHolder == null) {
            return;
        }
        Object object = this._addRemoveLock;
        synchronized (object) {
            producerHolder = this._producerHolders.remove(topicId);
            if (producerHolder == null) {
                return;
            }
            this._metaManager.unregisterProducer(topicId);
            closer.accept(producerHolder.getProducer());
            _logger.info("Producer removed: {}, config: {}", (Object)topicId, producerHolder.getConfig());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void restartProducer(String topicId) {
        Object object = this._addRemoveLock;
        synchronized (object) {
            ProducerHolder oldProducerHolder = this._producerHolders.remove(topicId);
            this._metaManager.unregisterProducer(topicId);
            KafkaAppInfoExtension.unregister(JMX_PREFIX, this.constructClientId(topicId));
            ProducerHolder newProducerHolder = this.newProducerHolder(topicId);
            this._producerHolders.put(topicId, newProducerHolder);
            this._executorService.schedule(() -> {
                try {
                    oldProducerHolder.getProducer().close();
                }
                catch (Throwable e) {
                    _logger.error("close old producer failed for topic: " + topicId, e);
                }
            }, (long)((Integer)this._destroyDelay.getValue()).intValue(), TimeUnit.MILLISECONDS);
        }
    }

    protected void forEach(BiConsumer<String, ProducerHolder> biConsumer) {
        this.forEach(t -> true, biConsumer);
    }

    protected void forEach(Predicate<String> predicate, BiConsumer<String, ProducerHolder> biConsumer) {
        Collection topics = this._producerHolders.keySet().stream().filter(predicate).collect(Collectors.toSet());
        topics.forEach((? super T t) -> biConsumer.accept((String)t, this._producerHolders.get(t)));
    }
}

