/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron.testing.flowgen;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.function.BiFunction;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.opennms.nephron.testing.flowgen.Limiter;
import org.opennms.nephron.testing.flowgen.SyntheticFlowTimestampPolicy;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlowReader
extends UnboundedSource.UnboundedReader<FlowDocument> {
    private static Logger LOG = LoggerFactory.getLogger(FlowReader.class);
    private final UnboundedSource<FlowDocument, CheckpointMark> source;
    private final BiFunction<Random, Long, FlowDocument> nextFlowDocument;
    private final SyntheticFlowTimestampPolicy timestampPolicy;
    private final int idxIncr;
    private final long maxIdx;
    protected long index;
    private Random random;
    private FlowDocument current = null;
    private final Limiter limiter;

    public FlowReader(UnboundedSource<FlowDocument, CheckpointMark> source, BiFunction<Random, Long, FlowDocument> nextFlowDocument, SyntheticFlowTimestampPolicy timestampPolicy, int idxIncr, long maxIdx, long index, Random random, Limiter limiter) {
        this.source = source;
        this.nextFlowDocument = nextFlowDocument;
        this.timestampPolicy = timestampPolicy;
        this.idxIncr = idxIncr;
        this.maxIdx = maxIdx;
        this.index = index;
        this.random = random;
        this.limiter = limiter;
    }

    public boolean start() throws IOException {
        return this.advance();
    }

    public boolean advance() throws IOException {
        this.getWatermark();
        if (this.index < this.maxIdx) {
            if (this.limiter.check(this.idxIncr)) {
                this.current = this.nextFlowDocument.apply(this.random, this.index);
                this.index += (long)this.idxIncr;
                this.timestampPolicy.getTimestampForFlow(this.current);
                return true;
            }
            return false;
        }
        this.current = null;
        return false;
    }

    public Instant getWatermark() {
        Instant wm = this.index < this.maxIdx ? this.timestampPolicy.getWatermark() : BoundedWindow.TIMESTAMP_MAX_VALUE;
        return wm;
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        return new CheckpointMark(this.timestampPolicy.getCheckpointInstant(), this.index, this.random, this.limiter.state());
    }

    public UnboundedSource<FlowDocument, ?> getCurrentSource() {
        return this.source;
    }

    public FlowDocument getCurrent() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.current;
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        if (this.current == null) {
            throw new NoSuchElementException();
        }
        return this.timestampPolicy.getTimestampForFlow(this.current);
    }

    public void close() throws IOException {
    }

    public static class CheckpointMark
    implements UnboundedSource.CheckpointMark {
        private static Coder<Instant> INSTANT_CODER = InstantCoder.of();
        private static Coder<Long> LONG_CODER = VarLongCoder.of();
        private static Coder<Random> RANDOM_CODER = SerializableCoder.of(Random.class);
        public static Coder<CheckpointMark> CHECKPOINT_CODER = new CustomCoder<CheckpointMark>(){

            public void encode(CheckpointMark value, OutputStream outStream) throws CoderException, IOException {
                INSTANT_CODER.encode((Object)value.previous, outStream);
                LONG_CODER.encode((Object)value.index, outStream);
                RANDOM_CODER.encode((Object)value.random, outStream);
                LONG_CODER.encode((Object)value.limiterState, outStream);
            }

            public CheckpointMark decode(InputStream inStream) throws CoderException, IOException {
                return new CheckpointMark((Instant)INSTANT_CODER.decode(inStream), (Long)LONG_CODER.decode(inStream), (Random)RANDOM_CODER.decode(inStream), (Long)LONG_CODER.decode(inStream));
            }
        };
        public final Instant previous;
        public final long index;
        public final Random random;
        public final long limiterState;

        public CheckpointMark(Instant previous, long index, Random random, long limiterState) {
            this.previous = previous;
            this.index = index;
            this.random = random;
            this.limiterState = limiterState;
        }

        public void finalizeCheckpoint() throws IOException {
        }
    }
}

