package io.datakernel.stream;

import io.datakernel.async.Promise;
import io.datakernel.async.SettablePromise;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.UncheckedException;
import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.Set;
import java.util.function.Consumer;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/datakernel/stream/StreamConsumers.class */
public final class StreamConsumers {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$ClosingWithErrorImpl.class */
    public static final class ClosingWithErrorImpl<T> implements StreamConsumer<T> {
        private final Throwable exception;
        private final SettablePromise<Void> acknowledgement = new SettablePromise<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        public ClosingWithErrorImpl(Throwable th) {
            this.exception = th;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void setSupplier(StreamSupplier<T> streamSupplier) {
            Eventloop.getCurrentEventloop().post(() -> {
                this.acknowledgement.trySetException(this.exception);
            });
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Promise<Void> getAcknowledgement() {
            return this.acknowledgement;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }

        public void close(@NotNull Throwable th) {
            this.acknowledgement.trySetException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$Idle.class */
    public static final class Idle<T> implements StreamConsumer<T> {
        private final SettablePromise<Void> acknowledgement = new SettablePromise<>();

        @Override // io.datakernel.stream.StreamConsumer
        public void setSupplier(StreamSupplier<T> streamSupplier) {
            Promise<Void> endOfStream = streamSupplier.getEndOfStream();
            SettablePromise<Void> settablePromise = this.acknowledgement;
            settablePromise.getClass();
            endOfStream.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Promise<Void> getAcknowledgement() {
            return this.acknowledgement;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }

        public void close(@NotNull Throwable th) {
            this.acknowledgement.trySetException(th);
        }
    }

    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$OfChannelConsumerImpl.class */
    static final class OfChannelConsumerImpl<T> extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
        private final ChannelConsumer<T> consumer;
        private final ArrayDeque<T> deque = new ArrayDeque<>();
        private final SettablePromise<Void> result = new SettablePromise<>();
        private boolean writing;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OfChannelConsumerImpl(ChannelConsumer<T> channelConsumer) {
            this.consumer = channelConsumer;
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            produce();
        }

        @Override // io.datakernel.stream.StreamDataAcceptor
        public void accept(T t) {
            if (!$assertionsDisabled && t == null) {
                throw new AssertionError();
            }
            if (!this.deque.isEmpty()) {
                getSupplier().suspend();
            }
            this.deque.add(t);
            produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            produce();
            return this.result;
        }

        private void produce() {
            if (this.writing) {
                return;
            }
            while (!this.deque.isEmpty()) {
                Promise accept = this.consumer.accept(this.deque.poll());
                if (!accept.isResult()) {
                    this.writing = true;
                    accept.whenComplete((r4, th) -> {
                        this.writing = false;
                        if (th == null) {
                            produce();
                        } else {
                            close(th);
                        }
                    });
                    return;
                }
            }
            if (!getEndOfStream().isResult()) {
                getSupplier().resume(this);
                return;
            }
            Promise accept2 = this.consumer.accept((Object) null);
            SettablePromise<Void> settablePromise = this.result;
            settablePromise.getClass();
            accept2.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            this.deque.clear();
            this.consumer.close(th);
            this.result.trySetException(th);
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }

        static {
            $assertionsDisabled = !StreamConsumers.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$OfConsumerImpl.class */
    static final class OfConsumerImpl<T> extends AbstractStreamConsumer<T> {
        private final Consumer<T> consumer;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: package-private */
        public OfConsumerImpl(Consumer<T> consumer) {
            this.consumer = consumer;
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            if (!$assertionsDisabled && getSupplier() == null) {
                throw new AssertionError();
            }
            getSupplier().resume(obj -> {
                try {
                    this.consumer.accept(obj);
                } catch (UncheckedException e) {
                    close(e.getCause());
                }
            });
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected Promise<Void> onEndOfStream() {
            try {
                return Promise.complete();
            } catch (UncheckedException e) {
                return Promise.ofException(e.getCause());
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }

        static {
            $assertionsDisabled = !StreamConsumers.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:io/datakernel/stream/StreamConsumers$Skip.class */
    static final class Skip<T> implements StreamConsumer<T> {
        private final SettablePromise<Void> acknowledgement = new SettablePromise<>();

        @Override // io.datakernel.stream.StreamConsumer
        public void setSupplier(StreamSupplier<T> streamSupplier) {
            Promise<Void> endOfStream = streamSupplier.getEndOfStream();
            SettablePromise<Void> settablePromise = this.acknowledgement;
            settablePromise.getClass();
            endOfStream.whenComplete((v1, v2) -> {
                r1.trySet(v1, v2);
            });
            streamSupplier.resume(obj -> {
            });
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Promise<Void> getAcknowledgement() {
            return this.acknowledgement;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public Set<StreamCapability> getCapabilities() {
            return EnumSet.of(StreamCapability.LATE_BINDING);
        }

        public void close(@NotNull Throwable th) {
            this.acknowledgement.trySetException(th);
        }
    }

    private StreamConsumers() {
    }
}
