package io.simplesource.kafka.internal.streams.topology;

import io.simplesource.kafka.internal.util.Tuple;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/EventSourcedTopology.class */
public final class EventSourcedTopology {
    public static <K, C, E, A> void addTopology(TopologyContext<K, C, E, A> topologyContext, StreamsBuilder streamsBuilder) {
        EventSourcedStores.addStateStores(topologyContext, streamsBuilder);
        Tuple processedCommands = EventSourcedStreams.getProcessedCommands(topologyContext, EventSourcedConsumer.commandRequestStream(topologyContext, streamsBuilder), EventSourcedConsumer.commandResponseStream(topologyContext, streamsBuilder));
        KStream kStream = (KStream) processedCommands.v1();
        KStream kStream2 = (KStream) processedCommands.v2();
        KStream eventResultStream = EventSourcedStreams.eventResultStream(topologyContext, kStream);
        KStream eventsWithSequence = EventSourcedStreams.getEventsWithSequence(eventResultStream);
        KStream aggregateUpdateResults = EventSourcedStreams.getAggregateUpdateResults(topologyContext, eventResultStream);
        KStream aggregateUpdates = EventSourcedStreams.getAggregateUpdates(aggregateUpdateResults);
        KStream commandResponses = EventSourcedStreams.getCommandResponses(aggregateUpdateResults);
        EventSourcedPublisher.publishEvents(topologyContext, eventsWithSequence);
        EventSourcedPublisher.publishAggregateUpdates(topologyContext, aggregateUpdates);
        EventSourcedPublisher.publishCommandResponses(topologyContext, kStream2);
        EventSourcedPublisher.publishCommandResponses(topologyContext, commandResponses);
        EventSourcedStores.updateAggregateStateStore(topologyContext, aggregateUpdateResults);
        EventSourcedStores.updateCommandResultStore(topologyContext, aggregateUpdateResults);
    }
}
