/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.queue.BlockingReadHandler;
import org.neo4j.helpers.Triplet;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.ha.CommunicationProtocol;
import org.neo4j.kernel.ha.HaCommunicationException;
import org.neo4j.kernel.ha.IdAllocation;
import org.neo4j.kernel.ha.LockResult;
import org.neo4j.kernel.ha.Master;
import org.neo4j.kernel.ha.ResourcePool;
import org.neo4j.kernel.ha.Response;
import org.neo4j.kernel.ha.SlaveContext;
import org.neo4j.kernel.ha.TransactionStream;
import org.neo4j.kernel.ha.TransactionStreams;
import org.neo4j.kernel.ha.zookeeper.Machine;
import org.neo4j.kernel.impl.util.StringLogger;

public class MasterClient
extends CommunicationProtocol
implements Master,
ChannelPipelineFactory {
    public static final int MAX_NUMBER_OF_CONCURRENT_REQUESTS_PER_CLIENT = 20;
    public static final int READ_RESPONSE_TIMEOUT_SECONDS = 20;
    private static final int MAX_NUMBER_OF_UNUSED_CHANNELS = 5;
    private final ClientBootstrap bootstrap;
    private final SocketAddress address;
    private final StringLogger msgLog;
    private final ExecutorService executor;
    private final ResourcePool<Triplet<Channel, ChannelBuffer, ByteBuffer>> channelPool = new ResourcePool<Triplet<Channel, ChannelBuffer, ByteBuffer>>(20, 5){

        @Override
        protected Triplet<Channel, ChannelBuffer, ByteBuffer> create() {
            ChannelFuture channelFuture = MasterClient.this.bootstrap.connect(MasterClient.this.address);
            channelFuture.awaitUninterruptibly(5L, TimeUnit.SECONDS);
            Triplet channel = null;
            if (channelFuture.isSuccess()) {
                channel = Triplet.of((Object)channelFuture.getChannel(), (Object)ChannelBuffers.dynamicBuffer(), (Object)ByteBuffer.allocateDirect(0x100000));
                MasterClient.this.msgLog.logMessage("Opened a new channel to " + MasterClient.this.address, true);
                return channel;
            }
            String msg = "MasterClient could not connect to " + MasterClient.this.address;
            MasterClient.this.msgLog.logMessage(msg, true);
            throw new HaCommunicationException(msg);
        }

        @Override
        protected boolean isAlive(Triplet<Channel, ChannelBuffer, ByteBuffer> resource) {
            return ((Channel)resource.first()).isConnected();
        }

        @Override
        protected void dispose(Triplet<Channel, ChannelBuffer, ByteBuffer> resource) {
            Channel channel = (Channel)resource.first();
            if (channel.isConnected()) {
                channel.close();
            }
        }
    };

    public MasterClient(String hostNameOrIp, int port, String storeDir) {
        this.address = new InetSocketAddress(hostNameOrIp, port);
        this.executor = Executors.newCachedThreadPool();
        this.bootstrap = new ClientBootstrap((ChannelFactory)new NioClientSocketChannelFactory((Executor)this.executor, (Executor)this.executor));
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        this.msgLog = StringLogger.getLogger((String)(storeDir + "/messages.log"));
        this.msgLog.logMessage("Client connected to " + hostNameOrIp + ":" + port, true);
    }

    public MasterClient(Machine machine, String storeDir) {
        this((String)machine.getServer().first(), (Integer)machine.getServer().other(), storeDir);
    }

    private <T> Response<T> sendRequest(CommunicationProtocol.RequestType type, SlaveContext slaveContext, CommunicationProtocol.Serializer serializer, CommunicationProtocol.Deserializer<T> deserializer) {
        Triplet<Channel, ChannelBuffer, ByteBuffer> channelContext = null;
        try {
            channelContext = this.getChannel();
            Channel channel = (Channel)channelContext.first();
            ChannelBuffer buffer = (ChannelBuffer)channelContext.other();
            buffer.clear();
            buffer.writeByte(type.ordinal());
            if (type.includesSlaveContext()) {
                MasterClient.writeSlaveContext(buffer, slaveContext);
            }
            serializer.write(buffer, (ByteBuffer)channelContext.third());
            channel.write((Object)buffer);
            BlockingReadHandler reader = (BlockingReadHandler)channel.getPipeline().get("blockingHandler");
            ChannelBuffer message = (ChannelBuffer)reader.read(20L, TimeUnit.SECONDS);
            if (message == null) {
                this.channelPool.dispose(channelContext);
                throw new HaCommunicationException("Channel has been closed");
            }
            T response = deserializer.read(message);
            TransactionStreams txStreams = type.includesSlaveContext() ? MasterClient.readTransactionStreams(message) : TransactionStreams.EMPTY;
            return new Response<T>(response, txStreams);
        }
        catch (ClosedChannelException e) {
            this.channelPool.dispose(channelContext);
            throw new HaCommunicationException(e);
        }
        catch (IOException e) {
            throw new HaCommunicationException(e);
        }
        catch (InterruptedException e) {
            throw new HaCommunicationException(e);
        }
        catch (Exception e) {
            throw new HaCommunicationException(e);
        }
    }

    private Triplet<Channel, ChannelBuffer, ByteBuffer> getChannel() throws Exception {
        return this.channelPool.acquire();
    }

    private void releaseChannel() {
        this.channelPool.release();
    }

    @Override
    public IdAllocation allocateIds(final IdType idType) {
        return this.sendRequest(CommunicationProtocol.RequestType.ALLOCATE_IDS, null, new CommunicationProtocol.Serializer(){

            @Override
            public void write(ChannelBuffer buffer, ByteBuffer readBuffer) throws IOException {
                buffer.writeByte(idType.ordinal());
            }
        }, new CommunicationProtocol.Deserializer<IdAllocation>(){

            @Override
            public IdAllocation read(ChannelBuffer buffer) throws IOException {
                return CommunicationProtocol.readIdAllocation(buffer);
            }
        }).response();
    }

    @Override
    public Response<Integer> createRelationshipType(SlaveContext context, final String name) {
        return this.sendRequest(CommunicationProtocol.RequestType.CREATE_RELATIONSHIP_TYPE, context, new CommunicationProtocol.Serializer(){

            @Override
            public void write(ChannelBuffer buffer, ByteBuffer readBuffer) throws IOException {
                CommunicationProtocol.writeString(buffer, name);
            }
        }, new CommunicationProtocol.Deserializer<Integer>(){

            @Override
            public Integer read(ChannelBuffer buffer) throws IOException {
                return buffer.readInt();
            }
        });
    }

    @Override
    public Response<LockResult> acquireNodeWriteLock(SlaveContext context, long ... nodes) {
        return this.sendRequest(CommunicationProtocol.RequestType.ACQUIRE_NODE_WRITE_LOCK, context, new CommunicationProtocol.AcquireLockSerializer(nodes), LOCK_RESULT_DESERIALIZER);
    }

    @Override
    public Response<LockResult> acquireNodeReadLock(SlaveContext context, long ... nodes) {
        return this.sendRequest(CommunicationProtocol.RequestType.ACQUIRE_NODE_READ_LOCK, context, new CommunicationProtocol.AcquireLockSerializer(nodes), LOCK_RESULT_DESERIALIZER);
    }

    @Override
    public Response<LockResult> acquireRelationshipWriteLock(SlaveContext context, long ... relationships) {
        return this.sendRequest(CommunicationProtocol.RequestType.ACQUIRE_RELATIONSHIP_WRITE_LOCK, context, new CommunicationProtocol.AcquireLockSerializer(relationships), LOCK_RESULT_DESERIALIZER);
    }

    @Override
    public Response<LockResult> acquireRelationshipReadLock(SlaveContext context, long ... relationships) {
        return this.sendRequest(CommunicationProtocol.RequestType.ACQUIRE_RELATIONSHIP_READ_LOCK, context, new CommunicationProtocol.AcquireLockSerializer(relationships), LOCK_RESULT_DESERIALIZER);
    }

    @Override
    public Response<Long> commitSingleResourceTransaction(SlaveContext context, final String resource, final TransactionStream transactionStream) {
        return this.sendRequest(CommunicationProtocol.RequestType.COMMIT, context, new CommunicationProtocol.Serializer(){

            @Override
            public void write(ChannelBuffer buffer, ByteBuffer readBuffer) throws IOException {
                CommunicationProtocol.writeString(buffer, resource);
                CommunicationProtocol.writeTransactionStream(buffer, readBuffer, transactionStream);
            }
        }, new CommunicationProtocol.Deserializer<Long>(){

            @Override
            public Long read(ChannelBuffer buffer) throws IOException {
                return buffer.readLong();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Response<Void> finishTransaction(SlaveContext context) {
        try {
            Response<Void> response = this.sendRequest(CommunicationProtocol.RequestType.FINISH, context, new CommunicationProtocol.Serializer(){

                @Override
                public void write(ChannelBuffer buffer, ByteBuffer readBuffer) throws IOException {
                }
            }, VOID_DESERIALIZER);
            return response;
        }
        finally {
            this.releaseChannel();
        }
    }

    public void rollbackOngoingTransactions(SlaveContext context) {
        throw new UnsupportedOperationException("Should never be called from the client side");
    }

    @Override
    public Response<Void> pullUpdates(SlaveContext context) {
        return this.sendRequest(CommunicationProtocol.RequestType.PULL_UPDATES, context, EMPTY_SERIALIZER, VOID_DESERIALIZER);
    }

    @Override
    public int getMasterIdForCommittedTx(final long txId) {
        return (Integer)this.sendRequest(CommunicationProtocol.RequestType.GET_MASTER_ID_FOR_TX, null, new CommunicationProtocol.Serializer(){

            @Override
            public void write(ChannelBuffer buffer, ByteBuffer readBuffer) throws IOException {
                buffer.writeLong(txId);
            }
        }, INTEGER_DESERIALIZER).response();
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x1000000, 0, 4, 0, 4));
        pipeline.addLast("frameEncoder", (ChannelHandler)new LengthFieldPrepender(4));
        BlockingReadHandler reader = new BlockingReadHandler();
        pipeline.addLast("blockingHandler", (ChannelHandler)reader);
        return pipeline;
    }

    public void shutdown() {
        this.msgLog.logMessage("MasterClient shutdown", true);
        this.channelPool.close(true);
        this.executor.shutdownNow();
    }
}

