/*
 * Decompiled with CFR 0.152.
 */
package org.xbib.helianthus.common.stream;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.xbib.helianthus.common.stream.AbortingSubscriber;
import org.xbib.helianthus.common.stream.CancelledSubscriptionException;
import org.xbib.helianthus.common.stream.ClosedPublisherException;
import org.xbib.helianthus.common.stream.StreamMessage;
import org.xbib.helianthus.common.stream.StreamWriter;
import org.xbib.helianthus.common.util.Exceptions;

public class DefaultStreamMessage<T>
implements StreamMessage<T>,
StreamWriter<T> {
    private static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
    private static final CloseEvent CANCELLED_CLOSE = new CloseEvent(Exceptions.clearTrace(CancelledSubscriptionException.get()));
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, SubscriptionImpl> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, SubscriptionImpl.class, "subscription");
    private static final AtomicLongFieldUpdater<DefaultStreamMessage> demandUpdater = AtomicLongFieldUpdater.newUpdater(DefaultStreamMessage.class, "demand");
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, State> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, State.class, "state");
    private final Queue<Object> queue;
    private final CompletableFuture<Void> closeFuture = new CompletableFuture();
    private volatile SubscriptionImpl subscription;
    private volatile long demand;
    private volatile State state = State.OPEN;
    private volatile boolean wroteAny;

    public DefaultStreamMessage() {
        this(new ConcurrentLinkedQueue<Object>());
    }

    public DefaultStreamMessage(Queue<Object> queue) {
        this.queue = Objects.requireNonNull(queue, "queue");
    }

    @Override
    public boolean isOpen() {
        return this.state == State.OPEN;
    }

    @Override
    public boolean isEmpty() {
        return !this.isOpen() && !this.wroteAny;
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        Objects.requireNonNull(subscriber, "subscriber");
        this.subscribe0(new SubscriptionImpl(this, subscriber, null));
    }

    @Override
    public void subscribe(Subscriber<? super T> subscriber, Executor executor) {
        Objects.requireNonNull(subscriber, "subscriber");
        Objects.requireNonNull(executor, "executor");
        this.subscribe0(new SubscriptionImpl(this, subscriber, executor));
    }

    private void subscribe0(SubscriptionImpl subscription) {
        if (!subscriptionUpdater.compareAndSet(this, null, subscription)) {
            throw new IllegalStateException("subscribed by other subscriber already: " + this.subscription.subscriber());
        }
        Executor executor = subscription.executor();
        if (executor != null) {
            executor.execute(() -> subscription.subscriber().onSubscribe((Subscription)subscription));
        } else {
            subscription.subscriber().onSubscribe((Subscription)subscription);
        }
    }

    @Override
    public void abort() {
        SubscriptionImpl currentSubscription = this.subscription;
        if (currentSubscription != null) {
            currentSubscription.cancel();
            return;
        }
        SubscriptionImpl newSubscription = new SubscriptionImpl(this, AbortingSubscriber.INSTANCE, null);
        if (subscriptionUpdater.compareAndSet(this, null, newSubscription)) {
            newSubscription.subscriber().onSubscribe((Subscription)newSubscription);
        } else {
            this.subscription.cancel();
        }
    }

    @Override
    public boolean write(T obj) {
        Objects.requireNonNull(obj, "obj");
        if (!this.isOpen()) {
            return false;
        }
        this.wroteAny = true;
        this.pushObject(obj);
        return true;
    }

    @Override
    public boolean write(Supplier<? extends T> supplier) {
        return this.write(supplier.get());
    }

    @Override
    public CompletableFuture<Void> onDemand(Runnable task) {
        Objects.requireNonNull(task, "task");
        AwaitDemandFuture f = new AwaitDemandFuture();
        if (!this.isOpen()) {
            f.completeExceptionally(ClosedPublisherException.get());
        } else {
            this.pushObject(f);
        }
        return f.thenRun(task);
    }

    private void pushObject(Object obj) {
        this.queue.add(obj);
        this.notifySubscriber();
    }

    protected void notifySubscriber() {
        SubscriptionImpl subscription = this.subscription;
        if (subscription == null) {
            return;
        }
        Queue<Object> queue = this.queue;
        if (queue.isEmpty()) {
            return;
        }
        Executor executor = subscription.executor();
        if (executor != null) {
            executor.execute(() -> this.notifySubscribers0(subscription, queue));
        } else {
            this.notifySubscribers0(subscription, queue);
        }
    }

    private void notifySubscribers0(SubscriptionImpl subscription, Queue<Object> queue) {
        Object o;
        if (this.state == State.CLEANUP) {
            this.cleanup();
            return;
        }
        Subscriber<Object> subscriber = subscription.subscriber();
        while ((o = queue.peek()) != null) {
            if (o instanceof CloseEvent) {
                this.notifySubscriberWithCloseEvent(subscriber, (CloseEvent)o);
                break;
            }
            if (o instanceof AwaitDemandFuture) {
                if (!this.notifyCompletableFuture(queue)) break;
                continue;
            }
            if (this.notifySubscriber(subscriber, queue)) continue;
            break;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifySubscriberWithCloseEvent(Subscriber<Object> subscriber, CloseEvent o) {
        this.setState(State.CLEANUP);
        this.cleanup();
        Throwable cause = o.cause();
        if (cause == null) {
            try {
                subscriber.onComplete();
            }
            finally {
                this.closeFuture.complete(null);
            }
        }
        try {
            if (!o.isCancelled()) {
                subscriber.onError(cause);
            }
        }
        finally {
            this.closeFuture.completeExceptionally(cause);
        }
    }

    private boolean notifyCompletableFuture(Queue<Object> queue) {
        if (this.demand == 0L) {
            return false;
        }
        CompletableFuture f = (CompletableFuture)queue.remove();
        f.complete(null);
        return true;
    }

    private boolean notifySubscriber(Subscriber<Object> subscriber, Queue<Object> queue) {
        long demand;
        while ((demand = this.demand) != 0L) {
            if (demand != Long.MAX_VALUE && !demandUpdater.compareAndSet(this, demand, demand - 1L)) continue;
            Object o = queue.remove();
            this.onRemoval(o);
            subscriber.onNext(o);
            return true;
        }
        return false;
    }

    protected void onRemoval(T obj) {
    }

    @Override
    public CompletableFuture<Void> closeFuture() {
        return this.closeFuture;
    }

    @Override
    public void close() {
        if (this.setState(State.CLOSED)) {
            this.pushObject(SUCCESSFUL_CLOSE);
        }
    }

    @Override
    public void close(Throwable cause) {
        Objects.requireNonNull(cause, "cause");
        if (cause instanceof CancelledSubscriptionException) {
            throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
        }
        if (this.setState(State.CLOSED)) {
            this.pushObject(new CloseEvent(cause));
        }
    }

    private boolean setState(State state) {
        assert (state != State.OPEN) : "state: " + (Object)((Object)state);
        return stateUpdater.compareAndSet(this, State.OPEN, state);
    }

    private void cleanup() {
        Object e;
        ClosedPublisherException cause = ClosedPublisherException.get();
        while ((e = this.queue.poll()) != null) {
            if (e instanceof CloseEvent) continue;
            if (e instanceof CompletableFuture) {
                CompletableFuture f = (CompletableFuture)e;
                f.completeExceptionally(cause);
            }
            Object obj = e;
            this.onRemoval(obj);
        }
    }

    private static final class CloseEvent {
        private final Throwable cause;

        CloseEvent(Throwable cause) {
            this.cause = cause;
        }

        boolean isCancelled() {
            return this.cause instanceof CancelledSubscriptionException;
        }

        Throwable cause() {
            return this.cause;
        }

        public String toString() {
            if (this.cause == null) {
                return "CloseEvent";
            }
            return "CloseEvent(" + this.cause + ')';
        }
    }

    private static final class AwaitDemandFuture
    extends CompletableFuture<Void> {
        private AwaitDemandFuture() {
        }
    }

    private static final class SubscriptionImpl
    implements Subscription {
        private final DefaultStreamMessage<?> publisher;
        private final Subscriber<Object> subscriber;
        private final Executor executor;

        SubscriptionImpl(DefaultStreamMessage<?> publisher, Subscriber<?> subscriber, Executor executor) {
            this.publisher = publisher;
            this.subscriber = subscriber;
            this.executor = executor;
        }

        Subscriber<Object> subscriber() {
            return this.subscriber;
        }

        Executor executor() {
            return this.executor;
        }

        public void request(long n) {
            block2: {
                long newDemand;
                long oldDemand;
                if (n <= 0L) {
                    throw new IllegalArgumentException("n: " + n + " (expected: > 0)");
                }
                do {
                    newDemand = (oldDemand = ((DefaultStreamMessage)this.publisher).demand) >= Long.MAX_VALUE - n ? Long.MAX_VALUE : oldDemand + n;
                } while (!demandUpdater.compareAndSet(this.publisher, oldDemand, newDemand));
                if (oldDemand != 0L) break block2;
                this.publisher.notifySubscriber();
            }
        }

        public void cancel() {
            if (((DefaultStreamMessage)this.publisher).setState(State.CLEANUP)) {
                CloseEvent closeEvent = Exceptions.isVerbose() ? new CloseEvent(CancelledSubscriptionException.get()) : CANCELLED_CLOSE;
                ((DefaultStreamMessage)this.publisher).pushObject(closeEvent);
            }
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Subscription:publisher=").append(this.publisher).append(",demand=").append(((DefaultStreamMessage)this.publisher).demand).append(",executor=").append(this.executor);
            return sb.toString();
        }
    }

    private static enum State {
        OPEN,
        CLOSED,
        CLEANUP;

    }
}

