/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.sync.inter.join;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import org.pipecraft.infra.concurrent.FailableFunction;
import org.pipecraft.pipes.exceptions.OutOfOrderPipeException;
import org.pipecraft.pipes.exceptions.PipeException;
import org.pipecraft.pipes.sync.Pipe;
import org.pipecraft.pipes.sync.inter.join.JoinMode;
import org.pipecraft.pipes.sync.inter.join.JoinRecord;
import org.pipecraft.pipes.sync.source.EmptyPipe;

public class SortedJoinPipe<K, L, R>
implements Pipe<JoinRecord<K, L, R>> {
    private final Pipe<L> leftPipe;
    private final FailableFunction<L, K, PipeException> leftKeyExtractor;
    private final List<? extends Pipe<R>> rightPipes;
    private final FailableFunction<R, K, PipeException> rightKeyExtractor;
    private PriorityQueue<LabeledPipe<K, ?>> queue;
    private final Comparator<? super K> keyComparator;
    private final JoinMode joinMode;
    private final BitSet activePipes;
    private JoinRecord<K, L, R> next;
    private boolean earlyExit;

    public SortedJoinPipe(Pipe<L> leftPipe, FailableFunction<L, K, PipeException> leftKeyExtractor, List<? extends Pipe<R>> rightPipes, FailableFunction<R, K, PipeException> rightKeyExtractor, Comparator<? super K> keyComparator, JoinMode joinMode) {
        this.leftPipe = leftPipe;
        this.leftKeyExtractor = leftKeyExtractor;
        this.rightPipes = rightPipes;
        this.rightKeyExtractor = rightKeyExtractor;
        this.keyComparator = keyComparator;
        this.joinMode = joinMode;
        this.activePipes = new BitSet(1 + rightPipes.size());
    }

    public SortedJoinPipe(Pipe<L> leftPipe, FailableFunction<L, K, PipeException> leftKeyExtractor, Pipe<R> rightPipe, FailableFunction<R, K, PipeException> rightKeyExtractor, Comparator<? super K> keyComparator, JoinMode joinMode) {
        this(leftPipe, leftKeyExtractor, Collections.singletonList(rightPipe), rightKeyExtractor, keyComparator, joinMode);
    }

    public SortedJoinPipe(List<? extends Pipe<R>> rightPipes, FailableFunction<R, K, PipeException> rightKeyExtractor, Comparator<? super K> keyComparator) {
        this((Pipe<Object>)EmptyPipe.instance(), (FailableFunction<Object, ? super K, PipeException>)v -> null, (List<Pipe<R>>)rightPipes, (FailableFunction<R, ? super K, PipeException>)rightKeyExtractor, keyComparator, JoinMode.OUTER);
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.queue = new PriorityQueue(1 + this.rightPipes.size(), (lp1, lp2) -> this.keyComparator.compare(lp1.peekNextKey(), lp2.peekNextKey()));
        this.leftPipe.start();
        L nextL = this.leftPipe.peek();
        if (nextL != null) {
            this.activePipes.set(0);
            LabeledPipe<K, L> p = new LabeledPipe<K, L>(this.leftPipe, 0);
            p.setNext(nextL, this.leftKeyExtractor.apply(nextL));
            this.queue.add(p);
        }
        for (int i = 0; i < this.rightPipes.size(); ++i) {
            Pipe<R> rPipe = this.rightPipes.get(i);
            rPipe.start();
            R next = rPipe.peek();
            if (next == null) continue;
            int rightPipeId = i + 1;
            this.activePipes.set(rightPipeId);
            LabeledPipe<K, R> p = new LabeledPipe<K, R>(rPipe, rightPipeId);
            p.setNext(next, this.rightKeyExtractor.apply(next));
            this.queue.add(p);
        }
        this.prepareNext();
    }

    @Override
    public void close() throws IOException {
        IOException lastExc = null;
        try {
            this.leftPipe.close();
        }
        catch (IOException e) {
            lastExc = e;
        }
        for (Pipe<R> p : this.rightPipes) {
            try {
                p.close();
            }
            catch (IOException e) {
                lastExc = e;
            }
        }
        if (lastExc != null) {
            throw lastExc;
        }
    }

    @Override
    public JoinRecord<K, L, R> next() throws PipeException, InterruptedException {
        JoinRecord<K, L, R> toReturn = this.next;
        this.prepareNext();
        return toReturn;
    }

    @Override
    public JoinRecord<K, L, R> peek() {
        return this.next;
    }

    private void prepareNext() throws PipeException, InterruptedException {
        LabeledPipe<K, ?> labeledPipe = this.queue.peek();
        if (labeledPipe == null) {
            this.next = null;
            return;
        }
        Object prevKey = this.next == null ? null : (Object)this.next.getKey();
        K key = labeledPipe.peekNextKey();
        JoinRecord candidate = new JoinRecord(key);
        while (true) {
            if (labeledPipe != null && labeledPipe.peekNextKey().equals(key)) {
                if (labeledPipe.id == 0) {
                    candidate.addLeft(labeledPipe.peekNext());
                } else {
                    candidate.addRight(labeledPipe.id - 1, labeledPipe.peekNext());
                }
                labeledPipe.pipe.next();
                this.queue.poll();
                Object peeked = labeledPipe.pipe.peek();
                if (peeked != null) {
                    labeledPipe.setNext(peeked, labeledPipe.id == 0 ? this.leftKeyExtractor.apply(peeked) : this.rightKeyExtractor.apply(peeked));
                    this.queue.add(labeledPipe);
                } else {
                    this.activePipes.clear(labeledPipe.id);
                    if (this.joinMode.canEarlyExit(this.activePipes, this.rightPipes.size())) {
                        this.earlyExit = true;
                    }
                }
                labeledPipe = this.queue.peek();
                continue;
            }
            if (prevKey != null && this.keyComparator.compare(key, prevKey) <= 0) {
                throw new OutOfOrderPipeException("One or more of the streams isn't sorted: " + prevKey + " vs " + key);
            }
            if (this.joinMode.shouldOutput(candidate, this.rightPipes.size())) {
                this.next = candidate;
                break;
            }
            if (this.earlyExit || labeledPipe == null) {
                this.queue.clear();
                this.next = null;
                break;
            }
            prevKey = key;
            key = labeledPipe.peekNextKey();
            candidate = new JoinRecord(key);
        }
    }

    @Override
    public float getProgress() {
        return this.joinMode.resolveProgress(this.leftPipe, this.rightPipes);
    }

    private static class LabeledPipe<K, C> {
        private final Pipe<C> pipe;
        private final int id;
        private C next;
        private K nextKey;

        public LabeledPipe(Pipe<C> pipe, int id) {
            this.pipe = pipe;
            this.id = id;
        }

        public void setNext(Object next, K nextKey) {
            this.next = next;
            this.nextKey = nextKey;
        }

        public K peekNextKey() {
            return this.nextKey;
        }

        public C peekNext() {
            return this.next;
        }
    }
}

