/*
 * Decompiled with CFR 0.152.
 */
package org.pipecraft.pipes.async.inter.distributed;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.pipecraft.infra.net.HostPort;
import org.pipecraft.pipes.async.AsyncPipe;
import org.pipecraft.pipes.async.AsyncPipeListener;
import org.pipecraft.pipes.async.inter.distributed.DistributedShufflerConfig;
import org.pipecraft.pipes.async.inter.distributed.NettyUtils;
import org.pipecraft.pipes.async.inter.distributed.ShuffleClient;
import org.pipecraft.pipes.async.inter.distributed.ShuffleServer;
import org.pipecraft.pipes.exceptions.IOPipeException;
import org.pipecraft.pipes.exceptions.PipeException;

public class DistributedShufflerPipe<T>
extends AsyncPipe<T> {
    private final AsyncPipe<T> input;
    private final DistributedShufflerConfig<T> config;
    private final Thread watcher;
    private final CountDownLatch doneLatch;
    private EventLoopGroup group;
    private ShuffleServer shuffleServer;

    public DistributedShufflerPipe(AsyncPipe<T> input, DistributedShufflerConfig<T> config) {
        this.input = input;
        this.config = config;
        this.doneLatch = new CountDownLatch(config.getWorkers().size() + 1);
        this.watcher = new Thread(() -> {
            try {
                this.doneLatch.await();
                this.notifyDone();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    @Override
    public void start() throws PipeException, InterruptedException {
        this.group = NettyUtils.newEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);
        this.shuffleServer = new ShuffleServer(this.config.getPort(), this.doneLatch, new ServerHandler(), this.group, bytes -> this.notifyNext(this.config.getCodec().decode((byte[])bytes)));
        this.shuffleServer.start();
        this.watcher.start();
        final ArrayList<ShuffleClient<T>> clients = new ArrayList<ShuffleClient<T>>(this.config.getWorkers().size());
        for (HostPort host : this.config.getWorkers()) {
            ShuffleClient<T> client = new ShuffleClient<T>(host.getHost(), host.getPort(), this.config.getCodec(), new ClientHandler(), this.group);
            clients.add(client);
        }
        this.input.setListener(new AsyncPipeListener<T>(){
            final Function<T, Integer> shardFunction;
            {
                this.shardFunction = DistributedShufflerPipe.this.config.getShardFunc();
            }

            @Override
            public void next(T item) throws InterruptedException, IOPipeException {
                try {
                    int shardId = this.shardFunction.apply(item);
                    ((ShuffleClient)clients.get(shardId)).send(item);
                }
                catch (IOException e) {
                    throw new IOPipeException(e);
                }
            }

            @Override
            public void done() throws InterruptedException {
                try {
                    for (ShuffleClient client : clients) {
                        client.done();
                    }
                }
                catch (IOException e) {
                    DistributedShufflerPipe.this.notifyError(new IOPipeException(e));
                    return;
                }
                DistributedShufflerPipe.this.doneLatch.countDown();
            }

            @Override
            public void error(PipeException e) throws InterruptedException {
                DistributedShufflerPipe.this.shuffleServer.close();
                DistributedShufflerPipe.this.notifyError(e);
            }
        });
        this.input.start();
    }

    @Override
    public void close() throws IOException {
        super.close();
        this.input.close();
        if (!this.group.isShuttingDown()) {
            this.group.shutdownGracefully();
        }
        this.shuffleServer.close();
        try {
            this.watcher.interrupt();
            this.watcher.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public float getProgress() {
        return this.input.getProgress();
    }

    public static int getWorkerShardId(List<HostPort> workers, int workerIndex) {
        HostPort worker = workers.get(workerIndex);
        ArrayList<HostPort> workersCopy = new ArrayList<HostPort>(workers);
        DistributedShufflerPipe.canonicalWorkerSort(workersCopy);
        return workersCopy.indexOf(worker);
    }

    private static void canonicalWorkerSort(List<HostPort> workers) {
        Collections.sort(workers);
    }

    @ChannelHandler.Sharable
    private class ClientHandler
    extends ChannelInboundHandlerAdapter {
        private ClientHandler() {
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            DistributedShufflerPipe.this.notifyError(new IOPipeException(cause));
            ctx.close();
        }
    }

    @ChannelHandler.Sharable
    private class ServerHandler
    extends ChannelInboundHandlerAdapter {
        private ServerHandler() {
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            DistributedShufflerPipe.this.notifyError(new IOPipeException(cause));
            ctx.close();
        }
    }
}

