/*
 * Decompiled with CFR 0.152.
 */
package com.groupon.mesos.zookeeper;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.InvalidProtocolBufferException;
import com.groupon.mesos.util.Log;
import com.groupon.mesos.util.ManagedEventBus;
import com.groupon.mesos.util.UPID;
import com.groupon.mesos.zookeeper.DetectMessage;
import com.groupon.mesos.zookeeper.MasterUpdateMessage;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.mesos.Protos;

public class ZookeeperMasterDetector
implements Closeable {
    private static final Log LOG = Log.getLog(ZookeeperMasterDetector.class);
    private final String zookeeperPath;
    private final String user;
    private final String password;
    private final ZkClient client;
    private final ManagedEventBus eventBus;
    private final SortedMap<String, Protos.MasterInfo> nodeCache = new TreeMap<String, Protos.MasterInfo>();
    private final BlockingQueue<DetectMessage> futures = new LinkedBlockingQueue<DetectMessage>();
    private final AtomicBoolean running = new AtomicBoolean(false);

    public ZookeeperMasterDetector(String master, ManagedEventBus eventBus) throws IOException {
        Preconditions.checkNotNull((Object)master, (Object)"master is null");
        this.eventBus = (ManagedEventBus)Preconditions.checkNotNull((Object)eventBus, (Object)"eventBus is null");
        URI zookeeperUri = URI.create(master);
        Preconditions.checkState((boolean)zookeeperUri.getScheme().equals("zk"), (String)"Only zk:// URIs are supported (%s)", (Object[])new Object[]{master});
        String authority = zookeeperUri.getAuthority();
        int atIndex = authority.indexOf(64);
        if (atIndex != -1) {
            ImmutableList userPass = ImmutableList.copyOf((Iterable)Splitter.on((char)':').trimResults().split((CharSequence)authority.substring(0, atIndex)));
            Preconditions.checkState((userPass.size() == 2 ? 1 : 0) != 0, (String)"found %s for user name and password", (Object[])new Object[]{userPass});
            this.user = (String)userPass.get(0);
            this.password = (String)userPass.get(1);
            authority = authority.substring(atIndex + 1);
        } else {
            this.user = null;
            this.password = null;
        }
        String zookeeperPath = zookeeperUri.getPath();
        while (zookeeperPath.endsWith("/")) {
            zookeeperPath = zookeeperPath.substring(0, zookeeperPath.length() - 1);
        }
        this.zookeeperPath = zookeeperPath;
        Preconditions.checkState((!zookeeperPath.equals("") ? 1 : 0) != 0, (String)"A zookeeper path must be given! (%s)", (Object[])new Object[]{zookeeperPath});
        Preconditions.checkState((this.user == null && this.password == null ? 1 : 0) != 0, (Object)"Current version of Zkclient does not support authentication!");
        this.client = new ZkClient(authority);
        this.client.setZkSerializer((ZkSerializer)new MasterInfoZkSerializer());
    }

    @Override
    public void close() throws IOException {
        if (this.running.getAndSet(false)) {
            this.client.close();
            ArrayList detectMessages = new ArrayList(this.futures.size());
            this.futures.drainTo(detectMessages);
            for (DetectMessage detectMessage : detectMessages) {
                detectMessage.getFuture().cancel(false);
            }
        }
    }

    public void start() {
        if (!this.running.getAndSet(true)) {
            this.processChildList(this.client.getChildren(this.zookeeperPath));
            this.client.subscribeChildChanges(this.zookeeperPath, new IZkChildListener(){

                public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
                    Preconditions.checkState((boolean)ZookeeperMasterDetector.this.zookeeperPath.equals(parentPath), (String)"Received Event for %s (expected %s)", (Object[])new Object[]{parentPath, ZookeeperMasterDetector.this.zookeeperPath});
                    ZookeeperMasterDetector.this.processChildList(currentChildren);
                }
            });
        }
    }

    public ListenableFuture<Protos.MasterInfo> detect(Protos.MasterInfo previous) {
        Preconditions.checkState((boolean)this.running.get(), (Object)"not running");
        SettableFuture result = SettableFuture.create();
        this.eventBus.post(new DetectMessage((SettableFuture<Protos.MasterInfo>)result, previous));
        return result;
    }

    @Subscribe
    public void processMasterUpdate(MasterUpdateMessage message) {
        Set<String> currentNodes = message.getNodes();
        ImmutableSet nodesToRemove = ImmutableSet.copyOf((Collection)Sets.difference(this.nodeCache.keySet(), currentNodes));
        ImmutableSet nodesToAdd = ImmutableSet.copyOf((Collection)Sets.difference(currentNodes, this.nodeCache.keySet()));
        for (String node : nodesToAdd) {
            String path = this.zookeeperPath + "/" + node;
            Protos.MasterInfo masterInfo = (Protos.MasterInfo)this.client.readData(path);
            this.nodeCache.put(node, masterInfo);
        }
        for (String node : nodesToRemove) {
            this.nodeCache.remove(node);
        }
        LOG.debug("Processed event, active nodes are %s", this.nodeCache.entrySet());
        Protos.MasterInfo masterInfo = this.getMaster();
        if (masterInfo == null) {
            LOG.debug("No current master exists!", new Object[0]);
        } else {
            LOG.debug("Current master is %s", UPID.create(masterInfo.getPid()).asString());
        }
        ArrayList detectMessages = new ArrayList(this.futures.size());
        this.futures.drainTo(detectMessages);
        for (DetectMessage detectMessage : detectMessages) {
            this.processDetect(detectMessage);
        }
    }

    @Subscribe
    public void processDetect(DetectMessage message) {
        SettableFuture<Protos.MasterInfo> future = message.getFuture();
        Protos.MasterInfo previous = message.getPrevious();
        Protos.MasterInfo currentLeader = this.getMaster();
        if (!Objects.equal((Object)currentLeader, (Object)previous)) {
            LOG.debug("Master has changed: %s -> %s", previous, currentLeader);
            future.set((Object)currentLeader);
        } else {
            LOG.debug("Master unchanged, queueing", new Object[0]);
            this.futures.add(message);
        }
    }

    private void processChildList(List<String> children) {
        ImmutableSet masterNodes = ImmutableSet.copyOf((Iterable)Iterables.filter(children, (Predicate)Predicates.containsPattern((String)"^info_")));
        this.eventBus.post(new MasterUpdateMessage((Set<String>)masterNodes));
    }

    private Protos.MasterInfo getMaster() {
        if (this.nodeCache.isEmpty()) {
            return null;
        }
        String key = this.nodeCache.firstKey();
        return (Protos.MasterInfo)this.nodeCache.get(key);
    }

    private static class MasterInfoZkSerializer
    implements ZkSerializer {
        private MasterInfoZkSerializer() {
        }

        public byte[] serialize(Object data) throws ZkMarshallingError {
            Preconditions.checkState((boolean)(data instanceof Protos.MasterInfo), (String)"%s is not a MasterInfo!", (Object[])new Object[]{data.getClass().getSimpleName()});
            return ((Protos.MasterInfo)data).toByteArray();
        }

        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            Preconditions.checkNotNull((Object)bytes, (Object)"bytes is null");
            try {
                return Protos.MasterInfo.parseFrom((byte[])bytes);
            }
            catch (InvalidProtocolBufferException e) {
                return new ZkMarshallingError((Throwable)e);
            }
        }
    }
}

