package io.simplesource.kafka.internal.streams.topology;

import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.internal.util.Tuple;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.kafka.model.ValueWithSequence;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/kafka/internal/streams/topology/EventSourcedStreams.class */
public final class EventSourcedStreams {
    private static final Logger logger = LoggerFactory.getLogger(EventSourcedStreams.class);

    EventSourcedStreams() {
    }

    private static <K> long getResponseSequence(CommandResponse commandResponse) {
        return ((Sequence) commandResponse.sequenceResult().getOrElse(commandResponse.readSequence())).getSeq();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, C, E, A> Tuple<KStream<K, CommandRequest<K, C>>, KStream<K, CommandResponse>> getProcessedCommands(TopologyContext<K, C, E, A> topologyContext, KStream<K, CommandRequest<K, C>> kStream, KStream<K, CommandResponse> kStream2) {
        KStream[] branch = kStream.selectKey((obj, commandRequest) -> {
            return commandRequest.commandId();
        }).leftJoin(kStream2.selectKey((obj2, commandResponse) -> {
            return commandResponse.commandId();
        }).groupByKey(Serialized.with(topologyContext.serdes().commandResponseKey(), topologyContext.serdes().commandResponse())).reduce((commandResponse2, commandResponse3) -> {
            return getResponseSequence(commandResponse2) > getResponseSequence(commandResponse3) ? commandResponse2 : commandResponse3;
        }), (v1, v2) -> {
            return new Tuple(v1, v2);
        }, Joined.with(topologyContext.serdes().commandResponseKey(), topologyContext.serdes().commandRequest(), topologyContext.serdes().commandResponse())).selectKey((uuid, tuple) -> {
            return ((CommandRequest) tuple.v1()).aggregateKey();
        }).branch(new Predicate[]{(obj3, tuple2) -> {
            return tuple2.v2() == null;
        }, (obj4, tuple3) -> {
            return tuple3.v2() != null;
        }});
        return new Tuple<>(branch[0].mapValues((obj5, tuple4) -> {
            return (CommandRequest) tuple4.v1();
        }), branch[1].mapValues((obj6, tuple5) -> {
            return (CommandResponse) tuple5.v2();
        }).peek((obj7, commandResponse4) -> {
            logger.info("Preprocessed: {}=CommandId:{}", obj7, commandResponse4.commandId());
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, C, E, A> KStream<K, CommandEvents<E, A>> eventResultStream(TopologyContext<K, C, E, A> topologyContext, KStream<K, CommandRequest<K, C>> kStream) {
        return kStream.transformValues(() -> {
            return new CommandRequestTransformer(topologyContext);
        }, new String[]{topologyContext.stateStoreName(AggregateResources.StateStoreEntity.aggregate_update)});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, E, A> KStream<K, ValueWithSequence<E>> getEventsWithSequence(KStream<K, CommandEvents<E, A>> kStream) {
        return kStream.flatMapValues(commandEvents -> {
            return (List) commandEvents.eventValue().fold(nonEmptyList -> {
                return Collections.emptyList();
            }, (v1) -> {
                return new ArrayList(v1);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, E, A> KStream<K, AggregateUpdateResult<A>> getAggregateUpdateResults(TopologyContext<K, ?, E, A> topologyContext, KStream<K, CommandEvents<E, A>> kStream) {
        return kStream.mapValues((obj, commandEvents) -> {
            return new AggregateUpdateResult(commandEvents.commandId(), commandEvents.readSequence(), commandEvents.eventValue().map(nonEmptyList -> {
                return (AggregateUpdate) nonEmptyList.fold(valueWithSequence -> {
                    return new AggregateUpdate(topologyContext.aggregator().applyEvent(commandEvents.aggregate(), valueWithSequence.value()), valueWithSequence.sequence());
                }, (aggregateUpdate, valueWithSequence2) -> {
                    return new AggregateUpdate(topologyContext.aggregator().applyEvent(aggregateUpdate.aggregate(), valueWithSequence2.value()), valueWithSequence2.sequence());
                });
            }));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, A> KStream<K, AggregateUpdate<A>> getAggregateUpdates(KStream<K, AggregateUpdateResult<A>> kStream) {
        return kStream.flatMapValues(aggregateUpdateResult -> {
            return (List) aggregateUpdateResult.updatedAggregateResult().fold(nonEmptyList -> {
                return Collections.emptyList();
            }, (v0) -> {
                return Collections.singletonList(v0);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, A> KStream<K, CommandResponse> getCommandResponses(KStream<K, AggregateUpdateResult<A>> kStream) {
        return kStream.mapValues((obj, aggregateUpdateResult) -> {
            return new CommandResponse(aggregateUpdateResult.commandId(), aggregateUpdateResult.readSequence(), aggregateUpdateResult.updatedAggregateResult().map((v0) -> {
                return v0.sequence();
            }));
        });
    }
}
