/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.testing.flowgen;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
import net.jqwik.api.RandomGenerator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.opennms.nephron.coders.FlowDocumentProtobufCoder;
import org.opennms.nephron.testing.flowgen.FlowDocuments;
import org.opennms.nephron.testing.flowgen.FlowReader;
import org.opennms.nephron.testing.flowgen.Limiter;
import org.opennms.nephron.testing.flowgen.SourceConfig;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SyntheticFlowSource
extends UnboundedSource<FlowDocument, FlowReader.CheckpointMark> {
    private static final Logger LOG = LoggerFactory.getLogger(SyntheticFlowSource.class);
    private final SourceConfig sourceConfig;

    public static PTransform<PBegin, PCollection<FlowDocument>> readFromSyntheticSource(final SourceConfig sourceConfig) {
        return new PTransform<PBegin, PCollection<FlowDocument>>(){
            private final Gauge inputDrift = Metrics.gauge((String)"flows", (String)"input_lag");

            public PCollection<FlowDocument> expand(PBegin input) {
                return (PCollection)((PCollection)input.apply((PTransform)Read.from((UnboundedSource)new SyntheticFlowSource(sourceConfig)))).apply((PTransform)ParDo.of((DoFn)new DoFn<FlowDocument, FlowDocument>(){

                    @DoFn.ProcessElement
                    public void processElement(DoFn.ProcessContext c) {
                        FlowDocument flow = (FlowDocument)c.element();
                        inputDrift.set(System.currentTimeMillis() - flow.getLastSwitched().getValue());
                        c.output((Object)flow);
                    }
                }));
            }
        };
    }

    public SyntheticFlowSource(SourceConfig sourceConfig) {
        this.sourceConfig = sourceConfig;
    }

    public List<? extends UnboundedSource<FlowDocument, FlowReader.CheckpointMark>> split(int desiredNumSplits, PipelineOptions options) throws Exception {
        LOG.debug("desired number of splits: " + desiredNumSplits);
        return this.sourceConfig.split(desiredNumSplits).stream().map(sc -> new SyntheticFlowSource((SourceConfig)sc)).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<FlowDocument> createReader(PipelineOptions options, @Nullable FlowReader.CheckpointMark checkpointMark) throws IOException {
        Limiter limiter;
        Random random;
        long startIdx;
        Optional<Instant> previous;
        RandomGenerator flowData = FlowDocuments.getFlowDataArbitrary(this.sourceConfig.flowConfig).generator(1000);
        if (checkpointMark == null) {
            LOG.trace("creating initial unbounded reader for {}", (Object)options);
            previous = Optional.empty();
            startIdx = this.sourceConfig.idxOffset;
            random = new Random(this.sourceConfig.seed);
            limiter = Limiter.of(this.sourceConfig.flowsPerSecond);
        } else {
            LOG.trace("resuming unbounded reader from {}", (Object)checkpointMark);
            previous = Optional.of(checkpointMark.previous);
            startIdx = checkpointMark.index;
            random = checkpointMark.random;
            limiter = Limiter.restore(this.sourceConfig.flowsPerSecond, checkpointMark.limiterState);
        }
        return new FlowReader(this, (rnd, idx) -> FlowDocuments.getFlowDocument(this.sourceConfig.flowConfig, idx, (FlowDocuments.FlowData)flowData.next(rnd).value()), this.sourceConfig.timestampPolicyFactory.create(previous), this.sourceConfig.idxInc, this.sourceConfig.maxIdx, startIdx, random, limiter);
    }

    public Coder<FlowReader.CheckpointMark> getCheckpointMarkCoder() {
        return FlowReader.CheckpointMark.CHECKPOINT_CODER;
    }

    public Coder<FlowDocument> getOutputCoder() {
        return new FlowDocumentProtobufCoder();
    }
}

