package io.simplesource.kafka.internal.streams.topology;

import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.CommandResponse;
import java.util.UUID;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/EventSourcedTopology.class */
public final class EventSourcedTopology {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/EventSourcedTopology$InputStreams.class */
    public static final class InputStreams<K, C> {
        public final KStream<K, CommandRequest<K, C>> commandRequest;
        public final KStream<K, CommandResponse<K>> commandResponse;

        public InputStreams(KStream<K, CommandRequest<K, C>> kStream, KStream<K, CommandResponse<K>> kStream2) {
            this.commandRequest = kStream;
            this.commandResponse = kStream2;
        }

        public KStream<K, CommandRequest<K, C>> commandRequest() {
            return this.commandRequest;
        }

        public KStream<K, CommandResponse<K>> commandResponse() {
            return this.commandResponse;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof InputStreams)) {
                return false;
            }
            InputStreams inputStreams = (InputStreams) obj;
            KStream<K, CommandRequest<K, C>> commandRequest = commandRequest();
            KStream<K, CommandRequest<K, C>> commandRequest2 = inputStreams.commandRequest();
            if (commandRequest == null) {
                if (commandRequest2 != null) {
                    return false;
                }
            } else if (!commandRequest.equals(commandRequest2)) {
                return false;
            }
            KStream<K, CommandResponse<K>> commandResponse = commandResponse();
            KStream<K, CommandResponse<K>> commandResponse2 = inputStreams.commandResponse();
            return commandResponse == null ? commandResponse2 == null : commandResponse.equals(commandResponse2);
        }

        public int hashCode() {
            KStream<K, CommandRequest<K, C>> commandRequest = commandRequest();
            int hashCode = (1 * 59) + (commandRequest == null ? 43 : commandRequest.hashCode());
            KStream<K, CommandResponse<K>> commandResponse = commandResponse();
            return (hashCode * 59) + (commandResponse == null ? 43 : commandResponse.hashCode());
        }

        public String toString() {
            return "EventSourcedTopology.InputStreams(commandRequest=" + commandRequest() + ", commandResponse=" + commandResponse() + ")";
        }
    }

    public static <K, C, E, A> InputStreams<K, C> addTopology(TopologyContext<K, C, E, A> topologyContext, StreamsBuilder streamsBuilder) {
        KStream commandRequestStream = EventSourcedConsumer.commandRequestStream(topologyContext, streamsBuilder);
        KStream commandResponseStream = EventSourcedConsumer.commandResponseStream(topologyContext, streamsBuilder);
        KTable aggregateTable = EventSourcedConsumer.aggregateTable(topologyContext, streamsBuilder);
        DistributorContext distributorContext = getDistributorContext(topologyContext);
        KStream<UUID, String> resultTopicMapStream = ResultDistributor.resultTopicMapStream(distributorContext, streamsBuilder);
        Tuple2 processedCommands = EventSourcedStreams.getProcessedCommands(topologyContext, commandRequestStream, commandResponseStream);
        KStream kStream = (KStream) processedCommands.v1();
        KStream kStream2 = (KStream) processedCommands.v2();
        KStream commandEvents = EventSourcedStreams.getCommandEvents(topologyContext, kStream, aggregateTable);
        KStream eventsWithSequence = EventSourcedStreams.getEventsWithSequence(commandEvents);
        KStream aggregateUpdateResults = EventSourcedStreams.getAggregateUpdateResults(topologyContext, commandEvents);
        KStream aggregateUpdates = EventSourcedStreams.getAggregateUpdates(aggregateUpdateResults);
        KStream commandResponses = EventSourcedStreams.getCommandResponses(aggregateUpdateResults);
        EventSourcedPublisher.publishEvents(topologyContext, eventsWithSequence);
        EventSourcedPublisher.publishAggregateUpdates(topologyContext, aggregateUpdates);
        EventSourcedPublisher.publishCommandResponses(topologyContext, kStream2);
        EventSourcedPublisher.publishCommandResponses(topologyContext, commandResponses);
        ResultDistributor.distribute(distributorContext, commandResponseStream, resultTopicMapStream);
        return new InputStreams<>(commandRequestStream, commandResponseStream);
    }

    private static <K> DistributorContext<CommandResponse<K>> getDistributorContext(TopologyContext<K, ?, ?, ?> topologyContext) {
        return new DistributorContext<>(topologyContext.aggregateSpec().serialization().resourceNamingStrategy().topicName(topologyContext.aggregateSpec().aggregateName(), AggregateResources.TopicEntity.command_response_topic_map.toString()), new DistributorSerdes(topologyContext.serdes().commandResponseKey(), topologyContext.serdes().commandResponse()), topologyContext.aggregateSpec().generation().stateStoreSpec(), (v0) -> {
            return v0.commandId();
        });
    }
}
