/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.ha.zookeeper;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.remote.JMXServiceURL;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.neo4j.backup.OnlineBackupSettings;
import org.neo4j.graphdb.config.Setting;
import org.neo4j.graphdb.factory.GraphDatabaseSetting;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Pair;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.InformativeStackTrace;
import org.neo4j.kernel.SlaveUpdateMode;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.ha.Broker;
import org.neo4j.kernel.ha.ClusterEventReceiver;
import org.neo4j.kernel.ha.ConnectionInformation;
import org.neo4j.kernel.ha.HaSettings;
import org.neo4j.kernel.ha.Master;
import org.neo4j.kernel.ha.MasterClientFactory;
import org.neo4j.kernel.ha.MasterImpl;
import org.neo4j.kernel.ha.MasterServer;
import org.neo4j.kernel.ha.Slave;
import org.neo4j.kernel.ha.SlaveClient;
import org.neo4j.kernel.ha.SlaveDatabaseOperations;
import org.neo4j.kernel.ha.SlaveImpl;
import org.neo4j.kernel.ha.SlaveServer;
import org.neo4j.kernel.ha.zookeeper.AbstractZooKeeperManager;
import org.neo4j.kernel.ha.zookeeper.BranchDetectingTxVerifier;
import org.neo4j.kernel.ha.zookeeper.BrokerShutDownException;
import org.neo4j.kernel.ha.zookeeper.Machine;
import org.neo4j.kernel.ha.zookeeper.NeoStoreUtil;
import org.neo4j.kernel.ha.zookeeper.TxIdUpdater;
import org.neo4j.kernel.ha.zookeeper.ZooKeeperException;
import org.neo4j.kernel.ha.zookeeper.ZooKeeperMachine;
import org.neo4j.kernel.ha.zookeeper.ZooKeeperTimedOutException;
import org.neo4j.kernel.impl.nioneo.store.StoreId;
import org.neo4j.kernel.impl.transaction.xaframework.LogExtractor;
import org.neo4j.kernel.impl.transaction.xaframework.NullLogBuffer;
import org.neo4j.kernel.impl.util.StringLogger;

