/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PDone;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.opennms.nephron.Aggregate;
import org.opennms.nephron.CompoundKey;
import org.opennms.nephron.CompoundKeyType;
import org.opennms.nephron.FlowSummaryData;
import org.opennms.nephron.FlowTimestampPolicy;
import org.opennms.nephron.MissingFieldsException;
import org.opennms.nephron.NephronOptions;
import org.opennms.nephron.UnalignedFixedWindows;
import org.opennms.nephron.WithHostname;
import org.opennms.nephron.coders.FlowDocumentProtobufCoder;
import org.opennms.nephron.coders.KafkaInputFlowDeserializer;
import org.opennms.nephron.elastic.AggregationType;
import org.opennms.nephron.elastic.FlowSummary;
import org.opennms.nephron.elastic.IndexStrategy;
import org.opennms.netmgt.flows.persistence.model.Direction;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit((Logger)LOG).maxRate(5).every(java.time.Duration.ofSeconds(10L)).build();

    public static org.apache.beam.sdk.Pipeline create(NephronOptions options) {
        Objects.requireNonNull(options);
        org.apache.beam.sdk.Pipeline p = org.apache.beam.sdk.Pipeline.create((PipelineOptions)options);
        Pipeline.registerCoders(p);
        HashMap<String, Object> kafkaConsumerConfig = new HashMap<String, Object>();
        HashMap<String, Object> kafkaProducerConfig = new HashMap<String, Object>();
        if (!Strings.isNullOrEmpty((String)options.getKafkaClientProperties())) {
            Properties properties = new Properties();
            try {
                properties.load(new FileReader(options.getKafkaClientProperties()));
            }
            catch (IOException e) {
                LOG.error("Error loading properties file", (Throwable)e);
                throw new RuntimeException("Error reading properties file", e);
            }
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                kafkaConsumerConfig.put(entry.getKey().toString(), entry.getValue());
                kafkaProducerConfig.put(entry.getKey().toString(), entry.getValue());
            }
        }
        kafkaConsumerConfig.put("group.id", options.getGroupId());
        kafkaConsumerConfig.put("enable.auto.commit", options.getAutoCommit());
        PCollection streamOfFlows = (PCollection)p.apply((PTransform)new ReadFromKafka(options.getBootstrapServers(), options.getFlowSourceTopic(), kafkaConsumerConfig));
        PCollection flowSummaries = (PCollection)streamOfFlows.apply((PTransform)new CalculateFlowStatistics(options));
        flowSummaries.apply((PTransform)new WriteToElasticsearch(options));
        if (options.getFlowDestTopic() != null) {
            flowSummaries.apply((PTransform)new WriteToKafka(options.getBootstrapServers(), options.getFlowDestTopic(), kafkaProducerConfig));
        }
        return p;
    }

    public static void registerCoders(org.apache.beam.sdk.Pipeline p) {
        CoderRegistry coderRegistry = p.getCoderRegistry();
        coderRegistry.registerCoderForClass(FlowDocument.class, (Coder)new FlowDocumentProtobufCoder());
        coderRegistry.registerCoderForClass(FlowSummaryData.class, (Coder)new FlowSummaryData.FlowSummaryDataCoder());
        coderRegistry.registerCoderForClass(CompoundKey.class, (Coder)new CompoundKey.CompoundKeyCoder());
        coderRegistry.registerCoderForClass(Aggregate.class, (Coder)new Aggregate.AggregateCoder());
    }

    public static TimestampPolicyFactory<String, FlowDocument> getKafkaInputTimestampPolicyFactory(Duration maxDelay) {
        return (TimestampPolicyFactory & Serializable)(tp, previousWatermark) -> new FlowTimestampPolicy(maxDelay, previousWatermark);
    }

    private static ParDo.SingleOutput<FlowSummaryData, String> toJson() {
        return ParDo.of((DoFn)new DoFn<FlowSummaryData, String>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) throws JsonProcessingException {
                FlowSummary flowSummary = Pipeline.toFlowSummary((FlowSummaryData)c.element());
                c.output((Object)MAPPER.writeValueAsString((Object)flowSummary));
            }
        });
    }

    public static ParDo.SingleOutput<FlowDocument, FlowDocument> attachTimestamps(final Duration fixedWindowSize, final Duration maxFlowDuration) {
        return ParDo.of((DoFn)new DoFn<FlowDocument, FlowDocument>(){
            final long windowSizeMs;
            final long maxFlowDurationMs;
            {
                this.windowSizeMs = fixedWindowSize.getMillis();
                this.maxFlowDurationMs = maxFlowDuration.getMillis();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                FlowDocument flow = (FlowDocument)c.element();
                long deltaSwitched = flow.getDeltaSwitched().getValue();
                long lastSwitched = flow.getLastSwitched().getValue();
                int nodeId = flow.getExporterNode().getNodeId();
                long firstWindow = UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, deltaSwitched);
                long lastWindow = UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, lastSwitched);
                long nbWindows = lastWindow - firstWindow + 1L;
                long timestamp = deltaSwitched;
                for (long i = 0L; i < nbWindows; ++i) {
                    if (timestamp <= c.timestamp().getMillis() - this.maxFlowDurationMs) {
                        RATE_LIMITED_LOG.warn("Skipping output for flow w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", new Object[]{org.joda.time.Instant.ofEpochMilli((long)deltaSwitched), org.joda.time.Instant.ofEpochMilli((long)lastSwitched), org.joda.time.Instant.ofEpochMilli((long)timestamp), c.timestamp(), flow});
                    } else {
                        c.outputWithTimestamp((Object)flow, org.joda.time.Instant.ofEpochMilli((long)timestamp));
                    }
                    if (timestamp + this.windowSizeMs < lastSwitched) {
                        timestamp += this.windowSizeMs;
                        continue;
                    }
                    timestamp = lastSwitched;
                }
            }

            public Duration getAllowedTimestampSkew() {
                return maxFlowDuration;
            }
        });
    }

    public static FlowSummaryData toFlowSummaryData(AggregationType aggregationType, IntervalWindow window, KV<CompoundKey, Aggregate> el, int ranking) {
        return new FlowSummaryData(aggregationType, (CompoundKey)el.getKey(), (Aggregate)el.getValue(), window.start().getMillis(), window.end().getMillis(), ranking);
    }

    public static FlowSummary toFlowSummary(FlowSummaryData fsd) {
        FlowSummary flowSummary = new FlowSummary();
        fsd.key.populate(flowSummary);
        flowSummary.setAggregationType(fsd.aggregationType);
        flowSummary.setRangeStartMs(fsd.windowStart);
        flowSummary.setRangeEndMs(fsd.windowEnd);
        flowSummary.setTimestamp(flowSummary.getRangeEndMs());
        flowSummary.setBytesEgress(fsd.aggregate.getBytesOut());
        flowSummary.setBytesIngress(fsd.aggregate.getBytesIn());
        flowSummary.setBytesTotal(flowSummary.getBytesIngress() + flowSummary.getBytesEgress());
        flowSummary.setCongestionEncountered(fsd.aggregate.isCongestionEncountered());
        flowSummary.setNonEcnCapableTransport(fsd.aggregate.isNonEcnCapableTransport());
        flowSummary.setHostName(fsd.aggregate.getHostname());
        flowSummary.setRanking(fsd.ranking);
        return flowSummary;
    }

    public static Window<FlowDocument> toWindow(Duration fixedWindowSize, Duration earlyProcessingDelay, Duration lateProcessingDelay, Duration allowedLateness) {
        AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow().withLateFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateProcessingDelay));
        if (earlyProcessingDelay != null && !earlyProcessingDelay.isEqual((ReadableDuration)Duration.ZERO)) {
            trigger = trigger.withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyProcessingDelay));
        }
        return Window.into((WindowFn)UnalignedFixedWindows.of(fixedWindowSize)).withTimestampCombiner(TimestampCombiner.END_OF_WINDOW).triggering((Trigger)trigger).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY).withAllowedLateness(allowedLateness).discardingFiredPanes();
    }

    public static TotalAndSummary aggregateParentTotal(String transformPrefix, PCollection<KV<CompoundKey, Aggregate>> child) {
        PCollection parentTotal = (PCollection)((PCollection)child.apply(transformPrefix + "group_by_outer_key", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).getOuterKey(), (Object)el.getValue()));
            }
        }))).apply(transformPrefix + "sum_bytes_by_key", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new SumBytes()));
        PCollection summary = (PCollection)parentTotal.apply(transformPrefix + "total_summary", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, FlowSummaryData>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, IntervalWindow window) {
                c.output((Object)Pipeline.toFlowSummaryData(AggregationType.TOTAL, window, (KV<CompoundKey, Aggregate>)((KV)c.element()), 0));
            }
        }));
        return new TotalAndSummary((PCollection<KV<CompoundKey, Aggregate>>)parentTotal, (PCollection<FlowSummaryData>)summary);
    }

    public static SumsAndTopKs aggregateSumsAndTopKs(String transformPrefix, PCollection<FlowDocument> input, CompoundKeyType typeWithTos, final CompoundKeyType typeWithoutTos, int k) {
        PCollection groupedByKeyWithTos = (PCollection)input.apply(transformPrefix + "group_with_tos", (PTransform)ParDo.of((DoFn)new KeyFlowBy(typeWithTos)));
        SumAndTopK withTos = Pipeline.aggregateSumAndTopK(transformPrefix + "with_tos_", (PCollection<KV<CompoundKey, Aggregate>>)groupedByKeyWithTos, k);
        PCollection groupedByKeyWithoutTos = (PCollection)withTos.sum.apply(transformPrefix + "group_without_tos_", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).project(typeWithoutTos), (Object)el.getValue()));
            }
        }));
        SumAndTopK withoutTos = Pipeline.aggregateSumAndTopK(transformPrefix + "without_tos_", (PCollection<KV<CompoundKey, Aggregate>>)groupedByKeyWithoutTos, k);
        return new SumsAndTopKs(withTos, withoutTos);
    }

    public static SumAndTopK aggregateSumAndTopK(String transformPrefix, PCollection<KV<CompoundKey, Aggregate>> groupedByKey, int k) {
        PCollection sum = (PCollection)groupedByKey.apply(transformPrefix + "sum_bytes_by_key", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new SumBytes()));
        PCollection topK = (PCollection)((PCollection)((PCollection)((PCollection)sum.apply(transformPrefix + "group_by_outer_key", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).getOuterKey(), (Object)el));
            }
        }))).apply(transformPrefix + "top_k_per_key", Top.perKey((int)k, (Comparator)new FlowBytesValueComparator()))).apply(transformPrefix + "flatten", (PTransform)Values.create())).apply(transformPrefix + "top_k_summary", (PTransform)ParDo.of((DoFn)new DoFn<List<KV<CompoundKey, Aggregate>>, FlowSummaryData>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, IntervalWindow window) {
                int ranking = 1;
                for (KV el : (List)c.element()) {
                    FlowSummaryData flowSummary = Pipeline.toFlowSummaryData(AggregationType.TOPK, window, (KV<CompoundKey, Aggregate>)el, ranking++);
                    c.output((Object)flowSummary);
                }
            }
        }));
        return new SumAndTopK((PCollection<KV<CompoundKey, Aggregate>>)sum, (PCollection<FlowSummaryData>)topK);
    }

    public static class SumsAndTopKs {
        public final SumAndTopK withTos;
        public final SumAndTopK withoutTos;

        public SumsAndTopKs(SumAndTopK withTos, SumAndTopK withoutTos) {
            this.withTos = withTos;
            this.withoutTos = withoutTos;
        }
    }

    public static class SumAndTopK {
        public final PCollection<KV<CompoundKey, Aggregate>> sum;
        public final PCollection<FlowSummaryData> topK;

        public SumAndTopK(PCollection<KV<CompoundKey, Aggregate>> sum, PCollection<FlowSummaryData> topK) {
            this.sum = sum;
            this.topK = topK;
        }
    }

    public static class TotalAndSummary {
        public final PCollection<KV<CompoundKey, Aggregate>> total;
        public final PCollection<FlowSummaryData> summary;

        public TotalAndSummary(PCollection<KV<CompoundKey, Aggregate>> total, PCollection<FlowSummaryData> summary) {
            this.total = total;
            this.summary = summary;
        }
    }

    public static class KeyFlowBy
    extends DoFn<FlowDocument, KV<CompoundKey, Aggregate>> {
        private final CompoundKeyType type;
        private final Counter flowsWithMissingFields = Metrics.counter(Pipeline.class, (String)"flowsWithMissingFields");
        private final Counter flowsInWindow = Metrics.counter((String)"flows", (String)"in_window");

        public KeyFlowBy(CompoundKeyType type) {
            this.type = type;
        }

        public static long bytesInWindow(long deltaSwitched, long lastSwitchedInclusive, double multipliedNumBytes, long windowStart, long windowEndInclusive) {
            long flowDurationMs = lastSwitchedInclusive - deltaSwitched + 1L;
            long overlapStart = Math.max(deltaSwitched, windowStart);
            long overlapEnd = Math.min(lastSwitchedInclusive, windowEndInclusive);
            long previousEnd = overlapStart - 1L;
            long bytesAtPreviousEnd = (long)((double)(previousEnd - deltaSwitched + 1L) * multipliedNumBytes / (double)flowDurationMs);
            long bytesAtEnd = (long)((double)(overlapEnd - deltaSwitched + 1L) * multipliedNumBytes / (double)flowDurationMs);
            return bytesAtEnd - bytesAtPreviousEnd;
        }

        private Aggregate aggregatize(IntervalWindow window, FlowDocument flow, String hostname) {
            double samplingInterval;
            double multiplier = 1.0;
            if (flow.hasSamplingInterval() && (samplingInterval = flow.getSamplingInterval().getValue()) > 0.0) {
                multiplier = samplingInterval;
            }
            long bytes = KeyFlowBy.bytesInWindow(flow.getDeltaSwitched().getValue(), flow.getLastSwitched().getValue(), (double)flow.getNumBytes().getValue() * multiplier, window.start().getMillis(), window.maxTimestamp().getMillis());
            this.flowsInWindow.inc();
            return Direction.INGRESS.equals((Object)flow.getDirection()) ? new Aggregate(bytes, 0L, hostname, flow.hasEcn() ? Integer.valueOf(flow.getEcn().getValue()) : null) : new Aggregate(0L, bytes, hostname, flow.hasEcn() ? Integer.valueOf(flow.getEcn().getValue()) : null);
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, IntervalWindow window) {
            FlowDocument flow = (FlowDocument)c.element();
            try {
                for (WithHostname<CompoundKey> key : this.key(flow)) {
                    Aggregate aggregate = this.aggregatize(window, flow, key.hostname);
                    c.output((Object)KV.of(key.value, (Object)aggregate));
                }
            }
            catch (MissingFieldsException mfe) {
                this.flowsWithMissingFields.inc();
            }
        }

        public Collection<WithHostname<CompoundKey>> key(FlowDocument flow) throws MissingFieldsException {
            return this.type.create(flow);
        }
    }

    static class FlowBytesValueComparator
    implements Comparator<KV<CompoundKey, Aggregate>>,
    Serializable {
        FlowBytesValueComparator() {
        }

        @Override
        public int compare(KV<CompoundKey, Aggregate> a, KV<CompoundKey, Aggregate> b) {
            int res = Long.compare(((Aggregate)a.getValue()).getBytes(), ((Aggregate)b.getValue()).getBytes());
            if (res != 0) {
                return res;
            }
            return ((CompoundKey)b.getKey()).groupedByKey().compareTo(((CompoundKey)a.getKey()).groupedByKey());
        }
    }

    static class SumBytes
    extends Combine.BinaryCombineFn<Aggregate> {
        SumBytes() {
        }

        public Aggregate apply(Aggregate left, Aggregate right) {
            return Aggregate.merge(left, right);
        }
    }

    public static class WriteToKafka
    extends PTransform<PCollection<FlowSummaryData>, PDone> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaProducerConfig;

        public WriteToKafka(String bootstrapServers, String topic, Map<String, Object> kafkaProducerConfig) {
            this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
            this.topic = Objects.requireNonNull(topic);
            this.kafkaProducerConfig = kafkaProducerConfig;
        }

        public PDone expand(PCollection<FlowSummaryData> input) {
            return (PDone)((PCollection)input.apply((PTransform)Pipeline.toJson())).apply(KafkaIO.write().withProducerConfigUpdates(this.kafkaProducerConfig).withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withValueSerializer(StringSerializer.class).values());
        }
    }

    public static class ReadFromKafka
    extends PTransform<PBegin, PCollection<FlowDocument>> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaConsumerConfig;
        private final Counter flowsFromKafka = Metrics.counter((String)"flows", (String)"from_kafka");
        private final Distribution flowsFromKafkaDrift = Metrics.distribution((String)"flows", (String)"from_kafka_drift");

        public ReadFromKafka(String bootstrapServers, String topic, Map<String, Object> kafkaConsumerConfig) {
            this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
            this.topic = Objects.requireNonNull(topic);
            this.kafkaConsumerConfig = Objects.requireNonNull(kafkaConsumerConfig);
        }

        public PCollection<FlowDocument> expand(PBegin input) {
            NephronOptions options = (NephronOptions)input.getPipeline().getOptions().as(NephronOptions.class);
            return (PCollection)((PCollection)((PCollection)input.apply(KafkaIO.read().withTopic(this.topic).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(KafkaInputFlowDeserializer.class).withConsumerConfigUpdates(this.kafkaConsumerConfig).withBootstrapServers(this.bootstrapServers).withTimestampPolicyFactory(Pipeline.getKafkaInputTimestampPolicyFactory(Duration.millis((long)options.getDefaultMaxInputDelayMs()))).withoutMetadata())).apply((PTransform)Values.create())).apply("init", (PTransform)ParDo.of((DoFn)new DoFn<FlowDocument, FlowDocument>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    FlowDocument flow = (FlowDocument)c.element();
                    if (!flow.hasDeltaSwitched()) {
                        flow = FlowDocument.newBuilder((FlowDocument)((FlowDocument)c.element())).setDeltaSwitched(flow.getFirstSwitched()).build();
                    }
                    c.output((Object)flow);
                    flowsFromKafka.inc();
                    flowsFromKafkaDrift.update(System.currentTimeMillis() - flow.getTimestamp());
                }
            }));
        }

        public static long getTimestampMs(FlowDocument doc) {
            return doc.getLastSwitched().getValue();
        }

        public static org.joda.time.Instant getTimestamp(FlowDocument doc) {
            return org.joda.time.Instant.ofEpochMilli((long)ReadFromKafka.getTimestampMs(doc));
        }
    }

    public static class WriteToElasticsearch
    extends PTransform<PCollection<FlowSummaryData>, PDone> {
        private final String elasticIndex;
        private final IndexStrategy indexStrategy;
        private final ElasticsearchIO.ConnectionConfiguration esConfig;
        private final Counter flowsToEs = Metrics.counter((String)"flows", (String)"to_es");
        private final Distribution flowsToEsDrift = Metrics.distribution((String)"flows", (String)"to_es_drift");
        private int elasticRetryCount;
        private long elasticRetryDuration;

        public WriteToElasticsearch(String elasticUrl, String elasticUser, String elasticPassword, String elasticIndex, IndexStrategy indexStrategy, int elasticConnectTimeout, int elasticSocketTimeout, int elasticRetryCount, long elasticRetryDuration) {
            Objects.requireNonNull(elasticUrl);
            this.elasticIndex = Objects.requireNonNull(elasticIndex);
            this.indexStrategy = Objects.requireNonNull(indexStrategy);
            ElasticsearchIO.ConnectionConfiguration thisEsConfig = ElasticsearchIO.ConnectionConfiguration.create((String[])new String[]{elasticUrl}, (String)elasticIndex, (String)"_doc");
            if (!Strings.isNullOrEmpty((String)elasticUser) && !Strings.isNullOrEmpty((String)elasticPassword)) {
                thisEsConfig = thisEsConfig.withUsername(elasticUser).withPassword(elasticPassword);
            }
            this.esConfig = thisEsConfig = thisEsConfig.withConnectTimeout(Integer.valueOf(elasticConnectTimeout)).withSocketTimeout(Integer.valueOf(elasticSocketTimeout));
            this.elasticRetryCount = elasticRetryCount;
            this.elasticRetryDuration = elasticRetryDuration;
        }

        public WriteToElasticsearch(NephronOptions options) {
            this(options.getElasticUrl(), options.getElasticUser(), options.getElasticPassword(), options.getElasticFlowIndex(), options.getElasticIndexStrategy(), options.getElasticConnectTimeout(), options.getElasticSocketTimeout(), options.getElasticRetryCount(), options.getElasticRetryDuration());
        }

        public PDone expand(PCollection<FlowSummaryData> input) {
            return (PDone)((PCollection)input.apply("SerializeToJson", (PTransform)Pipeline.toJson())).apply("WriteToElasticsearch", (PTransform)ElasticsearchIO.write().withConnectionConfiguration(this.esConfig).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create((int)this.elasticRetryCount, (Duration)Duration.millis((long)this.elasticRetryDuration))).withIndexFn(new ElasticsearchIO.Write.FieldValueExtractFn(){

                public String apply(JsonNode input) {
                    Instant flowTimestamp = Instant.ofEpochMilli(input.get("@timestamp").asLong());
                    String indexName = indexStrategy.getIndex(elasticIndex, flowTimestamp);
                    flowsToEs.inc();
                    flowsToEsDrift.update(System.currentTimeMillis() - flowTimestamp.toEpochMilli());
                    return indexName;
                }
            }));
        }
    }

    public static class WindowedFlows
    extends PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> {
        private final Duration fixedWindowSize;
        private final Duration maxFlowDuration;
        private final Duration earlyProcessingDelay;
        private final Duration lateProcessingDelay;
        private final Duration allowedLateness;

        public WindowedFlows(Duration fixedWindowSize, Duration maxFlowDuration, Duration earlyProcessingDelay, Duration lateProcessingDelay, Duration allowedLateness) {
            this.fixedWindowSize = Objects.requireNonNull(fixedWindowSize);
            this.maxFlowDuration = Objects.requireNonNull(maxFlowDuration);
            this.earlyProcessingDelay = Objects.requireNonNull(earlyProcessingDelay);
            this.lateProcessingDelay = Objects.requireNonNull(lateProcessingDelay);
            this.allowedLateness = Objects.requireNonNull(allowedLateness);
        }

        public PCollection<FlowDocument> expand(PCollection<FlowDocument> input) {
            return (PCollection)((PCollection)input.apply("attach_timestamp", Pipeline.attachTimestamps(this.fixedWindowSize, this.maxFlowDuration))).apply("to_windows", Pipeline.toWindow(this.fixedWindowSize, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
        }
    }

    public static class CalculateFlowStatistics
    extends PTransform<PCollection<FlowDocument>, PCollection<FlowSummaryData>> {
        private final int topK;
        private final Duration fixedWindowSize;
        private final Duration maxFlowDuration;
        private final Duration earlyProcessingDelay;
        private final Duration lateProcessingDelay;
        private final Duration allowedLateness;

        public CalculateFlowStatistics(int topK, Duration fixedWindowSize, Duration maxFlowDuration, Duration earlyProcessingDelay, Duration lateProcessingDelay, Duration allowedLateness) {
            this.topK = topK;
            this.fixedWindowSize = Objects.requireNonNull(fixedWindowSize);
            this.maxFlowDuration = Objects.requireNonNull(maxFlowDuration);
            this.earlyProcessingDelay = Objects.requireNonNull(earlyProcessingDelay);
            this.lateProcessingDelay = Objects.requireNonNull(lateProcessingDelay);
            this.allowedLateness = Objects.requireNonNull(allowedLateness);
        }

        public CalculateFlowStatistics(NephronOptions options) {
            this(options.getTopK(), Duration.millis((long)options.getFixedWindowSizeMs()), Duration.millis((long)options.getMaxFlowDurationMs()), Duration.millis((long)options.getEarlyProcessingDelayMs()), Duration.millis((long)options.getLateProcessingDelayMs()), Duration.millis((long)options.getAllowedLatenessMs()));
        }

        public PCollection<FlowSummaryData> expand(PCollection<FlowDocument> input) {
            PCollection windowedStreamOfFlows = (PCollection)input.apply("WindowedFlows", (PTransform)new WindowedFlows(this.fixedWindowSize, this.maxFlowDuration, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
            SumsAndTopKs app = Pipeline.aggregateSumsAndTopKs("app_", (PCollection<FlowDocument>)windowedStreamOfFlows, CompoundKeyType.EXPORTER_INTERFACE_TOS_APPLICATION, CompoundKeyType.EXPORTER_INTERFACE_APPLICATION, this.topK);
            SumsAndTopKs host = Pipeline.aggregateSumsAndTopKs("host_", (PCollection<FlowDocument>)windowedStreamOfFlows, CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST, CompoundKeyType.EXPORTER_INTERFACE_HOST, this.topK);
            SumsAndTopKs conv = Pipeline.aggregateSumsAndTopKs("conv_", (PCollection<FlowDocument>)windowedStreamOfFlows, CompoundKeyType.EXPORTER_INTERFACE_TOS_CONVERSATION, CompoundKeyType.EXPORTER_INTERFACE_CONVERSATION, this.topK);
            TotalAndSummary tos = Pipeline.aggregateParentTotal("tos_", app.withTos.sum);
            TotalAndSummary itf = Pipeline.aggregateParentTotal("itf_", tos.total);
            PCollectionList flowSummaries = PCollectionList.of(itf.summary).and(tos.summary).and(app.withTos.topK).and(app.withoutTos.topK).and(host.withTos.topK).and(host.withoutTos.topK).and(conv.withTos.topK).and(conv.withoutTos.topK);
            return (PCollection)flowSummaries.apply((PTransform)Flatten.pCollections());
        }
    }
}

