/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.util;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PaneAccumulator<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
    private static final Logger LOG = LoggerFactory.getLogger(PaneAccumulator.class);
    private final SerializableBiFunction<V, V, V> combiner;
    private final Duration accumulationDelay;
    private final Coder<K> keyCoder;
    private final Coder<V> valueCoder;

    public PaneAccumulator(SerializableBiFunction<V, V, V> combiner, Duration accumulationDelay, Coder<K> keyCoder, Coder<V> valueCoder) {
        super("paneAcc");
        this.combiner = combiner;
        this.accumulationDelay = accumulationDelay;
        this.keyCoder = keyCoder;
        this.valueCoder = valueCoder;
    }

    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
        return ((PCollection)input.apply((PTransform)ParDo.of((DoFn)new PaneCombinerFn()))).setCoder((Coder)KvCoder.of(this.keyCoder, this.valueCoder));
    }

    private class PaneCombinerFn
    extends DoFn<KV<K, V>, KV<K, V>> {
        private static final String VALUE_STATE_NAME = "value";
        private static final String OUTPUT_TIMER_NAME = "output";
        @DoFn.StateId(value="value")
        private final StateSpec<ValueState<V>> valueStateSpec;
        @DoFn.TimerId(value="output")
        private final TimerSpec outputTimerSpec;

        private PaneCombinerFn() {
            this.valueStateSpec = StateSpecs.value(PaneAccumulator.this.valueCoder);
            this.outputTimerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
        }

        @DoFn.ProcessElement
        public void process(DoFn.ProcessContext ctx, BoundedWindow window, @DoFn.AlwaysFetched @DoFn.StateId(value="value") ValueState<V> valueState, @DoFn.TimerId(value="output") Timer timer) {
            Object newValue;
            Object oldValue = valueState.read();
            if (oldValue == null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("create state - key: {}; ctx.timestamp: {}; window.maxTimestamp: {}", new Object[]{((KV)ctx.element()).getKey(), ctx.timestamp(), window.maxTimestamp()});
                }
                newValue = ((KV)ctx.element()).getValue();
                timer.withOutputTimestamp(window.maxTimestamp()).offset(PaneAccumulator.this.accumulationDelay).setRelative();
            } else {
                newValue = PaneAccumulator.this.combiner.apply(oldValue, ((KV)ctx.element()).getValue());
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("write state - key: {}; ctx.timestamp: {}; window.maxTimestamp: {}; newValue: {}", new Object[]{((KV)ctx.element()).getKey(), ctx.timestamp(), window.maxTimestamp(), newValue});
            }
            valueState.write(newValue);
        }

        @DoFn.OnTimer(value="output")
        public void onOutput(DoFn.OnTimerContext ctx, BoundedWindow window, @DoFn.Key K key, @DoFn.AlwaysFetched @DoFn.StateId(value="value") ValueState<V> valueState) {
            Object value = valueState.read();
            if (LOG.isTraceEnabled()) {
                LOG.trace("output state - key: {}; ctx.timestamp: {}, window.maxTimestamp: {}", new Object[]{key, ctx.timestamp(), window.maxTimestamp()});
            }
            ctx.outputWithTimestamp((Object)KV.of(key, (Object)value), window.maxTimestamp());
            valueState.clear();
        }

        @DoFn.OnWindowExpiration
        public void onExpiration(DoFn.OutputReceiver<KV<K, V>> ctx, BoundedWindow window, @DoFn.Key K key, @DoFn.AlwaysFetched @DoFn.StateId(value="value") ValueState<V> valueState) {
            Object value = valueState.read();
            if (value != null) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("output expired state - key: {}; window.maxTimestamp: {}", key, (Object)window.maxTimestamp());
                }
                ctx.outputWithTimestamp((Object)KV.of(key, (Object)value), window.maxTimestamp());
            }
        }
    }
}

