/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.producer;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import kz.greetgo.kafka.consumer.Profile;
import kz.greetgo.kafka.core.ProducerSynchronizer;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.producer.KafkaFuture;
import kz.greetgo.kafka.producer.KafkaSending;
import kz.greetgo.kafka.producer.ProducerFacade;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.util.Listener;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;

public abstract class ProducerFacadeAbstract<MessageType>
implements ProducerFacade<MessageType> {
    protected final ProducerSource source;
    protected final String producerName;
    protected final ProducerSynchronizer producerSynchronizer;
    protected final Supplier<String> topicPrefix;
    protected final ConcurrentHashMap<Profile, Listener> changeListeners = new ConcurrentHashMap();
    protected final ConcurrentHashMap<Profile, Producer<byte[], MessageType>> producerByProfile = new ConcurrentHashMap();

    protected ProducerFacadeAbstract(ProducerSource source, String producerName, ProducerSynchronizer producerSynchronizer, Supplier<String> topicPrefix) {
        this.source = source;
        this.producerName = producerName;
        this.producerSynchronizer = producerSynchronizer;
        this.topicPrefix = topicPrefix;
    }

    @Override
    public String name() {
        return this.producerName;
    }

    @Override
    public void reset() {
        this.closeAllProducers();
    }

    private void closeAllProducers() {
        ArrayList<Map.Entry<Profile, Producer<byte[], MessageType>>> entries = new ArrayList<Map.Entry<Profile, Producer<byte[], MessageType>>>(this.producerByProfile.entrySet());
        for (Map.Entry<Profile, Producer<byte[], MessageType>> e : entries) {
            Profile profile = e.getKey();
            Producer<byte[], MessageType> producer = e.getValue();
            this.producerByProfile.remove(profile);
            producer.close();
            if (!this.source.logger().isShow(LoggerType.LOG_CLOSE_PRODUCER)) continue;
            this.source.logger().logProducerClosed(this.producerName, profile);
        }
    }

    @Override
    public void close() throws Exception {
        this.closeAllProducers();
        this.changeListeners.values().forEach(Listener::kill);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nonnull
    public Producer<byte[], MessageType> getNativeProducer(@Nonnull Profile profile) {
        Producer<byte[], MessageType> p = this.producerByProfile.get(profile);
        if (p != null) {
            return p;
        }
        ConcurrentHashMap<Profile, Producer<byte[], MessageType>> concurrentHashMap = this.producerByProfile;
        synchronized (concurrentHashMap) {
            Producer<byte[], MessageType> p2 = this.producerByProfile.get(profile);
            if (p2 != null) {
                return p2;
            }
            p2 = this.createProducer(profile);
            this.producerByProfile.put(profile, p2);
            return p2;
        }
    }

    @Override
    public Map<String, String> getConfigData(Profile profile) {
        return this.source.getConfigFor(this.producerName, profile);
    }

    protected abstract Serializer<MessageType> createValueSerializer();

    @Nonnull
    private Producer<byte[], MessageType> createProducer(@Nonnull Profile profile) {
        ByteArraySerializer keySerializer = new ByteArraySerializer();
        Serializer<MessageType> valueSerializer = this.createValueSerializer();
        Producer<byte[], MessageType> producer = this.source.createProducer(this.producerName, profile, keySerializer, valueSerializer);
        Listener listener = this.source.addConfigListener(this.producerName, profile, () -> this.closeProducer(profile));
        Listener oldListener = this.changeListeners.put(profile, listener);
        if (oldListener != null) {
            oldListener.kill();
        }
        if (this.source.logger().isShow(LoggerType.LOG_CREATE_PRODUCER)) {
            this.source.logger().logProducerCreated(this.producerName, profile);
        }
        return producer;
    }

    private void closeProducer(Profile profile) {
        Producer<byte[], MessageType> producer = this.producerByProfile.remove(profile);
        if (producer != null) {
            producer.close();
        }
    }

    protected String doPrefixTopic(String topic) {
        String prefix = this.topicPrefix.get();
        return prefix == null ? topic : prefix + topic;
    }

    protected abstract KafkaFuture doSend(@Nonnull MessageType var1, String var2, byte[] var3, Integer var4, Long var5, String var6, String var7, @Nonnull ArrayList<Header> var8, @Nonnull Profile var9);

    @Override
    public KafkaSending sending(final MessageType message) {
        return new KafkaSending(){
            String toTopic = null;
            Integer toPartition = null;
            Long timestamp = null;
            final ArrayList<Header> headers = new ArrayList();
            String author;
            byte[] withKey;
            String kafkaId;
            Profile profile;
            {
                this.author = ProducerFacadeAbstract.this.source.author();
                this.withKey = null;
                this.kafkaId = null;
                this.profile = new Profile(null);
            }

            @Override
            public KafkaSending toTopic(String topic) {
                this.toTopic = topic;
                return this;
            }

            @Override
            public KafkaSending toPartition(int partition) {
                this.toPartition = partition;
                return this;
            }

            @Override
            public KafkaSending setTimestamp(Long timestamp) {
                this.timestamp = timestamp;
                return this;
            }

            @Override
            public KafkaSending addHeader(final String key, final byte[] value) {
                this.headers.add(new Header(){

                    public String key() {
                        return key;
                    }

                    public byte[] value() {
                        return value;
                    }
                });
                return this;
            }

            @Override
            public KafkaSending addHeader(String key, String value) {
                return this.addHeader(key, value == null ? new byte[]{} : value.getBytes(StandardCharsets.UTF_8));
            }

            @Override
            public KafkaSending setAuthor(String author) {
                this.author = author;
                return this;
            }

            @Override
            public KafkaSending withKey(String keyAsString) {
                return this.withKey(keyAsString.getBytes(StandardCharsets.UTF_8));
            }

            @Override
            public KafkaSending withKey(byte[] keyAsBytes) {
                this.withKey = keyAsBytes;
                return this;
            }

            @Override
            public KafkaSending kafkaId(String kafkaId) {
                this.kafkaId = kafkaId;
                return this;
            }

            @Override
            public KafkaSending profile(String profile) {
                this.profile = new Profile(profile);
                return this;
            }

            @Override
            public KafkaFuture go() {
                return ProducerFacadeAbstract.this.doSend(message, this.toTopic, this.withKey, this.toPartition, this.timestamp, this.author, this.kafkaId, this.headers, this.profile);
            }

            @Override
            public void goWithPortion() {
                ProducerFacadeAbstract.this.producerSynchronizer.acceptKafkaFuture(this.go());
            }
        };
    }
}

