/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.commons.collect;

import cn.ponfee.commons.exception.ServerException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class StreamForker<T> {
    private final Stream<T> stream;
    private final Map<Object, Function<Stream<T>, ?>> forks = new HashMap();

    public StreamForker(Stream<T> stream) {
        this.stream = stream;
    }

    public StreamForker<T> fork(Object key, Function<Stream<T>, ?> f) {
        this.forks.put(key, f);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer<T> consumer = this.build();
        try {
            ((Stream)this.stream.sequential()).forEach(consumer);
        }
        finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer<T> build() {
        ArrayList queues = new ArrayList();
        Map actions = this.forks.entrySet().stream().reduce(new HashMap(), (map, e) -> {
            map.put(e.getKey(), this.getOperationResult(queues, (Function)e.getValue()));
            return map;
        }, (m1, m2) -> {
            m1.putAll(m2);
            return m1;
        });
        return new ForkingStreamConsumer(queues, actions);
    }

    private Future<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f) {
        LinkedBlockingQueue queue = new LinkedBlockingQueue();
        queues.add(queue);
        Stream source = StreamSupport.stream(new BlockingQueueSpliterator(queue), false);
        return CompletableFuture.supplyAsync(() -> f.apply(source));
    }

    private static class BlockingQueueSpliterator<T>
    implements Spliterator<T> {
        private final BlockingQueue<T> q;

        BlockingQueueSpliterator(BlockingQueue<T> q) {
            this.q = q;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            T t;
            try {
                t = this.q.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ServerException(e);
            }
            if (t != ForkingStreamConsumer.END_OF_STREAM) {
                action.accept(t);
                return true;
            }
            return false;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return 0L;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }

    private static class ForkingStreamConsumer<T>
    implements Consumer<T>,
    Results {
        private static final Object END_OF_STREAM = new Object();
        private final List<BlockingQueue<T>> queues;
        private final Map<Object, Future<?>> actions;

        ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions) {
            this.queues = queues;
            this.actions = actions;
        }

        @Override
        public void accept(T t) {
            this.queues.forEach(q -> q.add(t));
        }

        @Override
        public <R> R get(Object key) {
            try {
                return (R)this.actions.get(key).get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        void finish() {
            this.accept(END_OF_STREAM);
        }
    }

    public static interface Results {
        public <R> R get(Object var1);
    }
}

