/*
 * Decompiled with CFR 0.152.
 */
package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.CommAdapter;
import ch.squaredesk.nova.comm.CommAdapterBuilder;
import ch.squaredesk.nova.comm.DefaultMessageTranscriberForStringAsTransportType;
import ch.squaredesk.nova.comm.MessageTranscriber;
import ch.squaredesk.nova.comm.kafka.MessageReceiver;
import ch.squaredesk.nova.comm.kafka.MessageSender;
import ch.squaredesk.nova.comm.kafka.OutgoingMessageMetaData;
import ch.squaredesk.nova.comm.kafka.SendInfo;
import ch.squaredesk.nova.metrics.Metrics;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAdapter
extends CommAdapter<String> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAdapter.class);
    private final MessageSender messageSender;
    private final MessageReceiver messageReceiver;

    KafkaAdapter(MessageSender messageSender, MessageReceiver messageReceiver, MessageTranscriber<String> messageTranscriber, Metrics metrics) {
        super(messageTranscriber, metrics);
        this.messageReceiver = messageReceiver;
        this.messageSender = messageSender;
    }

    public Completable sendMessage(String destination, String message) {
        SendInfo sendInfo = new SendInfo();
        OutgoingMessageMetaData meta = new OutgoingMessageMetaData(destination, sendInfo);
        return this.messageSender.send(message, meta);
    }

    public <T> Completable sendMessage(String destination, T message) {
        Function transcriber = this.messageTranscriber.getOutgoingMessageTranscriber(message.getClass());
        return this.sendMessage(destination, message, transcriber);
    }

    public <T> Completable sendMessage(String destination, T message, Function<T, String> transcriber) {
        Objects.requireNonNull(message, "message must not be null");
        SendInfo sendInfo = new SendInfo();
        OutgoingMessageMetaData meta = new OutgoingMessageMetaData(destination, sendInfo);
        return this.messageSender.send(message, meta, transcriber);
    }

    public Flowable<String> messages(String destination) {
        return this.messageReceiver.messages(destination).map(incomingMessage -> (String)incomingMessage.message);
    }

    public <T> Flowable<T> messages(String destination, Class<T> messageType) {
        return this.messages(destination, this.messageTranscriber.getIncomingMessageTranscriber(messageType));
    }

    public <T> Flowable<T> messages(String destination, Function<String, T> messageTranscriber) {
        return this.messageReceiver.messages(destination, messageTranscriber).map(incomingMessage -> incomingMessage.message);
    }

    public void shutdown() {
        if (this.messageReceiver instanceof MessageReceiver) {
            this.messageReceiver.shutdown();
        }
        logger.info("KafkaAdapter shut down");
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder
    extends CommAdapterBuilder<String, KafkaAdapter> {
        private String serverAddress;
        private String identifier;
        private MessageSender messageSender;
        private MessageReceiver messageReceiver;
        private Scheduler subscriptionScheduler;
        private Properties consumerProperties = new Properties();
        private Properties producerProperties = new Properties();
        private long pollTimeout = 1L;
        private TimeUnit pollTimeUnit = TimeUnit.SECONDS;

        private Builder() {
        }

        public Builder setMessagePollingTimeout(long pollTimeout, TimeUnit pollTimeUnit) {
            Objects.requireNonNull(pollTimeUnit, "pollTimeUnit must not be null");
            this.pollTimeout = pollTimeout;
            this.pollTimeUnit = pollTimeUnit;
            return this;
        }

        public Builder setConsumerProperties(Properties consumerProperties) {
            if (consumerProperties != null) {
                this.consumerProperties.putAll((Map<?, ?>)consumerProperties);
            }
            return this;
        }

        private Builder addProperty(Properties target, String key, String value) {
            Objects.requireNonNull(key, "property key must not be null");
            Objects.requireNonNull(value, "value for property " + key + " must not be null");
            target.setProperty(key, value);
            return this;
        }

        public Builder addConsumerProperty(String key, String value) {
            return this.addProperty(this.consumerProperties, key, value);
        }

        public Builder addProducerProperty(String key, String value) {
            return this.addProperty(this.producerProperties, key, value);
        }

        public Builder setProducerProperties(Properties producerProperties) {
            if (producerProperties != null) {
                this.producerProperties.putAll((Map<?, ?>)producerProperties);
            }
            return this;
        }

        public Builder setServerAddress(String serverAddress) {
            this.serverAddress = serverAddress;
            return this;
        }

        public Builder setSubscriptionScheduler(Scheduler scheduler) {
            this.subscriptionScheduler = scheduler;
            return this;
        }

        public Builder setIdentifier(String identifier) {
            this.identifier = identifier;
            return this;
        }

        public Builder setMessageSender(MessageSender messageSender) {
            this.messageSender = messageSender;
            return this;
        }

        public Builder setMessageReceiver(MessageReceiver messageReceiver) {
            this.messageReceiver = messageReceiver;
            return this;
        }

        public void validate() {
            Objects.requireNonNull(this.serverAddress, "serverAddress must be provided");
            Objects.requireNonNull(this.metrics, "metrics must be provided");
            if (this.subscriptionScheduler == null) {
                this.subscriptionScheduler = Schedulers.from((Executor)Executors.newSingleThreadExecutor(r -> {
                    Thread t = new Thread(r, "KafkaSubscription");
                    t.setDaemon(true);
                    return t;
                }));
            }
            if (this.consumerProperties == null) {
                this.consumerProperties = new Properties();
            }
            if (this.producerProperties == null) {
                this.producerProperties = new Properties();
            }
        }

        public KafkaAdapter createInstance() {
            String clientId = this.identifier == null ? "KafkaAdapter-" + UUID.randomUUID() : this.identifier;
            String groupId = this.identifier == null ? "KafkaAdapter-ReadGroup" : this.identifier + "ReadGroup";
            Builder.setPropertyIfNotPresent(this.consumerProperties, "bootstrap.servers", this.serverAddress);
            Builder.setPropertyIfNotPresent(this.consumerProperties, "key.deserializer", StringDeserializer.class.getName());
            Builder.setPropertyIfNotPresent(this.consumerProperties, "value.deserializer", StringDeserializer.class.getName());
            Builder.setPropertyIfNotPresent(this.consumerProperties, "client.id", clientId);
            Builder.setPropertyIfNotPresent(this.consumerProperties, "group.id", groupId);
            Builder.setPropertyIfNotPresent(this.producerProperties, "bootstrap.servers", this.serverAddress);
            Builder.setPropertyIfNotPresent(this.producerProperties, "key.serializer", StringSerializer.class.getName());
            Builder.setPropertyIfNotPresent(this.producerProperties, "value.serializer", StringSerializer.class.getName());
            Builder.setPropertyIfNotPresent(this.producerProperties, "client.id", clientId);
            if (this.messageReceiver == null) {
                this.messageReceiver = new MessageReceiver(this.identifier, this.consumerProperties, this.pollTimeout, this.pollTimeUnit, this.metrics);
            }
            if (this.messageSender == null) {
                this.messageSender = new MessageSender(this.identifier, this.producerProperties, this.metrics);
            }
            if (this.messageTranscriber == null) {
                this.messageTranscriber = new DefaultMessageTranscriberForStringAsTransportType();
            }
            return new KafkaAdapter(this.messageSender, this.messageReceiver, (MessageTranscriber<String>)this.messageTranscriber, this.metrics);
        }

        private static void setPropertyIfNotPresent(Properties props, String key, String value) {
            if (!props.containsKey(key)) {
                props.setProperty(key, value);
            }
        }
    }
}

