/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.kafka.producer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.KafkaFuture;
import kz.greetgo.kafka.producer.KafkaSending;
import kz.greetgo.kafka.producer.ProducerFacade;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class ProducerFacadeBridge
implements ProducerFacade {
    private final ProducerSource source;
    private final String producerName;
    private final boolean autoResettable;
    private final AtomicReference<Producer<byte[], Box>> producer = new AtomicReference<Object>(null);
    private final AtomicLong creationTimestamp = new AtomicLong(0L);

    private ProducerFacadeBridge(String producerName, ProducerSource source, boolean autoResettable) {
        this.source = source;
        this.producerName = producerName;
        this.autoResettable = autoResettable;
    }

    public static ProducerFacadeBridge createAutoResettableBridge(String producerName, ProducerSource source) {
        return new ProducerFacadeBridge(producerName, source, true);
    }

    public static ProducerFacadeBridge createPermanentBridge(String producerName, ProducerSource source) {
        return new ProducerFacadeBridge(producerName, source, false);
    }

    @Override
    public void reset() {
        Producer producer = this.producer.getAndSet(null);
        if (producer != null) {
            producer.close();
            if (this.source.logger().isShow(LoggerType.LOG_CLOSE_PRODUCER)) {
                this.source.logger().logProducerClosed(this.producerName);
            }
        }
    }

    @Override
    public Producer<byte[], Box> getNativeProducer() {
        Producer<byte[], Box> ret;
        if (this.autoResettable && this.creationTimestamp.get() < this.source.getProducerConfigUpdateTimestamp(this.producerName)) {
            this.reset();
        }
        if ((ret = this.producer.get()) != null) {
            return ret;
        }
        return this.producer.updateAndGet(current -> current != null ? current : this.createProducer());
    }

    @Override
    public Map<String, Object> getConfigData() {
        return this.source.getConfigFor(this.producerName);
    }

    private Producer<byte[], Box> createProducer() {
        ByteArraySerializer keySerializer = new ByteArraySerializer();
        BoxSerializer valueSerializer = new BoxSerializer(this.source.getStrConverter());
        Producer<byte[], Box> ret = this.source.createProducer(this.producerName, keySerializer, valueSerializer);
        this.creationTimestamp.set(this.source.getProducerConfigUpdateTimestamp(this.producerName));
        if (this.source.logger().isShow(LoggerType.LOG_CREATE_PRODUCER)) {
            this.source.logger().logProducerCreated(this.producerName);
        }
        return ret;
    }

    @Override
    public KafkaSending sending(final Object body) {
        return new KafkaSending(){
            String topic = null;
            Integer partition = null;
            Long timestamp = null;
            final ArrayList<Header> headers = new ArrayList();
            final Set<String> ignorableConsumers = new HashSet<String>();
            String author = ProducerFacadeBridge.access$000(ProducerFacadeBridge.this).author();

            @Override
            public KafkaSending toTopic(String topic) {
                this.topic = topic;
                return this;
            }

            @Override
            public KafkaSending toPartition(int partition) {
                this.partition = partition;
                return this;
            }

            @Override
            public KafkaSending setTimestamp(Long timestamp) {
                this.timestamp = timestamp;
                return this;
            }

            @Override
            public KafkaSending addHeader(final String key, final byte[] value) {
                this.headers.add(new Header(){

                    public String key() {
                        return key;
                    }

                    public byte[] value() {
                        return value;
                    }
                });
                return this;
            }

            @Override
            public KafkaSending addConsumerToIgnore(String consumerName) {
                this.ignorableConsumers.add(consumerName);
                return this;
            }

            @Override
            public KafkaSending setAuthor(String author) {
                this.author = author;
                return this;
            }

            @Override
            public KafkaFuture go() {
                if (this.topic == null) {
                    throw new RuntimeException("topic == null");
                }
                Box box = new Box();
                box.body = body;
                box.author = this.author;
                box.ignorableConsumers = this.ignorableConsumers.stream().sorted().collect(Collectors.toList());
                try {
                    box.validate();
                }
                catch (Throwable throwable) {
                    if (ProducerFacadeBridge.this.source.logger().isShow(LoggerType.LOG_PRODUCER_VALIDATION_ERROR)) {
                        ProducerFacadeBridge.this.source.logger().logProducerValidationError(throwable);
                    }
                    if (throwable instanceof RuntimeException) {
                        throw (RuntimeException)throwable;
                    }
                    throw new RuntimeException(throwable);
                }
                byte[] key = ProducerFacadeBridge.this.source.extractKey(body);
                return new KafkaFuture(ProducerFacadeBridge.this.getNativeProducer().send(new ProducerRecord(this.topic, this.partition, this.timestamp, (Object)key, (Object)box, this.headers)));
            }
        };
    }
}

