package io.datakernel.stream;

import io.datakernel.async.Promise;
import io.datakernel.async.SettablePromise;
import io.datakernel.eventloop.Eventloop;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/datakernel/stream/StreamConsumerSwitcher.class */
public final class StreamConsumerSwitcher<T> extends AbstractStreamConsumer<T> implements StreamDataAcceptor<T> {
    private StreamConsumerSwitcher<T>.InternalSupplier currentInternalSupplier;
    private int pendingConsumers = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/datakernel/stream/StreamConsumerSwitcher$InternalSupplier.class */
    public class InternalSupplier implements StreamSupplier<T> {
        private final Eventloop eventloop;
        private final StreamConsumer<T> consumer;
        private final SettablePromise<Void> endOfStream = new SettablePromise<>();
        private StreamDataAcceptor<T> lastDataAcceptor;
        private boolean suspended;

        @Nullable
        private ArrayList<T> pendingItems;
        private boolean pendingEndOfStream;
        static final /* synthetic */ boolean $assertionsDisabled;

        public InternalSupplier(Eventloop eventloop, StreamConsumer<T> streamConsumer) {
            this.eventloop = eventloop;
            this.consumer = streamConsumer;
            StreamConsumerSwitcher.access$108(StreamConsumerSwitcher.this);
        }

        @Override // io.datakernel.stream.StreamSupplier
        public void setConsumer(StreamConsumer<T> streamConsumer) {
            if (!$assertionsDisabled && streamConsumer != this.consumer) {
                throw new AssertionError();
            }
            streamConsumer.getAcknowledgement().whenException(this::close).post().whenResult(r3 -> {
                if (StreamConsumerSwitcher.access$106(StreamConsumerSwitcher.this) == 0) {
                    StreamConsumerSwitcher.this.acknowledge();
                }
            });
        }

        @Override // io.datakernel.stream.StreamSupplier
        public void resume(StreamDataAcceptor<T> streamDataAcceptor) {
            StreamSupplier<T> supplier;
            this.lastDataAcceptor = streamDataAcceptor;
            this.suspended = false;
            if (this.pendingItems != null) {
                this.eventloop.post(() -> {
                    if (this.pendingItems.isEmpty()) {
                        return;
                    }
                    Iterator<T> it = this.pendingItems.iterator();
                    while (it.hasNext()) {
                        this.lastDataAcceptor.accept(it.next());
                    }
                    this.pendingItems = null;
                    if (this.pendingEndOfStream) {
                        this.endOfStream.trySet((Object) null);
                    }
                    if (StreamConsumerSwitcher.this.currentInternalSupplier == this) {
                        if (this.suspended) {
                            StreamConsumerSwitcher.this.getSupplier().suspend();
                        } else {
                            StreamConsumerSwitcher.this.getSupplier().resume(StreamConsumerSwitcher.this);
                        }
                    }
                });
            } else {
                if (StreamConsumerSwitcher.this.currentInternalSupplier != this || (supplier = StreamConsumerSwitcher.this.getSupplier()) == null) {
                    return;
                }
                supplier.resume(StreamConsumerSwitcher.this);
            }
        }

        @Override // io.datakernel.stream.StreamSupplier
        public void suspend() {
            this.suspended = true;
            if (StreamConsumerSwitcher.this.currentInternalSupplier == this) {
                StreamConsumerSwitcher.this.getSupplier().suspend();
            }
        }

        public void close(@NotNull Throwable th) {
            StreamConsumerSwitcher.this.close(th);
        }

        @Override // io.datakernel.stream.StreamSupplier
        public Promise<Void> getEndOfStream() {
            return this.endOfStream;
        }

        @Override // io.datakernel.stream.StreamSupplier
        public Set<StreamCapability> getCapabilities() {
            return StreamConsumerSwitcher.this.getSupplier().getCapabilities();
        }

        public void onData(T t) {
            if (this.lastDataAcceptor != null) {
                this.lastDataAcceptor.accept(t);
                return;
            }
            if (this.pendingItems == null) {
                this.pendingItems = new ArrayList<>();
                StreamConsumerSwitcher.this.getSupplier().suspend();
            }
            this.pendingItems.add(t);
        }

        public void sendError(Throwable th) {
            this.lastDataAcceptor = obj -> {
            };
            this.endOfStream.trySetException(th);
        }

        public void sendEndOfStream() {
            if (this.pendingItems == null) {
                this.endOfStream.trySet((Object) null);
            } else {
                this.pendingEndOfStream = true;
            }
        }

        static {
            $assertionsDisabled = !StreamConsumerSwitcher.class.desiredAssertionStatus();
        }
    }

    private StreamConsumerSwitcher() {
    }

    public static <T> StreamConsumerSwitcher<T> create() {
        return new StreamConsumerSwitcher<>();
    }

    public static <T> StreamConsumerSwitcher<T> create(StreamConsumer<T> streamConsumer) {
        StreamConsumerSwitcher<T> streamConsumerSwitcher = new StreamConsumerSwitcher<>();
        streamConsumerSwitcher.switchTo(streamConsumer);
        return streamConsumerSwitcher;
    }

    @Override // io.datakernel.stream.StreamDataAcceptor
    public final void accept(T t) {
        this.currentInternalSupplier.onData(t);
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected Promise<Void> onEndOfStream() {
        if (this.currentInternalSupplier != null) {
            this.currentInternalSupplier.sendEndOfStream();
        }
        return getAcknowledgement();
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    protected final void onError(Throwable th) {
        switchTo(StreamConsumer.idle());
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
    public Set<StreamCapability> getCapabilities() {
        return this.currentInternalSupplier == null ? Collections.emptySet() : ((InternalSupplier) this.currentInternalSupplier).consumer.getCapabilities();
    }

    public void switchTo(StreamConsumer<T> streamConsumer) {
        if (getSupplier() != null && getSupplier().getEndOfStream().isException()) {
            if (this.currentInternalSupplier != null) {
                this.currentInternalSupplier.sendError(getAcknowledgement().getException());
            }
            this.currentInternalSupplier = new InternalSupplier(this.eventloop, StreamConsumer.idle());
            StreamSupplier.closingWithError(getAcknowledgement().getException()).streamTo(streamConsumer);
            return;
        }
        if (getSupplier() == null || !getSupplier().getEndOfStream().isComplete()) {
            if (this.currentInternalSupplier != null) {
                this.currentInternalSupplier.sendEndOfStream();
            }
            this.currentInternalSupplier = new InternalSupplier(this.eventloop, streamConsumer);
            this.currentInternalSupplier.streamTo(streamConsumer);
            return;
        }
        if (this.currentInternalSupplier != null) {
            this.currentInternalSupplier.sendEndOfStream();
        }
        this.currentInternalSupplier = new InternalSupplier(this.eventloop, StreamConsumer.idle());
        StreamSupplier.of(new Object[0]).streamTo(streamConsumer);
    }

    static /* synthetic */ int access$108(StreamConsumerSwitcher streamConsumerSwitcher) {
        int i = streamConsumerSwitcher.pendingConsumers;
        streamConsumerSwitcher.pendingConsumers = i + 1;
        return i;
    }

    static /* synthetic */ int access$106(StreamConsumerSwitcher streamConsumerSwitcher) {
        int i = streamConsumerSwitcher.pendingConsumers - 1;
        streamConsumerSwitcher.pendingConsumers = i;
        return i;
    }
}
