/*
 * Decompiled with CFR 0.152.
 */
package alluxio.shaded.client.org.apache.zookeeper.server.quorum;

import alluxio.shaded.client.org.apache.zookeeper.common.NetUtils;
import alluxio.shaded.client.org.apache.zookeeper.common.X509Exception;
import alluxio.shaded.client.org.apache.zookeeper.server.ExitCode;
import alluxio.shaded.client.org.apache.zookeeper.server.ZooKeeperThread;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.QuorumPeer;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.UnifiedServerSocket;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import alluxio.shaded.client.org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import alluxio.shaded.client.org.apache.zookeeper.server.util.ConfigUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QuorumCnxManager {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumCnxManager.class);
    static final int RECV_CAPACITY = 100;
    static final int SEND_CAPACITY = 1;
    static final int PACKETMAXSIZE = 524288;
    private AtomicLong observerCounter = new AtomicLong(-1L);
    public static final long PROTOCOL_VERSION = -65536L;
    public static final int maxBuffer = 2048;
    private int cnxTO = 5000;
    final QuorumPeer self;
    final long mySid;
    final int socketTimeout;
    final Map<Long, QuorumPeer.QuorumServer> view;
    final boolean listenOnAllIPs;
    private ThreadPoolExecutor connectionExecutor;
    private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet());
    private QuorumAuthServer authServer;
    private QuorumAuthLearner authLearner;
    private boolean quorumSaslAuthEnabled;
    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
    public final ArrayBlockingQueue<Message> recvQueue;
    private final Object recvQLock = new Object();
    volatile boolean shutdown = false;
    public final Listener listener;
    private AtomicInteger threadCnt = new AtomicInteger(0);
    private final boolean tcpKeepAlive = Boolean.getBoolean("zookeeper.tcpKeepAlive");
    static final Supplier<Socket> DEFAULT_SOCKET_FACTORY = () -> new Socket();
    private static Supplier<Socket> SOCKET_FACTORY = DEFAULT_SOCKET_FACTORY;

    static void setSocketFactory(Supplier<Socket> factory) {
        SOCKET_FACTORY = factory;
    }

    public QuorumCnxManager(QuorumPeer self, long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
        this.recvQueue = new ArrayBlockingQueue(100);
        this.queueSendMap = new ConcurrentHashMap();
        this.senderWorkerMap = new ConcurrentHashMap();
        this.lastMessageSent = new ConcurrentHashMap();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if (cnxToValue != null) {
            this.cnxTO = Integer.parseInt(cnxToValue);
        }
        this.self = self;
        this.mySid = mySid;
        this.socketTimeout = socketTimeout;
        this.view = view;
        this.listenOnAllIPs = listenOnAllIPs;
        this.authServer = authServer;
        this.authLearner = authLearner;
        this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
        this.initializeConnectionExecutor(mySid, quorumCnxnThreadsSize);
        this.listener = new Listener();
        this.listener.setName("QuorumPeerListener");
    }

    private void initializeConnectionExecutor(long mySid, int quorumCnxnThreadsSize) {
        AtomicInteger threadIndex = new AtomicInteger(1);
        SecurityManager s2 = System.getSecurityManager();
        ThreadGroup group = s2 != null ? s2.getThreadGroup() : Thread.currentThread().getThreadGroup();
        ThreadFactory daemonThFactory = runnable -> new Thread(group, runnable, String.format("QuorumConnectionThread-[myid=%d]-%d", mySid, threadIndex.getAndIncrement()));
        this.connectionExecutor = new ThreadPoolExecutor(3, quorumCnxnThreadsSize, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), daemonThFactory);
        this.connectionExecutor.allowCoreThreadTimeOut(true);
    }

    public void testInitiateConnection(long sid) {
        LOG.debug("Opening channel to server " + sid);
        this.initiateConnection(this.self.getVotingView().get((Object)Long.valueOf((long)sid)).electionAddr, sid);
    }

    public void initiateConnection(InetSocketAddress electionAddr, Long sid) {
        Socket sock = null;
        try {
            LOG.debug("Opening channel to server " + sid);
            if (this.self.isSslQuorum()) {
                SSLSocket sslSock = this.self.getX509Util().createSSLSocket();
                this.setSockOpts(sslSock);
                sslSock.connect(electionAddr, this.cnxTO);
                sslSock.startHandshake();
                sock = sslSock;
                LOG.info("SSL handshake complete with {} - {} - {}", new Object[]{sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite()});
            } else {
                sock = SOCKET_FACTORY.get();
                this.setSockOpts(sock);
                sock.connect(electionAddr, this.cnxTO);
            }
            LOG.debug("Connected to server " + sid);
        }
        catch (X509Exception e) {
            LOG.warn("Cannot open secure channel to {} at election address {}", new Object[]{sid, electionAddr, e});
            this.closeSocket(sock);
            return;
        }
        catch (IOException | UnresolvedAddressException e) {
            LOG.warn("Cannot open channel to {} at election address {}", new Object[]{sid, electionAddr, e});
            this.closeSocket(sock);
            return;
        }
        try {
            this.startConnection(sock, sid);
        }
        catch (IOException e) {
            LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection", (Object)new Object[]{sid, sock.getRemoteSocketAddress()}, (Object)e);
            this.closeSocket(sock);
        }
    }

    public boolean initiateConnectionAsync(InetSocketAddress electionAddr, Long sid) {
        if (!this.inprogressConnections.add(sid)) {
            LOG.debug("Connection request to server id: {} is already in progress, so skipping this request", (Object)sid);
            return true;
        }
        try {
            this.connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable e) {
            this.inprogressConnections.remove(sid);
            LOG.error("Exception while submitting quorum connection request", e);
            return false;
        }
        return true;
    }

    private boolean startConnection(Socket sock, Long sid) throws IOException {
        DataOutputStream dout = null;
        DataInputStream din = null;
        LOG.debug("startConnection (myId:{} --> sid:{})", (Object)this.self.getId(), (Object)sid);
        try {
            BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
            dout = new DataOutputStream(buf);
            dout.writeLong(-65536L);
            dout.writeLong(this.self.getId());
            String addr = NetUtils.formatInetAddr(this.self.getElectionAddress());
            byte[] addr_bytes = addr.getBytes();
            dout.writeInt(addr_bytes.length);
            dout.write(addr_bytes);
            dout.flush();
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
        }
        catch (IOException e) {
            LOG.warn("Ignoring exception reading or writing challenge: ", (Throwable)e);
            this.closeSocket(sock);
            return false;
        }
        QuorumPeer.QuorumServer qps = this.self.getVotingView().get(sid);
        if (qps != null) {
            this.authLearner.authenticate(sock, qps.hostname);
        }
        if (sid <= this.self.getId()) {
            LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", (Object)this.self.getId(), (Object)sid);
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
            return true;
        }
        LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", (Object)this.self.getId(), (Object)sid);
        this.closeSocket(sock);
        return false;
    }

    public void receiveConnection(Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
            LOG.debug("Sync handling of connection request received from: {}", (Object)sock.getRemoteSocketAddress());
            this.handleConnection(sock, din);
        }
        catch (IOException e) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", (Object)sock.getRemoteSocketAddress());
            LOG.debug("Exception details: ", (Throwable)e);
            this.closeSocket(sock);
        }
    }

    public void receiveConnectionAsync(Socket sock) {
        try {
            LOG.debug("Async handling of connection request received from: {}", (Object)sock.getRemoteSocketAddress());
            this.connectionExecutor.execute(new QuorumConnectionReceiverThread(sock));
            this.connectionThreadCnt.incrementAndGet();
        }
        catch (Throwable e) {
            LOG.error("Exception handling connection, addr: {}, closing server connection", (Object)sock.getRemoteSocketAddress());
            LOG.debug("Exception details: ", e);
            this.closeSocket(sock);
        }
    }

    private void handleConnection(Socket sock, DataInputStream din) throws IOException {
        SendWorker sw;
        Long sid = null;
        Long protocolVersion = null;
        InetSocketAddress electionAddr = null;
        try {
            protocolVersion = din.readLong();
            if (protocolVersion >= 0L) {
                sid = protocolVersion;
            } else {
                try {
                    InitialMessage init = InitialMessage.parse(protocolVersion, din);
                    sid = init.sid;
                    electionAddr = init.electionAddr;
                }
                catch (InitialMessage.InitialMessageException ex) {
                    LOG.error("Initial message parsing error!", (Throwable)ex);
                    this.closeSocket(sock);
                    return;
                }
            }
            if (sid == Long.MAX_VALUE) {
                sid = this.observerCounter.getAndDecrement();
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        }
        catch (IOException e) {
            LOG.warn("Exception reading or writing challenge: {}", (Throwable)e);
            this.closeSocket(sock);
            return;
        }
        this.authServer.authenticate(sock, din);
        if (sid < this.self.getId()) {
            sw = this.senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }
            LOG.debug("Create new connection to server: {}", (Object)sid);
            this.closeSocket(sock);
            if (electionAddr != null) {
                this.connectOne(sid, electionAddr);
            } else {
                this.connectOne(sid);
            }
        } else if (sid.longValue() == this.self.getId()) {
            LOG.warn("We got a connection request from a server with our own ID. This should be either a configuration error, or a bug.");
        } else {
            sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);
            SendWorker vsw = this.senderWorkerMap.get(sid);
            if (vsw != null) {
                vsw.finish();
            }
            this.senderWorkerMap.put(sid, sw);
            this.queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(1));
            sw.start();
            rw.start();
        }
    }

    public void toSend(Long sid, ByteBuffer b) {
        if (this.mySid == sid) {
            b.position(0);
            this.addToRecvQueue(new Message(b.duplicate(), sid));
        } else {
            ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(1);
            ArrayBlockingQueue<ByteBuffer> oldq = this.queueSendMap.putIfAbsent(sid, bq);
            if (oldq != null) {
                this.addToSendQueue(oldq, b);
            } else {
                this.addToSendQueue(bq, b);
            }
            this.connectOne(sid);
        }
    }

    private synchronized boolean connectOne(long sid, InetSocketAddress electionAddr) {
        if (this.senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return true;
        }
        return this.initiateConnectionAsync(electionAddr, sid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void connectOne(long sid) {
        if (this.senderWorkerMap.get(sid) != null) {
            LOG.debug("There is a connection already for server " + sid);
            return;
        }
        Object object = this.self.QV_LOCK;
        synchronized (object) {
            boolean knownId = false;
            this.self.recreateSocketAddresses(sid);
            Map<Long, QuorumPeer.QuorumServer> lastCommittedView = this.self.getView();
            QuorumVerifier lastSeenQV = this.self.getLastSeenQuorumVerifier();
            Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
            if (lastCommittedView.containsKey(sid)) {
                knownId = true;
                LOG.debug("Server {} knows {} already, it is in the lastCommittedView", (Object)this.self.getId(), (Object)sid);
                if (this.connectOne(sid, lastCommittedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                    return;
                }
            }
            if (lastSeenQV != null && lastProposedView.containsKey(sid) && (!knownId || lastProposedView.get((Object)Long.valueOf((long)sid)).electionAddr != lastCommittedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                knownId = true;
                LOG.debug("Server {} knows {} already, it is in the lastProposedView", (Object)this.self.getId(), (Object)sid);
                if (this.connectOne(sid, lastProposedView.get((Object)Long.valueOf((long)sid)).electionAddr)) {
                    return;
                }
            }
            if (!knownId) {
                LOG.warn("Invalid server id: " + sid);
                return;
            }
        }
    }

    public void connectAll() {
        Enumeration<Long> en = this.queueSendMap.keys();
        while (en.hasMoreElements()) {
            long sid = en.nextElement();
            this.connectOne(sid);
        }
    }

    boolean haveDelivered() {
        for (ArrayBlockingQueue<ByteBuffer> queue : this.queueSendMap.values()) {
            LOG.debug("Queue size: " + queue.size());
            if (queue.size() != 0) continue;
            return true;
        }
        return false;
    }

    public void halt() {
        this.shutdown = true;
        LOG.debug("Halting listener");
        this.listener.halt();
        try {
            this.listener.join();
        }
        catch (InterruptedException ex) {
            LOG.warn("Got interrupted before joining the listener", (Throwable)ex);
        }
        this.softHalt();
        if (this.connectionExecutor != null) {
            this.connectionExecutor.shutdown();
        }
        this.inprogressConnections.clear();
        this.resetConnectionThreadCount();
    }

    public void softHalt() {
        for (SendWorker sw : this.senderWorkerMap.values()) {
            LOG.debug("Halting sender: " + sw);
            LOG.debug("Server {} is soft-halting sender towards: {}", (Object)this.self.getId(), (Object)sw);
            sw.finish();
        }
    }

    private void setSockOpts(Socket sock) throws SocketException {
        sock.setTcpNoDelay(true);
        sock.setKeepAlive(this.tcpKeepAlive);
        sock.setSoTimeout(this.self.tickTime * this.self.syncLimit);
    }

    private void closeSocket(Socket sock) {
        if (sock == null) {
            return;
        }
        try {
            sock.close();
        }
        catch (IOException ie) {
            LOG.error("Exception while closing", (Throwable)ie);
        }
    }

    public long getThreadCount() {
        return this.threadCnt.get();
    }

    public long getConnectionThreadCount() {
        return this.connectionThreadCnt.get();
    }

    private void resetConnectionThreadCount() {
        this.connectionThreadCnt.set(0);
    }

    private void addToSendQueue(ArrayBlockingQueue<ByteBuffer> queue, ByteBuffer buffer) {
        if (queue.remainingCapacity() == 0) {
            try {
                queue.remove();
            }
            catch (NoSuchElementException ne) {
                LOG.debug("Trying to remove from an empty Queue. Ignoring exception " + ne);
            }
        }
        try {
            queue.add(buffer);
        }
        catch (IllegalStateException ie) {
            LOG.error("Unable to insert an element in the queue " + ie);
        }
    }

    private boolean isSendQueueEmpty(ArrayBlockingQueue<ByteBuffer> queue) {
        return queue.isEmpty();
    }

    private ByteBuffer pollSendQueue(ArrayBlockingQueue<ByteBuffer> queue, long timeout, TimeUnit unit) throws InterruptedException {
        return queue.poll(timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToRecvQueue(Message msg) {
        Object object = this.recvQLock;
        synchronized (object) {
            if (this.recvQueue.remainingCapacity() == 0) {
                try {
                    this.recvQueue.remove();
                }
                catch (NoSuchElementException ne) {
                    LOG.debug("Trying to remove from an empty recvQueue. Ignoring exception " + ne);
                }
            }
            try {
                this.recvQueue.add(msg);
            }
            catch (IllegalStateException ie) {
                LOG.error("Unable to insert element in the recvQueue " + ie);
            }
        }
    }

    public Message pollRecvQueue(long timeout, TimeUnit unit) throws InterruptedException {
        return this.recvQueue.poll(timeout, unit);
    }

    public boolean connectedToPeer(long peerSid) {
        return this.senderWorkerMap.get(peerSid) != null;
    }

    class RecvWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        volatile boolean running;
        final DataInputStream din;
        final SendWorker sw;

        RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
            super("RecvWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.sw = sw;
            this.din = din;
            try {
                sock.setSoTimeout(0);
            }
            catch (IOException e) {
                LOG.error("Error while accessing socket for " + sid, (Throwable)e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
        }

        synchronized boolean finish() {
            LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            this.interrupt();
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                LOG.debug("RecvWorker thread towards {} started. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
                while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                    int length = this.din.readInt();
                    if (length <= 0 || length > 524288) {
                        throw new IOException("Received packet with invalid packet: " + length);
                    }
                    byte[] msgArray = new byte[length];
                    this.din.readFully(msgArray, 0, length);
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    QuorumCnxManager.this.addToRecvQueue(new Message(message.duplicate(), this.sid));
                }
            }
            catch (Exception e) {
                LOG.warn("Connection broken for id " + this.sid + ", my id = " + QuorumCnxManager.this.mySid + ", error = ", (Throwable)e);
            }
            finally {
                LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
                this.sw.finish();
                QuorumCnxManager.this.closeSocket(this.sock);
            }
        }
    }

    class SendWorker
    extends ZooKeeperThread {
        Long sid;
        Socket sock;
        RecvWorker recvWorker;
        volatile boolean running;
        DataOutputStream dout;

        SendWorker(Socket sock, Long sid) {
            super("SendWorker:" + sid);
            this.running = true;
            this.sid = sid;
            this.sock = sock;
            this.recvWorker = null;
            try {
                this.dout = new DataOutputStream(sock.getOutputStream());
            }
            catch (IOException e) {
                LOG.error("Unable to access socket output stream", (Throwable)e);
                QuorumCnxManager.this.closeSocket(sock);
                this.running = false;
            }
            LOG.debug("Address of remote peer: " + this.sid);
        }

        synchronized void setRecv(RecvWorker recvWorker) {
            this.recvWorker = recvWorker;
        }

        synchronized RecvWorker getRecvWorker() {
            return this.recvWorker;
        }

        synchronized boolean finish() {
            LOG.debug("Calling SendWorker.finish for {}", (Object)this.sid);
            if (!this.running) {
                return this.running;
            }
            this.running = false;
            QuorumCnxManager.this.closeSocket(this.sock);
            this.interrupt();
            if (this.recvWorker != null) {
                this.recvWorker.finish();
            }
            LOG.debug("Removing entry from senderWorkerMap sid=" + this.sid);
            QuorumCnxManager.this.senderWorkerMap.remove(this.sid, this);
            QuorumCnxManager.this.threadCnt.decrementAndGet();
            return this.running;
        }

        synchronized void send(ByteBuffer b) throws IOException {
            byte[] msgBytes = new byte[b.capacity()];
            try {
                b.position(0);
                b.get(msgBytes);
            }
            catch (BufferUnderflowException be) {
                LOG.error("BufferUnderflowException ", (Throwable)be);
                return;
            }
            this.dout.writeInt(b.capacity());
            this.dout.write(b.array());
            this.dout.flush();
        }

        @Override
        public void run() {
            QuorumCnxManager.this.threadCnt.incrementAndGet();
            try {
                ByteBuffer b;
                ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                if ((bq == null || QuorumCnxManager.this.isSendQueueEmpty(bq)) && (b = QuorumCnxManager.this.lastMessageSent.get(this.sid)) != null) {
                    LOG.debug("Attempting to send lastMessage to sid=" + this.sid);
                    this.send(b);
                }
            }
            catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", (Throwable)e);
                this.finish();
            }
            LOG.debug("SendWorker thread started towards {}. myId: {}", (Object)this.sid, (Object)QuorumCnxManager.this.mySid);
            block6: while (true) {
                try {
                    while (this.running && !QuorumCnxManager.this.shutdown && this.sock != null) {
                        ByteBuffer b = null;
                        try {
                            ArrayBlockingQueue<ByteBuffer> bq = QuorumCnxManager.this.queueSendMap.get(this.sid);
                            if (bq == null) {
                                LOG.error("No queue of incoming messages for server " + this.sid);
                                break block6;
                            }
                            b = QuorumCnxManager.this.pollSendQueue(bq, 1000L, TimeUnit.MILLISECONDS);
                            if (b == null) continue block6;
                            QuorumCnxManager.this.lastMessageSent.put(this.sid, b);
                            this.send(b);
                            continue block6;
                        }
                        catch (InterruptedException e) {
                            LOG.warn("Interrupted while waiting for message on queue", (Throwable)e);
                        }
                    }
                    break;
                }
                catch (Exception e) {
                    LOG.warn("Exception when using channel: for id " + this.sid + " my id = " + QuorumCnxManager.this.mySid + " error = " + e);
                    break;
                }
            }
            this.finish();
            LOG.warn("Send worker leaving thread  id " + this.sid + " my id = " + QuorumCnxManager.this.self.getId());
        }
    }

    public class Listener
    extends ZooKeeperThread {
        private static final String ELECTION_PORT_BIND_RETRY = "zookeeper.electionPortBindRetry";
        private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
        private final int portBindMaxRetry;
        private Runnable socketBindErrorHandler;
        volatile ServerSocket ss;

        public Listener() {
            super("ListenerThread");
            this.socketBindErrorHandler = () -> System.exit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
            this.ss = null;
            Integer maxRetry = Integer.getInteger(ELECTION_PORT_BIND_RETRY, 3);
            if (maxRetry >= 0) {
                LOG.info("Election port bind maximum retries is {}", maxRetry == 0 ? "infinite" : maxRetry);
                this.portBindMaxRetry = maxRetry;
            } else {
                LOG.info("'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.", new Object[]{ELECTION_PORT_BIND_RETRY, maxRetry, 3});
                this.portBindMaxRetry = 3;
            }
        }

        public void setSocketBindErrorHandler(Runnable errorHandler) {
            this.socketBindErrorHandler = errorHandler;
        }

        @Override
        public void run() {
            int numRetries = 0;
            Socket client = null;
            IOException exitException = null;
            while (!(QuorumCnxManager.this.shutdown || this.portBindMaxRetry != 0 && numRetries >= this.portBindMaxRetry)) {
                LOG.debug("Listener thread started, myId: {}", (Object)QuorumCnxManager.this.self.getId());
                try {
                    InetSocketAddress addr;
                    if (QuorumCnxManager.this.self.shouldUsePortUnification()) {
                        LOG.info("Creating TLS-enabled quorum server socket");
                        this.ss = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), true);
                    } else if (QuorumCnxManager.this.self.isSslQuorum()) {
                        LOG.info("Creating TLS-only quorum server socket");
                        this.ss = new UnifiedServerSocket(QuorumCnxManager.this.self.getX509Util(), false);
                    } else {
                        this.ss = new ServerSocket();
                    }
                    this.ss.setReuseAddress(true);
                    if (QuorumCnxManager.this.self.getQuorumListenOnAllIPs()) {
                        int port = QuorumCnxManager.this.self.getElectionAddress().getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        QuorumCnxManager.this.self.recreateSocketAddresses(QuorumCnxManager.this.self.getId());
                        addr = QuorumCnxManager.this.self.getElectionAddress();
                    }
                    LOG.info("{} is accepting connections now, my election bind port: {}", (Object)QuorumCnxManager.this.mySid, (Object)addr.toString());
                    this.setName(addr.toString());
                    this.ss.bind(addr);
                    while (!QuorumCnxManager.this.shutdown) {
                        try {
                            client = this.ss.accept();
                            QuorumCnxManager.this.setSockOpts(client);
                            LOG.info("Received connection request from {}", (Object)client.getRemoteSocketAddress());
                            if (QuorumCnxManager.this.quorumSaslAuthEnabled) {
                                QuorumCnxManager.this.receiveConnectionAsync(client);
                            } else {
                                QuorumCnxManager.this.receiveConnection(client);
                            }
                            numRetries = 0;
                        }
                        catch (SocketTimeoutException e) {
                            LOG.warn("The socket is listening for the election accepted and it timed out unexpectedly, but will retry.see ZOOKEEPER-2836");
                        }
                    }
                }
                catch (IOException e) {
                    if (QuorumCnxManager.this.shutdown) break;
                    LOG.error("Exception while listening", (Throwable)e);
                    exitException = e;
                    ++numRetries;
                    try {
                        this.ss.close();
                        Thread.sleep(1000L);
                    }
                    catch (IOException ie) {
                        LOG.error("Error closing server socket", (Throwable)ie);
                    }
                    catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. Ignoring exception", (Throwable)ie);
                    }
                    QuorumCnxManager.this.closeSocket(client);
                }
            }
            LOG.info("Leaving listener");
            if (!QuorumCnxManager.this.shutdown) {
                LOG.error("As I'm leaving the listener thread after " + numRetries + " errors. I won't be able to participate in leader election any longer: " + NetUtils.formatInetAddr(QuorumCnxManager.this.self.getElectionAddress()) + ". Use " + ELECTION_PORT_BIND_RETRY + " property to increase retry count.");
                if (exitException instanceof SocketException) {
                    this.socketBindErrorHandler.run();
                }
            } else if (this.ss != null) {
                try {
                    this.ss.close();
                }
                catch (IOException ie) {
                    LOG.debug("Error closing server socket", (Throwable)ie);
                }
            }
        }

        void halt() {
            try {
                LOG.debug("Halt called: Trying to close listeners");
                if (this.ss != null) {
                    LOG.debug("Closing listener: " + QuorumCnxManager.this.mySid);
                    this.ss.close();
                }
            }
            catch (IOException e) {
                LOG.warn("Exception when shutting down listener: " + e);
            }
        }
    }

    private class QuorumConnectionReceiverThread
    extends ZooKeeperThread {
        private final Socket sock;

        QuorumConnectionReceiverThread(Socket sock) {
            super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
            this.sock = sock;
        }

        @Override
        public void run() {
            QuorumCnxManager.this.receiveConnection(this.sock);
        }
    }

    private class QuorumConnectionReqThread
    extends ZooKeeperThread {
        final InetSocketAddress electionAddr;
        final Long sid;

        QuorumConnectionReqThread(InetSocketAddress electionAddr, Long sid) {
            super("QuorumConnectionReqThread-" + sid);
            this.electionAddr = electionAddr;
            this.sid = sid;
        }

        @Override
        public void run() {
            try {
                QuorumCnxManager.this.initiateConnection(this.electionAddr, this.sid);
            }
            finally {
                QuorumCnxManager.this.inprogressConnections.remove(this.sid);
            }
        }
    }

    public static class InitialMessage {
        public Long sid;
        public InetSocketAddress electionAddr;

        InitialMessage(Long sid, InetSocketAddress address) {
            this.sid = sid;
            this.electionAddr = address;
        }

        public static InitialMessage parse(Long protocolVersion, DataInputStream din) throws InitialMessageException, IOException {
            int port;
            String[] host_port;
            if (protocolVersion != -65536L) {
                throw new InitialMessageException("Got unrecognized protocol version %s", protocolVersion);
            }
            Long sid = din.readLong();
            int remaining = din.readInt();
            if (remaining <= 0 || remaining > 2048) {
                throw new InitialMessageException("Unreasonable buffer length: %s", remaining);
            }
            byte[] b = new byte[remaining];
            int num_read = din.read(b);
            if (num_read != remaining) {
                throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
            }
            String addr = new String(b);
            try {
                host_port = ConfigUtils.getHostAndPort(addr);
            }
            catch (QuorumPeerConfig.ConfigException e) {
                throw new InitialMessageException("Badly formed address: %s", addr);
            }
            if (host_port.length != 2) {
                throw new InitialMessageException("Badly formed address: %s", addr);
            }
            try {
                port = Integer.parseInt(host_port[1]);
            }
            catch (NumberFormatException e) {
                throw new InitialMessageException("Bad port number: %s", host_port[1]);
            }
            catch (ArrayIndexOutOfBoundsException e) {
                throw new InitialMessageException("No port number in: %s", addr);
            }
            return new InitialMessage(sid, InitialMessage.isWildcardAddress(host_port[0]) ? null : new InetSocketAddress(host_port[0], port));
        }

        public static boolean isWildcardAddress(String hostname) {
            try {
                return InetAddress.getByName(hostname).isAnyLocalAddress();
            }
            catch (UnknownHostException e) {
                return false;
            }
            catch (SecurityException e) {
                LOG.warn("SecurityException in getByName() for" + hostname);
                return false;
            }
        }

        public String toString() {
            return "InitialMessage{sid=" + this.sid + ", electionAddr=" + this.electionAddr + '}';
        }

        public static class InitialMessageException
        extends Exception {
            InitialMessageException(String message, Object ... args) {
                super(String.format(message, args));
            }
        }
    }

    public static class Message {
        ByteBuffer buffer;
        long sid;

        Message(ByteBuffer buffer, long sid) {
            this.buffer = buffer;
            this.sid = sid;
        }
    }
}

