/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha.comm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.queue.BlockingReadHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.ha.HaCommunicationException;
import org.neo4j.kernel.ha.IdAllocation;
import org.neo4j.kernel.ha.LockResult;
import org.neo4j.kernel.ha.Master;
import org.neo4j.kernel.ha.Response;
import org.neo4j.kernel.ha.SlaveContext;
import org.neo4j.kernel.ha.TransactionStream;
import org.neo4j.kernel.ha.comm.CommunicationUtils;
import org.neo4j.kernel.ha.comm.DataWriter;
import org.neo4j.kernel.ha.comm.RequestType;
import org.neo4j.kernel.ha.comm.ResponseDecoder;
import org.neo4j.kernel.ha.comm.TransactionApplier;
import org.neo4j.kernel.ha.zookeeper.Machine;
import org.neo4j.kernel.impl.util.StringLogger;

public class MasterClient
implements Master,
ChannelPipelineFactory {
    private final Deque<Channel> unusedChannels = new LinkedList<Channel>();
    private final Map<Thread, Channel> channels = new HashMap<Thread, Channel>();
    private final ClientBootstrap bootstrap;
    private final String hostNameOrIp;
    private final int port;
    private final TransactionApplier txApplier = new TransactionApplier();
    private final StringLogger msgLog;

    public MasterClient(String hostNameOrIp, int port, String storeDir) {
        this.hostNameOrIp = hostNameOrIp;
        this.port = port;
        ExecutorService executor = Executors.newCachedThreadPool();
        this.bootstrap = new ClientBootstrap((ChannelFactory)new NioClientSocketChannelFactory((Executor)executor, (Executor)executor));
        this.bootstrap.setPipelineFactory((ChannelPipelineFactory)this);
        this.msgLog = StringLogger.getLogger((String)(storeDir + "/messages.log"));
        this.msgLog.logMessage("Client connected to " + hostNameOrIp + ":" + port, true);
    }

    public MasterClient(Machine machine, String storeDir) {
        this((String)machine.getServer().first(), (Integer)machine.getServer().other(), storeDir);
    }

    private <T> Response<T> sendRequest(RequestType type, SlaveContext slaveContext, DataWriter serializer) {
        try {
            ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
            buffer.writeByte(type.ordinal());
            if (type.includesSlaveContext) {
                CommunicationUtils.writeSlaveContext(slaveContext, buffer);
            }
            serializer.write(buffer);
            Channel channel = this.getChannel();
            channel.write((Object)buffer);
            BlockingReadHandler reader = (BlockingReadHandler)channel.getPipeline().get("blockingHandler");
            ChannelBuffer message = (ChannelBuffer)reader.read(20L, TimeUnit.SECONDS);
            if (message == null) {
                throw new HaCommunicationException("Channel has been closed");
            }
            Object response = type.readResponse(message);
            return Response.wrapResponseObjectOnly(response);
        }
        catch (IOException e) {
            throw new HaCommunicationException(e);
        }
        catch (InterruptedException e) {
            throw new HaCommunicationException(e);
        }
        catch (Exception e) {
            throw new HaCommunicationException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Channel getChannel() throws Exception {
        Thread thread = Thread.currentThread();
        Map<Thread, Channel> map = this.channels;
        synchronized (map) {
            Channel channel = this.channels.get(thread);
            if (channel == null) {
                Channel unusedChannel;
                while (channel == null && (unusedChannel = this.unusedChannels.poll()) != null) {
                    if (unusedChannel.isConnected()) {
                        this.msgLog.logMessage("Found unused (and still connected) channel");
                        channel = unusedChannel;
                        continue;
                    }
                    this.msgLog.logMessage("Found unused stale channel, discarding it");
                }
                if (channel == null) {
                    for (int i = 0; i < 5; ++i) {
                        ChannelFuture channelFuture = this.bootstrap.connect((SocketAddress)new InetSocketAddress(this.hostNameOrIp, this.port));
                        channelFuture.awaitUninterruptibly();
                        if (channelFuture.isSuccess()) {
                            channel = channelFuture.getChannel();
                            this.msgLog.logMessage("Opened a new channel to " + this.hostNameOrIp + ":" + this.port, true);
                            break;
                        }
                        this.msgLog.logMessage("Retrying connect to " + this.hostNameOrIp + ":" + this.port, true);
                        try {
                            Thread.sleep(500L);
                            continue;
                        }
                        catch (InterruptedException e) {
                            Thread.interrupted();
                        }
                    }
                }
                if (channel == null) {
                    throw new IOException("Not able to connect to master");
                }
                this.channels.put(thread, channel);
            }
            return channel;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseChannel() {
        Map<Thread, Channel> map = this.channels;
        synchronized (map) {
            Channel channel = this.channels.remove(Thread.currentThread());
            if (channel != null) {
                if (this.unusedChannels.size() < 5) {
                    this.unusedChannels.push(channel);
                } else {
                    channel.close();
                }
            }
        }
    }

    @Override
    public IdAllocation allocateIds(IdType idType) {
        return (IdAllocation)this.sendRequest(RequestType.allocateIds(idType), null, null).response();
    }

    @Override
    public Response<Integer> createRelationshipType(SlaveContext context, String name) {
        return this.sendRequest(RequestType.CREATE_RELATIONSHIP_TYPE, context, new DataWriter.WriteString(name));
    }

    @Override
    public Response<LockResult> acquireNodeWriteLock(SlaveContext context, long ... nodes) {
        return this.sendRequest(RequestType.ACQUIRE_NODE_WRITE_LOCK, context, new DataWriter.WriteIdArray(nodes));
    }

    @Override
    public Response<LockResult> acquireNodeReadLock(SlaveContext context, long ... nodes) {
        return this.sendRequest(RequestType.ACQUIRE_NODE_READ_LOCK, context, new DataWriter.WriteIdArray(nodes));
    }

    @Override
    public Response<LockResult> acquireRelationshipWriteLock(SlaveContext context, long ... relationships) {
        return this.sendRequest(RequestType.ACQUIRE_RELATIONSHIP_WRITE_LOCK, context, new DataWriter.WriteIdArray(relationships));
    }

    @Override
    public Response<LockResult> acquireRelationshipReadLock(SlaveContext context, long ... relationships) {
        return this.sendRequest(RequestType.ACQUIRE_RELATIONSHIP_READ_LOCK, context, new DataWriter.WriteIdArray(relationships));
    }

    @Override
    public Response<Long> commitSingleResourceTransaction(SlaveContext context, String resource, TransactionStream transactionStream) {
        throw new UnsupportedOperationException("Not implemented: commitSingleResourceTransaction()");
    }

    @Override
    public Response<Void> finishTransaction(SlaveContext context) {
        return this.sendRequest(RequestType.FINISH, context, null);
    }

    public void rollbackOngoingTransactions(SlaveContext context) {
        throw new UnsupportedOperationException("Should never be called from the client side");
    }

    @Override
    public Response<Void> pullUpdates(SlaveContext context) {
        return this.sendRequest(RequestType.PULL_UPDATES, context, null);
    }

    @Override
    public int getMasterIdForCommittedTx(long txId) {
        return (Integer)this.sendRequest(RequestType.GET_MASTER_ID_FOR_TX, null, new DataWriter.WriteLong(txId)).response();
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("chunkedWriter", (ChannelHandler)new ChunkedWriteHandler());
        pipeline.addLast("responseDecoder", (ChannelHandler)new ResponseDecoder());
        pipeline.addLast("applyTransactions", (ChannelHandler)this.txApplier);
        BlockingReadHandler reader = new BlockingReadHandler();
        pipeline.addLast("blockingHandler", (ChannelHandler)reader);
        return pipeline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.msgLog.logMessage("MasterClient shutdown", true);
        Map<Thread, Channel> map = this.channels;
        synchronized (map) {
            for (Channel channel : this.unusedChannels) {
                channel.close();
            }
            for (Channel channel : this.channels.values()) {
                channel.close();
            }
        }
    }
}

