/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.async.inter;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.exceptions.TimeoutPipeException;

public class AsyncTimeoutPipe<T>
extends AsyncPipe<T> {
    private final AsyncPipe<T> input;
    private final long timeoutMillis;
    private final ScheduledExecutorService schedExecutor;
    private final AtomicBoolean isDone = new AtomicBoolean();
    private volatile ScheduledFuture<?> future;

    public AsyncTimeoutPipe(AsyncPipe<T> input, Duration timeout, ScheduledExecutorService schedExector) {
        this.input = input;
        this.timeoutMillis = timeout.toMillis();
        this.schedExecutor = schedExector;
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.future = this.schedExecutor.schedule(() -> {
            if (this.isDone.compareAndSet(false, true)) {
                try {
                    this.notifyError(new TimeoutPipeException("Pipeline timeout - " + this.timeoutMillis + " millis ellapsed"));
                    try {
                        this.input.close();
                    }
                    catch (IOException iOException) {}
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
            }
        }, this.timeoutMillis, TimeUnit.MILLISECONDS);
        this.input.setListener(new AsyncPipeListener<T>(){

            @Override
            public void next(T item) throws PipeException, InterruptedException {
                AsyncTimeoutPipe.this.notifyNext(item);
            }

            @Override
            public void done() throws InterruptedException {
                if (AsyncTimeoutPipe.this.isDone.compareAndSet(false, true)) {
                    AsyncTimeoutPipe.this.notifyDone();
                }
            }

            @Override
            public void error(PipeException e) throws InterruptedException {
                if (AsyncTimeoutPipe.this.isDone.compareAndSet(false, true)) {
                    AsyncTimeoutPipe.this.notifyError(e);
                }
            }
        });
        this.input.start();
    }

    @Override
    public float getProgress() {
        return this.input.getProgress();
    }

    @Override
    public void close() throws IOException {
        super.close();
        this.future.cancel(true);
        this.input.close();
    }
}

