/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.neo4j.helpers.Pair;
import org.neo4j.kernel.ha.ChunkingChannelBuffer;
import org.neo4j.kernel.ha.CommunicationProtocol;
import org.neo4j.kernel.ha.Master;
import org.neo4j.kernel.ha.MasterImpl;
import org.neo4j.kernel.ha.Response;
import org.neo4j.kernel.ha.SlaveContext;
import org.neo4j.kernel.impl.util.StringLogger;

public class MasterServer
extends CommunicationProtocol
implements ChannelPipelineFactory {
    private static final int DEAD_CONNECTIONS_CHECK_INTERVAL = 3;
    private static final int MAX_NUMBER_OF_CONCURRENT_TRANSACTIONS = 200;
    private final ChannelFactory channelFactory;
    private final ServerBootstrap bootstrap;
    private final Master realMaster;
    private final ChannelGroup channelGroup;
    private final ScheduledExecutorService deadConnectionsPoller;
    private final Map<Channel, SlaveContext> connectedSlaveChannels = new HashMap<Channel, SlaveContext>();
    private final Map<Channel, Pair<ChannelBuffer, ByteBuffer>> channelBuffers = new HashMap<Channel, Pair<ChannelBuffer, ByteBuffer>>();
    private final ExecutorService executor;
    private final StringLogger msgLog;
    private final Map<Channel, PartialRequest> partialRequests = Collections.synchronizedMap(new HashMap());

    public MasterServer(Master realMaster, int port, String storeDir) {
        Channel channel;
        this.realMaster = realMaster;
        this.msgLog = StringLogger.getLogger((String)(storeDir + "/messages.log"));
        this.executor = Executors.newCachedThreadPool();
        this.channelFactory = new NioServerSocketChannelFactory((Executor)this.executor, (Executor)this.executor, 200);
        this.bootstrap = new ServerBootstrap(this.channelFactory);
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        try {
            channel = this.bootstrap.bind((SocketAddress)new InetSocketAddress(port));
        }
        catch (ChannelException e) {
            this.msgLog.logMessage("Failed to bind master server to port " + port, (Throwable)e);
            this.executor.shutdown();
            throw e;
        }
        this.channelGroup = new DefaultChannelGroup();
        this.channelGroup.add((Object)channel);
        this.msgLog.logMessage("Master server bound to " + port, true);
        this.deadConnectionsPoller = new ScheduledThreadPoolExecutor(1);
        this.deadConnectionsPoller.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                MasterServer.this.checkForDeadChannels();
            }
        }, 3L, 3L, TimeUnit.SECONDS);
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        MasterServer.addLengthFieldPipes(pipeline);
        pipeline.addLast("serverHandler", (ChannelHandler)new ServerHandler());
        return pipeline;
    }

    private void handleRequest(Master realMaster, ChannelBuffer buffer, final Channel channel) throws IOException {
        byte continuation = buffer.readByte();
        if (continuation == 1) {
            PartialRequest partialRequest = this.partialRequests.get(channel);
            if (partialRequest == null) {
                CommunicationProtocol.RequestType type = CommunicationProtocol.RequestType.values()[buffer.readByte()];
                SlaveContext context = null;
                if (type.includesSlaveContext()) {
                    context = MasterServer.readSlaveContext(buffer);
                }
                Pair<ChannelBuffer, ByteBuffer> targetBuffers = this.mapSlave(channel, context);
                partialRequest = new PartialRequest(type, context, targetBuffers);
                this.partialRequests.put(channel, partialRequest);
            }
            partialRequest.add(buffer);
        } else {
            Pair<ChannelBuffer, ByteBuffer> targetBuffers;
            PartialRequest partialRequest = this.partialRequests.remove(channel);
            CommunicationProtocol.RequestType type = null;
            SlaveContext context = null;
            ChannelBuffer bufferToReadFrom = null;
            ChannelBuffer bufferToWriteTo = null;
            if (partialRequest == null) {
                type = CommunicationProtocol.RequestType.values()[buffer.readByte()];
                if (type.includesSlaveContext()) {
                    context = MasterServer.readSlaveContext(buffer);
                }
                targetBuffers = this.mapSlave(channel, context);
                bufferToReadFrom = buffer;
                bufferToWriteTo = (ChannelBuffer)targetBuffers.first();
            } else {
                type = partialRequest.type;
                context = partialRequest.slaveContext;
                targetBuffers = partialRequest.buffers;
                partialRequest.add(buffer);
                bufferToReadFrom = (ChannelBuffer)targetBuffers.first();
                bufferToWriteTo = ChannelBuffers.dynamicBuffer();
            }
            bufferToWriteTo.clear();
            final ChunkingChannelBuffer chunkingBuffer = new ChunkingChannelBuffer(bufferToWriteTo, channel, 0x1000000);
            final Response response = type.caller.callMaster(realMaster, context, bufferToReadFrom, chunkingBuffer);
            final ByteBuffer targetByteBuffer = (ByteBuffer)targetBuffers.other();
            final CommunicationProtocol.RequestType finalType = type;
            final SlaveContext finalContext = context;
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        finalType.serializer.write(response.response(), chunkingBuffer);
                        if (finalType.includesSlaveContext()) {
                            CommunicationProtocol.writeTransactionStreams(response.transactions(), chunkingBuffer, targetByteBuffer);
                        }
                        chunkingBuffer.done();
                        if (finalType == CommunicationProtocol.RequestType.FINISH || finalType == CommunicationProtocol.RequestType.PULL_UPDATES) {
                            MasterServer.this.unmapSlave(channel, finalContext);
                        }
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                        throw new RuntimeException(e);
                    }
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Pair<ChannelBuffer, ByteBuffer> mapSlave(Channel channel, SlaveContext slave) {
        this.channelGroup.add((Object)channel);
        Pair buffer = null;
        Map<Channel, SlaveContext> map = this.connectedSlaveChannels;
        synchronized (map) {
            if (slave != null) {
                this.connectedSlaveChannels.put(channel, slave);
            }
            if ((buffer = this.channelBuffers.get(channel)) == null) {
                buffer = Pair.of((Object)ChannelBuffers.dynamicBuffer(), (Object)ByteBuffer.allocateDirect(0x100000));
                this.channelBuffers.put(channel, (Pair<ChannelBuffer, ByteBuffer>)buffer);
            }
            ((ChannelBuffer)buffer.first()).clear();
        }
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void unmapSlave(Channel channel, SlaveContext slave) {
        Map<Channel, SlaveContext> map = this.connectedSlaveChannels;
        synchronized (map) {
            this.connectedSlaveChannels.remove(channel);
        }
    }

    public void shutdown() {
        this.deadConnectionsPoller.shutdown();
        this.msgLog.logMessage("Master server shutdown, closing all channels", true);
        this.channelGroup.close().awaitUninterruptibly();
        this.executor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForDeadChannels() {
        Map<Channel, SlaveContext> map = this.connectedSlaveChannels;
        synchronized (map) {
            ArrayList<Channel> channelsToRemove = new ArrayList<Channel>();
            for (Map.Entry<Channel, SlaveContext> entry : this.connectedSlaveChannels.entrySet()) {
                if (this.channelIsOpen(entry.getKey())) continue;
                System.out.println("Found dead channel " + entry.getKey() + ", " + entry.getValue());
                this.realMaster.finishTransaction(entry.getValue());
                System.out.println("Removed " + entry.getKey() + ", " + entry.getValue());
                channelsToRemove.add(entry.getKey());
            }
            for (Channel channel : channelsToRemove) {
                this.connectedSlaveChannels.remove(channel);
                this.channelBuffers.remove(channel);
                this.partialRequests.remove(channel);
            }
        }
    }

    private boolean channelIsOpen(Channel channel) {
        return channel.isConnected() && channel.isOpen();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Integer, Collection<SlaveContext>> getSlaveInformation() {
        HashSet<Integer> machineIds = new HashSet<Integer>();
        Map<Channel, SlaveContext> map = this.connectedSlaveChannels;
        synchronized (map) {
            for (SlaveContext context : this.connectedSlaveChannels.values()) {
                machineIds.add(context.machineId());
            }
        }
        Map<Integer, Collection<SlaveContext>> ongoingTransactions = ((MasterImpl)this.realMaster).getOngoingTransactions();
        for (Integer machineId : machineIds) {
            if (ongoingTransactions.containsKey(machineId)) continue;
            ongoingTransactions.put(machineId, Collections.emptyList());
        }
        return new TreeMap<Integer, Collection<SlaveContext>>(ongoingTransactions);
    }

    static class PartialRequest {
        final SlaveContext slaveContext;
        final Pair<ChannelBuffer, ByteBuffer> buffers;
        final CommunicationProtocol.RequestType type;

        public PartialRequest(CommunicationProtocol.RequestType type, SlaveContext slaveContext, Pair<ChannelBuffer, ByteBuffer> buffers) {
            this.type = type;
            this.slaveContext = slaveContext;
            this.buffers = buffers;
        }

        public void add(ChannelBuffer buffer) {
            ((ChannelBuffer)this.buffers.first()).writeBytes(buffer);
        }
    }

    private class ServerHandler
    extends SimpleChannelHandler {
        private ServerHandler() {
        }

        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            try {
                ChannelBuffer message = (ChannelBuffer)event.getMessage();
                MasterServer.this.handleRequest(MasterServer.this.realMaster, message, event.getChannel());
            }
            catch (Exception e) {
                e.printStackTrace();
                throw e;
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            e.getCause().printStackTrace();
        }
    }
}

