/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.testing.benchmark;

import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.opennms.nephron.Aggregate;
import org.opennms.nephron.CompoundKey;
import org.opennms.nephron.NephronOptions;
import org.opennms.nephron.Pipeline;
import org.opennms.nephron.testing.benchmark.BenchmarkOptions;
import org.opennms.nephron.testing.benchmark.BenchmarkResult;
import org.opennms.nephron.testing.benchmark.CmdLineArgsProcessor;
import org.opennms.nephron.testing.benchmark.InputSetup;
import org.opennms.nephron.testing.benchmark.TestingProbe;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Benchmark {
    private static Logger LOG = LoggerFactory.getLogger(Benchmark.class);
    private static final Duration DONE_DELAY = Duration.standardSeconds((long)15L);
    private static final Duration PERF_DELAY = Duration.standardSeconds((long)15L);
    private final BenchmarkOptions options;
    private final Consumer<String> resultConsumer;
    private final TestingProbe<FlowDocument> inTestingProbe = new TestingProbe("benchmark", "in");
    private final TestingProbe<KV<CompoundKey, Aggregate>> outTestingProbe = new TestingProbe("benchmark", "out");
    private final Instant start = Instant.now();
    private PipelineResult pipelineResult;
    private TerminationReason terminationReason;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        Consumer<String> resultConsumer;
        args = Benchmark.ensureArg("--blockOnRun=false", args);
        args = Benchmark.ensureArg("--runner=FlinkRunner", args);
        CmdLineArgsProcessor.CmdLineArgs cmdLineArgs = CmdLineArgsProcessor.process(args);
        List<List<String>> paramLists = cmdLineArgs.args.eval().expand();
        if (cmdLineArgs.out == null) {
            resultConsumer = msg -> System.out.println((String)msg);
        } else {
            PrintStream fileWriter = new PrintStream(cmdLineArgs.out, StandardCharsets.UTF_8);
            resultConsumer = msg -> {
                System.out.println((String)msg);
                fileWriter.println((String)msg);
            };
        }
        Set<String> commonParameters = Benchmark.commonParameters(paramLists);
        resultConsumer.accept(String.format("Started benchmark run at          : %s", Instant.now()));
        resultConsumer.accept(String.format("Number of pipeline argument lists : %d", paramLists.size()));
        resultConsumer.accept(String.format("common pipeline arguments         : %s", commonParameters.stream().sorted().collect(Collectors.joining(" "))));
        int counter = 1;
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            for (List<String> paramList : paramLists) {
                Benchmark.executeCommand(cmdLineArgs.before);
                String[] as = paramList.toArray(new String[0]);
                as = Benchmark.ensureArg("--blockOnRun=false", as);
                as = Benchmark.ensureArg("--runner=FlinkRunner", as);
                as = Benchmark.ensureArg("--elasticUrl=", as);
                BenchmarkOptions options = (BenchmarkOptions)PipelineOptionsFactory.fromArgs((String[])as).withValidation().as(BenchmarkOptions.class);
                ArrayList<String> pl = new ArrayList<String>(paramList);
                pl.removeAll(commonParameters);
                resultConsumer.accept("=".repeat(30));
                resultConsumer.accept(String.format("start pipeline run [%d/%d] at : %s", counter, paramLists.size(), Instant.now()));
                resultConsumer.accept(String.format("varying pipeline arguments : %s", pl.stream().sorted().collect(Collectors.joining(" "))));
                Future<?> f = executor.submit(() -> {
                    try {
                        new Benchmark(options, resultConsumer).run();
                    }
                    catch (Exception e) {
                        LOG.error("benchmark failed", (Throwable)e);
                        throw new RuntimeException(e);
                    }
                });
                f.get();
                Benchmark.executeCommand(cmdLineArgs.after);
                if (counter < paramLists.size() && options.getSleepBetweenRunsMs() > 0L) {
                    Thread.sleep(options.getSleepBetweenRunsMs());
                }
                ++counter;
            }
        }
        finally {
            executor.shutdown();
        }
    }

    public static Set<String> commonParameters(List<List<String>> paramLists) {
        Set<String> commonParameters = paramLists.stream().flatMap(l -> l.stream()).collect(Collectors.toSet());
        paramLists.forEach(commonParameters::retainAll);
        return commonParameters;
    }

    private static void executeCommand(String command) throws Exception {
        if (command != null) {
            ProcessBuilder pb = new ProcessBuilder(command);
            pb.redirectOutput(ProcessBuilder.Redirect.INHERIT);
            pb.redirectError(ProcessBuilder.Redirect.INHERIT);
            Process p = pb.start();
            int exitCode = p.waitFor();
            if (exitCode != 0) {
                throw new Exception("command returned non-zero exit code - exitCode: " + exitCode + "; command: " + command);
            }
        }
    }

    private static String[] ensureArg(String argAssignment, String[] args) {
        if (Benchmark.argValue(argAssignment, args) != null) {
            return args;
        }
        String[] newArgs = new String[args.length + 1];
        System.arraycopy(args, 0, newArgs, 0, args.length);
        newArgs[args.length] = argAssignment;
        return newArgs;
    }

    private static String argValue(String arg, String[] args) {
        String leftHandSide = arg.substring(0, arg.indexOf(61) + 1);
        for (String a : args) {
            if (!a.startsWith(leftHandSide)) continue;
            return a.substring(leftHandSide.length());
        }
        return null;
    }

    private Benchmark(BenchmarkOptions options, Consumer<String> resultConsumer) {
        this.options = options;
        this.resultConsumer = resultConsumer;
    }

    public void run() throws Exception {
        org.apache.beam.sdk.Pipeline pipeline = org.apache.beam.sdk.Pipeline.create((PipelineOptions)this.options);
        Pipeline.registerCoders((org.apache.beam.sdk.Pipeline)pipeline);
        InputSetup inputSetup = this.options.getInput().createInputSetup(this.options);
        PCollection flowSummaries = (PCollection)((PCollection)((PCollection)pipeline.apply(inputSetup.source())).apply(this.inTestingProbe.getTransform())).apply((PTransform)new Pipeline.CalculateFlowStatistics((NephronOptions)this.options));
        flowSummaries = Pipeline.accumulateSummariesIfNecessary((NephronOptions)this.options, (PCollection)flowSummaries);
        flowSummaries = (PCollection)flowSummaries.apply(this.outTestingProbe.getTransform());
        Pipeline.attachWriteToElastic((NephronOptions)this.options, (PCollection)flowSummaries);
        Pipeline.attachWriteToCortex((NephronOptions)this.options, (PCollection)flowSummaries, cw -> cw.withFixedLabel("bmr", Instant.now().toString()));
        flowSummaries.apply(Benchmark.devNull("summaries"));
        if (FlinkRunner.class.isAssignableFrom(this.options.getRunner()) && this.options.getInput() == InputSetup.Seletion.KAFKA) {
            Thread thread = new Thread(() -> {
                try {
                    LOG.info("wait some time until pipeline is started...");
                    Thread.sleep(20000L);
                    LOG.info("generate input...");
                    inputSetup.generate();
                }
                catch (Exception e) {
                    LOG.error("input generation failed", (Throwable)e);
                }
            });
            thread.start();
            this.pipelineResult = pipeline.run();
        } else {
            LOG.info("run pipeline...");
            this.pipelineResult = pipeline.run();
            LOG.info("generate input...");
            inputSetup.generate();
        }
        BenchmarkResult benchmarkResult = this.waitUntilFinished(inputSetup.sourceConfig.maxIdx);
        LOG.info("benchmark finished");
        this.resultConsumer.accept("Benchmark result");
        this.resultConsumer.accept(String.format("termination          : %s", this.terminationReason != null ? this.terminationReason.name() : "<unknown>"));
        this.resultConsumer.accept(String.format("input count          : %d", benchmarkResult.snapshot.in.count));
        this.resultConsumer.accept(String.format("expected input count : %d", inputSetup.sourceConfig.maxIdx));
        this.resultConsumer.accept(String.format("output count         : %d", benchmarkResult.snapshot.out.count));
        this.resultConsumer.accept(String.format("cortex samples       : %d", benchmarkResult.snapshot.cortexSamples));
        this.resultConsumer.accept(String.format("input  rate     : %.2f (1/s)", benchmarkResult.inRate));
        this.resultConsumer.accept(String.format("output rate     : %.2f (1/s)", benchmarkResult.outRate));
        this.resultConsumer.accept(String.format("processing time (first input until last output) : %.2f s", (double)benchmarkResult.processingTime.getMillis() / 1000.0));
    }

    private void cancel(TerminationReason tr) throws Exception {
        if (this.terminationReason == null) {
            this.terminationReason = tr;
            this.pipelineResult.cancel();
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    private BenchmarkResult waitUntilFinished(long maxIdx) throws Exception {
        Instant lastActivityAt = Instant.now();
        Snapshot lastSnapshot = null;
        Instant maxIdxReachedAt = null;
        while (this.terminationReason == null) {
            Instant now = Instant.now();
            Duration elapsed = new Duration((ReadableInstant)this.start, (ReadableInstant)now);
            Snapshot nextSnapshot = this.takeSnapshot();
            if (lastSnapshot == null || !lastSnapshot.equals(nextSnapshot)) {
                lastActivityAt = now;
                lastSnapshot = nextSnapshot;
            }
            if (elapsed.getStandardSeconds() >= (long)this.options.getMaxRunSecs().intValue()) {
                this.cancel(TerminationReason.TIMEOUT);
            }
            if (maxIdxReachedAt == null && lastSnapshot.in.count >= maxIdx) {
                maxIdxReachedAt = now;
            } else {
                Duration maxIdxReachedFor = new Duration(maxIdxReachedAt, (ReadableInstant)now);
                if (maxIdxReachedFor.isLongerThan((ReadableDuration)DONE_DELAY)) {
                    this.cancel(TerminationReason.EXPECTED_INPUT_PROCESSED);
                }
            }
            Duration quietFor = new Duration((ReadableInstant)lastActivityAt, (ReadableInstant)now);
            if (quietFor.isLongerThan((ReadableDuration)DONE_DELAY)) {
                this.cancel(TerminationReason.STARVED);
            }
            PipelineResult.State state = this.pipelineResult.getState();
            switch (state) {
                case UNKNOWN: 
                case UNRECOGNIZED: 
                case STOPPED: 
                case RUNNING: {
                    break;
                }
                case DONE: {
                    this.terminationReason = TerminationReason.DONE;
                    return new BenchmarkResult(lastSnapshot);
                }
                case CANCELLED: {
                    if (this.terminationReason != null) return new BenchmarkResult(lastSnapshot);
                    LOG.error("Job was unexpectedly cancelled");
                    this.terminationReason = TerminationReason.UNEXPECTED;
                    return new BenchmarkResult(lastSnapshot);
                }
                case FAILED: {
                    this.terminationReason = TerminationReason.FAILED;
                    return new BenchmarkResult(lastSnapshot);
                }
                case UPDATED: {
                    this.terminationReason = TerminationReason.UPDATED;
                    LOG.error("Job was unexpectedly updated");
                    return new BenchmarkResult(lastSnapshot);
                }
            }
            LOG.debug(String.format("input count : %d", lastSnapshot.in.count));
            LOG.debug(String.format("output count: %d", lastSnapshot.out.count));
            Thread.sleep(PERF_DELAY.getMillis());
        }
        return new BenchmarkResult(lastSnapshot);
    }

    private Snapshot takeSnapshot() {
        return new Snapshot(this.inTestingProbe.takeSnapshot(this.pipelineResult), this.outTestingProbe.takeSnapshot(this.pipelineResult), TestingProbe.getElementCount("cortex", "sample", this.pipelineResult));
    }

    public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
        return ParDo.of((DoFn)new DoFn<T, Void>(){
            final Counter discardedCounterMetric;
            {
                this.discardedCounterMetric = Metrics.counter((String)name, (String)"discarded");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                this.discardedCounterMetric.inc();
            }
        });
    }

    static enum TerminationReason {
        DONE,
        TIMEOUT,
        STARVED,
        EXPECTED_INPUT_PROCESSED,
        UNEXPECTED,
        FAILED,
        UPDATED;

    }

    public static class Snapshot {
        public final TestingProbe.Snapshot in;
        public final TestingProbe.Snapshot out;
        public final long cortexSamples;

        public Snapshot(TestingProbe.Snapshot in, TestingProbe.Snapshot out, long cortexSamples) {
            this.in = in;
            this.out = out;
            this.cortexSamples = cortexSamples;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Snapshot snapshot = (Snapshot)o;
            return this.cortexSamples == snapshot.cortexSamples && Objects.equals(this.in, snapshot.in) && Objects.equals(this.out, snapshot.out);
        }

        public int hashCode() {
            return Objects.hash(this.in, this.out, this.cortexSamples);
        }
    }
}

