/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.reactivestreams.impl;

import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.impl.BackOffStrategy;
import org.nustaq.kontraktor.reactivestreams.CancelException;
import org.nustaq.kontraktor.reactivestreams.KxReactiveStreams;
import org.nustaq.serialization.util.FSTUtil;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class KxSubscriber<T>
implements Subscriber<T>,
Serializable,
Iterator<T> {
    public static final String COMPLETE = "COMPLETE";
    public static BackOffStrategy strat = new BackOffStrategy(100, 2, 5);
    protected long batchSize;
    protected Callback<T> cb;
    protected long credits;
    protected Subscription subs;
    protected boolean autoRequestOnSubs;
    protected ConcurrentLinkedQueue buffer;
    Object next;
    public static ThreadLocal<Subscription> subsToCancel = new ThreadLocal();

    public KxSubscriber(long batchSize) {
        this.batchSize = batchSize;
        this.autoRequestOnSubs = true;
        this.credits = 0L;
        this.cb = (Callback & Serializable)(res, err) -> {
            if (this.buffer == null) {
                this.buffer = new ConcurrentLinkedQueue();
            }
            if (Actors.isResult((Object)err)) {
                this.buffer.add(res);
            } else if (Actors.isError((Object)err)) {
                this.buffer.add(err);
            } else if (Actors.isComplete((Object)err)) {
                this.buffer.add(COMPLETE);
            }
        };
    }

    public KxSubscriber(long batchSize, Callback<T> cb) {
        this(batchSize, cb, true);
    }

    public KxSubscriber(long batchSize, Callback<T> cb, boolean autoRequestOnSubs) {
        this.batchSize = batchSize;
        this.cb = cb;
        this.autoRequestOnSubs = autoRequestOnSubs;
        this.credits = 0L;
    }

    public void onSubscribe(Subscription s) {
        if (this.subs != null) {
            s.cancel();
            return;
        }
        this.subs = s;
        if (this.autoRequestOnSubs) {
            s.request(this.batchSize);
        }
        this.credits += this.batchSize;
    }

    public void onNext(T t) {
        if (t == null) {
            throw null;
        }
        --this.credits;
        if (this.credits < this.batchSize / (long)KxReactiveStreams.REQU_NEXT_DIVISOR) {
            this.subs.request(this.batchSize);
            this.credits += this.batchSize;
        }
        this.nextAction(t);
    }

    protected void nextAction(T t) {
        try {
            this.cb.pipe(t);
        }
        catch (CancelException c) {
            this.subs.cancel();
        }
    }

    public void onError(Throwable t) {
        if (t == null) {
            throw null;
        }
        this.cb.reject((Object)t);
    }

    public void onComplete() {
        this.cb.finish();
    }

    @Override
    public boolean hasNext() {
        subsToCancel.set(this.subs);
        int count = 0;
        while (this.buffer == null || this.buffer.peek() == null) {
            if (Actor.inside()) {
                if (++count < 1) {
                    Actor.yield();
                    continue;
                }
                if (count < 5) {
                    Actor.yield((long)1L);
                    continue;
                }
                Actor.yield((long)5L);
                continue;
            }
            strat.yield(count++);
        }
        Object poll = this.buffer.poll();
        this.next = poll;
        return this.next != COMPLETE && !(this.next instanceof Throwable);
    }

    @Override
    public T next() {
        if (this.next == COMPLETE) {
            throw new RuntimeException("no further elements in iterator");
        }
        if (this.next instanceof Throwable) {
            this.subs.cancel();
            FSTUtil.rethrow((Throwable)((Throwable)this.next));
        }
        return (T)this.next;
    }
}

