/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.zookeeper;

import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.registry.zookeeper.configuration.ZookeeperRegistryProperties;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public class CuratorFrameworkClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(CuratorFrameworkClient.class);
    private final Map<String, ChildChangedWatcher> childWatchers = new HashMap<String, ChildChangedWatcher>();
    private final CuratorFramework curatorFramework;
    private final ReconnectCallback reconnectCallback;

    public CuratorFrameworkClient(ZookeeperRegistryProperties config, ReconnectCallback reconnectCallback) throws Exception {
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(config.getConnectString()).connectionTimeoutMs(config.getConnectionTimeoutMs()).sessionTimeoutMs(config.getSessionTimeoutMs()).retryPolicy(CuratorFrameworkClient.buildRetryPolicy(config));
        Optional.ofNullable(config.authorization()).ifPresent(s -> builder.authorization("digest", s.getBytes()));
        this.curatorFramework = builder.build();
        this.curatorFramework.getConnectionStateListenable().addListener((Object)new CuratorConnectionStateListener());
        this.curatorFramework.start();
        boolean isStarted = this.curatorFramework.getState() == CuratorFrameworkState.STARTED;
        Assert.state((boolean)isStarted, () -> "Curator framework not started: " + this.curatorFramework.getState());
        boolean isConnected = this.curatorFramework.blockUntilConnected(config.getMaxWaitTimeMs(), TimeUnit.MILLISECONDS);
        Assert.state((boolean)isConnected, () -> "Curator framework not connected: " + this.curatorFramework.getState());
        this.reconnectCallback = reconnectCallback;
    }

    public void createPersistent(String path) throws Exception {
        try {
            this.curatorFramework.create().creatingParentsIfNeeded().forPath(path);
        }
        catch (KeeperException.NodeExistsException e) {
            LOG.debug("Node path already exists: {}, {}", (Object)path, (Object)e.getMessage());
        }
    }

    public void createEphemeral(String path, int retries) throws Exception {
        block2: {
            try {
                ((ACLBackgroundPathAndBytesable)this.curatorFramework.create().withMode(CreateMode.EPHEMERAL)).forPath(path);
            }
            catch (KeeperException.NodeExistsException e) {
                LOG.debug("Node path already exists: {}, {}", (Object)path, (Object)e.getMessage());
                if (retries <= 0) break block2;
                this.deletePath(path);
                this.createEphemeral(path, --retries);
            }
        }
    }

    public void createPersistent(String path, byte[] data) throws Exception {
        try {
            this.curatorFramework.create().creatingParentsIfNeeded().forPath(path, data);
        }
        catch (KeeperException.NodeExistsException ignored) {
            this.curatorFramework.setData().forPath(path, data);
        }
    }

    public void createEphemeral(String path, byte[] data, int retries) throws Exception {
        block2: {
            try {
                ((ACLBackgroundPathAndBytesable)this.curatorFramework.create().withMode(CreateMode.EPHEMERAL)).forPath(path, data);
            }
            catch (KeeperException.NodeExistsException ignored) {
                if (retries <= 0) break block2;
                this.deletePath(path);
                this.createEphemeral(path, data, --retries);
            }
        }
    }

    public void deletePath(String path) throws Exception {
        try {
            this.curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
        }
        catch (KeeperException.NoNodeException e) {
            LOG.debug("Node path not exists: {}, {}", (Object)path, (Object)e.getMessage());
        }
    }

    public List<String> getChildren(String path) throws Exception {
        try {
            return (List)this.curatorFramework.getChildren().forPath(path);
        }
        catch (KeeperException.NoNodeException ignored) {
            return null;
        }
    }

    public byte[] getData(String path) throws Exception {
        try {
            return (byte[])this.curatorFramework.getData().forPath(path);
        }
        catch (KeeperException.NoNodeException ignored) {
            return null;
        }
    }

    public boolean checkExists(String path) {
        try {
            return this.curatorFramework.checkExists().forPath(path) != null;
        }
        catch (Exception ignored) {
            return false;
        }
    }

    public synchronized void watchChildChanged(String path, CountDownLatch latch, Consumer<List<String>> processor) throws Exception {
        if (this.childWatchers.containsKey(path)) {
            throw new IllegalStateException("Path already watched: " + path);
        }
        ChildChangedWatcher watcher = new ChildChangedWatcher(path, latch, processor);
        List servers = (List)((BackgroundPathable)this.curatorFramework.getChildren().usingWatcher((CuratorWatcher)watcher)).forPath(path);
        this.childWatchers.put(path, watcher);
        processor.accept(servers);
    }

    public synchronized boolean unwatchChildChanged(String path) {
        ChildChangedWatcher watcher = this.childWatchers.remove(path);
        if (watcher != null) {
            watcher.unwatch();
            return true;
        }
        return false;
    }

    public boolean isConnected() {
        return this.curatorFramework.getZookeeperClient().isConnected();
    }

    @Override
    public synchronized void close() {
        new ArrayList<String>(this.childWatchers.keySet()).forEach(this::unwatchChildChanged);
        this.curatorFramework.close();
    }

    private static RetryPolicy buildRetryPolicy(ZookeeperRegistryProperties config) {
        return new ExponentialBackoffRetry(config.getBaseSleepTimeMs(), config.getMaxRetries(), config.getMaxSleepMs());
    }

    private class CuratorConnectionStateListener
    implements ConnectionStateListener {
        private static final long UNKNOWN_SESSION_ID = -1L;
        private long lastSessionId;

        private CuratorConnectionStateListener() {
        }

        public void stateChanged(CuratorFramework client, ConnectionState state) {
            long sessionId;
            try {
                sessionId = client.getZookeeperClient().getZooKeeper().getSessionId();
            }
            catch (Throwable t) {
                sessionId = -1L;
                LOG.warn("Curator client state changed, get session instance error.", t);
            }
            if (state == ConnectionState.CONNECTED) {
                this.lastSessionId = sessionId;
                LOG.info("Curator first connected, session={}", (Object)sessionId);
            } else if (state == ConnectionState.LOST) {
                LOG.warn("Curator session expired, session={}", (Object)this.lastSessionId);
            } else if (state == ConnectionState.SUSPENDED) {
                LOG.warn("Curator connection lost, session={}", (Object)sessionId);
            } else if (state == ConnectionState.RECONNECTED) {
                if (this.lastSessionId == sessionId && sessionId != -1L) {
                    LOG.warn("Curator recover connected, reuse old-session={}", (Object)sessionId);
                } else {
                    LOG.warn("Curator recover connected, old-session={}, new-session={}", (Object)this.lastSessionId, (Object)sessionId);
                    this.lastSessionId = sessionId;
                }
                CuratorFrameworkClient.this.reconnectCallback.call(CuratorFrameworkClient.this);
            }
        }
    }

    private class ChildChangedWatcher
    implements CuratorWatcher {
        private final String path;
        private final CountDownLatch latch;
        private volatile Consumer<List<String>> processor;

        public ChildChangedWatcher(String path, CountDownLatch latch, Consumer<List<String>> processor) {
            this.path = path;
            this.latch = latch;
            this.processor = processor;
        }

        public void unwatch() {
            this.processor = null;
        }

        public void process(WatchedEvent event) throws Exception {
            Throwables.ThrowingRunnable.doCaught(this.latch::await);
            LOG.info("Watched event type: {}", (Object)event.getType());
            Consumer<List<String>> action = this.processor;
            if (action == null || event.getType() == Watcher.Event.EventType.None) {
                return;
            }
            List children = (List)((BackgroundPathable)CuratorFrameworkClient.this.curatorFramework.getChildren().usingWatcher((CuratorWatcher)this)).forPath(this.path);
            action.accept(children);
        }
    }

    public static interface ReconnectCallback {
        public void call(CuratorFrameworkClient var1);
    }
}

