/*
 * Decompiled with CFR 0.152.
 */
package org.ct42.fnflow.batchdlt;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import org.ct42.fnflow.batchdlt.BatchElement;
import org.ct42.fnflow.batchdlt.BatchFnWrapper;
import org.ct42.fnflow.batchdlt.ErrorConvert2ByteArray;
import org.ct42.fnflow.batchdlt.FunctionWrapper;
import org.ct42.fnflow.batchdlt.InMsg2Header;
import org.ct42.fnflow.batchdlt.OutConvert2ByteArray;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.ApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class ComposedFunction
implements Function<Flux<Message<byte[]>>, Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>> {
    private final ApplicationContext ctx;

    public ComposedFunction(ApplicationContext ctx) {
        this.ctx = ctx;
    }

    @Override
    public Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>> apply(Flux<Message<byte[]>> messageFlux) {
        String definition = (String)Binder.get((Environment)this.ctx.getEnvironment()).bind("org.ct42.fnflow.function.definition", Bindable.of(String.class)).orElse(null);
        int batchSize = (Integer)Binder.get((Environment)this.ctx.getEnvironment()).bind("org.ct42.fnflow.default.batch.size", Bindable.of(Integer.class)).orElse((Object)500);
        long batchTimeout = (Long)Binder.get((Environment)this.ctx.getEnvironment()).bind("org.ct42.fnflow.default.batch.timeoutms", Bindable.of(Long.class)).orElse((Object)500L);
        if (definition == null || definition.isEmpty()) {
            throw new IllegalStateException("org.ct42.fnflow.function.definition not configured");
        }
        String[] fns = definition.split("\\|");
        Sinks.Many errorSink = Sinks.many().unicast().onBackpressureBuffer();
        ResolvableType imperativeType = ResolvableType.forClassWithGenerics(Function.class, (Class[])new Class[]{JsonNode.class, JsonNode.class});
        Set<String> imperativeBeans = Set.of(this.ctx.getBeanNamesForType(imperativeType));
        ResolvableType batchListType = ResolvableType.forClassWithGenerics(List.class, (Class[])new Class[]{BatchElement.class});
        ResolvableType batchType = ResolvableType.forClassWithGenerics(Function.class, (ResolvableType[])new ResolvableType[]{batchListType, batchListType});
        Set<String> batchBeans = Set.of(this.ctx.getBeanNamesForType(batchType));
        Flux<Message<JsonNode>> intermediate = new InMsg2Header().apply((Flux<Message<byte[]>>)messageFlux.doOnComplete(() -> ((Sinks.Many)errorSink).tryEmitComplete()));
        for (String fn : fns) {
            if (imperativeBeans.contains(fn)) {
                Function fnBean = (Function)this.ctx.getBean(fn, Function.class);
                FunctionWrapper wrappedFn = new FunctionWrapper(fnBean);
                intermediate = wrappedFn.apply(intermediate, (Sinks.Many<Message<Throwable>>)errorSink);
                continue;
            }
            if (batchBeans.contains(fn)) {
                Function batchFnBean = (Function)this.ctx.getBean(fn, Function.class);
                BatchFnWrapper wrappedBatchFn = new BatchFnWrapper(batchFnBean, batchSize, batchTimeout);
                intermediate = wrappedBatchFn.apply(intermediate, (Sinks.Many<Message<Throwable>>)errorSink);
                continue;
            }
            throw new IllegalStateException("No matching bean found for name " + fn);
        }
        return Tuples.of(new OutConvert2ByteArray().apply(intermediate), new ErrorConvert2ByteArray().apply((Flux<Message<Throwable>>)errorSink.asFlux()));
    }
}

