package io.simplesource.kafka.dsl;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandAPISet;
import io.simplesource.kafka.api.RemoteCommandResponseStore;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.KafkaCommandAPI;
import io.simplesource.kafka.internal.cluster.ClusterSubsystem;
import io.simplesource.kafka.internal.streams.EventSourcedStreamsApp;
import io.simplesource.kafka.internal.streams.statestore.KafkaStreamAggregateStoreBridge;
import io.simplesource.kafka.internal.streams.statestore.KafkaStreamCommandResponseStoreBridge;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.spec.AggregateSetSpec;
import io.simplesource.kafka.spec.AggregateSpec;
import io.simplesource.kafka.spec.KafkaExecutionSpec;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;

/* loaded from: input_file:io/simplesource/kafka/dsl/AggregateSetBuilder.class */
public final class AggregateSetBuilder {
    private KafkaConfig kafkaConfig;
    private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("SimpleSourcingKafkaBuilder-scheduler"));
    private Map<String, AggregateSpec<?, ?, ?, ?>> aggregateConfigMap = new HashMap();

    private static Function<AggregateSpec<?, ?, ?, ?>, KeyValue<String, CommandAPI<?, ?>>> createAggregate(KafkaConfig kafkaConfig, KafkaStreams kafkaStreams, RemoteCommandResponseStore remoteCommandResponseStore, ScheduledExecutorService scheduledExecutorService) {
        return aggregateSpec -> {
            KafkaCommandAPI kafkaCommandAPI = new KafkaCommandAPI(aggregateSpec, kafkaConfig, new KafkaStreamCommandResponseStoreBridge(aggregateSpec, kafkaStreams), remoteCommandResponseStore, scheduledExecutorService, aggregateSpec.generation().retryDelay());
            new KafkaStreamAggregateStoreBridge(aggregateSpec, kafkaStreams);
            return KeyValue.pair(aggregateSpec.aggregateName(), kafkaCommandAPI);
        };
    }

    public AggregateSetBuilder withScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.scheduledExecutor = scheduledExecutorService;
        return this;
    }

    public AggregateSetBuilder withKafkaConfig(Function<KafkaConfig.Builder, KafkaConfig> function) {
        this.kafkaConfig = function.apply(new KafkaConfig.Builder());
        return this;
    }

    public AggregateSetBuilder withKafkaConfig(KafkaConfig kafkaConfig) {
        this.kafkaConfig = kafkaConfig;
        return this;
    }

    public <K, C, E, A> AggregateSetBuilder addAggregate(Function<AggregateBuilder<K, C, E, A>, AggregateSpec<K, C, E, A>> function) {
        AggregateSpec<K, C, E, A> apply = function.apply(AggregateBuilder.newBuilder());
        this.aggregateConfigMap.put(apply.aggregateName(), apply);
        return this;
    }

    public <K, C, E, A> AggregateSetBuilder addAggregate(AggregateSpec<K, C, E, A> aggregateSpec) {
        this.aggregateConfigMap.put(aggregateSpec.aggregateName(), aggregateSpec);
        return this;
    }

    public CommandAPISet build() {
        AggregateSetSpec aggregateSetSpec = new AggregateSetSpec(new KafkaExecutionSpec(this.scheduledExecutor, this.kafkaConfig), this.aggregateConfigMap);
        EventSourcedStreamsApp eventSourcedStreamsApp = new EventSourcedStreamsApp(aggregateSetSpec);
        eventSourcedStreamsApp.start();
        KafkaStreams streams = eventSourcedStreamsApp.getStreams();
        ClusterSubsystem clusterSubsystem = new ClusterSubsystem(str -> {
            return r3[0].getCommandAPI(str);
        }, this.kafkaConfig.clusterConfig(), this.scheduledExecutor);
        final Map map = (Map) aggregateSetSpec.aggregateConfigMap().values().stream().map(createAggregate(aggregateSetSpec.executionSpec().kafkaConfig(), streams, clusterSubsystem, aggregateSetSpec.executionSpec().scheduledExecutor())).collect(Collectors.toMap(keyValue -> {
            return (String) keyValue.key;
        }, keyValue2 -> {
            return (CommandAPI) keyValue2.value;
        }));
        CommandAPISet commandAPISet = new CommandAPISet() { // from class: io.simplesource.kafka.dsl.AggregateSetBuilder.1
            public <K, C> CommandAPI<K, C> getCommandAPI(String str2) {
                return (CommandAPI) map.get(str2);
            }
        };
        CommandAPISet[] commandAPISetArr = {commandAPISet};
        clusterSubsystem.start();
        return commandAPISet;
    }
}
