package io.simplesource.kafka.internal.streams.topology;

import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.model.AggregateUpdateResult;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/EventSourcedStores.class */
final class EventSourcedStores {
    EventSourcedStores() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, C, E, A> void addStateStores(TopologyContext<K, C, E, A> topologyContext, StreamsBuilder streamsBuilder) {
        streamsBuilder.addStateStore(new KeyValueStoreBuilder(Stores.persistentKeyValueStore(topologyContext.stateStoreName(AggregateResources.StateStoreEntity.aggregate_update)), topologyContext.serdes().aggregateKey(), topologyContext.serdes().aggregateUpdate(), Time.SYSTEM));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, A> void updateAggregateStateStore(TopologyContext<K, ?, ?, A> topologyContext, KStream<K, AggregateUpdateResult<A>> kStream) {
        kStream.process(() -> {
            return new Processor<K, AggregateUpdateResult<A>>() { // from class: io.simplesource.kafka.internal.streams.topology.EventSourcedStores.1
                private KeyValueStore stateStore;

                public void init(ProcessorContext processorContext) {
                    this.stateStore = processorContext.getStateStore(TopologyContext.this.stateStoreName(AggregateResources.StateStoreEntity.aggregate_update));
                }

                public void process(K k, AggregateUpdateResult<A> aggregateUpdateResult) {
                    aggregateUpdateResult.updatedAggregateResult().ifSuccessful(aggregateUpdate -> {
                        this.stateStore.put(k, aggregateUpdate);
                    });
                }

                public void close() {
                }

                /* JADX WARN: Multi-variable type inference failed */
                public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
                    process((AnonymousClass1<A, K>) obj, (AggregateUpdateResult) obj2);
                }
            };
        }, new String[]{topologyContext.stateStoreName(AggregateResources.StateStoreEntity.aggregate_update)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, A> void updateCommandResultStore(TopologyContext<K, ?, ?, A> topologyContext, KStream<K, AggregateUpdateResult<A>> kStream) {
        long millis = TimeUnit.SECONDS.toMillis(topologyContext.commandResponseRetentionInSeconds());
        kStream.map((obj, aggregateUpdateResult) -> {
            return KeyValue.pair(aggregateUpdateResult.commandId(), aggregateUpdateResult);
        }).groupByKey(topologyContext.serializedAggregateUpdate()).windowedBy(TimeWindows.of(millis).advanceBy(millis / 3)).reduce((aggregateUpdateResult2, aggregateUpdateResult3) -> {
            return aggregateUpdateResult3;
        }, materializedWindow(topologyContext, topologyContext.stateStoreName(AggregateResources.StateStoreEntity.command_response)));
    }

    private static <A> Materialized<UUID, AggregateUpdateResult<A>, WindowStore<Bytes, byte[]>> materializedWindow(TopologyContext<?, ?, ?, A> topologyContext, String str) {
        return Materialized.as(str).withKeySerde(topologyContext.serdes().commandResponseKey()).withValueSerde(topologyContext.serdes().updateResult());
    }
}
