package io.simplesource.kafka.internal.streams;

import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.ResourceNamingStrategy;
import io.simplesource.kafka.internal.streams.topology.EventSourcedTopology;
import io.simplesource.kafka.internal.streams.topology.TopologyContext;
import io.simplesource.kafka.spec.AggregateSetSpec;
import io.simplesource.kafka.spec.TopicSpec;
import io.simplesource.kafka.util.KafkaStreamsUtils;
import java.io.File;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/EventSourcedStreamsApp.class */
public final class EventSourcedStreamsApp {
    private static final Logger logger = LoggerFactory.getLogger(EventSourcedStreamsApp.class);
    private final AggregateSetSpec aggregateSetSpec;
    private final AdminClient adminClient;
    private KafkaStreams streams = null;

    public EventSourcedStreamsApp(AggregateSetSpec aggregateSetSpec) {
        this.aggregateSetSpec = aggregateSetSpec;
        this.adminClient = AdminClient.create(aggregateSetSpec.kafkaConfig().adminClientConfig());
    }

    public synchronized void start() {
        if (Objects.nonNull(this.streams)) {
            throw new IllegalStateException("Application already started");
        }
        createTopics();
        this.streams = startApp(buildTopology());
        KafkaStreamsUtils.waitUntilStable(logger, this.streams);
    }

    public KafkaStreams getStreams() {
        return this.streams;
    }

    private void createTopics() {
        try {
            EnumSet allOf = EnumSet.allOf(AggregateResources.TopicEntity.class);
            Set set = (Set) ((Collection) this.adminClient.listTopics().listings().get()).stream().map((v0) -> {
                return v0.name();
            }).collect(Collectors.toSet());
            List list = (List) this.aggregateSetSpec.aggregateConfigMap().values().stream().flatMap(aggregateSpec -> {
                ResourceNamingStrategy resourceNamingStrategy = aggregateSpec.serialization().resourceNamingStrategy();
                Map<AggregateResources.TopicEntity, TopicSpec> map = aggregateSpec.generation().topicConfig();
                return allOf.stream().map(topicEntity -> {
                    return KeyValue.pair(resourceNamingStrategy.topicName(aggregateSpec.aggregateName(), topicEntity.name()), (TopicSpec) map.get(topicEntity));
                });
            }).filter(keyValue -> {
                return !set.contains(keyValue.key);
            }).map(keyValue2 -> {
                String str = (String) keyValue2.key;
                TopicSpec topicSpec = (TopicSpec) keyValue2.value;
                NewTopic newTopic = new NewTopic(str, topicSpec.partitionCount(), topicSpec.replicaCount());
                newTopic.configs(topicSpec.config());
                return newTopic;
            }).collect(Collectors.toList());
            CreateTopicsOptions createTopicsOptions = new CreateTopicsOptions();
            createTopicsOptions.timeoutMs(15000);
            this.adminClient.createTopics(list, createTopicsOptions).all().get();
        } catch (Exception e) {
            throw new RuntimeException("Failed to empty required topics", e);
        }
    }

    private Topology buildTopology() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.aggregateSetSpec.aggregateConfigMap().values().forEach(aggregateSpec -> {
            EventSourcedTopology.addTopology(new TopologyContext(aggregateSpec), streamsBuilder);
        });
        return streamsBuilder.build();
    }

    private KafkaStreams startApp(Topology topology) {
        logger.info("Topology description {}", topology.describe());
        new File(this.aggregateSetSpec.kafkaConfig().stateDir()).mkdirs();
        KafkaStreams kafkaStreams = new KafkaStreams(topology, new StreamsConfig(this.aggregateSetSpec.kafkaConfig().streamsConfig()));
        KafkaStreamsUtils.registerExceptionHandler(logger, kafkaStreams);
        KafkaStreamsUtils.addShutdownHook(logger, kafkaStreams);
        kafkaStreams.start();
        return kafkaStreams;
    }
}
