package io.simplesource.kafka.internal.streams;

import io.simplesource.api.CommandAPI;
import io.simplesource.data.FutureResult;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.api.RemoteCommandResponseStore;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.KafkaCommandAPI;
import io.simplesource.kafka.internal.streams.statestore.AggregateStoreBridge;
import io.simplesource.kafka.internal.streams.statestore.CommandResponseStoreBridge;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.internal.util.RetryDelay;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.spec.AggregateSpec;
import java.time.Duration;
import java.util.Comparator;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.StreamSupport;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.test.ConsumerRecordFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/AggregateTestDriver.class */
public final class AggregateTestDriver<K, C, E, A> implements CommandAPI<K, C> {
    private final TopologyTestDriver driver;
    private final AggregateSpec<K, C, E, A> aggregateSpec;
    private final KafkaConfig kafkaConfig;
    private final AggregateSerdes<K, C, E, A> aggregateSerdes;
    private final AggregateTestDriver<K, C, E, A>.TestDriverPublisher publisher;
    private final CommandAPI<K, C> commandAPI;

    /* loaded from: input_file:io/simplesource/kafka/internal/streams/AggregateTestDriver$TestDriverPublisher.class */
    private class TestDriverPublisher {
        private final ConsumerRecordFactory<K, CommandRequest<C>> factory;

        TestDriverPublisher(AggregateSerdes<K, C, E, A> aggregateSerdes) {
            this.factory = new ConsumerRecordFactory<>(aggregateSerdes.aggregateKey().serializer(), aggregateSerdes.commandRequest().serializer());
        }

        private ConsumerRecordFactory<K, CommandRequest<C>> recordFactory() {
            return this.factory;
        }

        void publish(String str, K k, CommandRequest<C> commandRequest) {
            AggregateTestDriver.this.driver.pipeInput(recordFactory().create(str, k, commandRequest));
        }
    }

    /* loaded from: input_file:io/simplesource/kafka/internal/streams/AggregateTestDriver$TestDriverStoreBridge.class */
    private class TestDriverStoreBridge implements AggregateStoreBridge<K, A>, CommandResponseStoreBridge<A> {
        private TestDriverStoreBridge() {
        }

        public ReadOnlyKeyValueStore<K, AggregateUpdate<A>> getAggregateStateStore() {
            return AggregateTestDriver.this.driver.getKeyValueStore(AggregateTestDriver.this.storeName(AggregateResources.StateStoreEntity.aggregate_update));
        }

        public ReadOnlyWindowStore<UUID, AggregateUpdateResult<A>> getCommandResponseStore() {
            return AggregateTestDriver.this.driver.getWindowStore(AggregateTestDriver.this.storeName(AggregateResources.StateStoreEntity.command_response));
        }

        public Optional<HostInfo> hostInfoForAggregateStoreKey(K k) {
            return Optional.of(AggregateTestDriver.this.kafkaConfig.currentHostInfo());
        }

        public Optional<HostInfo> hostInfoForCommandResponseStoreKey(UUID uuid) {
            return Optional.of(AggregateTestDriver.this.kafkaConfig.currentHostInfo());
        }
    }

    public AggregateTestDriver(AggregateSpec<K, C, E, A> aggregateSpec, KafkaConfig kafkaConfig) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        new EventSourcedTopology(aggregateSpec).addTopology(streamsBuilder);
        TestDriverStoreBridge testDriverStoreBridge = new TestDriverStoreBridge();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("QueryAPI-scheduler"));
        RetryDelay retryDelay = (j, j2, i) -> {
            return 15L;
        };
        this.aggregateSpec = aggregateSpec;
        this.kafkaConfig = kafkaConfig;
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        Properties properties = new Properties();
        properties.putAll(kafkaConfig.streamsConfig());
        this.driver = new TopologyTestDriver(streamsBuilder.build(), properties, 0L);
        this.publisher = new TestDriverPublisher(this.aggregateSerdes);
        this.commandAPI = new KafkaCommandAPI(aggregateSpec, kafkaConfig, testDriverStoreBridge, (RemoteCommandResponseStore) null, newSingleThreadScheduledExecutor, retryDelay);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public FutureResult<CommandAPI.CommandError, UUID> publishCommand(CommandAPI.Request<K, C> request) {
        this.publisher.publish(topicName(AggregateResources.TopicEntity.command_request), request.key(), new CommandRequest(request.command(), request.readSequence(), request.commandId()));
        return FutureResult.of(request.commandId());
    }

    public FutureResult<CommandAPI.CommandError, NonEmptyList<Sequence>> queryCommandResult(UUID uuid, Duration duration) {
        return this.commandAPI.queryCommandResult(uuid, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<KeyValue<K, AggregateUpdate<A>>> readAggregateTopic() {
        return Optional.ofNullable(this.driver.readOutput(topicName(AggregateResources.TopicEntity.aggregate), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.aggregateUpdate().deserializer())).map(producerRecord -> {
            return KeyValue.pair(producerRecord.key(), producerRecord.value());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<KeyValue<K, ValueWithSequence<E>>> readEventTopic() {
        return Optional.ofNullable(this.driver.readOutput(topicName(AggregateResources.TopicEntity.event), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.valueWithSequence().deserializer())).map(producerRecord -> {
            return KeyValue.pair(producerRecord.key(), producerRecord.value());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<AggregateUpdateResult<A>> fetchAggregateUpdateResult(UUID uuid) {
        WindowStoreIterator fetch = this.driver.getWindowStore(storeName(AggregateResources.StateStoreEntity.command_response)).fetch(uuid, 0L, System.currentTimeMillis());
        Iterable iterable = () -> {
            return fetch;
        };
        return StreamSupport.stream(iterable.spliterator(), false).max(Comparator.comparingLong(keyValue -> {
            return ((Long) keyValue.key).longValue();
        })).map(keyValue2 -> {
            return (AggregateUpdateResult) keyValue2.value;
        });
    }

    public void close() {
        if (this.driver != null) {
            this.driver.close();
        }
    }

    private String topicName(AggregateResources.TopicEntity topicEntity) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().topicName(this.aggregateSpec.aggregateName(), topicEntity.name());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String storeName(AggregateResources.StateStoreEntity stateStoreEntity) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().storeName(this.aggregateSpec.aggregateName(), stateStoreEntity.name());
    }
}