public class ZooClient
extends AbstractZooKeeperManager {
    static final String MASTER_NOTIFY_CHILD = "master-notify";
    static final String MASTER_REBOUND_CHILD = "master-rebound";
    private final ZooKeeper zooKeeper;
    private final int machineId;
    private String sequenceNr;
    private long committedTx;
    private int masterForCommittedTx;
    private final Object keeperStateMonitor = new Object();
    private volatile Watcher.Event.KeeperState keeperState = Watcher.Event.KeeperState.Disconnected;
    private volatile boolean shutdown = false;
    private volatile boolean flushing = false;
    private String rootPath;
    private volatile StoreId storeId;
    private volatile TxIdUpdater updater = new NoUpdateTxIdUpdater();
    private final String haServer;
    private final String storeDir;
    private long sessionId = -1L;
    private Config conf;
    private final SlaveDatabaseOperations localDatabase;
    private final ClusterEventReceiver clusterReceiver;
    private final int backupPort;
    private final boolean writeLastCommittedTx;
    private final String clusterName;
    private final boolean allowCreateCluster;
    private final WatcherImpl watcher;
    private final Machine asMachine;
    private final Map<Integer, Pair<SlaveClient, Machine>> cachedSlaves = new HashMap<Integer, Pair<SlaveClient, Machine>>();
    private volatile boolean serversRefreshed = true;

    public ZooClient(String storeDir, StringLogger stringLogger, Config conf, SlaveDatabaseOperations localDatabase, ClusterEventReceiver clusterReceiver, MasterClientFactory clientFactory) {
        super((String)conf.get((GraphDatabaseSetting)HaSettings.coordinators), stringLogger, conf.getInteger(HaSettings.zk_session_timeout), clientFactory);
        this.storeDir = storeDir;
        this.conf = conf;
        this.localDatabase = localDatabase;
        this.clusterReceiver = clusterReceiver;
        this.machineId = conf.getInteger(HaSettings.server_id);
        this.backupPort = conf.getInteger((GraphDatabaseSetting.IntegerSetting)OnlineBackupSettings.online_backup_port);
        this.haServer = conf.isSet((Setting)HaSettings.server) ? (String)conf.get((GraphDatabaseSetting)HaSettings.server) : this.defaultServer();
        this.writeLastCommittedTx = ((SlaveUpdateMode)conf.getEnum(SlaveUpdateMode.class, (GraphDatabaseSetting.OptionsSetting)HaSettings.slave_coordinator_update_mode)).syncWithZooKeeper;
        this.clusterName = (String)conf.get((GraphDatabaseSetting)HaSettings.cluster_name);
        this.sequenceNr = "not initialized yet";
        this.allowCreateCluster = conf.getBoolean(HaSettings.allow_init_cluster);
        this.asMachine = new Machine(this.machineId, 0, 0L, 0, this.haServer, this.backupPort);
        try {
            this.watcher = new WatcherImpl();
            this.zooKeeper = new ZooKeeper(this.getServers(), this.getSessionTimeout(), (Watcher)this.watcher);
            this.watcher.flushUnprocessedEvents(this.zooKeeper);
        }
        catch (IOException e) {
            throw new ZooKeeperException("Unable to create zoo keeper client", e);
        }
    }

    private String defaultServer() {
        InetAddress host = null;
        try {
            host = InetAddress.getLocalHost();
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        if (host == null) {
            throw new IllegalStateException("Could not auto configure host name, please supply " + HaSettings.server.name());
        }
        return host.getHostAddress() + ":" + 6361;
    }

    public Object instantiateMasterServer(GraphDatabaseAPI graphDb) {
        int timeOut = this.conf.isSet((Setting)HaSettings.lock_read_timeout) ? (Integer)this.conf.get((GraphDatabaseSetting)HaSettings.lock_read_timeout) : (Integer)this.conf.get((GraphDatabaseSetting)HaSettings.read_timeout);
        return new MasterServer(new MasterImpl(graphDb, timeOut), (Integer)Machine.splitIpAndPort(this.haServer).other(), graphDb.getMessageLog(), (Integer)this.conf.get((GraphDatabaseSetting)HaSettings.max_concurrent_channels_per_slave), timeOut, new BranchDetectingTxVerifier(graphDb), (Integer)this.conf.get(HaSettings.com_chunk_size));
    }

    @Override
    protected StoreId getStoreId() {
        return this.storeId;
    }

    public Object instantiateSlaveServer(GraphDatabaseAPI graphDb, Broker broker, SlaveDatabaseOperations ops) {
        return new SlaveServer(new SlaveImpl(graphDb, broker, ops), (Integer)Machine.splitIpAndPort(this.haServer).other(), graphDb.getMessageLog(), (Integer)this.conf.get(HaSettings.com_chunk_size));
    }

    @Override
    protected int getMyMachineId() {
        return this.machineId;
    }

    private int toInt(byte[] data) {
        return ByteBuffer.wrap(data).getInt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void waitForSyncConnected(AbstractZooKeeperManager.WaitMode mode) {
        long startTime;
        if (this.keeperState == Watcher.Event.KeeperState.SyncConnected) {
            return;
        }
        if (this.shutdown) {
            throw new ZooKeeperException("ZooKeeper client has been shutdwon");
        }
        AbstractZooKeeperManager.WaitStrategy strategy = mode.getStrategy(this);
        long currentTime = startTime = System.currentTimeMillis();
        Object object = this.keeperStateMonitor;
        synchronized (object) {
            do {
                try {
                    this.keeperStateMonitor.wait(250L);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                }
                if (this.keeperState == Watcher.Event.KeeperState.SyncConnected) {
                    return;
                }
                if (!this.shutdown) continue;
                throw new ZooKeeperException("ZooKeeper client has been shutdwon");
            } while (strategy.waitMore((currentTime = System.currentTimeMillis()) - startTime));
            if (this.keeperState != Watcher.Event.KeeperState.SyncConnected) {
                throw new ZooKeeperTimedOutException("Connection to ZooKeeper server timed out, keeper state=" + this.keeperState);
            }
        }
    }

    protected void subscribeToDataChangeWatcher(String child) {
        block8: {
            String root = this.getRoot();
            String path = root + "/" + child;
            try {
                try {
                    this.zooKeeper.getData(path, true, null);
                }
                catch (KeeperException e) {
                    if (e.code() == KeeperException.Code.NONODE) {
                        byte[] data = new byte[4];
                        ByteBuffer.wrap(data).putInt(-1);
                        try {
                            this.zooKeeper.create(path, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                            break block8;
                        }
                        catch (KeeperException ce) {
                            if (e.code() != KeeperException.Code.NODEEXISTS) {
                                throw new ZooKeeperException("Creation error", ce);
                            }
                            break block8;
                        }
                    }
                    throw new ZooKeeperException("Couldn't get or create " + child, e);
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new ZooKeeperException("Interrupted", e);
            }
        }
    }

    protected void subscribeToChildrenChangeWatcher(String child) {
        String path = this.getRoot() + "/" + child;
        try {
            this.zooKeeper.getChildren(path, true);
        }
        catch (KeeperException e) {
            throw new ZooKeeperException("Couldn't subscribe getChildren at " + path, e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ZooKeeperException("Interrupted", e);
        }
    }

    protected void setDataChangeWatcher(String child, int currentMasterId) {
        this.setDataChangeWatcher(child, currentMasterId, true);
    }

    protected void setDataChangeWatcher(String child, int currentMasterId, boolean skipOnSame) {
        try {
            boolean exists;
            byte[] data;
            String path;
            block11: {
                String root = this.getRoot();
                path = root + "/" + child;
                data = null;
                exists = false;
                try {
                    data = this.zooKeeper.getData(path, true, null);
                    exists = true;
                    if (skipOnSame && ByteBuffer.wrap(data).getInt() == currentMasterId) {
                        this.msgLog.logMessage(child + " not set, is already " + currentMasterId);
                        return;
                    }
                }
                catch (KeeperException e) {
                    if (e.code() == KeeperException.Code.NONODE) break block11;
                    throw new ZooKeeperException("Couldn't get master notify node", e);
                }
            }
            try {
                data = new byte[4];
                ByteBuffer.wrap(data).putInt(currentMasterId);
                if (!exists) {
                    this.zooKeeper.create(path, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    this.msgLog.logMessage(child + " created with " + currentMasterId);
                } else if (currentMasterId != -1) {
                    this.zooKeeper.setData(path, data, -1);
                    this.msgLog.logMessage(child + " set to " + currentMasterId);
                }
                this.zooKeeper.getData(path, true, null);
            }
            catch (KeeperException e) {
                if (e.code() != KeeperException.Code.NODEEXISTS) {
                    throw new ZooKeeperException("Couldn't set master notify node", e);
                }
            }
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ZooKeeperException("Interrupted", e);
        }
    }

    @Override
    public String getRoot() {
        this.makeSureRootPathIsFound();
        byte[] rootData = null;
        do {
            try {
                rootData = this.zooKeeper.getData(this.rootPath, false, null);
                return this.rootPath;
            }
            catch (KeeperException e) {
                if (e.code() != KeeperException.Code.NONODE) {
                    throw new ZooKeeperException("Unable to get root node", e);
                }
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new ZooKeeperException("Got interrupted", e);
            }
            try {
                byte[] data = new byte[]{};
                this.zooKeeper.create(this.rootPath, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException e) {
                if (e.code() == KeeperException.Code.NODEEXISTS) continue;
                throw new ZooKeeperException("Unable to create root", e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                throw new ZooKeeperException("Got interrupted", e);
            }
        } while (rootData == null);
        throw new IllegalStateException("Root path couldn't be found");
    }

    private void makeSureRootPathIsFound() {
        if (this.rootPath == null) {
            this.storeId = this.getClusterStoreId(this.zooKeeper, this.clusterName);
            if (this.storeId != null) {
                this.rootPath = this.asRootPath(this.storeId);
                if (NeoStoreUtil.storeExists(this.storeDir)) {
                    NeoStoreUtil store = new NeoStoreUtil(this.storeDir);
                    this.committedTx = store.getLastCommittedTx();
                    if (!this.storeId.equals((Object)store.asStoreId())) {
                        throw new ZooKeeperException("StoreId in database doesn't match that of the ZK cluster");
                    }
                } else {
                    this.committedTx = 1L;
                }
            } else {
                if (!this.allowCreateCluster) {
                    throw new RuntimeException("Not allowed to create cluster");
                }
                StoreId storeIdSuggestion = NeoStoreUtil.storeExists(this.storeDir) ? new NeoStoreUtil(this.storeDir).asStoreId() : new StoreId();
                this.storeId = this.createCluster(storeIdSuggestion);
                this.makeSureRootPathIsFound();
            }
            this.masterForCommittedTx = this.getFirstMasterForTx(this.committedTx);
        }
    }

    private void cleanupChildren() {
        try {
            String root = this.getRoot();
            List children = this.zooKeeper.getChildren(root, false);
            for (String child : children) {
                Pair<Integer, Integer> parsedChild = this.parseChild(child);
                if (parsedChild == null || (Integer)parsedChild.first() != this.machineId) continue;
                this.zooKeeper.delete(root + "/" + child, -1);
            }
        }
        catch (KeeperException e) {
            throw new ZooKeeperException("Unable to clean up old child", e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ZooKeeperException("Interrupted.", e);
        }
    }

    private byte[] dataRepresentingMe(long txId, int master) {
        byte[] array = new byte[12];
        ByteBuffer buffer = ByteBuffer.wrap(array);
        buffer.putLong(txId);
        buffer.putInt(master);
        return array;
    }

    private String setup() {
        try {
            this.cleanupChildren();
            this.writeHaServerConfig();
            String root = this.getRoot();
            String path = root + "/" + this.machineId + "_";
            String created = this.zooKeeper.create(path, this.dataRepresentingMe(this.committedTx, this.masterForCommittedTx), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            this.subscribeToDataChangeWatcher(MASTER_NOTIFY_CHILD);
            this.subscribeToDataChangeWatcher(MASTER_REBOUND_CHILD);
            this.subscribeToChildrenChangeWatcher("ha-servers");
            return created.substring(created.lastIndexOf("_") + 1);
        }
        catch (KeeperException e) {
            throw new ZooKeeperException("Unable to setup", e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ZooKeeperException("Setup got interrupted", e);
        }
        catch (Throwable t) {
            throw new ZooKeeperException("Unknown setup error", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeHaServerConfig() throws InterruptedException, KeeperException {
        String compatibilityPath;
        String serverRootPath;
        block16: {
            block15: {
                serverRootPath = this.rootPath + "/" + "ha-servers";
                try {
                    this.zooKeeper.create(serverRootPath, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                catch (KeeperException e) {
                    if (e.code() == KeeperException.Code.NODEEXISTS) break block15;
                    throw e;
                }
            }
            compatibilityPath = this.rootPath + "/" + "compatibility-1.8";
            try {
                this.zooKeeper.create(compatibilityPath, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            catch (KeeperException e) {
                if (e.code() == KeeperException.Code.NODEEXISTS) break block16;
                throw e;
            }
        }
        String machinePath = serverRootPath + "/" + this.machineId;
        String compatibilityMachinePath = compatibilityPath + "/" + this.machineId;
        byte[] data = this.haServerAsData();
        boolean compatCreated = false;
        boolean machineCreated = false;
        try {
            this.zooKeeper.create(compatibilityMachinePath, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            compatCreated = true;
            this.zooKeeper.create(machinePath, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            machineCreated = true;
        }
        catch (KeeperException e) {
            if (e.code() != KeeperException.Code.NODEEXISTS) {
                throw e;
            }
            this.msgLog.logMessage("HA server info already present, trying again");
            try {
                if (compatCreated) {
                    this.zooKeeper.delete(compatibilityMachinePath, -1);
                }
                if (machineCreated) {
                    this.zooKeeper.delete(machinePath, -1);
                }
            }
            catch (KeeperException ee) {
                if (ee.code() != KeeperException.Code.NONODE) {
                    this.msgLog.logMessage("Unable to delete " + ee.getPath(), (Throwable)ee);
                }
            }
            finally {
                this.writeHaServerConfig();
            }
        }
        this.zooKeeper.setData(machinePath, data, -1);
        this.msgLog.logMessage("Wrote HA server " + this.haServer + " to zoo keeper");
    }

    private byte[] haServerAsData() {
        byte[] array = new byte[this.haServer.length() * 2 + 100];
        ByteBuffer buffer = ByteBuffer.wrap(array);
        buffer.putInt(this.backupPort);
        buffer.put((byte)this.haServer.length());
        buffer.asCharBuffer().put(this.haServer.toCharArray()).flip();
        byte[] actualArray = new byte[buffer.limit()];
        System.arraycopy(array, 0, actualArray, 0, actualArray.length);
        return actualArray;
    }

    public synchronized void setJmxConnectionData(JMXServiceURL jmxUrl, String instanceId) {
        block7: {
            String path = this.rootPath + "/" + "ha-servers" + "/" + this.machineId + "-jmx";
            String url = jmxUrl.toString();
            byte[] data = new byte[(url.length() + instanceId.length()) * 2 + 4];
            ByteBuffer buffer = ByteBuffer.wrap(data);
            buffer.putShort((short)url.length());
            buffer.asCharBuffer().put(url.toCharArray());
            buffer.position(buffer.position() + url.length() * 2);
            buffer.putShort((short)instanceId.length());
            buffer.asCharBuffer().put(instanceId.toCharArray());
            if (buffer.limit() != data.length) {
                byte[] array = new byte[buffer.limit()];
                System.arraycopy(data, 0, array, 0, array.length);
                data = array;
            }
            try {
                try {
                    this.zooKeeper.setData(path, data, -1);
                }
                catch (KeeperException e) {
                    if (e.code() == KeeperException.Code.NONODE) {
                        this.zooKeeper.create(path, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                        break block7;
                    }
                    this.msgLog.logMessage("Unable to set jxm connection info", (Throwable)e);
                }
            }
            catch (KeeperException e) {
                this.msgLog.logMessage("Unable to set jxm connection info", (Throwable)e);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                this.msgLog.logMessage("Unable to set jxm connection info", (Throwable)e);
            }
        }
    }

    public int getCurrentMasterNotify() {
        String path = this.rootPath + "/" + MASTER_NOTIFY_CHILD;
        try {
            byte[] data = this.zooKeeper.getData(path, true, null);
            return ByteBuffer.wrap(data).getInt();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void getJmxConnectionData(ConnectionInformation connection) {
        char[] instanceId;
        char[] url;
        byte[] data;
        String path = this.rootPath + "/" + "ha-servers" + "/" + this.machineId + "-jmx";
        try {
            data = this.zooKeeper.getData(path, false, null);
        }
        catch (KeeperException e) {
            return;
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            return;
        }
        if (data == null || data.length == 0) {
            return;
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        try {
            url = new char[buffer.getShort()];
            buffer.asCharBuffer().get(url);
            buffer.position(buffer.position() + url.length * 2);
            instanceId = new char[buffer.getShort()];
            buffer.asCharBuffer().get(instanceId);
        }
        catch (BufferUnderflowException e) {
            return;
        }
        connection.setJMXConnectionData(new String(url), new String(instanceId));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startFlushing() {
        if (this.checkCompatibilityMode()) {
            this.msgLog.logMessage("Discovered compatibility node, will remain in compatibility mode until the node is removed");
            this.updater = new CompatibilitySlaveOnlyTxIdUpdater();
            this.updater.init();
        } else if (!this.flushing) {
            ZooClient zooClient = this;
            synchronized (zooClient) {
                if (!this.flushing) {
                    this.flushing = true;
                    this.updater = new SynchronousTxIdUpdater();
                    this.updater.init();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stopFlushing() {
        if (this.checkCompatibilityMode()) {
            this.msgLog.logMessage("Discovered compatibility node, will remain in compatibility mode until the node is removed");
            this.updater = new CompatibilitySlaveOnlyTxIdUpdater();
            this.updater.init();
        } else if (this.flushing) {
            ZooClient zooClient = this;
            synchronized (zooClient) {
                if (this.flushing) {
                    this.flushing = false;
                    this.updater = new NoUpdateTxIdUpdater();
                    this.updater.init();
                }
            }
        }
    }

    public synchronized void setCommittedTx(long tx) {
        this.committedTx = tx;
        this.updater.updatedTxId(tx);
    }

    private void writeData(long tx, int masterForThat) {
        this.waitForSyncConnected();
        String root = this.getRoot();
        String path = root + "/" + this.machineId + "_" + this.sequenceNr;
        byte[] data = this.dataRepresentingMe(tx, masterForThat);
        try {
            this.zooKeeper.setData(path, data, -1);
        }
        catch (KeeperException e) {
            throw new ZooKeeperException("Unable to set current tx", e);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
            throw new ZooKeeperException("Interrupted...", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int getFirstMasterForTx(long committedTx) {
        if (committedTx == 1L) {
            return -1;
        }
        LogExtractor extractor = null;
        try {
            extractor = LogExtractor.from((String)this.storeDir, (long)committedTx);
            long tx = extractor.extractNext(NullLogBuffer.INSTANCE);
            if (tx != committedTx) {
                this.msgLog.logMessage("Tried to extract master for tx " + committedTx + " at initialization, but got tx " + tx + " back. Will be using " + -1 + " temporarily");
                int n = -1;
                return n;
            }
            int n = extractor.getLastStartEntry().getMasterId();
            return n;
        }
        catch (IOException e) {
            this.msgLog.logMessage("Couldn't get master for " + committedTx + " using " + -1 + " temporarily", (Throwable)e);
            int n = -1;
            return n;
        }
        finally {
            if (extractor != null) {
                extractor.close();
            }
        }
    }

    @Override
    public void shutdown() {
        this.watcher.shutdown();
        this.shutdown = true;
        this.shutdownSlaves();
        super.shutdown();
    }

    @Override
    protected void masterElectionHappened(Pair<Master, Machine> previousMaster, Machine newMaster) {
        if (previousMaster == NO_MASTER_MACHINE_PAIR && newMaster.getMachineId() == this.getMyMachineId()) {
            this.setDataChangeWatcher(MASTER_REBOUND_CHILD, this.getMyMachineId(), false);
        }
    }

    private void shutdownSlaves() {
        for (Pair<SlaveClient, Machine> slave : this.cachedSlaves.values()) {
            ((SlaveClient)slave.first()).shutdown();
        }
        this.cachedSlaves.clear();
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    @Override
    public ZooKeeper getZooKeeper(boolean sync) {
        if (sync) {
            this.zooKeeper.sync(this.rootPath, null, null);
        }
        return this.zooKeeper;
    }

    @Override
    protected Machine getHaServer(int machineId, boolean wait) {
        return machineId == this.machineId ? this.asMachine : super.getHaServer(machineId, wait);
    }

    private boolean checkCompatibilityMode() {
        try {
            this.refreshHaServers();
            int totalCount = this.getNumberOfServers();
            int myVersionCount = this.zooKeeper.getChildren(this.getRoot() + "/" + "compatibility-1.8", false).size();
            boolean result = myVersionCount <= totalCount - 1;
            this.msgLog.logMessage("Checking compatibility mode, read " + totalCount + " as all machines, " + myVersionCount + " as myVersion machines. Based on that I return " + result);
            return result;
        }
        catch (Exception e) {
            this.msgLog.logMessage("Tried to discover if we are in compatibility mode, got this exception instead", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private synchronized StoreId createCluster(StoreId storeIdSuggestion) {
        String path = "/" + this.clusterName;
        try {
            try {
                this.zooKeeper.create(path, storeIdSuggestion.serialize(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                return storeIdSuggestion;
            }
            catch (KeeperException e) {
                if (e.code() == KeeperException.Code.NODEEXISTS) {
                    try {
                        return StoreId.deserialize((byte[])this.zooKeeper.getData(path, false, null));
                    }
                    catch (KeeperException ex) {
                        throw new ZooKeeperException("Unable to read cluster store id", ex);
                    }
                }
                throw new ZooKeeperException("Unable to write cluster store id", e);
            }
        }
        catch (InterruptedException e) {
            throw new ZooKeeperException("createCluster interrupted", e);
        }
    }

    public StoreId getClusterStoreId(AbstractZooKeeperManager.WaitMode mode) {
        this.waitForSyncConnected(mode);
        this.makeSureRootPathIsFound();
        return this.storeId;
    }

    @Override
    protected String getSequenceNr() {
        return this.sequenceNr;
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[serverId:" + this.machineId + ", seq:" + this.sequenceNr + ", lastCommittedTx:" + this.committedTx + " w/ master:" + this.masterForCommittedTx + ", session:" + this.sessionId + "]";
    }

    @Override
    protected void invalidateMaster() {
        super.invalidateMaster();
        this.serversRefreshed = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Slave[] getSlavesFromZooKeeper() {
        Map<Integer, Pair<SlaveClient, Machine>> map = this.cachedSlaves;
        synchronized (map) {
            if (this.serversRefreshed || this.cachedSlaves.isEmpty()) {
                Machine master = (Machine)this.cachedMaster.other();
                if (master.getMachineId() == this.machineId) {
                    Integer[] existingSlaves;
                    HashSet<Integer> visitedSlaves = new HashSet<Integer>();
                    for (Machine machine : this.getHaServers()) {
                        int id = machine.getMachineId();
                        visitedSlaves.add(id);
                        if (id == this.machineId) continue;
                        boolean instantiate = true;
                        Pair<SlaveClient, Machine> existingSlave = this.cachedSlaves.get(id);
                        if (existingSlave != null) {
                            Machine existingMachine = (Machine)existingSlave.other();
                            if (existingMachine.getServer().equals(machine.getServer())) {
                                instantiate = false;
                            } else {
                                ((SlaveClient)existingSlave.first()).shutdown();
                            }
                        }
                        if (!instantiate) continue;
                        this.cachedSlaves.put(id, (Pair<SlaveClient, Machine>)Pair.of((Object)new SlaveClient(machine.getMachineId(), (String)machine.getServer().first(), (Integer)machine.getServer().other(), this.msgLog, this.storeId, (Integer)this.conf.get((GraphDatabaseSetting)HaSettings.max_concurrent_channels_per_slave), (Integer)this.conf.get(HaSettings.com_chunk_size)), (Object)machine));
                    }
                    Integer[] integerArray = existingSlaves = this.cachedSlaves.keySet().toArray(new Integer[this.cachedSlaves.size()]);
                    int len$ = integerArray.length;
                    for (int i$ = 0; i$ < len$; ++i$) {
                        int id = integerArray[i$];
                        if (visitedSlaves.contains(id)) continue;
                        ((SlaveClient)this.cachedSlaves.remove(id).first()).shutdown();
                    }
                } else {
                    this.shutdownSlaves();
                }
                this.serversRefreshed = true;
            }
            Slave[] slaves = new Slave[this.cachedSlaves.size()];
            int i = 0;
            for (Pair pair : this.cachedSlaves.values()) {
                slaves[i++] = (Slave)pair.first();
            }
            return slaves;
        }
    }

    private class CompatibilitySlaveOnlyTxIdUpdater
    extends AbstractTxIdUpdater {
        private CompatibilitySlaveOnlyTxIdUpdater() {
        }

        @Override
        public void init() {
            ZooClient.this.writeData(0L, -1);
            ZooClient.this.msgLog.logMessage("Set to defaults (0 for txid, -1 for master) since we are running in compatilibility mode, while at txid " + ZooClient.this.committedTx);
        }
    }

    private class NoUpdateTxIdUpdater
    extends AbstractTxIdUpdater {
        private NoUpdateTxIdUpdater() {
        }

        @Override
        public void init() {
            ZooClient.this.writeData(-2L, -2);
            ZooClient.this.msgLog.logMessage("Stopping flushing of txids to zk, while at txid " + ZooClient.this.committedTx);
        }
    }

    private class SynchronousTxIdUpdater
    extends AbstractTxIdUpdater {
        private SynchronousTxIdUpdater() {
        }

        @Override
        public void init() {
            ZooClient.this.writeData(ZooClient.this.committedTx, ZooClient.this.getFirstMasterForTx(ZooClient.this.committedTx));
            ZooClient.this.msgLog.logMessage("Starting flushing of txids to zk, while at txid " + ZooClient.this.committedTx);
        }

        @Override
        public void updatedTxId(long txId) {
            ZooClient.this.masterForCommittedTx = ZooClient.this.localDatabase.getMasterForTx(txId);
            ZooClient.this.writeData(txId, ZooClient.this.masterForCommittedTx);
        }
    }

    private abstract class AbstractTxIdUpdater
    implements TxIdUpdater {
        private AbstractTxIdUpdater() {
        }

        @Override
        public void updatedTxId(long txId) {
        }
    }

    private class WatcherImpl
    implements Watcher {
        private final Queue<WatchedEvent> unprocessedEvents = new LinkedList<WatchedEvent>();
        private final ExecutorService threadPool = Executors.newCachedThreadPool();
        private final AtomicInteger count = new AtomicInteger(0);
        private volatile boolean electionHappening = false;

        private WatcherImpl() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void flushUnprocessedEvents(ZooKeeper zooKeeper) {
            Queue<WatchedEvent> queue = this.unprocessedEvents;
            synchronized (queue) {
                WatchedEvent e = null;
                while ((e = this.unprocessedEvents.poll()) != null) {
                    this.runEventInThread(e, zooKeeper);
                }
            }
        }

        public void shutdown() {
            this.threadPool.shutdown();
            try {
                this.threadPool.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                ZooClient.this.msgLog.logMessage(ZooClient.this + " couldn't flush pending events in time during shutdown", true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void process(WatchedEvent event) {
            Queue<WatchedEvent> queue = this.unprocessedEvents;
            synchronized (queue) {
                if (ZooClient.this.zooKeeper == null || !this.unprocessedEvents.isEmpty()) {
                    this.unprocessedEvents.add(event);
                    return;
                }
            }
            this.runEventInThread(event, ZooClient.this.zooKeeper);
        }

        private synchronized void runEventInThread(final WatchedEvent event, final ZooKeeper zoo) {
            if (ZooClient.this.shutdown) {
                return;
            }
            if (this.count.get() > 10) {
                ZooClient.this.msgLog.logMessage("Thread count is already at " + this.count.get() + " and added another ZK event handler thread.");
            }
            this.threadPool.submit(new Runnable(){

                @Override
                public void run() {
                    WatcherImpl.this.processEvent(event, zoo);
                }
            });
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processEvent(WatchedEvent event, ZooKeeper zooKeeper) {
            block35: {
                try {
                    this.count.incrementAndGet();
                    String path = event.getPath();
                    ZooClient.this.msgLog.logMessage(this + ", " + new Date() + " Got event: " + event + " (path=" + path + ")", true);
                    if (path == null && event.getState() == Watcher.Event.KeeperState.Expired) {
                        ZooClient.this.keeperState = Watcher.Event.KeeperState.Expired;
                        ZooClient.this.clusterReceiver.reconnect(new InformativeStackTrace("Reconnect due to session expired"));
                        break block35;
                    }
                    if (path == null && event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        long newSessionId = zooKeeper.getSessionId();
                        if (newSessionId != ZooClient.this.sessionId) {
                            if (ZooClient.this.writeLastCommittedTx) {
                                ZooClient.this.sequenceNr = ZooClient.this.setup();
                                ZooClient.this.msgLog.logMessage("Did setup, seq=" + ZooClient.this.sequenceNr + " new sessionId=" + newSessionId);
                                int previousMaster = ZooClient.this.getCurrentMasterNotify();
                                if (ZooClient.this.sessionId != -1L) {
                                    ZooClient.this.clusterReceiver.newMaster(new InformativeStackTrace("Got SyncConnected event from ZK"));
                                    if (ZooClient.this.getCurrentMasterNotify() == ZooClient.this.getMyMachineId() && previousMaster == ZooClient.this.getMyMachineId()) {
                                        ZooClient.this.setDataChangeWatcher(ZooClient.MASTER_REBOUND_CHILD, ZooClient.this.getMyMachineId(), false);
                                    }
                                }
                                ZooClient.this.sessionId = newSessionId;
                            } else {
                                ZooClient.this.msgLog.logMessage("Didn't do setup due to told not to write");
                                ZooClient.this.keeperState = Watcher.Event.KeeperState.SyncConnected;
                                ZooClient.this.subscribeToDataChangeWatcher(ZooClient.MASTER_REBOUND_CHILD);
                            }
                            ZooClient.this.keeperState = Watcher.Event.KeeperState.SyncConnected;
                            if (ZooClient.this.checkCompatibilityMode()) {
                                ZooClient.this.msgLog.logMessage("Discovered compatibility node, will remain in compatibility mode until the node is removed");
                                ZooClient.this.updater = new CompatibilitySlaveOnlyTxIdUpdater();
                            }
                        } else {
                            ZooClient.this.msgLog.logMessage("SyncConnected with same session id: " + ZooClient.this.sessionId);
                            ZooClient.this.keeperState = Watcher.Event.KeeperState.SyncConnected;
                        }
                        break block35;
                    }
                    if (path == null && event.getState() == Watcher.Event.KeeperState.Disconnected) {
                        ZooClient.this.keeperState = Watcher.Event.KeeperState.Disconnected;
                        break block35;
                    }
                    if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                        ZooClient.this.msgLog.logMessage("Got a NodeDeleted event for " + path);
                        ZooKeeperMachine currentMaster = (ZooKeeperMachine)ZooClient.this.getCachedMaster().other();
                        if (path.contains(currentMaster.getZooKeeperPath())) {
                            ZooClient.this.msgLog.logMessage("Acting on it, calling newMaster()");
                            ZooClient.this.clusterReceiver.newMaster(new InformativeStackTrace("NodeDeleted event received (a machine left the cluster)"));
                        }
                        break block35;
                    }
                    if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                        if (path.endsWith("ha-servers")) {
                            try {
                                ZooClient.this.refreshHaServers();
                                ZooClient.this.serversRefreshed = true;
                                ZooClient.this.subscribeToChildrenChangeWatcher("ha-servers");
                            }
                            catch (ZooKeeperException e) {}
                        }
                        break block35;
                    }
                    if (event.getType() != Watcher.Event.EventType.NodeDataChanged) break block35;
                    int updatedData = ZooClient.this.toInt(ZooClient.this.getZooKeeper(true).getData(path, true, null));
                    ZooClient.this.msgLog.logMessage("Got event data " + updatedData);
                    if (path.contains(ZooClient.MASTER_NOTIFY_CHILD)) {
                        if (updatedData != ZooClient.this.machineId || this.electionHappening) break block35;
                        try {
                            this.electionHappening = true;
                            ZooClient.this.clusterReceiver.newMaster(new InformativeStackTrace("NodeDataChanged event received (someone though I should be the master)"));
                            ZooClient.this.serversRefreshed = true;
                            break block35;
                        }
                        finally {
                            this.electionHappening = false;
                        }
                    }
                    if (path.contains(ZooClient.MASTER_REBOUND_CHILD)) {
                        if (updatedData == ZooClient.this.machineId || this.electionHappening) break block35;
                        try {
                            this.electionHappening = true;
                            ZooClient.this.clusterReceiver.newMaster(new InformativeStackTrace("NodeDataChanged event received (new master ensures I'm slave)"));
                            ZooClient.this.serversRefreshed = true;
                            break block35;
                        }
                        finally {
                            this.electionHappening = false;
                        }
                    }
                    if (path.contains("flush-requested")) {
                        if (updatedData == -6) {
                            ZooClient.this.stopFlushing();
                        } else {
                            ZooClient.this.startFlushing();
                        }
                    } else {
                        ZooClient.this.msgLog.logMessage("Unrecognized data change " + path);
                    }
                }
                catch (BrokerShutDownException e) {
                }
                catch (Throwable e) {
                    ZooClient.this.msgLog.logMessage("Error in ZooClient.process", e, true);
                    throw Exceptions.launderedException((Throwable)e);
                }
                finally {
                    ZooClient.this.msgLog.flush();
                    this.count.decrementAndGet();
                }
            }
        }
    }
}

