/*
 * Decompiled with CFR 0.152.
 */
package org.wikidata.query.rdf.tool.change;

import java.util.Date;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.query.rdf.tool.change.RecentChangesPoller;
import org.wikidata.query.rdf.tool.exception.RetryableException;

public class TailingChangesPoller
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(TailingChangesPoller.class);
    private final RecentChangesPoller poller;
    private RecentChangesPoller.Batch lastBatch;
    private final int tailSeconds;
    private final BlockingQueue<RecentChangesPoller.Batch> queue;
    private volatile long mainPollerTs;

    public TailingChangesPoller(RecentChangesPoller poller, BlockingQueue<RecentChangesPoller.Batch> queue, int tailSeconds) {
        this.poller = poller;
        this.tailSeconds = tailSeconds;
        this.queue = queue;
    }

    public void setPollerTs(long ts) {
        this.mainPollerTs = ts;
    }

    public boolean isOldEnough(Date timestamp) {
        return timestamp.before(DateUtils.addSeconds((Date)new Date(), (int)(-this.tailSeconds)));
    }

    @Override
    public void run() {
        this.setName("TailPoller");
        try {
            while (true) {
                try {
                    this.lastBatch = this.lastBatch == null ? this.poller.firstBatch() : this.poller.nextBatch(this.lastBatch);
                }
                catch (RetryableException e) {
                    log.warn("Retryable error fetching first batch.  Retrying.", (Throwable)e);
                }
                if (this.lastBatch.changes().size() > 0) {
                    log.info("Caught {} missing updates, adding to the queue", (Object)this.lastBatch.changes().size());
                    this.queue.put(this.lastBatch);
                }
                log.info("Tail poll up to {}", (Object)this.lastBatch.leftOffDate());
                if (this.mainPollerTs > 0L && this.mainPollerTs < this.lastBatch.leftOffDate().getTime()) {
                    long sleepTime = this.lastBatch.leftOffDate().getTime() - this.mainPollerTs + (long)(this.tailSeconds * 1000);
                    log.info("Got ahead of main poller ({} > {}), sleeping for {}...", new Object[]{this.lastBatch.leftOffDate(), new Date(this.mainPollerTs), sleepTime});
                    Thread.sleep(sleepTime);
                }
                if (this.isOldEnough(this.lastBatch.leftOffDate())) continue;
                long sleepTime = this.lastBatch.leftOffDate().getTime() - DateUtils.addSeconds((Date)new Date(), (int)(-this.tailSeconds - 2)).getTime();
                log.info("Got too close to the current stream, sleeping for {}...", (Object)sleepTime);
                Thread.sleep(sleepTime);
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            return;
        }
    }
}

