package io.simplesource.kafka.internal;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.data.FutureResult;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.api.RemoteCommandResponseStore;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.streams.statestore.CommandResponseStoreBridge;
import io.simplesource.kafka.internal.streams.statestore.StateStoreUtils;
import io.simplesource.kafka.internal.util.RetryDelay;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.spec.AggregateSpec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.WindowStoreIterator;

/* loaded from: input_file:io/simplesource/kafka/internal/KafkaCommandAPI.class */
public final class KafkaCommandAPI<K, C, A> implements CommandAPI<K, C> {
    private final String aggregateName;
    private final String commandRequestTopic;
    private final Producer<K, CommandRequest<K, C>> commandProducer;
    private final AggregateSerdes<K, C, ?, A> aggregateSerdes;
    private final HostInfo currentHost;
    private final CommandResponseStoreBridge<A> storeBridge;
    private final RemoteCommandResponseStore remoteStore;
    private final ScheduledExecutorService scheduledExecutor;
    private final RetryDelay retryDelay;

    public KafkaCommandAPI(AggregateSpec<K, C, ?, A> aggregateSpec, KafkaConfig kafkaConfig, CommandResponseStoreBridge<A> commandResponseStoreBridge, RemoteCommandResponseStore remoteCommandResponseStore, ScheduledExecutorService scheduledExecutorService, RetryDelay retryDelay) {
        this.commandRequestTopic = aggregateSpec.serialization().resourceNamingStrategy().topicName(aggregateSpec.aggregateName(), AggregateResources.TopicEntity.command_request.name());
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        this.commandProducer = new KafkaProducer(kafkaConfig.producerConfig(), this.aggregateSerdes.aggregateKey().serializer(), this.aggregateSerdes.commandRequest().serializer());
        this.currentHost = kafkaConfig.currentHostInfo();
        this.aggregateName = aggregateSpec.aggregateName();
        this.storeBridge = commandResponseStoreBridge;
        this.remoteStore = remoteCommandResponseStore;
        this.scheduledExecutor = scheduledExecutorService;
        this.retryDelay = retryDelay;
    }

    public FutureResult<CommandError, UUID> publishCommand(CommandAPI.Request<K, C> request) {
        return FutureResult.ofFuture(this.commandProducer.send(new ProducerRecord(this.commandRequestTopic, request.key(), new CommandRequest(request.key(), request.command(), request.readSequence(), request.commandId()))), exc -> {
            return CommandError.of(CommandError.Reason.CommandPublishError, exc);
        }).map(recordMetadata -> {
            return request.commandId();
        });
    }

    public FutureResult<CommandError, NonEmptyList<Sequence>> queryCommandResult(UUID uuid, Duration duration) {
        return StateStoreUtils.get(hostInfoForCommandResponseKey(uuid), this.currentHost, () -> {
            return getLocalAggregate(uuid);
        }, (hostInfo, duration2) -> {
            return this.remoteStore.get(hostInfo, this.aggregateName, uuid, duration2);
        }, () -> {
            return CommandError.of(CommandError.Reason.Timeout, "Request timed out");
        }, exc -> {
            return CommandError.of(CommandError.Reason.RemoteLookupFailed, exc);
        }, this.scheduledExecutor, this.retryDelay, duration);
    }

    private Optional<Result<CommandError, NonEmptyList<Sequence>>> getLocalAggregate(UUID uuid) {
        WindowStoreIterator fetch = this.storeBridge.getCommandResponseStore().fetch(uuid, 0L, System.currentTimeMillis());
        Iterable iterable = () -> {
            return fetch;
        };
        return StreamSupport.stream(iterable.spliterator(), false).max(Comparator.comparingLong(keyValue -> {
            return ((Long) keyValue.key).longValue();
        })).map(keyValue2 -> {
            return (AggregateUpdateResult) keyValue2.value;
        }).map(aggregateUpdateResult -> {
            return aggregateUpdateResult.updatedAggregateResult().map(aggregateUpdate -> {
                Sequence next = aggregateUpdateResult.readSequence().next();
                ArrayList arrayList = new ArrayList();
                Sequence next2 = next.next();
                while (true) {
                    Sequence sequence = next2;
                    if (!sequence.isLessThanOrEqual(aggregateUpdate.sequence())) {
                        return new NonEmptyList(next, arrayList);
                    }
                    arrayList.add(sequence);
                    next2 = sequence.next();
                }
            });
        });
    }

    private Supplier<Result<CommandError, HostInfo>> hostInfoForCommandResponseKey(UUID uuid) {
        return () -> {
            return (Result) this.storeBridge.hostInfoForCommandResponseStoreKey(uuid).map((v0) -> {
                return Result.success(v0);
            }).orElse(Result.failure(CommandError.of(CommandError.Reason.AggregateNotFound, "No metadata found for command response key " + uuid), new CommandError[0]));
        };
    }
}
