/*
 * Decompiled with CFR 0.152.
 */
package org.ujorm.tools.thread;

import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.ujorm.tools.Assert;
import org.ujorm.tools.thread.JobException;

public class AsyncStreamBuilder<T> {
    private static final Object UNDEFINED = new Object();
    private final AtomicLong countDown;
    private final Duration timeout;
    private final LinkedBlockingQueue<T> queue;
    private final Stream<T> stream;
    private final Clock clock;
    private long startMilis = Long.MIN_VALUE;
    private volatile boolean closed;
    private volatile Throwable interrupt;

    public AsyncStreamBuilder(long count) {
        this(count, Duration.ofMinutes(1L));
    }

    public AsyncStreamBuilder(long count, @Nonnull Duration timeout) {
        Assert.isTrue(count >= 0L, "count");
        Assert.notNull(timeout, "timeout");
        this.countDown = new AtomicLong(count);
        this.timeout = timeout;
        this.queue = new LinkedBlockingQueue();
        this.clock = Clock.systemUTC();
        this.stream = Stream.generate(() -> this.get()).limit(count).filter(v -> v != UNDEFINED);
    }

    @Nonnull
    protected T get() throws JobException {
        if (this.interrupt == null) {
            try {
                T result;
                long restMillis = this.isOpen() ? this.timeout.toMillis() - this.clock.millis() + this.startMilis : 0L;
                T t = result = restMillis > 0L ? this.queue.poll(restMillis, TimeUnit.MILLISECONDS) : this.queue.poll();
                if (result == null) {
                    this.close();
                    throw JobException.of("Time is over: " + this.timeout);
                }
                return result;
            }
            catch (InterruptedException e) {
                if (this.interrupt == null) {
                    this.interrupt = e;
                }
                this.close();
                Thread.currentThread().interrupt();
            }
        }
        throw JobException.of(this.interrupt);
    }

    @Nonnull
    public Stream<T> stream() {
        if (this.startMilis == Long.MIN_VALUE) {
            this.startMilis = this.clock.millis();
        }
        return this.stream;
    }

    public void addAll(T ... items) {
        for (T item : items) {
            this.add(item);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void add(@Nullable T item) {
        if (this.countDown.decrementAndGet() < 0L) throw JobException.of("The parameter is over limit: " + item);
        if (this.interrupt != null) return;
        if (!this.isOpen()) throw JobException.of("The builder is closed");
        this.queue.add(item != null ? item : UNDEFINED);
    }

    protected final void close() {
        this.closed = true;
    }

    protected final boolean isOpen() {
        return !this.closed;
    }

    public void interrupt(@Nonnull Throwable causedBy) {
        if (this.interrupt == null) {
            this.interrupt = Assert.notNull(causedBy, "causedBy");
            this.close();
            Thread.currentThread().interrupt();
        }
    }
}

