/*
 * Decompiled with CFR 0.152.
 */
package org.opennms.nephron;

import java.util.Optional;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicy;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.opennms.nephron.Pipeline;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;

public class FlowTimestampPolicy
extends TimestampPolicy<String, FlowDocument> {
    private final Duration maxDelay;
    private Instant maxEventTimestamp;

    public FlowTimestampPolicy(Duration maxDelay, Optional<Instant> previousWatermark) {
        this.maxDelay = maxDelay;
        this.maxEventTimestamp = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE).plus((ReadableDuration)maxDelay);
    }

    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<String, FlowDocument> record) {
        return this.getTimestampForRecord(ctx, record, Instant.now());
    }

    @VisibleForTesting
    public Instant getTimestampForRecord(TimestampPolicy.PartitionContext ctx, KafkaRecord<String, FlowDocument> record, Instant now) {
        Instant ts = Pipeline.ReadFromKafka.getTimestamp((FlowDocument)record.getKV().getValue());
        if (ts.isAfter((ReadableInstant)this.maxEventTimestamp) && ts.isBefore((ReadableInstant)now)) {
            this.maxEventTimestamp = ts;
        }
        return ts;
    }

    public Instant getWatermark(TimestampPolicy.PartitionContext ctx) {
        return this.maxEventTimestamp.minus((ReadableDuration)this.maxDelay);
    }
}

