/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.net.InetAddresses;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.opennms.nephron.Aggregate;
import org.opennms.nephron.CompoundKey;
import org.opennms.nephron.CompoundKeyData;
import org.opennms.nephron.CompoundKeyType;
import org.opennms.nephron.MissingFieldsException;
import org.opennms.nephron.NephronOptions;
import org.opennms.nephron.UnalignedFixedWindows;
import org.opennms.nephron.coders.FlowDocumentProtobufCoder;
import org.opennms.nephron.coders.KafkaInputFlowDeserializer;
import org.opennms.nephron.cortex.CortexIo;
import org.opennms.nephron.cortex.TimeSeriesBuilder;
import org.opennms.nephron.elastic.AggregationType;
import org.opennms.nephron.elastic.FlowSummary;
import org.opennms.nephron.elastic.IndexStrategy;
import org.opennms.nephron.network.IPAddress;
import org.opennms.nephron.network.IpValue;
import org.opennms.nephron.network.StringValue;
import org.opennms.nephron.util.PaneAccumulator;
import org.opennms.netmgt.flows.persistence.model.Direction;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit((Logger)LOG).maxRate(5).every(java.time.Duration.ofSeconds(10L)).build();
    private static TupleTag<KV<CompoundKey, Aggregate>> BY_HOST = new TupleTag<KV<CompoundKey, Aggregate>>(){};
    private static TupleTag<KV<CompoundKey, Aggregate>> BY_APP = new TupleTag<KV<CompoundKey, Aggregate>>(){};
    private static ParDo.SingleOutput<KV<CompoundKey, Aggregate>, String> FLOW_SUMMARY_DATA_TO_JSON = ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, String>(){

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, IntervalWindow window) throws JsonProcessingException {
            FlowSummary flowSummary = Pipeline.toFlowSummary((KV<CompoundKey, Aggregate>)((KV)c.element()), window);
            c.output((Object)MAPPER.writeValueAsString((Object)flowSummary));
        }
    });

    public static org.apache.beam.sdk.Pipeline create(NephronOptions options) {
        Objects.requireNonNull(options);
        TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory = Pipeline.getKafkaInputTimestampPolicyFactory(Duration.millis((long)options.getDefaultMaxInputDelayMs()));
        return Pipeline.create(options, timestampPolicyFactory);
    }

    public static org.apache.beam.sdk.Pipeline create(NephronOptions options, TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory) {
        Objects.requireNonNull(options);
        org.apache.beam.sdk.Pipeline p = org.apache.beam.sdk.Pipeline.create((PipelineOptions)options);
        Pipeline.registerCoders(p);
        Map<String, Object> kafkaConsumerConfig = Pipeline.loadKafkaClientProperties(options);
        kafkaConsumerConfig.put("group.id", options.getGroupId());
        kafkaConsumerConfig.put("enable.auto.commit", options.getAutoCommit());
        PCollection streamOfFlows = (PCollection)p.apply((PTransform)new ReadFromKafka(options.getBootstrapServers(), options.getFlowSourceTopic(), kafkaConsumerConfig, timestampPolicyFactory));
        PCollection<KV<CompoundKey, Aggregate>> flowSummaries = (PCollection<KV<CompoundKey, Aggregate>>)streamOfFlows.apply((PTransform)new CalculateFlowStatistics(options));
        flowSummaries = Pipeline.accumulateSummariesIfNecessary(options, flowSummaries);
        Pipeline.attachWriteToElastic(options, flowSummaries);
        Pipeline.attachWriteToKafka(options, flowSummaries);
        Pipeline.attachWriteToCortex(options, flowSummaries);
        return p;
    }

    public static PCollection<KV<CompoundKey, Aggregate>> accumulateSummariesIfNecessary(NephronOptions options, PCollection<KV<CompoundKey, Aggregate>> flowSummaries) {
        if (options.getSummaryAccumulationDelayMs() != 0L) {
            return Pipeline.accumulateFlowSummaries(flowSummaries, Duration.millis((long)options.getSummaryAccumulationDelayMs()));
        }
        return flowSummaries;
    }

    public static PCollection<KV<CompoundKey, Aggregate>> accumulateFlowSummaries(PCollection<KV<CompoundKey, Aggregate>> input, Duration accumulationDelay) {
        PaneAccumulator paneAccumulator = new PaneAccumulator(Aggregate::merge, accumulationDelay, new CompoundKey.CompoundKeyCoder(), new Aggregate.AggregateCoder());
        return (PCollection)input.apply(paneAccumulator);
    }

    private static Map<String, Object> loadKafkaClientProperties(NephronOptions options) {
        HashMap<String, Object> kafkaClientProperties = new HashMap<String, Object>();
        if (!Strings.isNullOrEmpty((String)options.getKafkaClientProperties())) {
            Properties properties = new Properties();
            try {
                properties.load(new FileReader(options.getKafkaClientProperties()));
            }
            catch (IOException e) {
                LOG.error("Error loading properties file", (Throwable)e);
                throw new RuntimeException("Error reading properties file", e);
            }
            for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                kafkaClientProperties.put(entry.getKey().toString(), entry.getValue());
            }
        }
        return kafkaClientProperties;
    }

    public static void attachWriteToElastic(NephronOptions options, PCollection<KV<CompoundKey, Aggregate>> flowSummaries) {
        if (!Strings.isNullOrEmpty((String)options.getElasticUrl())) {
            flowSummaries.apply((PTransform)new WriteToElasticsearch(options));
        }
    }

    public static void attachWriteToKafka(NephronOptions options, PCollection<KV<CompoundKey, Aggregate>> flowSummaries) {
        if (!Strings.isNullOrEmpty((String)options.getFlowDestTopic())) {
            Map<String, Object> kafkaProducerConfig = Pipeline.loadKafkaClientProperties(options);
            flowSummaries.apply((PTransform)new WriteToKafka(options.getBootstrapServers(), options.getFlowDestTopic(), kafkaProducerConfig));
        }
    }

    public static void attachWriteToCortex(NephronOptions options, PCollection<KV<CompoundKey, Aggregate>> flowSummaries) {
        Pipeline.attachWriteToCortex(options, flowSummaries, cw -> {});
    }

    public static void attachWriteToCortex(NephronOptions options, PCollection<KV<CompoundKey, Aggregate>> flowSummaries, Consumer<CortexIo.Write<CompoundKey, Aggregate>> additionalConfig) {
        if (Pipeline.cortexOutputEnabled(options)) {
            CortexIo.Write cortexWrite = options.getCortexAccumulationDelayMs() != 0L ? CortexIo.of((String)options.getCortexWriteUrl(), Pipeline::cortexOutput, (Coder)new CompoundKey.CompoundKeyCoder(), (Coder)new Aggregate.AggregateCoder(), Aggregate::merge, (Duration)Duration.millis((long)options.getCortexAccumulationDelayMs())) : CortexIo.of((String)options.getCortexWriteUrl(), Pipeline::cortexOutput);
            cortexWrite.withMaxBatchSize((long)options.getCortexMaxBatchSize()).withMaxBatchBytes((long)options.getCortexMaxBatchBytes());
            additionalConfig.accept((CortexIo.Write<CompoundKey, Aggregate>)cortexWrite);
            if (!Strings.isNullOrEmpty((String)options.getCortexOrgId())) {
                cortexWrite.withOrgId(options.getCortexOrgId());
            }
            ((PCollection)flowSummaries.apply((PTransform)Filter.by(Pipeline.includeInCortexOutput(options)))).apply((PTransform)cortexWrite);
        }
    }

    private static IpValue validateIpAddress(String ipAddressValue) throws IllegalArgumentException {
        StringValue inputValue = new StringValue(ipAddressValue);
        String errorPrefix = "invalid cortexConsideredHosts argument - value: " + ipAddressValue;
        List<StringValue> actualValues = inputValue.splitBy(",");
        for (StringValue eachValue : actualValues) {
            if (eachValue.isRanged()) {
                IPAddress end;
                List<StringValue> rangedValues = eachValue.splitBy("-");
                if (rangedValues.size() != 2) {
                    throw new IllegalArgumentException(errorPrefix + "; at range: " + eachValue.getValue());
                }
                for (StringValue rangedValue : rangedValues) {
                    if (rangedValue.contains("/")) {
                        throw new IllegalArgumentException(errorPrefix + "; CIDR notation not supported in address ranges: " + rangedValue.getValue());
                    }
                    if (InetAddresses.isInetAddress((String)rangedValue.getValue())) continue;
                    throw new IllegalArgumentException(errorPrefix + "; not an ip address: " + rangedValue.getValue());
                }
                IPAddress begin = new IPAddress(rangedValues.get(0).getValue());
                if (!begin.isGreaterThan(end = new IPAddress(rangedValues.get(1).getValue()))) continue;
                throw new IllegalArgumentException(errorPrefix + "; invalid address range: begin must not be after end - begin: " + begin + "; end: " + end);
            }
            if (eachValue.contains("/")) {
                try {
                    IpValue.parseCIDR(eachValue.getValue());
                    continue;
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(errorPrefix + "; not a valid CIDR value: " + eachValue.getValue());
                }
            }
            if (InetAddresses.isInetAddress((String)eachValue.getValue())) continue;
            throw new IllegalArgumentException(errorPrefix + "; not an ip address: " + eachValue.getValue());
        }
        return IpValue.of(inputValue);
    }

    private static SerializableFunction<KV<CompoundKey, Aggregate>, Boolean> includeInCortexOutput(NephronOptions options) {
        if (StringUtils.isNoneBlank((CharSequence[])new CharSequence[]{options.getCortexConsideredHosts()})) {
            IpValue ipValue = Pipeline.validateIpAddress(options.getCortexConsideredHosts());
            return (SerializableFunction & Serializable)fsd -> {
                switch (((CompoundKey)fsd.getKey()).type) {
                    case EXPORTER_INTERFACE_HOST: 
                    case EXPORTER_INTERFACE_TOS_HOST: {
                        return ipValue.isInRange(((CompoundKey)fsd.getKey()).data.address);
                    }
                    case EXPORTER_INTERFACE_CONVERSATION: 
                    case EXPORTER_INTERFACE_TOS_CONVERSATION: {
                        return false;
                    }
                }
                return true;
            };
        }
        return (SerializableFunction & Serializable)fsd -> {
            switch (((CompoundKey)fsd.getKey()).type) {
                case EXPORTER_INTERFACE_HOST: 
                case EXPORTER_INTERFACE_TOS_HOST: 
                case EXPORTER_INTERFACE_CONVERSATION: 
                case EXPORTER_INTERFACE_TOS_CONVERSATION: {
                    return false;
                }
            }
            return true;
        };
    }

    private static boolean cortexOutputEnabled(NephronOptions options) {
        return !Strings.isNullOrEmpty((String)options.getCortexWriteUrl());
    }

    private static void cortexOutput(CompoundKey key, Aggregate agg, org.joda.time.Instant eventTimestamp, int index, TimeSeriesBuilder builder) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("cortex output - eventTimestamp: {}; keyType: {}; key: {}; index: {}; in: {}; out: {}; total: {}", new Object[]{eventTimestamp, key.type, key, index, agg.getBytesIn(), agg.getBytesOut(), agg.getBytesIn() + agg.getBytesOut()});
        }
        Pipeline.doCortexOutput(key, eventTimestamp, index, "in", agg.getBytesIn(), builder);
        builder.nextSeries();
        Pipeline.doCortexOutput(key, eventTimestamp, index, "out", agg.getBytesOut(), builder);
    }

    private static void doCortexOutput(CompoundKey key, org.joda.time.Instant eventTimestamp, int paneId, String direction, long bytes, TimeSeriesBuilder builder) {
        builder.addLabel("pane", paneId);
        builder.addLabel("direction", direction);
        builder.addSample(eventTimestamp.getMillis(), (double)bytes);
        key.populate(builder);
    }

    public static void registerCoders(org.apache.beam.sdk.Pipeline p) {
        CoderRegistry coderRegistry = p.getCoderRegistry();
        coderRegistry.registerCoderForClass(FlowDocument.class, (Coder)new FlowDocumentProtobufCoder());
        coderRegistry.registerCoderForClass(CompoundKey.class, (Coder)new CompoundKey.CompoundKeyCoder());
        coderRegistry.registerCoderForClass(Aggregate.class, (Coder)new Aggregate.AggregateCoder());
    }

    public static TimestampPolicyFactory<byte[], FlowDocument> getKafkaInputTimestampPolicyFactory(Duration maxDelay) {
        return (TimestampPolicyFactory & Serializable)(tp, previousWatermark) -> new CustomTimestampPolicyWithLimitedDelay(ReadFromKafka::getTimestamp, maxDelay, previousWatermark);
    }

    public static ParDo.SingleOutput<FlowDocument, FlowDocument> attachTimestamps(final Duration fixedWindowSize, final Duration maxFlowDuration) {
        return ParDo.of((DoFn)new DoFn<FlowDocument, FlowDocument>(){
            final long windowSizeMs;
            final long maxFlowDurationMs;
            {
                this.windowSizeMs = fixedWindowSize.getMillis();
                this.maxFlowDurationMs = maxFlowDuration.getMillis();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                FlowDocument flow = (FlowDocument)c.element();
                long deltaSwitched = flow.getDeltaSwitched().getValue();
                long lastSwitched = flow.getLastSwitched().getValue();
                int nodeId = flow.getExporterNode().getNodeId();
                long shift = UnalignedFixedWindows.perNodeShift(nodeId, this.windowSizeMs);
                if (deltaSwitched < shift) {
                    RATE_LIMITED_LOG.warn("Skipping output for flow whose start is too small w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", new Object[]{org.joda.time.Instant.ofEpochMilli((long)deltaSwitched), org.joda.time.Instant.ofEpochMilli((long)lastSwitched), org.joda.time.Instant.ofEpochMilli((long)deltaSwitched), c.timestamp(), flow});
                    return;
                }
                long firstWindow = UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, deltaSwitched);
                long lastWindow = UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, lastSwitched);
                long nbWindows = lastWindow - firstWindow + 1L;
                long timestamp = deltaSwitched;
                for (long i = 0L; i < nbWindows; ++i) {
                    if (timestamp <= c.timestamp().getMillis() - this.maxFlowDurationMs) {
                        RATE_LIMITED_LOG.warn("Skipping output for flow that reaches back too far w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", new Object[]{org.joda.time.Instant.ofEpochMilli((long)deltaSwitched), org.joda.time.Instant.ofEpochMilli((long)lastSwitched), org.joda.time.Instant.ofEpochMilli((long)timestamp), c.timestamp(), flow});
                    } else {
                        c.outputWithTimestamp((Object)flow, org.joda.time.Instant.ofEpochMilli((long)timestamp));
                    }
                    if (timestamp + this.windowSizeMs < lastSwitched) {
                        timestamp += this.windowSizeMs;
                        continue;
                    }
                    timestamp = lastSwitched;
                }
            }

            public Duration getAllowedTimestampSkew() {
                return maxFlowDuration;
            }
        });
    }

    public static FlowSummary toFlowSummary(KV<CompoundKey, Aggregate> fsd, IntervalWindow window) {
        FlowSummary flowSummary = new FlowSummary();
        ((CompoundKey)fsd.getKey()).populate(flowSummary);
        flowSummary.setAggregationType(((CompoundKey)fsd.getKey()).type.isTotalNotTopK() ? AggregationType.TOTAL : AggregationType.TOPK);
        flowSummary.setRangeStartMs(window.start().getMillis());
        flowSummary.setRangeEndMs(window.end().getMillis());
        flowSummary.setTimestamp(flowSummary.getRangeEndMs());
        flowSummary.setBytesEgress(((Aggregate)fsd.getValue()).getBytesOut());
        flowSummary.setBytesIngress(((Aggregate)fsd.getValue()).getBytesIn());
        flowSummary.setBytesTotal(flowSummary.getBytesIngress() + flowSummary.getBytesEgress());
        flowSummary.setCongestionEncountered(((Aggregate)fsd.getValue()).isCongestionEncountered());
        flowSummary.setNonEcnCapableTransport(((Aggregate)fsd.getValue()).isNonEcnCapableTransport());
        if (((CompoundKey)fsd.getKey()).getType() == CompoundKeyType.EXPORTER_INTERFACE_HOST || ((CompoundKey)fsd.getKey()).getType() == CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST) {
            flowSummary.setHostName(Strings.emptyToNull((String)((Aggregate)fsd.getValue()).getHostname()));
        }
        return flowSummary;
    }

    public static Window<FlowDocument> toWindow(Duration fixedWindowSize, Duration earlyProcessingDelay, Duration lateProcessingDelay, Duration allowedLateness) {
        AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow().withLateFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateProcessingDelay));
        if (earlyProcessingDelay != null && !earlyProcessingDelay.isEqual((ReadableDuration)Duration.ZERO)) {
            trigger = trigger.withEarlyFirings((Trigger.OnceTrigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyProcessingDelay));
        }
        return Window.into((WindowFn)UnalignedFixedWindows.of(fixedWindowSize)).withTimestampCombiner(TimestampCombiner.END_OF_WINDOW).triggering((Trigger)trigger).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY).withAllowedLateness(allowedLateness).discardingFiredPanes();
    }

    public static long bytesInWindow(long deltaSwitched, long lastSwitchedInclusive, double multipliedNumBytes, long windowStart, long windowEndInclusive) {
        long flowDurationMs = lastSwitchedInclusive - deltaSwitched + 1L;
        long overlapStart = Math.max(deltaSwitched, windowStart);
        long overlapEnd = Math.min(lastSwitchedInclusive, windowEndInclusive);
        long previousEnd = overlapStart - 1L;
        long bytesAtPreviousEnd = (long)((double)(previousEnd - deltaSwitched + 1L) * multipliedNumBytes / (double)flowDurationMs);
        long bytesAtEnd = (long)((double)(overlapEnd - deltaSwitched + 1L) * multipliedNumBytes / (double)flowDurationMs);
        return bytesAtEnd - bytesAtPreviousEnd;
    }

    public static Aggregate aggregatize(IntervalWindow window, FlowDocument flow, String hostname, String hostname2) {
        double samplingInterval;
        double multiplier = 1.0;
        if (flow.hasSamplingInterval() && (samplingInterval = flow.getSamplingInterval().getValue()) > 0.0) {
            multiplier = samplingInterval;
        }
        long bytes = Pipeline.bytesInWindow(flow.getDeltaSwitched().getValue(), flow.getLastSwitched().getValue(), (double)flow.getNumBytes().getValue() * multiplier, window.start().getMillis(), window.maxTimestamp().getMillis());
        return Direction.INGRESS.equals((Object)flow.getDirection()) ? new Aggregate(bytes, 0L, hostname, hostname2, flow.hasEcn() ? Integer.valueOf(flow.getEcn().getValue()) : null) : new Aggregate(0L, bytes, hostname, hostname2, flow.hasEcn() ? Integer.valueOf(flow.getEcn().getValue()) : null);
    }

    public static TotalAndSummary aggregateParentTotal(String transformPrefix, PCollection<KV<CompoundKey, Aggregate>> child) {
        PCollection parentTotal = (PCollection)((PCollection)child.apply(transformPrefix + "group_by_outer_key", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).getOuterKey(), (Object)((Aggregate)el.getValue())));
            }
        }))).apply(transformPrefix + "sum_bytes_by_key", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new SumBytes()));
        return new TotalAndSummary((PCollection<KV<CompoundKey, Aggregate>>)parentTotal, (PCollection<KV<CompoundKey, Aggregate>>)parentTotal);
    }

    public static SumsAndTopKs aggregateSumsAndTopKs(String transformPrefix, PCollection<KV<CompoundKey, Aggregate>> groupedByKeyWithTos, final CompoundKeyType typeWithoutTos, int k, SerializableFunction<CompoundKey, Boolean> includeKeyInTopK) {
        SumAndTopK withTos = Pipeline.aggregateSumAndTopK(transformPrefix + "with_tos_", groupedByKeyWithTos, k, includeKeyInTopK);
        PCollection groupedByKeyWithoutTos = (PCollection)withTos.sum.apply(transformPrefix + "group_without_tos_", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).cast(typeWithoutTos), (Object)((Aggregate)el.getValue())));
            }
        }));
        SumAndTopK withoutTos = Pipeline.aggregateSumAndTopK(transformPrefix + "without_tos_", (PCollection<KV<CompoundKey, Aggregate>>)groupedByKeyWithoutTos, k, includeKeyInTopK);
        return new SumsAndTopKs(withTos, withoutTos);
    }

    public static SumAndTopK aggregateSumAndTopK(String transformPrefix, PCollection<KV<CompoundKey, Aggregate>> groupedByKey, int k, final SerializableFunction<CompoundKey, Boolean> includeKeyInTopK) {
        PCollection sum = (PCollection)groupedByKey.apply(transformPrefix + "sum_bytes_by_key", (PTransform)Combine.perKey((CombineFnBase.GlobalCombineFn)new SumBytes()));
        PCollection topK = (PCollection)((PCollection)((PCollection)((PCollection)sum.apply(transformPrefix + "group_by_outer_key", (PTransform)ParDo.of((DoFn)new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                KV el = (KV)c.element();
                if (((Boolean)includeKeyInTopK.apply((Object)((CompoundKey)el.getKey()))).booleanValue()) {
                    c.output((Object)KV.of((Object)((CompoundKey)el.getKey()).getOuterKey(), (Object)el));
                }
            }
        }))).apply(transformPrefix + "top_k_per_key", Top.perKey((int)k, (Comparator)new FlowBytesValueComparator()))).apply(transformPrefix + "flatten", (PTransform)Values.create())).apply(transformPrefix + "top_k_summary", (PTransform)ParDo.of((DoFn)new DoFn<List<KV<CompoundKey, Aggregate>>, KV<CompoundKey, Aggregate>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                for (KV el : (List)c.element()) {
                    c.output((Object)el);
                }
            }
        }));
        return new SumAndTopK((PCollection<KV<CompoundKey, Aggregate>>)sum, (PCollection<KV<CompoundKey, Aggregate>>)topK);
    }

    public static class SumsAndTopKs {
        public final SumAndTopK withTos;
        public final SumAndTopK withoutTos;

        public SumsAndTopKs(SumAndTopK withTos, SumAndTopK withoutTos) {
            this.withTos = withTos;
            this.withoutTos = withoutTos;
        }
    }

    public static class SumAndTopK {
        public final PCollection<KV<CompoundKey, Aggregate>> sum;
        public final PCollection<KV<CompoundKey, Aggregate>> topK;

        public SumAndTopK(PCollection<KV<CompoundKey, Aggregate>> sum, PCollection<KV<CompoundKey, Aggregate>> topK) {
            this.sum = sum;
            this.topK = topK;
        }
    }

    public static class TotalAndSummary {
        public final PCollection<KV<CompoundKey, Aggregate>> total;
        public final PCollection<KV<CompoundKey, Aggregate>> summary;

        public TotalAndSummary(PCollection<KV<CompoundKey, Aggregate>> total, PCollection<KV<CompoundKey, Aggregate>> summary) {
            this.total = total;
            this.summary = summary;
        }
    }

    public static class ProjConvWithTos
    extends DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>> {
        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<CompoundKey, Aggregate> kv, DoFn.MultiOutputReceiver out) {
            CompoundKey convKey = (CompoundKey)kv.getKey();
            Aggregate a = (Aggregate)kv.getValue();
            CompoundKey appKey = convKey.cast(CompoundKeyType.EXPORTER_INTERFACE_TOS_APPLICATION);
            out.get(BY_APP).output((Object)KV.of((Object)appKey, (Object)a.withHostname(null)));
            CompoundKey hostKey1 = convKey.cast(CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST);
            out.get(BY_HOST).output((Object)KV.of((Object)hostKey1, (Object)a.withHostname(a.getHostname())));
            CompoundKey hostKey2 = new CompoundKey(CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST, new CompoundKeyData.Builder(convKey.data).withAddress(convKey.data.largerAddress).build());
            out.get(BY_HOST).output((Object)KV.of((Object)hostKey2, (Object)a.withHostname(a.getHostname2())));
        }
    }

    public static class KeyByConvWithTos
    extends DoFn<FlowDocument, KV<CompoundKey, Aggregate>> {
        private final Counter flowsWithMissingFields = Metrics.counter(Pipeline.class, (String)"flowsWithMissingFields");
        private final Counter flowsInWindow = Metrics.counter((String)"flows", (String)"in_window");

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, IntervalWindow window) {
            FlowDocument flow = (FlowDocument)c.element();
            try {
                String hostname2;
                String hostname;
                CompoundKey key = CompoundKeyType.EXPORTER_INTERFACE_TOS_CONVERSATION.create(flow);
                String src = Strings.nullToEmpty((String)flow.getSrcAddress());
                String dst = Strings.nullToEmpty((String)flow.getDstAddress());
                if (src.compareTo(dst) < 0) {
                    hostname = flow.getSrcHostname();
                    hostname2 = flow.getDstHostname();
                } else {
                    hostname2 = flow.getSrcHostname();
                    hostname = flow.getDstHostname();
                }
                Aggregate aggregate = Pipeline.aggregatize(window, flow, hostname, hostname2);
                this.flowsInWindow.inc();
                c.output((Object)KV.of((Object)key, (Object)aggregate));
            }
            catch (MissingFieldsException mfe) {
                this.flowsWithMissingFields.inc();
            }
        }
    }

    static class FlowBytesValueComparator
    implements Comparator<KV<CompoundKey, Aggregate>>,
    Serializable {
        FlowBytesValueComparator() {
        }

        @Override
        public int compare(KV<CompoundKey, Aggregate> a, KV<CompoundKey, Aggregate> b) {
            int res = Long.compare(((Aggregate)a.getValue()).getBytes(), ((Aggregate)b.getValue()).getBytes());
            if (res != 0) {
                return res;
            }
            return ((CompoundKey)b.getKey()).groupedByKey().compareTo(((CompoundKey)a.getKey()).groupedByKey());
        }
    }

    static class SumBytes
    extends Combine.BinaryCombineFn<Aggregate> {
        SumBytes() {
        }

        public Aggregate apply(Aggregate left, Aggregate right) {
            return Aggregate.merge(left, right);
        }
    }

    public static class WriteToKafka
    extends PTransform<PCollection<KV<CompoundKey, Aggregate>>, PDone> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaProducerConfig;

        public WriteToKafka(String bootstrapServers, String topic, Map<String, Object> kafkaProducerConfig) {
            this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
            this.topic = Objects.requireNonNull(topic);
            this.kafkaProducerConfig = kafkaProducerConfig;
        }

        public PDone expand(PCollection<KV<CompoundKey, Aggregate>> input) {
            return (PDone)((PCollection)input.apply(FLOW_SUMMARY_DATA_TO_JSON)).apply(KafkaIO.write().withProducerConfigUpdates(this.kafkaProducerConfig).withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withValueSerializer(StringSerializer.class).values());
        }
    }

    public static class ReadFromKafka
    extends PTransform<PBegin, PCollection<FlowDocument>> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaConsumerConfig;
        private final Counter flowsFromKafka = Metrics.counter((String)"flows", (String)"from_kafka");
        private final Gauge flowsFromKafkaDrift = Metrics.gauge((String)"flows", (String)"from_kafka_drift");
        private final TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory;

        public ReadFromKafka(String bootstrapServers, String topic, Map<String, Object> kafkaConsumerConfig, TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory) {
            this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
            this.topic = Objects.requireNonNull(topic);
            this.kafkaConsumerConfig = Objects.requireNonNull(kafkaConsumerConfig);
            this.timestampPolicyFactory = timestampPolicyFactory;
        }

        public PCollection<FlowDocument> expand(PBegin input) {
            return (PCollection)((PCollection)input.apply(KafkaIO.read().withTopic(this.topic).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(KafkaInputFlowDeserializer.class).withConsumerConfigUpdates(this.kafkaConsumerConfig).withBootstrapServers(this.bootstrapServers).withTimestampPolicyFactory(this.timestampPolicyFactory).withoutMetadata())).apply("init", (PTransform)ParDo.of((DoFn)new DoFn<KV<byte[], FlowDocument>, FlowDocument>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    FlowDocument flow = (FlowDocument)((KV)c.element()).getValue();
                    if (!flow.hasDeltaSwitched()) {
                        flow = FlowDocument.newBuilder((FlowDocument)flow).setDeltaSwitched(flow.getFirstSwitched()).build();
                    }
                    c.output((Object)flow);
                    flowsFromKafka.inc();
                    flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getLastSwitched().getValue());
                }
            }));
        }

        public static long getTimestampMs(FlowDocument doc) {
            return doc.getLastSwitched().getValue();
        }

        public static org.joda.time.Instant getTimestamp(KafkaRecord<byte[], FlowDocument> record) {
            return ReadFromKafka.getTimestamp((FlowDocument)record.getKV().getValue());
        }

        public static org.joda.time.Instant getTimestamp(FlowDocument doc) {
            return org.joda.time.Instant.ofEpochMilli((long)ReadFromKafka.getTimestampMs(doc));
        }
    }

    public static class WriteToElasticsearch
    extends PTransform<PCollection<KV<CompoundKey, Aggregate>>, PDone> {
        private final String elasticIndex;
        private final IndexStrategy indexStrategy;
        private final ElasticsearchIO.ConnectionConfiguration esConfig;
        private final Counter flowsToEs = Metrics.counter((String)"flows", (String)"to_es");
        private final Gauge flowsToEsDrift = Metrics.gauge((String)"flows", (String)"to_es_drift");
        private int elasticRetryCount;
        private long elasticRetryDuration;

        public WriteToElasticsearch(String elasticUrl, String elasticUser, String elasticPassword, String elasticIndex, IndexStrategy indexStrategy, int elasticConnectTimeout, int elasticSocketTimeout, int elasticRetryCount, long elasticRetryDuration) {
            Objects.requireNonNull(elasticUrl);
            this.elasticIndex = Objects.requireNonNull(elasticIndex);
            this.indexStrategy = Objects.requireNonNull(indexStrategy);
            ElasticsearchIO.ConnectionConfiguration thisEsConfig = ElasticsearchIO.ConnectionConfiguration.create((String[])new String[]{elasticUrl}, (String)elasticIndex, (String)"_doc");
            if (!Strings.isNullOrEmpty((String)elasticUser) && !Strings.isNullOrEmpty((String)elasticPassword)) {
                thisEsConfig = thisEsConfig.withUsername(elasticUser).withPassword(elasticPassword);
            }
            this.esConfig = thisEsConfig = thisEsConfig.withConnectTimeout(Integer.valueOf(elasticConnectTimeout)).withSocketTimeout(Integer.valueOf(elasticSocketTimeout));
            this.elasticRetryCount = elasticRetryCount;
            this.elasticRetryDuration = elasticRetryDuration;
        }

        public WriteToElasticsearch(NephronOptions options) {
            this(options.getElasticUrl(), options.getElasticUser(), options.getElasticPassword(), options.getElasticFlowIndex(), options.getElasticIndexStrategy(), options.getElasticConnectTimeout(), options.getElasticSocketTimeout(), options.getElasticRetryCount(), options.getElasticRetryDuration());
        }

        public PDone expand(PCollection<KV<CompoundKey, Aggregate>> input) {
            return (PDone)((PCollection)input.apply("SerializeToJson", FLOW_SUMMARY_DATA_TO_JSON)).apply("WriteToElasticsearch", (PTransform)ElasticsearchIO.write().withConnectionConfiguration(this.esConfig).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create((int)this.elasticRetryCount, (Duration)Duration.millis((long)this.elasticRetryDuration))).withIndexFn(new ElasticsearchIO.Write.FieldValueExtractFn(){

                public String apply(JsonNode input) {
                    Instant flowTimestamp = Instant.ofEpochMilli(input.get("@timestamp").asLong());
                    String indexName = indexStrategy.getIndex(elasticIndex, flowTimestamp);
                    flowsToEs.inc();
                    flowsToEsDrift.set(System.currentTimeMillis() - flowTimestamp.toEpochMilli());
                    return indexName;
                }
            }));
        }
    }

    public static class WindowedFlows
    extends PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> {
        private final Duration fixedWindowSize;
        private final Duration maxFlowDuration;
        private final Duration earlyProcessingDelay;
        private final Duration lateProcessingDelay;
        private final Duration allowedLateness;

        public WindowedFlows(Duration fixedWindowSize, Duration maxFlowDuration, Duration earlyProcessingDelay, Duration lateProcessingDelay, Duration allowedLateness) {
            this.fixedWindowSize = Objects.requireNonNull(fixedWindowSize);
            this.maxFlowDuration = Objects.requireNonNull(maxFlowDuration);
            this.earlyProcessingDelay = Objects.requireNonNull(earlyProcessingDelay);
            this.lateProcessingDelay = Objects.requireNonNull(lateProcessingDelay);
            this.allowedLateness = Objects.requireNonNull(allowedLateness);
        }

        public WindowedFlows(NephronOptions options) {
            this(Duration.millis((long)options.getFixedWindowSizeMs()), Duration.millis((long)options.getMaxFlowDurationMs()), Duration.millis((long)options.getEarlyProcessingDelayMs()), Duration.millis((long)options.getLateProcessingDelayMs()), Duration.millis((long)options.getAllowedLatenessMs()));
        }

        public PCollection<FlowDocument> expand(PCollection<FlowDocument> input) {
            return (PCollection)((PCollection)input.apply("attach_timestamp", Pipeline.attachTimestamps(this.fixedWindowSize, this.maxFlowDuration))).apply("to_windows", Pipeline.toWindow(this.fixedWindowSize, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
        }
    }

    public static class CalculateFlowStatistics
    extends PTransform<PCollection<FlowDocument>, PCollection<KV<CompoundKey, Aggregate>>> {
        private final int topK;
        private final PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> windowing;

        public CalculateFlowStatistics(int topK, PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> windowing) {
            this.topK = topK;
            this.windowing = windowing;
        }

        public CalculateFlowStatistics(NephronOptions options) {
            this(options.getTopK(), new WindowedFlows(options));
        }

        public PCollection<KV<CompoundKey, Aggregate>> expand(PCollection<FlowDocument> input) {
            PCollection windowedStreamOfFlows = (PCollection)input.apply("WindowedFlows", this.windowing);
            PCollection keyedByConvWithTos = (PCollection)windowedStreamOfFlows.apply("key_by_conv", (PTransform)ParDo.of((DoFn)new KeyByConvWithTos()));
            SumsAndTopKs conv = Pipeline.aggregateSumsAndTopKs("conv_", (PCollection<KV<CompoundKey, Aggregate>>)keyedByConvWithTos, CompoundKeyType.EXPORTER_INTERFACE_CONVERSATION, this.topK, (SerializableFunction<CompoundKey, Boolean>)(SerializableFunction & Serializable)k -> k.isCompleteConversationKey());
            PCollectionTuple projected = (PCollectionTuple)conv.withTos.sum.apply("proj_conv", (PTransform)ParDo.of((DoFn)new ProjConvWithTos()).withOutputTags(BY_APP, TupleTagList.of(BY_HOST)));
            PCollection keyedByAppWithTos = projected.get(BY_APP);
            PCollection keyedByHostWithTos = projected.get(BY_HOST);
            SumsAndTopKs app = Pipeline.aggregateSumsAndTopKs("app_", (PCollection<KV<CompoundKey, Aggregate>>)keyedByAppWithTos, CompoundKeyType.EXPORTER_INTERFACE_APPLICATION, this.topK, (SerializableFunction<CompoundKey, Boolean>)(SerializableFunction & Serializable)k -> true);
            SumsAndTopKs host = Pipeline.aggregateSumsAndTopKs("host_", (PCollection<KV<CompoundKey, Aggregate>>)keyedByHostWithTos, CompoundKeyType.EXPORTER_INTERFACE_HOST, this.topK, (SerializableFunction<CompoundKey, Boolean>)(SerializableFunction & Serializable)k -> true);
            TotalAndSummary tos = Pipeline.aggregateParentTotal("tos_", app.withTos.sum);
            TotalAndSummary itf = Pipeline.aggregateParentTotal("itf_", tos.total);
            PCollectionList flowSummaries = PCollectionList.of(itf.summary).and(tos.summary).and(app.withTos.topK).and(app.withoutTos.topK).and(host.withTos.topK).and(host.withoutTos.topK).and(conv.withTos.topK).and(conv.withoutTos.topK);
            return (PCollection)flowSummaries.apply((PTransform)Flatten.pCollections());
        }
    }
}

