package io.simplesource.kafka.dsl;

import io.simplesource.api.CommandAPISet;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.streams.EventSourcedStreamsApp;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.spec.AggregateSetSpec;
import io.simplesource.kafka.spec.AggregateSpec;
import io.simplesource.kafka.util.SpecUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/simplesource/kafka/dsl/EventSourcedApp.class */
public final class EventSourcedApp {
    private KafkaConfig kafkaConfig;
    private AggregateSetSpec aggregateSetSpec;
    private Map<String, AggregateSpec<?, ?, ?, ?>> aggregateConfigMap = new HashMap();
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("EventSourcedApp-scheduler"));

    public EventSourcedApp withKafkaConfig(Function<KafkaConfig.Builder, KafkaConfig> function) {
        this.kafkaConfig = function.apply(new KafkaConfig.Builder());
        return this;
    }

    public EventSourcedApp withKafkaConfig(KafkaConfig kafkaConfig) {
        this.kafkaConfig = kafkaConfig;
        return this;
    }

    public EventSourcedApp withScheduler(ScheduledExecutorService scheduledExecutorService) {
        this.scheduler = scheduledExecutorService;
        return this;
    }

    public <K, C, E, A> EventSourcedApp addAggregate(Consumer<AggregateBuilder<K, C, E, A>> consumer) {
        AggregateBuilder<K, C, E, A> newBuilder = AggregateBuilder.newBuilder();
        consumer.accept(newBuilder);
        AggregateSpec<K, C, E, A> build = newBuilder.build();
        this.aggregateConfigMap.put(build.aggregateName(), build);
        return this;
    }

    public <K, C, E, A> EventSourcedApp addAggregate(AggregateSpec<K, C, E, A> aggregateSpec) {
        this.aggregateConfigMap.put(aggregateSpec.aggregateName(), aggregateSpec);
        return this;
    }

    public EventSourcedApp start() {
        AggregateSetSpec aggregateSetSpec = new AggregateSetSpec(this.kafkaConfig, this.aggregateConfigMap);
        new EventSourcedStreamsApp(aggregateSetSpec).start();
        this.aggregateSetSpec = aggregateSetSpec;
        return this;
    }

    public CommandAPISet getCommandAPISet(String str) {
        Objects.requireNonNull(this.aggregateSetSpec, "App has not been started. start() must be called before getCommandAPISet");
        Objects.requireNonNull(this.scheduler, "Scheduler has not been defined. Please define with with 'withScheduler' method.");
        return EventSourcedClient.getCommandAPISet(this.aggregateSetSpec.aggregateConfigMap().values().stream().map(aggregateSpec -> {
            return SpecUtils.getCommandSpec(aggregateSpec, str);
        }), this.aggregateSetSpec.kafkaConfig(), this.scheduler);
    }
}
