/*
 * Decompiled with CFR 0.152.
 */
package cz.encircled.jput.reactive;

import cz.encircled.jput.JPut;
import cz.encircled.jput.model.ExecutionRun;
import cz.encircled.jput.model.PerfTestExecution;
import cz.encircled.jput.model.RunResult;
import cz.encircled.jput.runner.ThreadBasedTestExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.TypeCastException;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.ranges.RangesKt;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxExtensionsKt;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoExtensionsKt;
import reactor.core.scheduler.Schedulers;

@Metadata(mv={1, 1, 15}, bv={1, 0, 3}, k=1, d1={"\u0000.\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u0000\n\u0000\u0018\u00002\u00020\u0001B\u0005\u00a2\u0006\u0002\u0010\u0002J(\u0010\u0006\u001a\u00020\u00072\u0006\u0010\b\u001a\u00020\t2\u0016\u0010\n\u001a\u0012\u0012\u0006\u0012\u0004\u0018\u00010\f\u0012\u0006\u0012\u0004\u0018\u00010\r0\u000bH\u0016R\u0016\u0010\u0003\u001a\n \u0005*\u0004\u0018\u00010\u00040\u0004X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006\u000e"}, d2={"Lcz/encircled/jput/reactive/ReactiveTestExecutor;", "Lcz/encircled/jput/runner/ThreadBasedTestExecutor;", "()V", "log", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "performExecution", "", "execution", "Lcz/encircled/jput/model/PerfTestExecution;", "statement", "Lkotlin/Function1;", "Lcz/encircled/jput/JPut;", "", "jput-reactive"})
public final class ReactiveTestExecutor
extends ThreadBasedTestExecutor {
    private final Logger log = LoggerFactory.getLogger(ReactiveTestExecutor.class);

    public void performExecution(@NotNull PerfTestExecution execution, @NotNull Function1<? super JPut, ? extends Object> statement) {
        Intrinsics.checkParameterIsNotNull((Object)execution, (String)"execution");
        Intrinsics.checkParameterIsNotNull(statement, (String)"statement");
        if (!execution.getConf().isReactive()) {
            super.performExecution(execution, statement);
            return;
        }
        Object body = statement.invoke(null);
        boolean bl = body instanceof Mono;
        boolean bl2 = false;
        boolean bl3 = false;
        if (!bl) {
            boolean bl4 = false;
            String string = "Reactive test must return Mono<*> object, without subscribing/blocking.";
            throw (Throwable)new IllegalStateException(string.toString());
        }
        CountDownLatch wuCountDown = new CountDownLatch(execution.getConf().getWarmUp());
        FluxExtensionsKt.toFlux((Iterable)((Iterable)RangesKt.until((int)0, (int)execution.getConf().getWarmUp()))).flatMap(new Function<T, Publisher<? extends R>>(body){
            final /* synthetic */ Object $body;

            @NotNull
            public final Mono<?> apply(Integer it) {
                Object object = this.$body;
                if (object == null) {
                    throw new TypeCastException("null cannot be cast to non-null type reactor.core.publisher.Mono<*>");
                }
                return (Mono)object;
            }
            {
                this.$body = object;
            }
        }).onErrorContinue((BiConsumer)new BiConsumer<Throwable, Object>(this){
            final /* synthetic */ ReactiveTestExecutor this$0;

            public final void accept(Throwable t, Object $noName_1) {
                ReactiveTestExecutor.access$getLog$p(this.this$0).warn("Error during warm up", t);
            }
            {
                this.this$0 = reactiveTestExecutor;
            }
        }).doOnNext((Consumer)new Consumer<Object>(wuCountDown){
            final /* synthetic */ CountDownLatch $wuCountDown;

            public final void accept(Object it) {
                this.$wuCountDown.countDown();
            }
            {
                this.$wuCountDown = countDownLatch;
            }
        }).subscribe();
        wuCountDown.await();
        long rampUp = execution.getConf().getRampUp() > 0L ? execution.getConf().getRampUp() / (long)(execution.getConf().getParallelCount() - 1) : 0L;
        CountDownLatch countDown = new CountDownLatch(execution.getConf().getParallelCount());
        FluxExtensionsKt.toFlux((Iterable)((Iterable)RangesKt.until((int)0, (int)execution.getConf().getRepeats()))).parallel(execution.getConf().getParallelCount()).runOn(Schedulers.parallel()).flatMap(new Function<T, Publisher<? extends R>>(execution, body){
            final /* synthetic */ PerfTestExecution $execution;
            final /* synthetic */ Object $body;

            public final Mono<Pair<Object, ExecutionRun>> apply(Integer index) {
                ExecutionRun repeat = new ExecutionRun(this.$execution, 0L, 0L, null, 14, null);
                this.$execution.getExecutionResult().put(Long.valueOf(index.intValue()), repeat);
                return ((Mono)this.$body).flatMap((Function)new Function<T, Mono<? extends R>>(repeat){
                    final /* synthetic */ ExecutionRun $repeat;

                    @NotNull
                    public final Mono<Pair<Object, ExecutionRun>> apply(Object b) {
                        this.$repeat.measureElapsed();
                        return MonoExtensionsKt.toMono((Object)new Pair(b, (Object)this.$repeat));
                    }
                    {
                        this.$repeat = executionRun;
                    }
                }).onErrorContinue((BiConsumer)new BiConsumer<Throwable, Object>(repeat){
                    final /* synthetic */ ExecutionRun $repeat;

                    public final void accept(Throwable t, Object $noName_1) {
                        this.$repeat.measureElapsed();
                        this.$repeat.setResultDetails(new RunResult(null, t, null, 5, null));
                    }
                    {
                        this.$repeat = executionRun;
                    }
                });
            }
            {
                this.$execution = perfTestExecution;
                this.$body = object;
            }
        }).doOnTerminate(new Runnable(countDown){
            final /* synthetic */ CountDownLatch $countDown;

            public final void run() {
                this.$countDown.countDown();
            }
            {
                this.$countDown = countDownLatch;
            }
        }).subscribe();
        countDown.await();
    }

    public static final /* synthetic */ Logger access$getLog$p(ReactiveTestExecutor $this) {
        return $this.log;
    }
}

