/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.testing.benchmark;

import java.io.Serializable;
import java.util.HashMap;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.opennms.nephron.Pipeline;
import org.opennms.nephron.testing.benchmark.BenchmarkOptions;
import org.opennms.nephron.testing.benchmark.KafkaFlowIngester;
import org.opennms.nephron.testing.flowgen.SourceConfig;
import org.opennms.nephron.testing.flowgen.SyntheticFlowSource;
import org.opennms.nephron.testing.flowgen.SyntheticFlowTimestampPolicyFactory;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;

public abstract class InputSetup {
    protected final BenchmarkOptions options;
    protected final SourceConfig sourceConfig;

    public InputSetup(BenchmarkOptions options) {
        this.options = options;
        this.sourceConfig = SourceConfig.of(options, SyntheticFlowTimestampPolicyFactory.withLimitedDelay(options, (SerializableFunction<FlowDocument, Instant>)((SerializableFunction & Serializable)Pipeline.ReadFromKafka::getTimestamp)));
    }

    abstract PTransform<PBegin, PCollection<FlowDocument>> source();

    abstract void generate() throws Exception;

    private static TimestampPolicyFactory<byte[], FlowDocument> createTimestampPolicyFactory(final long maxIdx, Duration maxInputDelay, final Duration maxInputIdleDuration, final Duration maxRunDuration) {
        return (TimestampPolicyFactory & Serializable)(tp, previousWatermark) -> new CustomTimestampPolicyWithLimitedDelay<byte[], FlowDocument>(Pipeline.ReadFromKafka::getTimestamp, maxInputDelay, previousWatermark){
            private long counter;
            private Instant start;
            private Instant idleSince;
            private boolean closed;
            {
                super(timestampFunction, maxDelay, previousWatermark);
                this.counter = 0L;
                this.start = Instant.now();
                this.idleSince = Instant.now();
                this.closed = false;
            }

            public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<byte[], FlowDocument> record) {
                ++this.counter;
                this.idleSince = Instant.now();
                return super.getTimestampForRecord(ctx, record);
            }

            public Instant getWatermark(TimestampPolicy.PartitionContext ctx) {
                if (this.closed || this.counter >= maxIdx || new Duration((ReadableInstant)this.idleSince, (ReadableInstant)Instant.now()).isLongerThan((ReadableDuration)maxInputIdleDuration) || new Duration((ReadableInstant)this.start, (ReadableInstant)Instant.now()).isLongerThan((ReadableDuration)maxRunDuration)) {
                    this.closed = true;
                    return BoundedWindow.TIMESTAMP_MAX_VALUE;
                }
                return super.getWatermark(ctx);
            }
        };
    }

    public static class MemoryInputSetup
    extends InputSetup {
        public MemoryInputSetup(BenchmarkOptions options) {
            super(options);
        }

        @Override
        public PTransform<PBegin, PCollection<FlowDocument>> source() {
            return SyntheticFlowSource.readFromSyntheticSource(this.sourceConfig);
        }

        @Override
        public void generate() throws Exception {
        }
    }

    public static class KafkaInputSetup
    extends InputSetup {
        public KafkaInputSetup(BenchmarkOptions options) {
            super(options);
        }

        @Override
        public PTransform<PBegin, PCollection<FlowDocument>> source() {
            HashMap<String, Object> kafkaConsumerConfig = new HashMap<String, Object>();
            kafkaConsumerConfig.put("group.id", this.options.getGroupId());
            kafkaConsumerConfig.put("enable.auto.commit", this.options.getAutoCommit());
            TimestampPolicyFactory<byte[], FlowDocument> tpf = InputSetup.createTimestampPolicyFactory(this.sourceConfig.maxIdx, Duration.millis((long)this.options.getDefaultMaxInputDelayMs()), Duration.standardSeconds((long)this.options.getMaxInputIdleSecs().intValue()), Duration.standardSeconds((long)this.options.getMaxRunSecs().intValue()));
            return new Pipeline.ReadFromKafka(this.options.getBootstrapServers(), this.options.getFlowSourceTopic(), kafkaConsumerConfig, tpf);
        }

        @Override
        public void generate() throws Exception {
            KafkaFlowIngester.sendRecordsToKafka(this.options);
        }
    }

    public static enum Seletion {
        MEMORY{

            @Override
            public InputSetup createInputSetup(BenchmarkOptions options) {
                return new MemoryInputSetup(options);
            }
        }
        ,
        KAFKA{

            @Override
            public InputSetup createInputSetup(BenchmarkOptions options) {
                return new KafkaInputSetup(options);
            }
        };


        public abstract InputSetup createInputSetup(BenchmarkOptions var1);
    }
}

