package io.datakernel.stream.processor;

import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.HasInputs;
import io.datakernel.stream.HasOutput;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamStatus;
import io.datakernel.util.Preconditions;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;

/* loaded from: input_file:io/datakernel/stream/processor/StreamJoin.class */
public final class StreamJoin<K, L, R, V> implements HasOutput<V>, HasInputs {
    private final Comparator<K> keyComparator;
    private final Function<L, K> leftKeyFunction;
    private final Function<R, K> rightKeyFunction;
    private final Joiner<K, L, R, V> joiner;
    private final ArrayDeque<L> leftDeque = new ArrayDeque<>();
    private final ArrayDeque<R> rightDeque = new ArrayDeque<>();
    private final StreamJoin<K, L, R, V>.Input<L> left = new Input<>(this.leftDeque);
    private final StreamJoin<K, L, R, V>.Input<R> right = new Input<>(this.rightDeque);
    private final StreamJoin<K, L, R, V>.Output output = new Output();

    /* loaded from: input_file:io/datakernel/stream/processor/StreamJoin$InnerJoiner.class */
    public static abstract class InnerJoiner<K, L, R, V> implements Joiner<K, L, R, V> {
        @Override // io.datakernel.stream.processor.StreamJoin.Joiner
        public void onLeftJoin(K k, L l, StreamDataReceiver<V> streamDataReceiver) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/datakernel/stream/processor/StreamJoin$Input.class */
    public final class Input<I> extends AbstractStreamConsumer<I> implements StreamDataReceiver<I> {
        private final ArrayDeque<I> deque;

        public Input(ArrayDeque<I> arrayDeque) {
            this.deque = arrayDeque;
        }

        @Override // io.datakernel.stream.StreamDataReceiver
        public void onData(I i) {
            this.deque.add(i);
            StreamJoin.this.output.produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onStarted() {
            StreamJoin.this.output.produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onEndOfStream() {
            StreamJoin.this.output.produce();
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer
        protected void onError(Throwable th) {
            StreamJoin.this.output.closeWithError(th);
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamJoin$Joiner.class */
    public interface Joiner<K, L, R, V> {
        void onInnerJoin(K k, L l, R r, StreamDataReceiver<V> streamDataReceiver);

        void onLeftJoin(K k, L l, StreamDataReceiver<V> streamDataReceiver);
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamJoin$Output.class */
    protected final class Output extends AbstractStreamProducer<V> {
        protected Output() {
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
            StreamJoin.this.left.getProducer().suspend();
            StreamJoin.this.right.getProducer().suspend();
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onError(Throwable th) {
            StreamJoin.this.left.closeWithError(th);
            StreamJoin.this.right.closeWithError(th);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void produce() {
            if (isReceiverReady() && !StreamJoin.this.leftDeque.isEmpty() && !StreamJoin.this.rightDeque.isEmpty()) {
                Object peek = StreamJoin.this.leftDeque.peek();
                Object apply = StreamJoin.this.leftKeyFunction.apply(peek);
                Object peek2 = StreamJoin.this.rightDeque.peek();
                Object apply2 = StreamJoin.this.rightKeyFunction.apply(peek2);
                while (true) {
                    int compare = StreamJoin.this.keyComparator.compare(apply, apply2);
                    if (compare >= 0) {
                        if (compare <= 0) {
                            StreamJoin.this.joiner.onInnerJoin(apply, peek, peek2, getCurrentDataReceiver());
                            StreamJoin.this.leftDeque.poll();
                            if (StreamJoin.this.leftDeque.isEmpty() || !isReceiverReady()) {
                                break;
                            }
                            peek = StreamJoin.this.leftDeque.peek();
                            apply = StreamJoin.this.leftKeyFunction.apply(peek);
                        } else {
                            StreamJoin.this.rightDeque.poll();
                            if (StreamJoin.this.rightDeque.isEmpty()) {
                                break;
                            }
                            peek2 = StreamJoin.this.rightDeque.peek();
                            apply2 = StreamJoin.this.rightKeyFunction.apply(peek2);
                        }
                    } else {
                        StreamJoin.this.joiner.onLeftJoin(apply, peek, getCurrentDataReceiver());
                        StreamJoin.this.leftDeque.poll();
                        if (StreamJoin.this.leftDeque.isEmpty()) {
                            break;
                        }
                        peek = StreamJoin.this.leftDeque.peek();
                        apply = StreamJoin.this.leftKeyFunction.apply(peek);
                    }
                }
            }
            if (isReceiverReady()) {
                if (StreamJoin.this.left.getStatus() == StreamStatus.END_OF_STREAM && StreamJoin.this.right.getStatus() == StreamStatus.END_OF_STREAM) {
                    sendEndOfStream();
                } else {
                    StreamJoin.this.left.getProducer().produce(StreamJoin.this.left);
                    StreamJoin.this.right.getProducer().produce(StreamJoin.this.right);
                }
            }
        }
    }

    /* loaded from: input_file:io/datakernel/stream/processor/StreamJoin$ValueJoiner.class */
    public static abstract class ValueJoiner<K, L, R, V> implements Joiner<K, L, R, V> {
        public abstract V doInnerJoin(K k, L l, R r);

        public V doLeftJoin(K k, L l) {
            return null;
        }

        @Override // io.datakernel.stream.processor.StreamJoin.Joiner
        public final void onInnerJoin(K k, L l, R r, StreamDataReceiver<V> streamDataReceiver) {
            V doInnerJoin = doInnerJoin(k, l, r);
            if (doInnerJoin != null) {
                streamDataReceiver.onData(doInnerJoin);
            }
        }

        @Override // io.datakernel.stream.processor.StreamJoin.Joiner
        public final void onLeftJoin(K k, L l, StreamDataReceiver<V> streamDataReceiver) {
            V doLeftJoin = doLeftJoin(k, l);
            if (doLeftJoin != null) {
                streamDataReceiver.onData(doLeftJoin);
            }
        }
    }

    private StreamJoin(Comparator<K> comparator, Function<L, K> function, Function<R, K> function2, Joiner<K, L, R, V> joiner) {
        this.keyComparator = (Comparator) Preconditions.checkNotNull(comparator);
        this.joiner = (Joiner) Preconditions.checkNotNull(joiner);
        this.leftKeyFunction = (Function) Preconditions.checkNotNull(function);
        this.rightKeyFunction = (Function) Preconditions.checkNotNull(function2);
    }

    public static <K, L, R, V> StreamJoin<K, L, R, V> create(Comparator<K> comparator, Function<L, K> function, Function<R, K> function2, Joiner<K, L, R, V> joiner) {
        return new StreamJoin<>(comparator, function, function2, joiner);
    }

    public StreamConsumer<L> getLeft() {
        return this.left;
    }

    public StreamConsumer<R> getRight() {
        return this.right;
    }

    @Override // io.datakernel.stream.HasInputs
    public List<? extends StreamConsumer<?>> getInputs() {
        return Arrays.asList(this.left, this.right);
    }

    @Override // io.datakernel.stream.HasOutput
    public StreamProducer<V> getOutput() {
        return this.output;
    }
}
