/*
 * Decompiled with CFR 0.152.
 */
package org.ct42.fnflow.batchdlt;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.ct42.fnflow.batchdlt.BatchElement;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class BatchFnWrapper
implements BiFunction<Flux<Message<JsonNode>>, Sinks.Many<Message<JsonNode>>, Flux<Message<JsonNode>>> {
    public final int defaultBatchSize;
    public final Duration defaultBatchTimeout;
    private final Function<List<BatchElement>, List<BatchElement>> target;

    public BatchFnWrapper(Function<List<BatchElement>, List<BatchElement>> target, int defaultBatchSize, long defaultBatchTimeout) {
        this.defaultBatchSize = defaultBatchSize;
        this.defaultBatchTimeout = Duration.ofMillis(defaultBatchTimeout);
        this.target = target;
    }

    @Override
    public Flux<Message<JsonNode>> apply(Flux<Message<JsonNode>> messageFlux, Sinks.Many<Message<JsonNode>> error) {
        return messageFlux.bufferTimeout(this.defaultBatchSize, this.defaultBatchTimeout).flatMapSequential(b -> {
            List<BatchElement> results = this.target.apply(b.stream().map(e -> new BatchElement((JsonNode)e.getPayload())).toList());
            ArrayList<Message> resultMsgs = new ArrayList<Message>();
            for (int i = 0; i < results.size(); ++i) {
                BatchElement result = results.get(i);
                if (result.getOutput() != null) {
                    Message msg = MessageBuilder.withPayload((Object)result.getOutput()).copyHeaders((Map)((Message)b.get(i)).getHeaders()).build();
                    resultMsgs.add(msg);
                    continue;
                }
                if (result.getError() == null) continue;
                error.tryEmitNext((Object)MessageBuilder.withPayload((Object)result.getInput()).copyHeaders((Map)((Message)b.get(i)).getHeaders()).build());
            }
            return Flux.fromIterable(resultMsgs);
        });
    }
}

