package io.datakernel.stream;

import io.datakernel.annotation.Nullable;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.ExpectedException;
import io.datakernel.util.Preconditions;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/AbstractStreamConsumer.class */
public abstract class AbstractStreamConsumer<T> implements StreamConsumer<T> {
    private StreamProducer<T> producer;
    private Throwable exception;
    private Object tag;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    protected final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final long createTick = this.eventloop.getTick();
    private StreamStatus status = StreamStatus.OPEN;
    private final SettableStage<Void> endOfStream = SettableStage.create();

    @Override // io.datakernel.stream.StreamConsumer
    public final void setProducer(StreamProducer<T> streamProducer) {
        Preconditions.checkNotNull(streamProducer);
        Preconditions.checkState(this.producer == null);
        Preconditions.checkState(getCapabilities().contains(StreamCapability.LATE_BINDING) || this.eventloop.getTick() == this.createTick, StreamConsumer.LATE_BINDING_ERROR_MESSAGE, new Object[]{this});
        this.producer = streamProducer;
        onWired();
        streamProducer.getEndOfStream().thenRun(this::endOfStream).whenException(this::closeWithError);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onWired() {
        this.eventloop.post(this::onStarted);
    }

    protected void onStarted() {
    }

    public boolean isWired() {
        return this.producer != null;
    }

    @Nullable
    public final StreamProducer<T> getProducer() {
        return this.producer;
    }

    protected final void endOfStream() {
        if (this.status.isClosed()) {
            return;
        }
        this.status = StreamStatus.END_OF_STREAM;
        onEndOfStream();
        this.eventloop.post(this::cleanup);
        this.endOfStream.set((Object) null);
    }

    protected abstract void onEndOfStream();

    public final void closeWithError(Throwable th) {
        if (this.status.isClosed()) {
            return;
        }
        this.status = StreamStatus.CLOSED_WITH_ERROR;
        this.exception = th;
        if (!(th instanceof ExpectedException) && this.logger.isWarnEnabled()) {
            this.logger.warn("StreamConsumer {} closed with error {}", this, th.toString());
        }
        onError(th);
        this.eventloop.post(this::cleanup);
        this.endOfStream.setException(th);
    }

    protected abstract void onError(Throwable th);

    protected void cleanup() {
    }

    public final StreamStatus getStatus() {
        return this.status;
    }

    public final Throwable getException() {
        return this.exception;
    }

    @Override // io.datakernel.stream.StreamConsumer
    public final Stage<Void> getEndOfStream() {
        return this.endOfStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Set<StreamCapability> addCapabilities(@Nullable StreamConsumer<?> streamConsumer, StreamCapability streamCapability, StreamCapability... streamCapabilityArr) {
        EnumSet of = EnumSet.of(streamCapability, streamCapabilityArr);
        if (streamConsumer != null) {
            of.addAll(streamConsumer.getCapabilities());
        }
        return of;
    }

    @Override // io.datakernel.stream.StreamConsumer
    public Set<StreamCapability> getCapabilities() {
        return Collections.emptySet();
    }

    public final Object getTag() {
        return this.tag;
    }

    public final void setTag(Object obj) {
        this.tag = obj;
    }

    public String toString() {
        return this.tag != null ? this.tag.toString() : super.toString();
    }
}
