/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.CommAdapterBuilder;
import ch.squaredesk.nova.comm.kafka.KafkaMessageReceiver;
import ch.squaredesk.nova.comm.kafka.KafkaMessageSender;
import ch.squaredesk.nova.comm.kafka.OutgoingMessageMetaData;
import ch.squaredesk.nova.comm.kafka.SendInfo;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAdapter<InternalMessageType> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAdapter.class);
    private final KafkaMessageSender<InternalMessageType> messageSender;
    private final KafkaMessageReceiver<InternalMessageType> messageReceiver;

    KafkaAdapter(KafkaMessageSender<InternalMessageType> messageSender, KafkaMessageReceiver<InternalMessageType> messageReceiver) {
        this.messageReceiver = messageReceiver;
        this.messageSender = messageSender;
    }

    public <ConcreteMessageType extends InternalMessageType> Completable sendMessage(String destination, ConcreteMessageType message) {
        Objects.requireNonNull(message, "message must not be null");
        SendInfo sendInfo = new SendInfo();
        OutgoingMessageMetaData meta = new OutgoingMessageMetaData(destination, sendInfo);
        return this.messageSender.doSend(message, meta);
    }

    public Flowable<InternalMessageType> messages(String destination) {
        return this.messageReceiver.messages(destination).map(incomingMessage -> incomingMessage.message);
    }

    public void shutdown() {
        this.messageReceiver.shutdown();
        logger.info("KafkaAdapter shut down");
    }

    public static <InternalMessageType> Builder<InternalMessageType> builder(Class<InternalMessageType> messageTypeClass) {
        return new Builder(messageTypeClass);
    }

    public static class Builder<InternalMessageType>
    extends CommAdapterBuilder<InternalMessageType, KafkaAdapter<InternalMessageType>> {
        private String serverAddress;
        private String identifier;
        private KafkaMessageSender<InternalMessageType> messageSender;
        private KafkaMessageReceiver<InternalMessageType> messageReceiver;
        private Scheduler subscriptionScheduler;
        private Properties consumerProperties = new Properties();
        private Properties producerProperties = new Properties();
        private long pollTimeout = 1L;
        private TimeUnit pollTimeUnit = TimeUnit.SECONDS;

        private Builder(Class<InternalMessageType> messageTypeClass) {
            super(messageTypeClass);
        }

        public Builder<InternalMessageType> setMessagePollingTimeout(long pollTimeout, TimeUnit pollTimeUnit) {
            Objects.requireNonNull(pollTimeUnit, "pollTimeUnit must not be null");
            this.pollTimeout = pollTimeout;
            this.pollTimeUnit = pollTimeUnit;
            return this;
        }

        public Builder<InternalMessageType> setConsumerProperties(Properties consumerProperties) {
            if (consumerProperties != null) {
                this.consumerProperties.putAll((Map<?, ?>)consumerProperties);
            }
            return this;
        }

        private Builder<InternalMessageType> addProperty(Properties target, String key, String value) {
            Objects.requireNonNull(key, "property key must not be null");
            Objects.requireNonNull(value, "value for property " + key + " must not be null");
            target.setProperty(key, value);
            return this;
        }

        public Builder<InternalMessageType> addConsumerProperty(String key, String value) {
            return this.addProperty(this.consumerProperties, key, value);
        }

        public Builder<InternalMessageType> addProducerProperty(String key, String value) {
            return this.addProperty(this.producerProperties, key, value);
        }

        public Builder<InternalMessageType> setProducerProperties(Properties producerProperties) {
            if (producerProperties != null) {
                this.producerProperties.putAll((Map<?, ?>)producerProperties);
            }
            return this;
        }

        public Builder<InternalMessageType> setServerAddress(String serverAddress) {
            this.serverAddress = serverAddress;
            return this;
        }

        public Builder<InternalMessageType> setSubscriptionScheduler(Scheduler scheduler) {
            this.subscriptionScheduler = scheduler;
            return this;
        }

        public Builder<InternalMessageType> setIdentifier(String identifier) {
            this.identifier = identifier;
            return this;
        }

        public Builder<InternalMessageType> setMessageSender(KafkaMessageSender<InternalMessageType> messageSender) {
            this.messageSender = messageSender;
            return this;
        }

        public Builder<InternalMessageType> setMessageReceiver(KafkaMessageReceiver<InternalMessageType> messageReceiver) {
            this.messageReceiver = messageReceiver;
            return this;
        }

        public void validate() {
            Objects.requireNonNull(this.serverAddress, "serverAddress must be provided");
            Objects.requireNonNull(this.messageUnmarshaller, "messageUnmarshaller must be provided");
            Objects.requireNonNull(this.messageMarshaller, "messageMarshaller must be provided");
            Objects.requireNonNull(this.metrics, "metrics must be provided");
            if (this.subscriptionScheduler == null) {
                this.subscriptionScheduler = Schedulers.from((Executor)Executors.newSingleThreadExecutor(r -> {
                    Thread t = new Thread(r, "KafkaSubscription");
                    t.setDaemon(true);
                    return t;
                }));
            }
            if (this.consumerProperties == null) {
                this.consumerProperties = new Properties();
            }
            if (this.producerProperties == null) {
                this.producerProperties = new Properties();
            }
        }

        public KafkaAdapter<InternalMessageType> createInstance() {
            String clientId = this.identifier == null ? "KafkaAdapter-" + UUID.randomUUID() : this.identifier;
            String groupId = this.identifier == null ? "KafkaAdapter-ReadGroup" : this.identifier + "ReadGroup";
            Builder.setPropertyIfNotPresent(this.consumerProperties, "bootstrap.servers", this.serverAddress);
            Builder.setPropertyIfNotPresent(this.consumerProperties, "key.deserializer", StringDeserializer.class.getName());
            Builder.setPropertyIfNotPresent(this.consumerProperties, "value.deserializer", StringDeserializer.class.getName());
            Builder.setPropertyIfNotPresent(this.consumerProperties, "client.id", clientId);
            Builder.setPropertyIfNotPresent(this.consumerProperties, "group.id", groupId);
            Builder.setPropertyIfNotPresent(this.producerProperties, "bootstrap.servers", this.serverAddress);
            Builder.setPropertyIfNotPresent(this.producerProperties, "key.serializer", StringSerializer.class.getName());
            Builder.setPropertyIfNotPresent(this.producerProperties, "value.serializer", StringSerializer.class.getName());
            Builder.setPropertyIfNotPresent(this.producerProperties, "client.id", clientId);
            if (this.messageReceiver == null) {
                this.messageReceiver = new KafkaMessageReceiver(this.identifier, this.consumerProperties, this.messageUnmarshaller, this.pollTimeout, this.pollTimeUnit, this.metrics);
            }
            if (this.messageSender == null) {
                this.messageSender = new KafkaMessageSender(this.identifier, this.producerProperties, this.messageMarshaller, this.metrics);
            }
            return new KafkaAdapter<InternalMessageType>(this.messageSender, this.messageReceiver);
        }

        private static void setPropertyIfNotPresent(Properties props, String key, String value) {
            if (!props.containsKey(key)) {
                props.setProperty(key, value);
            }
        }
    }
}

