package io.simplesource.kafka.internal.client;

import io.simplesource.data.FutureResult;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.RequestPublisher;
import io.simplesource.kafka.spec.TopicSpec;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI.class */
public final class KafkaRequestAPI<K, I, RK, R> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaRequestAPI.class);
    private final RequestAPIContext<K, I, RK, R> ctx;
    private final ResponseSubscription responseSubscription;
    private final ExpiringMap<RK, ResponseHandler<I, R>> responseHandlers;
    private final RequestPublisher<K, I> requestSender;
    private final RequestPublisher<RK, String> responseTopicMapSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$ResponseHandler.class */
    public static final class ResponseHandler<I, R> {
        private final I input;
        private final List<CompletableFuture<R>> responseFutures;
        private final Optional<R> response;

        /* JADX INFO: Access modifiers changed from: package-private */
        public static <I, R> ResponseHandler<I, R> initialise(I i, Optional<R> optional) {
            return new ResponseHandler<>(i, new ArrayList(), optional);
        }

        void forEachFuture(Consumer<CompletableFuture<R>> consumer) {
            List<CompletableFuture<R>> list = this.responseFutures;
            Objects.requireNonNull(consumer);
            list.forEach((v1) -> {
                r1.accept(v1);
            });
        }

        public ResponseHandler(I i, List<CompletableFuture<R>> list, Optional<R> optional) {
            this.input = i;
            this.responseFutures = list;
            this.response = optional;
        }

        public I input() {
            return this.input;
        }

        public List<CompletableFuture<R>> responseFutures() {
            return this.responseFutures;
        }

        public Optional<R> response() {
            return this.response;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ResponseHandler)) {
                return false;
            }
            ResponseHandler responseHandler = (ResponseHandler) obj;
            I input = input();
            Object input2 = responseHandler.input();
            if (input == null) {
                if (input2 != null) {
                    return false;
                }
            } else if (!input.equals(input2)) {
                return false;
            }
            List<CompletableFuture<R>> responseFutures = responseFutures();
            List<CompletableFuture<R>> responseFutures2 = responseHandler.responseFutures();
            if (responseFutures == null) {
                if (responseFutures2 != null) {
                    return false;
                }
            } else if (!responseFutures.equals(responseFutures2)) {
                return false;
            }
            Optional<R> response = response();
            Optional<R> response2 = responseHandler.response();
            return response == null ? response2 == null : response.equals(response2);
        }

        public int hashCode() {
            I input = input();
            int hashCode = (1 * 59) + (input == null ? 43 : input.hashCode());
            List<CompletableFuture<R>> responseFutures = responseFutures();
            int hashCode2 = (hashCode * 59) + (responseFutures == null ? 43 : responseFutures.hashCode());
            Optional<R> response = response();
            return (hashCode2 * 59) + (response == null ? 43 : response.hashCode());
        }

        public String toString() {
            return "KafkaRequestAPI.ResponseHandler(input=" + input() + ", responseFutures=" + responseFutures() + ", response=" + response() + ")";
        }
    }

    /* loaded from: input_file:io/simplesource/kafka/internal/client/KafkaRequestAPI$ResponseReceiver.class */
    static final class ResponseReceiver<K, M, V> {
        private final ExpiringMap<K, M> expiringMap;
        private final BiFunction<M, V, M> mapModifier;

        void receive(K k, V v) {
            this.expiringMap.computeIfPresent(k, obj -> {
                return this.mapModifier.apply(obj, v);
            });
        }

        public ResponseReceiver(ExpiringMap<K, M> expiringMap, BiFunction<M, V, M> biFunction) {
            this.expiringMap = expiringMap;
            this.mapModifier = biFunction;
        }

        public ExpiringMap<K, M> expiringMap() {
            return this.expiringMap;
        }

        public BiFunction<M, V, M> mapModifier() {
            return this.mapModifier;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ResponseReceiver)) {
                return false;
            }
            ResponseReceiver responseReceiver = (ResponseReceiver) obj;
            ExpiringMap<K, M> expiringMap = expiringMap();
            ExpiringMap<K, M> expiringMap2 = responseReceiver.expiringMap();
            if (expiringMap == null) {
                if (expiringMap2 != null) {
                    return false;
                }
            } else if (!expiringMap.equals(expiringMap2)) {
                return false;
            }
            BiFunction<M, V, M> mapModifier = mapModifier();
            BiFunction<M, V, M> mapModifier2 = responseReceiver.mapModifier();
            return mapModifier == null ? mapModifier2 == null : mapModifier.equals(mapModifier2);
        }

        public int hashCode() {
            ExpiringMap<K, M> expiringMap = expiringMap();
            int hashCode = (1 * 59) + (expiringMap == null ? 43 : expiringMap.hashCode());
            BiFunction<M, V, M> mapModifier = mapModifier();
            return (hashCode * 59) + (mapModifier == null ? 43 : mapModifier.hashCode());
        }

        public String toString() {
            return "KafkaRequestAPI.ResponseReceiver(expiringMap=" + expiringMap() + ", mapModifier=" + mapModifier() + ")";
        }
    }

    private static <K, V> RequestPublisher<K, V> kakfaProducerSender(KafkaConfig kafkaConfig, String str, Serde<K> serde, Serde<V> serde2) {
        KafkaProducer kafkaProducer = new KafkaProducer(kafkaConfig.producerConfig(), serde.serializer(), serde2.serializer());
        return (obj, obj2) -> {
            return FutureResult.ofFuture(kafkaProducer.send(new ProducerRecord(str, obj, obj2)), exc -> {
                logger.error("Error returned from future", exc);
                return exc;
            }).map(recordMetadata -> {
                return new RequestPublisher.PublishResult(recordMetadata.timestamp());
            });
        };
    }

    public KafkaRequestAPI(RequestAPIContext<K, I, RK, R> requestAPIContext) {
        this(requestAPIContext, kakfaProducerSender(requestAPIContext.kafkaConfig(), requestAPIContext.requestTopic(), requestAPIContext.requestKeySerde(), requestAPIContext.requestValueSerde()), kakfaProducerSender(requestAPIContext.kafkaConfig(), requestAPIContext.responseTopicMapTopic(), requestAPIContext.responseKeySerde(), Serdes.String()), biConsumer -> {
            return KafkaConsumerRunner.run(requestAPIContext.kafkaConfig().consumerConfig(), requestAPIContext.privateResponseTopic(), requestAPIContext.responseValueSerde(), biConsumer, requestAPIContext.uuidToResponseId());
        }, true);
    }

    public KafkaRequestAPI(RequestAPIContext<K, I, RK, R> requestAPIContext, RequestPublisher<K, I> requestPublisher, RequestPublisher<RK, String> requestPublisher2, Function<BiConsumer<RK, R>, ResponseSubscription> function, boolean z) {
        KafkaConfig kafkaConfig = requestAPIContext.kafkaConfig();
        this.ctx = requestAPIContext;
        long retentionInSeconds = requestAPIContext.responseWindowSpec().retentionInSeconds();
        this.requestSender = requestPublisher;
        this.responseTopicMapSender = requestPublisher2;
        if (z) {
            AdminClient create = AdminClient.create(kafkaConfig.adminClientConfig());
            try {
                Set set = (Set) create.listTopics().names().get();
                String privateResponseTopic = requestAPIContext.privateResponseTopic();
                if (!set.contains(privateResponseTopic)) {
                    TopicSpec outputTopicConfig = requestAPIContext.outputTopicConfig();
                    create.createTopics(Collections.singletonList(new NewTopic(privateResponseTopic, outputTopicConfig.partitionCount(), outputTopicConfig.replicaCount()))).all().get();
                }
            } catch (Exception e) {
                throw new RuntimeException("Unable to create required topics.", e);
            }
        }
        this.responseHandlers = new ExpiringMap<>(retentionInSeconds, Clock.systemUTC());
        ResponseReceiver responseReceiver = new ResponseReceiver(this.responseHandlers, (responseHandler, obj) -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.complete(obj);
            });
            return ResponseHandler.initialise(responseHandler.input, Optional.of(obj));
        });
        Objects.requireNonNull(responseReceiver);
        this.responseSubscription = function.apply(responseReceiver::receive);
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public FutureResult<Exception, RequestPublisher.PublishResult> publishRequest(K k, RK rk, I i) {
        FutureResult<Exception, RequestPublisher.PublishResult> map = this.responseTopicMapSender.publish(rk, this.ctx.privateResponseTopic()).flatMap(publishResult -> {
            return this.requestSender.publish(k, i);
        }).map(publishResult2 -> {
            this.responseHandlers.insertIfAbsent(rk, () -> {
                return ResponseHandler.initialise(i, Optional.empty());
            });
            return publishResult2;
        });
        this.responseHandlers.removeStaleAsync(responseHandler -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.complete(this.ctx.errorValue().apply(responseHandler.input, new Exception("Request not processed.")));
            });
        });
        return map;
    }

    public CompletableFuture<R> queryResponse(RK rk, Duration duration) {
        CompletableFuture<R> completableFuture = new CompletableFuture<>();
        if (this.responseHandlers.computeIfPresent(rk, responseHandler -> {
            Optional optional = responseHandler.response;
            if (optional.isPresent()) {
                completableFuture.complete(optional.get());
            } else {
                this.ctx.scheduler().schedule(() -> {
                    completableFuture.complete(this.ctx.errorValue().apply(responseHandler.input, new TimeoutException("Timeout after " + duration)));
                }, duration.toMillis(), TimeUnit.MILLISECONDS);
                responseHandler.responseFutures.add(completableFuture);
            }
            return responseHandler;
        }) == null) {
            completableFuture.completeExceptionally(new Exception("Invalid commandId."));
        }
        return completableFuture;
    }

    public void close() {
        logger.info("Request API shutting down");
        this.responseHandlers.removeAll(responseHandler -> {
            responseHandler.forEachFuture(completableFuture -> {
                completableFuture.complete(this.ctx.errorValue().apply(responseHandler.input, new Exception("Consumer closed before future.")));
            });
        });
        this.responseSubscription.close();
    }
}
