package io.datakernel.stream;

import io.datakernel.async.Promise;
import io.datakernel.async.SettablePromise;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.ExpectedException;
import io.datakernel.util.Preconditions;
import io.datakernel.util.Recyclable;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/AbstractStreamSupplier.class */
public abstract class AbstractStreamSupplier<T> implements StreamSupplier<T> {

    @Nullable
    private StreamConsumer<T> consumer;

    @Nullable
    private StreamDataAcceptor<T> currentDataAcceptor;

    @Nullable
    private ProduceStatus produceStatus;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final long createTick = this.eventloop.tick();
    private final SettablePromise<Void> endOfStream = new SettablePromise<>();
    private final SettablePromise<Void> acknowledgement = new SettablePromise<>();
    private StreamDataAcceptor<T> lastDataAcceptor = obj -> {
        throw new IllegalStateException("Uninitialized data receiver");
    };
    private final AbstractStreamSupplier<T>.AsyncProduceController controller = new AsyncProduceController();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/datakernel/stream/AbstractStreamSupplier$AsyncProduceController.class */
    public final class AsyncProduceController {
        private AsyncProduceController() {
        }

        public void begin() {
            AbstractStreamSupplier.this.produceStatus = ProduceStatus.STARTED_ASYNC;
        }

        public void end() {
            AbstractStreamSupplier.this.produceStatus = null;
        }

        public void resume() {
            if (AbstractStreamSupplier.this.isReceiverReady()) {
                AbstractStreamSupplier.this.produce(this);
            } else {
                end();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/AbstractStreamSupplier$ProduceStatus.class */
    public enum ProduceStatus {
        POSTED,
        STARTED,
        STARTED_ASYNC
    }

    @Override // io.datakernel.stream.StreamSupplier
    public final void setConsumer(StreamConsumer<T> streamConsumer) {
        Preconditions.checkNotNull(streamConsumer);
        Preconditions.checkState(this.consumer == null, "Consumer has already been set");
        Preconditions.checkState(getCapabilities().contains(StreamCapability.LATE_BINDING) || this.eventloop.tick() == this.createTick, StreamSupplier.LATE_BINDING_ERROR_MESSAGE, new Object[]{this});
        this.consumer = streamConsumer;
        onWired();
        streamConsumer.getAcknowledgement().whenException(this::close).whenComplete(this.acknowledgement);
    }

    protected void onWired() {
        this.eventloop.post(this::onStarted);
    }

    protected void onStarted() {
    }

    public boolean isWired() {
        return this.consumer != null;
    }

    @Nullable
    public final StreamConsumer<T> getConsumer() {
        return this.consumer;
    }

    public final boolean isReceiverReady() {
        return this.currentDataAcceptor != null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(T t) {
        this.lastDataAcceptor.accept(t);
    }

    @Nullable
    public final StreamDataAcceptor<T> getCurrentDataAcceptor() {
        return this.currentDataAcceptor;
    }

    public StreamDataAcceptor<T> getLastDataAcceptor() {
        return this.lastDataAcceptor;
    }

    protected void produce(AbstractStreamSupplier<T>.AsyncProduceController asyncProduceController) {
    }

    public final void tryProduce() {
        if (isReceiverReady() && this.produceStatus == null) {
            this.produceStatus = ProduceStatus.STARTED;
            produce(this.controller);
            if (this.produceStatus == ProduceStatus.STARTED) {
                this.produceStatus = null;
            }
        }
    }

    public final void postProduce() {
        if (this.produceStatus != null) {
            return;
        }
        this.produceStatus = ProduceStatus.POSTED;
        this.eventloop.post(() -> {
            this.produceStatus = null;
            tryProduce();
        });
    }

    protected void onProduce(StreamDataAcceptor<T> streamDataAcceptor) {
        postProduce();
    }

    @Override // io.datakernel.stream.StreamSupplier
    public final void resume(StreamDataAcceptor<T> streamDataAcceptor) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Start producing: {}", this);
        }
        if (!$assertionsDisabled && streamDataAcceptor == null) {
            throw new AssertionError();
        }
        if (this.currentDataAcceptor == streamDataAcceptor || this.endOfStream.isComplete()) {
            return;
        }
        this.currentDataAcceptor = streamDataAcceptor;
        this.lastDataAcceptor = streamDataAcceptor;
        onProduce(streamDataAcceptor);
    }

    protected boolean isClosed() {
        return this.endOfStream.isComplete();
    }

    protected void onSuspended() {
    }

    @Override // io.datakernel.stream.StreamSupplier
    public final void suspend() {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Suspend supplier: {}", this);
        }
        if (isReceiverReady()) {
            this.currentDataAcceptor = null;
            onSuspended();
        }
    }

    public Promise<Void> sendEndOfStream() {
        if (!$assertionsDisabled && this.consumer == null) {
            throw new AssertionError();
        }
        if (this.endOfStream.isComplete()) {
            return this.endOfStream;
        }
        this.currentDataAcceptor = null;
        this.lastDataAcceptor = Recyclable::tryRecycle;
        this.endOfStream.set((Object) null);
        this.eventloop.post(this::cleanup);
        return this.consumer.getAcknowledgement();
    }

    public final void close(@NotNull Throwable th) {
        if (this.endOfStream.isComplete()) {
            return;
        }
        if (!(th instanceof ExpectedException) && this.logger.isWarnEnabled()) {
            this.logger.warn("StreamSupplier {} closed with error {}", this, th.toString());
        }
        this.currentDataAcceptor = null;
        this.lastDataAcceptor = Recyclable::tryRecycle;
        this.endOfStream.setException(th);
        this.eventloop.post(this::cleanup);
        onError(th);
    }

    protected abstract void onError(Throwable th);

    protected void cleanup() {
    }

    @Override // io.datakernel.stream.StreamSupplier
    public final Promise<Void> getEndOfStream() {
        return this.endOfStream;
    }

    public Promise<Void> getAcknowledgement() {
        return this.acknowledgement;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Set<StreamCapability> addCapabilities(@Nullable StreamSupplier<?> streamSupplier, StreamCapability streamCapability, StreamCapability... streamCapabilityArr) {
        EnumSet of = EnumSet.of(streamCapability, streamCapabilityArr);
        if (streamSupplier != null) {
            of.addAll(streamSupplier.getCapabilities());
        }
        return of;
    }

    @Override // io.datakernel.stream.StreamSupplier
    public Set<StreamCapability> getCapabilities() {
        return Collections.emptySet();
    }

    static {
        $assertionsDisabled = !AbstractStreamSupplier.class.desiredAssertionStatus();
    }
}
