/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.wrapper.esper;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public abstract class AbstractQueueRunnable<T>
extends Thread {
    protected BlockingQueue<T> queue;
    protected long closeAfter = 0L;
    protected long currentTimestamp;
    protected boolean autoClose;
    private boolean running;

    public AbstractQueueRunnable(int maxQueueSize, int closeAfter) {
        this.queue = new ArrayBlockingQueue<T>(maxQueueSize);
        this.autoClose = true;
        this.closeAfter = closeAfter * 1000;
        this.currentTimestamp = System.currentTimeMillis();
    }

    public AbstractQueueRunnable(int maxQueueSize) {
        this.queue = new ArrayBlockingQueue<T>(maxQueueSize);
        this.autoClose = false;
        this.currentTimestamp = System.currentTimeMillis();
    }

    @Override
    public void run() {
        this.running = true;
        while (this.running && (!this.autoClose || System.currentTimeMillis() - this.currentTimestamp <= this.closeAfter)) {
            try {
                T data = this.queue.take();
                this.currentTimestamp = System.currentTimeMillis();
                this.doNext(data);
            }
            catch (Exception e) {
                e.printStackTrace();
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    continue;
                }
                e.printStackTrace();
            }
        }
        System.out.println("Interrupted");
    }

    @Override
    public void interrupt() {
        this.running = false;
    }

    public void add(T data) throws InterruptedException {
        if (data != null) {
            this.queue.put(data);
        }
    }

    protected abstract void doNext(T var1) throws Exception;
}

