/*
 * Decompiled with CFR 0.152.
 */
package org.ct42.fnflow.batchdlt;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.ct42.fnflow.batchdlt.BatchElement;
import org.ct42.fnflow.batchdlt.HeaderAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class BatchFnWrapper
implements BiFunction<Flux<Message<JsonNode>>, Sinks.Many<Message<Throwable>>, Flux<Message<JsonNode>>> {
    public final int defaultBatchSize;
    public final Duration defaultBatchTimeout;
    private final Function<List<BatchElement>, List<BatchElement>> target;

    public BatchFnWrapper(Function<List<BatchElement>, List<BatchElement>> target, int defaultBatchSize, long defaultBatchTimeout) {
        this.defaultBatchSize = defaultBatchSize;
        this.defaultBatchTimeout = Duration.ofMillis(defaultBatchTimeout);
        this.target = target;
    }

    @Override
    public Flux<Message<JsonNode>> apply(Flux<Message<JsonNode>> messageFlux, Sinks.Many<Message<Throwable>> error) {
        return messageFlux.bufferTimeout(this.defaultBatchSize, this.defaultBatchTimeout).flatMapSequential(b -> {
            List<BatchElement> results = this.target.apply(b.stream().map(e -> new BatchElement((JsonNode)e.getPayload())).toList());
            ArrayList<Message> resultMsgs = new ArrayList<Message>();
            for (int i = 0; i < results.size(); ++i) {
                BatchElement result = results.get(i);
                if (result.getOutput() != null) {
                    MessageBuilder builder = MessageBuilder.withPayload((Object)result.getOutput()).copyHeaders((Map)((Message)b.get(i)).getHeaders());
                    Function<List<BatchElement>, List<BatchElement>> patt0$temp = this.target;
                    if (patt0$temp instanceof HeaderAware) {
                        HeaderAware headerAware = (HeaderAware)((Object)patt0$temp);
                        headerAware.headersToBeAdded(result.getInput()).forEach((arg_0, arg_1) -> ((MessageBuilder)builder).setHeader(arg_0, arg_1));
                    }
                    Message msg = builder.build();
                    resultMsgs.add(msg);
                    continue;
                }
                if (result.getError() == null) continue;
                error.tryEmitNext((Object)MessageBuilder.withPayload((Object)result.getError()).copyHeaders((Map)((Message)b.get(i)).getHeaders()).build());
            }
            return Flux.fromIterable(resultMsgs);
        });
    }
}

