/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.cortex;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.opennms.nephron.cortex.EventTimestampIndexer;
import org.opennms.nephron.cortex.Heap;
import org.opennms.nephron.cortex.TimeSeriesBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
import prometheus.PrometheusRemote;
import prometheus.PrometheusTypes;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class CortexIo {
    private static final String METRIC_NAME_LABEL = "__name__";
    private static final Logger LOG = LoggerFactory.getLogger(CortexIo.class);
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit((Logger)LOG).maxRate(5).every(java.time.Duration.ofSeconds(10L)).build();
    private static final Logger LOG_WRITE = LoggerFactory.getLogger((String)(CortexIo.class.getName() + ".write"));
    private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse((String)"application/x-protobuf");
    private static final String X_SCOPE_ORG_ID_HEADER = "X-Scope-OrgID";
    public static final String CORTEX_METRIC_NAMESPACE = "cortex";
    public static final String CORTEX_WRITE_METRIC_NAME = "write";
    public static final String CORTEX_SAMPLE_METRIC_NAME = "sample";
    public static final String CORTEX_WRITE_FAILURE_METRIC_NAME = "write_failure";
    public static final String CORTEX_RESPONSE_FAILURE_METRIC_NAME = "response_failure";
    private static Counter WRITE_FAILURE = Metrics.counter((String)"cortex", (String)"write_failure");
    private static Counter RESPONSE_FAILURE = Metrics.counter((String)"cortex", (String)"response_failure");
    private static Counter WRITE = Metrics.counter((String)"cortex", (String)"write");
    private static Counter SAMPLE = Metrics.counter((String)"cortex", (String)"sample");
    private static String[] KNOWN_CORTEX_ERRORS = new String[]{"out of order sample", "duplicate sample for timestamp", "per-metric series limit", "per-metric metadata limit", "per-user series limit", "per-user metric metadata limit"};
    private static final JsonFormat.Printer PROTOBUF_JSON_PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();

    public static <K, V> Write<K, V> of(String writeUrl, BuildTimeSeries<K, V> build) {
        return new WriteWithoutAccumulation<K, V>(writeUrl, build);
    }

    public static <K, V> Write<K, V> of(String writeUrl, BuildTimeSeries<K, V> build, Coder<K> keyCoder, Coder<V> valueCoder, SerializableBiFunction<V, V, V> combiner, Duration accumulationDelay) {
        return new WriteWithAccumulation<K, V>(writeUrl, build, keyCoder, valueCoder, combiner, accumulationDelay);
    }

    private static String determineErrorKind(String body) {
        for (String msg : KNOWN_CORTEX_ERRORS) {
            if (!body.contains(msg)) continue;
            return msg;
        }
        RATE_LIMITED_LOG.warn("could not extract error kind - body: {}", (Object)body);
        return "body unknown";
    }

    public static void sanitize(String name, String value, BiConsumer<String, String> consumer) {
        String v;
        String n;
        if (METRIC_NAME_LABEL.equals(name)) {
            n = name;
            v = CortexIo.sanitizeMetricName(value);
        } else {
            n = CortexIo.sanitizeLabelName(name);
            v = value;
        }
        consumer.accept(n, v);
    }

    public static String sanitizeMetricName(String metricName) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < metricName.length(); ++i) {
            char b = metricName.charAt(i);
            if (!(b >= 'a' && b <= 'z' || b >= 'A' && b <= 'Z' || b == '_' || b == ':' || b >= '0' && b <= '9' && i > 0)) {
                sb.append("_");
                continue;
            }
            sb.append(b);
        }
        return sb.toString();
    }

    public static String sanitizeLabelName(String labelName) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < labelName.length(); ++i) {
            char b = labelName.charAt(i);
            if (!(b >= 'a' && b <= 'z' || b >= 'A' && b <= 'Z' || b == '_' || b >= '0' && b <= '9' && i > 0)) {
                sb.append("_");
                continue;
            }
            sb.append(b);
        }
        return sb.toString();
    }

    public static class WriteFnWithAccumulation<K, V>
    extends WriteFn<K, V, WriteWithAccumulation<K, V>> {
        private static final String OUTPUT_TIMER_NAME = "output";
        private static final String GC_TIMER_NAME = "gc";
        private static final String HEAP_STATE_NAME = "heap";
        @DoFn.StateId(value="heap")
        private final StateSpec<ValueState<Heap.HeapImpl<V>>> heapStateSpec;
        @DoFn.TimerId(value="output")
        private final TimerSpec outputTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
        @DoFn.TimerId(value="gc")
        private final TimerSpec gcTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        public WriteFnWithAccumulation(WriteWithAccumulation<K, V> spec, Duration allowedLateness) {
            super(spec, allowedLateness);
            this.heapStateSpec = StateSpecs.value(new Heap.HeapImpl.HeapImplCoder(spec.valueCoder));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx, @DoFn.AlwaysFetched @DoFn.StateId(value="heap") ValueState<Heap.HeapImpl<V>> heapState, @DoFn.TimerId(value="output") Timer outputTimer, @DoFn.TimerId(value="gc") Timer gcTimer) throws Exception {
            Heap.HeapImpl<Object> heap = (Heap.HeapImpl<Object>)heapState.read();
            if (heap == null) {
                LOG.debug("create heap - key: {}", ((KV)ctx.element()).getKey());
                heap = new Heap.HeapImpl<Object>(new EventTimestampIndexer(), new HashMap());
            }
            if (heap.isEmpty()) {
                outputTimer.offset(((WriteWithAccumulation)this.spec).accumulationDelay).setRelative();
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("add value to heap - ctx.timestamp: {}; key: {}; value: {}", new Object[]{ctx.timestamp(), ((KV)ctx.element()).getKey(), ((KV)ctx.element()).getValue()});
            }
            heap.add(((KV)ctx.element()).getValue(), ctx.timestamp(), Instant.now(), (BiFunction<Object, Object, Object>)((WriteWithAccumulation)this.spec).combiner);
            heapState.write(heap);
            Instant newestEventTimestamp = heap.newestEventTimestamp().get();
            this.setGcTimer(gcTimer, newestEventTimestamp);
        }

        @DoFn.OnTimer(value="output")
        public void onOutput(DoFn.OnTimerContext ctx, @DoFn.Key K key, @DoFn.AlwaysFetched @DoFn.StateId(value="heap") ValueState<Heap.HeapImpl<V>> heapState, @DoFn.TimerId(value="output") Timer timer) throws IOException {
            Heap.HeapImpl heap = (Heap.HeapImpl)heapState.read();
            if (heap != null) {
                Optional<Instant> oldestTimestamp;
                Instant now = Instant.now();
                boolean changed = this.flushHeap(heap, key, now, ((WriteWithAccumulation)this.spec).accumulationDelay, "after accumulation");
                if (changed) {
                    heapState.write((Object)heap);
                }
                if ((oldestTimestamp = heap.oldestProcessingTimestamp()).isPresent()) {
                    Duration d = new Duration((ReadableInstant)oldestTimestamp.get().plus((ReadableDuration)((WriteWithAccumulation)this.spec).accumulationDelay), (ReadableInstant)now);
                    LOG.trace("schedule next heap check - duration: {}", (Object)d);
                    timer.offset(d).setRelative();
                }
            }
        }

        @DoFn.OnTimer(value="gc")
        public void onGc(@DoFn.Key K key, @DoFn.AlwaysFetched @DoFn.StateId(value="heap") ValueState<Heap.HeapImpl<V>> heapState) throws IOException {
            Heap.HeapImpl heap = (Heap.HeapImpl)heapState.read();
            if (heap != null) {
                this.flushHeap(heap, key, Instant.now(), Duration.ZERO, "on garbage collection");
                heapState.clear();
            }
        }

        private boolean flushHeap(Heap<V> heap, K key, Instant now, Duration outputDelay, String when) throws IOException {
            List<Heap.Flushed<V>> fs = heap.flush(now, outputDelay);
            if (LOG.isTraceEnabled()) {
                LOG.trace("flush values from heap {} - key: {}; size: {}", new Object[]{when, key, fs.size()});
            }
            if (!fs.isEmpty()) {
                for (Heap.Flushed<V> f : fs) {
                    this.outputTimeSeries(builder -> ((WriteWithAccumulation)this.spec).build.accept(key, f.value, f.eventTimestamp, f.index, (TimeSeriesBuilder)builder));
                }
            }
            return !fs.isEmpty();
        }
    }

    public static class WriteFnWithoutAccumulation<K, V>
    extends WriteFn<K, V, WriteWithoutAccumulation<K, V>> {
        private static final String GC_TIMER_NAME = "gc";
        private static final String FLUSHED_STATE_NAME = "flushed";
        @DoFn.StateId(value="flushed")
        private final StateSpec<ValueState<EventTimestampIndexer>> flushedStateSpec;
        @DoFn.TimerId(value="gc")
        private final TimerSpec gcTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

        public WriteFnWithoutAccumulation(WriteWithoutAccumulation<K, V> spec, Duration allowedLateness) {
            super(spec, allowedLateness);
            this.flushedStateSpec = StateSpecs.value((Coder)EventTimestampIndexer.EventTimestampIndexerCoder.of());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext ctx, @DoFn.AlwaysFetched @DoFn.StateId(value="flushed") ValueState<EventTimestampIndexer> flushedState, @DoFn.TimerId(value="gc") Timer gcTimer) throws Exception {
            EventTimestampIndexer flushedEventTimestamps = (EventTimestampIndexer)flushedState.read();
            if (flushedEventTimestamps == null) {
                flushedEventTimestamps = new EventTimestampIndexer();
            }
            int index = flushedEventTimestamps.findIndex(ctx.timestamp());
            flushedState.write((Object)flushedEventTimestamps);
            if (LOG.isTraceEnabled()) {
                LOG.trace("add value to heap - ctx.timestamp: {}; key: {}; value: {}", new Object[]{ctx.timestamp(), ((KV)ctx.element()).getKey(), ((KV)ctx.element()).getValue()});
            }
            this.outputTimeSeries(builder -> ((WriteWithoutAccumulation)this.spec).build.accept(((KV)ctx.element()).getKey(), ((KV)ctx.element()).getValue(), ctx.timestamp(), index, (TimeSeriesBuilder)builder));
            Instant newestEventTimestamp = flushedEventTimestamps.newestEventTimestamp();
            this.setGcTimer(gcTimer, newestEventTimestamp);
        }

        @DoFn.OnTimer(value="gc")
        public void onGc(@DoFn.StateId(value="flushed") ValueState<EventTimestampIndexer> flushedState) throws IOException {
            flushedState.clear();
        }
    }

    private static class TimeSeriesBuilderImpl
    implements TimeSeriesBuilder {
        private final ArrayList<PrometheusTypes.TimeSeries.Builder> builders = new ArrayList();
        private Set<Map.Entry<String, String>> fixedLabels;
        private PrometheusTypes.TimeSeries.Builder builder;

        private TimeSeriesBuilderImpl(Set<Map.Entry<String, String>> fixedLabels) {
            this.fixedLabels = fixedLabels;
        }

        private PrometheusTypes.TimeSeries.Builder builder() {
            if (this.builder == null) {
                this.builder = PrometheusTypes.TimeSeries.newBuilder();
                for (Map.Entry<String, String> entry : this.fixedLabels) {
                    this.builder.addLabels(PrometheusTypes.Label.newBuilder().setName(entry.getKey()).setValue(entry.getValue()));
                }
                this.builders.add(this.builder);
            }
            return this.builder;
        }

        @Override
        public TimeSeriesBuilder setMetricName(String name) {
            return this.addLabel(CortexIo.METRIC_NAME_LABEL, name);
        }

        @Override
        public TimeSeriesBuilder addLabel(String name, String value) {
            CortexIo.sanitize(name, value, (n, v) -> this.builder().addLabels(PrometheusTypes.Label.newBuilder().setName((String)n).setValue((String)v)));
            return this;
        }

        @Override
        public TimeSeriesBuilder addSample(long epocheMillis, double value) {
            SAMPLE.inc();
            this.builder().addSamples(PrometheusTypes.Sample.newBuilder().setTimestamp(epocheMillis).setValue(value));
            return this;
        }

        @Override
        public TimeSeriesBuilder nextSeries() {
            this.builder = null;
            return this;
        }
    }

    public static abstract class WriteFn<K, V, W extends Write<K, V>>
    extends DoFn<KV<K, V>, Void> {
        protected final W spec;
        protected final Duration allowedLateness;
        private transient OkHttpClient okHttpClient;
        private transient PrometheusRemote.WriteRequest.Builder writeRequestBuilder;
        private transient long batchSize;
        private transient long batchBytes;
        private transient AtomicLong writes;
        private transient AtomicLong writeFailures;
        private transient AtomicLong responseFailures;
        private transient Map<String, Pair<AtomicLong, Counter>> detailedResponseFailures;
        private transient Phaser phaser;

        public WriteFn(W spec, Duration allowedLateness) {
            this.spec = spec;
            this.allowedLateness = allowedLateness;
            LOG.debug("allowed lateness: {}", (Object)allowedLateness);
        }

        protected void setGcTimer(Timer gcTimer, Instant newestEventTimestamp) {
            Instant gcAt = newestEventTimestamp.plus((ReadableDuration)this.allowedLateness).plus(1L);
            gcTimer.set(gcAt);
        }

        @DoFn.Setup
        public void setup() {
            LOG.debug("setup - instance: {}", (Object)this);
            ConnectionPool connectionPool = new ConnectionPool(1, 5L, TimeUnit.MINUTES);
            Dispatcher dispatcher = new Dispatcher();
            dispatcher.setMaxRequests(1);
            dispatcher.setMaxRequestsPerHost(1);
            this.okHttpClient = new OkHttpClient.Builder().readTimeout(((Write)((Object)this.spec)).readTimeoutMs, TimeUnit.MILLISECONDS).writeTimeout(((Write)((Object)this.spec)).writeTimeoutMs, TimeUnit.MILLISECONDS).dispatcher(dispatcher).connectionPool(connectionPool).build();
            this.phaser = new Phaser(1);
            this.writes = new AtomicLong();
            this.writeFailures = new AtomicLong();
            this.responseFailures = new AtomicLong();
            this.detailedResponseFailures = new HashMap<String, Pair<AtomicLong, Counter>>();
        }

        private static void incrementCounter(AtomicLong al, Counter counter) {
            long cnt = al.getAndSet(0L);
            if (cnt != 0L) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("increment counter - name: {};  count: {}", (Object)counter.getName(), (Object)cnt);
                }
                counter.inc(cnt);
            }
        }

        @DoFn.StartBundle
        public void startBundle() {
            LOG.debug("startBundle - instance: {}", (Object)this);
            this.startBatch();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @DoFn.FinishBundle
        public void finishBundle() throws IOException, InterruptedException {
            LOG.debug("finishBundle - instance: {}", (Object)this);
            this.flushBatch();
            this.phaser.arriveAndAwaitAdvance();
            WriteFn.incrementCounter(this.writes, WRITE);
            WriteFn.incrementCounter(this.writeFailures, WRITE_FAILURE);
            WriteFn.incrementCounter(this.responseFailures, RESPONSE_FAILURE);
            Map<String, Pair<AtomicLong, Counter>> map = this.detailedResponseFailures;
            synchronized (map) {
                for (Pair<AtomicLong, Counter> e : this.detailedResponseFailures.values()) {
                    WriteFn.incrementCounter((AtomicLong)e.getLeft(), (Counter)e.getRight());
                }
            }
        }

        @DoFn.Teardown
        public void closeClient() throws IOException {
            LOG.debug("teardown - instance: {}", (Object)this);
            this.okHttpClient.dispatcher().executorService().shutdown();
            this.okHttpClient.connectionPool().evictAll();
        }

        protected void outputTimeSeries(Consumer<TimeSeriesBuilder> consumer) throws IOException {
            TimeSeriesBuilderImpl timeSeriesBuilder = new TimeSeriesBuilderImpl(((Write)((Object)this.spec)).fixedLabels.entrySet());
            consumer.accept(timeSeriesBuilder);
            for (PrometheusTypes.TimeSeries.Builder builder : timeSeriesBuilder.builders) {
                PrometheusTypes.TimeSeries timeSeries = builder.build();
                int serializedSize = timeSeries.getSerializedSize();
                if (this.batchSize >= ((Write)((Object)this.spec)).maxBatchSize || this.batchBytes + (long)serializedSize > ((Write)((Object)this.spec)).maxBatchBytes) {
                    this.flushBatch();
                    this.startBatch();
                }
                ++this.batchSize;
                this.batchBytes += (long)serializedSize;
                this.writeRequestBuilder.addTimeseries(timeSeries);
            }
        }

        public void startBatch() {
            this.writeRequestBuilder = PrometheusRemote.WriteRequest.newBuilder();
            this.batchSize = 0L;
            this.batchBytes = 0L;
        }

        private void flushBatch() throws IOException {
            LOG.trace("flushBatch - instance: {}; batchSize: {}", (Object)this, (Object)this.batchSize);
            if (this.batchSize == 0L) {
                return;
            }
            final PrometheusRemote.WriteRequest writeRequest = this.writeRequestBuilder.build();
            byte[] writeRequestCompressed = Snappy.compress((byte[])writeRequest.toByteArray());
            RequestBody body = RequestBody.create((MediaType)PROTOBUF_MEDIA_TYPE, (byte[])writeRequestCompressed);
            Request.Builder builder = new Request.Builder().url(((Write)((Object)this.spec)).writeUrl).addHeader("X-Prometheus-Remote-Write-Version", "0.1.0").addHeader("Content-Encoding", "snappy").addHeader("User-Agent", CortexIo.class.getCanonicalName()).post(body);
            if (((Write)((Object)this.spec)).orgId != null) {
                builder.addHeader(CortexIo.X_SCOPE_ORG_ID_HEADER, ((Write)((Object)this.spec)).orgId);
            }
            Request request = builder.build();
            if (LOG_WRITE.isTraceEnabled()) {
                String str = PROTOBUF_JSON_PRINTER.print((MessageOrBuilder)writeRequest);
                String indexAction = "{\"index\":{}}";
                Pattern pattern = Pattern.compile(",\\{\"labels\"");
                String bulkBody = pattern.splitAsStream(str.substring(str.indexOf(91) + 1, str.length() - 2)).filter(xva$0 -> StringUtils.isNoneBlank((CharSequence[])new CharSequence[]{xva$0})).collect(Collectors.joining("\n" + indexAction + "\n{\"labels\"", indexAction + "\n", ""));
                LOG_WRITE.trace(bulkBody);
            }
            this.phaser.register();
            this.writes.incrementAndGet();
            this.okHttpClient.newCall(request).enqueue(new Callback(){

                public void onFailure(Call call, IOException e) {
                    try {
                        RATE_LIMITED_LOG.error("Write to Cortex failed", (Throwable)e);
                        writeFailures.incrementAndGet();
                    }
                    finally {
                        phaser.arriveAndDeregister();
                    }
                    ((Write)((Object)spec)).failureHandlers.forEach(handler -> handler.accept(call, e));
                }

                public void onResponse(Call call, Response response) {
                    try {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("got response - code: {}, writeRequest: {}", (Object)response.code(), (Object)writeRequest);
                        }
                        if (!response.isSuccessful()) {
                            responseFailures.incrementAndGet();
                            this.handleUnsuccessfulResponse(response);
                        }
                    }
                    finally {
                        phaser.arriveAndDeregister();
                    }
                    ((Write)((Object)spec)).responseHandlers.forEach(handler -> handler.accept(call, response));
                }

                private void handleUnsuccessfulResponse(Response response) {
                    try (ResponseBody body = response.body();){
                        String bodyAsString;
                        if (body != null) {
                            try {
                                bodyAsString = body.string();
                                this.incrementDetailedResponseFailure(response.code(), CortexIo.determineErrorKind(bodyAsString));
                            }
                            catch (IOException e) {
                                bodyAsString = "(error reading body)";
                                this.incrementDetailedResponseFailure(response.code(), "body read error");
                            }
                        } else {
                            bodyAsString = "(null)";
                            this.incrementDetailedResponseFailure(response.code(), "body empty");
                        }
                        RATE_LIMITED_LOG.error("Write to Cortex failed - code: " + response.code() + "; message: " + response.message() + "; body: " + bodyAsString.trim() + "; request: " + writeRequest);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                private void incrementDetailedResponseFailure(int responseCode, String errorKind) {
                    Pair entry;
                    String suffix = String.valueOf(responseCode) + " " + errorKind;
                    Map<String, Pair<AtomicLong, Counter>> map = detailedResponseFailures;
                    synchronized (map) {
                        entry = detailedResponseFailures.get(suffix);
                        if (entry == null) {
                            entry = Pair.of((Object)new AtomicLong(), (Object)Metrics.counter((String)CortexIo.CORTEX_METRIC_NAMESPACE, (String)("response_failure_" + suffix)));
                            detailedResponseFailures.put(suffix, (Pair<AtomicLong, Counter>)entry);
                        }
                    }
                    ((AtomicLong)entry.getKey()).incrementAndGet();
                }
            });
        }
    }

    private static class WriteWithoutAccumulation<K, V>
    extends Write<K, V> {
        public WriteWithoutAccumulation(String writeUrl, BuildTimeSeries<K, V> build) {
            super(writeUrl, build);
        }

        @Override
        WriteFn<K, V, ?> createWriteFn(Duration allowedLateness) {
            return new WriteFnWithoutAccumulation(this, allowedLateness);
        }
    }

    private static class WriteWithAccumulation<K, V>
    extends Write<K, V> {
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final SerializableBiFunction<V, V, V> combiner;
        private final Duration accumulationDelay;

        public WriteWithAccumulation(String writeUrl, BuildTimeSeries<K, V> build, Coder<K> keyCoder, Coder<V> valueCoder, SerializableBiFunction<V, V, V> combiner, Duration accumulationDelay) {
            super(writeUrl, build);
            this.keyCoder = keyCoder;
            this.valueCoder = valueCoder;
            this.combiner = combiner;
            this.accumulationDelay = accumulationDelay;
        }

        @Override
        WriteFn<K, V, ?> createWriteFn(Duration allowedLateness) {
            return new WriteFnWithAccumulation(this, allowedLateness);
        }
    }

    public static abstract class Write<K, V>
    extends PTransform<PCollection<KV<K, V>>, PDone> {
        protected final String writeUrl;
        protected final BuildTimeSeries<K, V> build;
        protected String orgId;
        protected long maxBatchSize = 10000L;
        protected long maxBatchBytes = 524288L;
        protected long readTimeoutMs = 10000L;
        protected long writeTimeoutMs = 10000L;
        protected final Map<String, String> fixedLabels = new HashMap<String, String>();
        protected final List<BiConsumer<Call, Response>> responseHandlers = new ArrayList<BiConsumer<Call, Response>>();
        protected final List<BiConsumer<Call, Exception>> failureHandlers = new ArrayList<BiConsumer<Call, Exception>>();

        public Write(String writeUrl, BuildTimeSeries<K, V> build) {
            super("CortexWrite");
            this.writeUrl = writeUrl;
            this.build = build;
        }

        public Write<K, V> withOrgId(String value) {
            this.orgId = value;
            return this;
        }

        public Write<K, V> withMaxBatchSize(long value) {
            this.maxBatchSize = value;
            return this;
        }

        public Write<K, V> withMaxBatchBytes(long value) {
            this.maxBatchBytes = value;
            return this;
        }

        public Write<K, V> withReadTimeoutMs(long value) {
            this.readTimeoutMs = value;
            return this;
        }

        public Write<K, V> withWriteTimeoutMs(long value) {
            this.writeTimeoutMs = value;
            return this;
        }

        public Write<K, V> withFixedLabel(String name, String value) {
            CortexIo.sanitize(name, value, this.fixedLabels::put);
            return this;
        }

        public Write<K, V> withMetricName(String value) {
            return this.withFixedLabel(CortexIo.METRIC_NAME_LABEL, value);
        }

        public Write<K, V> withResponseHandler(BiConsumer<Call, Response> handler) {
            this.responseHandlers.add(handler);
            return this;
        }

        public Write<K, V> withFailureHandler(BiConsumer<Call, Exception> handler) {
            this.failureHandlers.add(handler);
            return this;
        }

        @VisibleForTesting
        abstract WriteFn<K, V, ?> createWriteFn(Duration var1);

        public PDone expand(PCollection<KV<K, V>> input) {
            ((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()))).apply((PTransform)ParDo.of(this.createWriteFn(input.getWindowingStrategy().getAllowedLateness())));
            return PDone.in((Pipeline)input.getPipeline());
        }
    }

    @FunctionalInterface
    public static interface BuildTimeSeries<K, V>
    extends Serializable {
        public void accept(K var1, V var2, Instant var3, int var4, TimeSeriesBuilder var5);
    }
}

