package io.simplesource.kafka.dsl;

import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.streams.EventSourcedStreamsApp;
import io.simplesource.kafka.spec.AggregateSetSpec;
import io.simplesource.kafka.spec.AggregateSpec;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/simplesource/kafka/dsl/EventSourcedApp.class */
public final class EventSourcedApp {
    private KafkaConfig kafkaConfig;
    private Map<String, AggregateSpec<?, ?, ?, ?>> aggregateConfigMap = new HashMap();
    private boolean isStarted = false;

    public EventSourcedApp withKafkaConfig(Function<KafkaConfig.Builder, KafkaConfig> function) {
        requireNotStarted();
        this.kafkaConfig = function.apply(new KafkaConfig.Builder());
        return this;
    }

    public EventSourcedApp withKafkaConfig(KafkaConfig kafkaConfig) {
        requireNotStarted();
        this.kafkaConfig = kafkaConfig;
        return this;
    }

    public <K, C, E, A> EventSourcedApp withAggregate(Function<AggregateBuilder<K, C, E, A>, AggregateSpec<K, C, E, A>> function) {
        requireNotStarted();
        AggregateSpec<K, C, E, A> apply = function.apply(AggregateBuilder.newBuilder());
        this.aggregateConfigMap.put(apply.aggregateName(), apply);
        return this;
    }

    public <K, C, E, A> EventSourcedApp withAggregate(AggregateSpec<K, C, E, A> aggregateSpec) {
        requireNotStarted();
        this.aggregateConfigMap.put(aggregateSpec.aggregateName(), aggregateSpec);
        return this;
    }

    public <K, E> EventSourcedApp withEvent(Consumer<EventAggregateBuilder<K, E>> consumer) {
        requireNotStarted();
        EventAggregateBuilder<K, E> newBuilder = EventAggregateBuilder.newBuilder();
        consumer.accept(newBuilder);
        AggregateSpec<K, E, E, Boolean> build = newBuilder.build();
        this.aggregateConfigMap.put(build.aggregateName(), build);
        return this;
    }

    public <K, E> EventSourcedApp withEvent(AggregateSpec<K, E, E, ?> aggregateSpec) {
        requireNotStarted();
        this.aggregateConfigMap.put(aggregateSpec.aggregateName(), aggregateSpec);
        return this;
    }

    public void start() {
        if (this.isStarted) {
            return;
        }
        Objects.requireNonNull(this.kafkaConfig, "KafkaConfig has not been defined. Please define it with 'withKafkaConfig' method.");
        new EventSourcedStreamsApp(new AggregateSetSpec(this.kafkaConfig, this.aggregateConfigMap)).start();
        this.isStarted = true;
    }

    private void requireNotStarted() {
        if (this.isStarted) {
            throw new RuntimeException("App already started, and cannot be modified.");
        }
    }
}
