package io.simplesource.kafka.dsl;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandAPISet;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.KafkaCommandAPI;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.spec.CommandSetSpec;
import io.simplesource.kafka.spec.CommandSpec;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.KeyValue;

/* loaded from: input_file:io/simplesource/kafka/dsl/EventSourcedClient.class */
public final class EventSourcedClient {
    private KafkaConfig kafkaConfig;
    private Map<String, CommandSpec<?, ?>> commandConfigMap = new HashMap();
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("EventSourcedClient-scheduler"));

    public EventSourcedClient withKafkaConfig(Function<? super KafkaConfig.Builder, KafkaConfig> function) {
        this.kafkaConfig = function.apply(new KafkaConfig.Builder() { // from class: io.simplesource.kafka.dsl.EventSourcedClient.1
            @Override // io.simplesource.kafka.dsl.KafkaConfig.Builder
            public KafkaConfig build() {
                return super.build(true);
            }
        });
        return this;
    }

    public <K, C> EventSourcedClient addCommands(Consumer<CommandApiBuilder<K, C>> consumer) {
        CommandApiBuilder<K, C> newBuilder = CommandApiBuilder.newBuilder();
        consumer.accept(newBuilder);
        CommandSpec<K, C> build = newBuilder.build();
        this.commandConfigMap.put(build.aggregateName(), build);
        return this;
    }

    public <K, C> EventSourcedClient addCommands(CommandSpec<K, C> commandSpec) {
        this.commandConfigMap.put(commandSpec.aggregateName(), commandSpec);
        return this;
    }

    public EventSourcedClient withScheduler(ScheduledExecutorService scheduledExecutorService) {
        this.scheduler = scheduledExecutorService;
        return this;
    }

    public CommandAPISet build() {
        Objects.requireNonNull(this.scheduler, "Scheduler has not been defined. Please define with with 'withScheduler' method.");
        CommandSetSpec commandSetSpec = new CommandSetSpec(this.kafkaConfig, this.commandConfigMap);
        return getCommandAPISet(commandSetSpec.commandConfigMap().values().stream(), commandSetSpec.kafkaConfig(), this.scheduler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CommandAPISet getCommandAPISet(Stream<CommandSpec<?, ?>> stream, KafkaConfig kafkaConfig, ScheduledExecutorService scheduledExecutorService) {
        final Map map = (Map) stream.map(createCommandApi(kafkaConfig, scheduledExecutorService)).collect(Collectors.toMap(keyValue -> {
            return (String) keyValue.key;
        }, keyValue2 -> {
            return (CommandAPI) keyValue2.value;
        }));
        return new CommandAPISet() { // from class: io.simplesource.kafka.dsl.EventSourcedClient.2
            public <K, C> CommandAPI<K, C> getCommandAPI(String str) {
                return (CommandAPI) map.get(str);
            }
        };
    }

    static Function<CommandSpec<?, ?>, KeyValue<String, CommandAPI<?, ?>>> createCommandApi(KafkaConfig kafkaConfig, ScheduledExecutorService scheduledExecutorService) {
        return commandSpec -> {
            return KeyValue.pair(commandSpec.aggregateName(), new KafkaCommandAPI(commandSpec, kafkaConfig, scheduledExecutorService));
        };
    }
}
