/*
 * Decompiled with CFR 0.152.
 */
package org.lable.oss.dynamicconfig.zookeeper;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.lable.oss.dynamicconfig.zookeeper.ZooKeeperConnectionObserver;
import org.lable.oss.dynamicconfig.zookeeper.ZooKeeperLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitoringZookeeperConnection
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(MonitoringZookeeperConnection.class);
    static final int MAX_RETRY_WAIT_MINUTES = 5;
    static final String LOCKING_NODES = "/locks";
    static final long MAINTENANCE_TIMER_INTERVAL = TimeUnit.SECONDS.toSeconds(10L);
    final String connectString;
    final Map<String, NodeState> monitoredFiles;
    final NodeChangeListener changeListener;
    final String identityString;
    final Watcher watcher;
    final Queue<Task> jobQueue;
    final ScheduledExecutorService executor;
    final Queue<ZooKeeperConnectionObserver> observers = new ConcurrentLinkedQueue<ZooKeeperConnectionObserver>();
    CompletableFuture<Void> connectionTask;
    Future<?> jobRunner;
    ZooKeeper zooKeeper;
    State state;
    boolean runMaintenanceTasksNow;
    int retryCounter;
    int retryWait;

    public MonitoringZookeeperConnection(String quorum) {
        this(quorum.split(","), null, null);
    }

    public MonitoringZookeeperConnection(String[] quorum) {
        this(quorum, null, null);
    }

    public MonitoringZookeeperConnection(String[] quorum, String chroot, NodeChangeListener changeListener) {
        if (chroot == null) {
            this.connectString = String.join((CharSequence)",", quorum);
        } else {
            if (!chroot.startsWith("/")) {
                chroot = "/" + chroot;
            }
            this.connectString = String.join((CharSequence)",", quorum) + chroot;
        }
        logger.info("Monitoring: {}", (Object)this.connectString);
        this.state = State.CONNECTING;
        this.changeListener = changeListener == null ? (name, inputStream) -> {} : changeListener;
        this.jobQueue = new ConcurrentLinkedQueue<Task>();
        this.monitoredFiles = new ConcurrentHashMap<String, NodeState>();
        this.identityString = "MonitoringZKConn " + chroot;
        this.watcher = new MZKWatcher();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.resetRetryCounters();
        this.connectionTask = CompletableFuture.runAsync(this::connect, this.executor).thenRun(() -> {
            this.jobRunner = this.executor.scheduleWithFixedDelay(new JobRunner(this), 50L, 1000L, TimeUnit.MILLISECONDS);
        });
    }

    public Optional<InputStream> load(String node) {
        if (!MonitoringZookeeperConnection.isLegalName(node)) {
            logger.error(this.identityString + ": ZooKeeper node name is not valid ({}).", (Object)node);
            return Optional.empty();
        }
        switch (this.state) {
            case CONNECTING: {
                logger.info(this.identityString + ": Connection to ZooKeeper not established yet; waiting\u2026");
                try {
                    TimeUnit.MILLISECONDS.sleep(300L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return Optional.empty();
                }
                return this.load(node);
            }
            case LIVE: {
                break;
            }
            case CLOSED: {
                return Optional.empty();
            }
        }
        try {
            byte[] data = this.zooKeeper.getData(node, false, null);
            return data == null || data.length == 0 ? Optional.empty() : Optional.of(new ByteArrayInputStream(data));
        }
        catch (KeeperException e) {
            logger.error(this.identityString + ": Failure during getData on " + node, (Throwable)e);
            return Optional.empty();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Optional.empty();
        }
    }

    public void set(String node, String value, boolean createIfMissing) {
        if (!MonitoringZookeeperConnection.isLegalName(node)) {
            logger.error(this.identityString + ": ZooKeeper node name is not valid ({}).", (Object)node);
            return;
        }
        byte[] data = value.getBytes(Charset.forName("UTF-8"));
        Task task = () -> {
            try {
                this.zooKeeper.setData(node, data, -1);
            }
            catch (KeeperException.NoNodeException e) {
                if (createIfMissing) {
                    this.zooKeeper.create(node, data, (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
                logger.error("Failed to set node " + node + ", it is missing.", (Throwable)e);
            }
        };
        this.jobQueue.add(task);
    }

    public void registerObserver(ZooKeeperConnectionObserver observer) {
        this.observers.add(observer);
    }

    public void deregisterObserver(ZooKeeperConnectionObserver observer) {
        this.observers.remove(observer);
    }

    public State getState() {
        return this.state;
    }

    public ZooKeeper getActiveConnection() {
        if (this.state != State.LIVE) {
            try {
                logger.info("Not connected.");
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return this.getActiveConnection();
        }
        return this.zooKeeper;
    }

    synchronized void connect() {
        if (this.state != State.CONNECTING) {
            return;
        }
        if (this.zooKeeper != null) {
            try {
                this.zooKeeper.close();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.retryCounter > 0) {
            logger.warn(this.identityString + ": Failed to connect to Zookeeper quorum, retrying (" + this.retryCounter + ").");
        }
        try {
            this.zooKeeper = new ZooKeeper(this.connectString, 3000, this.watcher);
        }
        catch (IOException e) {
            this.waitBeforeRetrying();
            this.connect();
        }
    }

    public void listen(String node) {
        this.listen(node, false);
    }

    public void listen(String node, boolean loadInitially) {
        if (!MonitoringZookeeperConnection.isLegalName(node)) {
            logger.error("Configuration source name is not valid ({}).", (Object)node);
            return;
        }
        if (this.state != State.LIVE) {
            try {
                logger.info("Not connected.");
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.listen(node, loadInitially);
        }
        this.exclusiveListen(node, loadInitially);
    }

    synchronized void exclusiveListen(String node, boolean loadInitially) {
        NodeState nodeState;
        if (this.monitoredFiles.containsKey(node)) {
            nodeState = this.monitoredFiles.get(node);
            nodeState.markAsMonitored();
        } else {
            nodeState = new NodeState(node);
            this.monitoredFiles.put(node, nodeState);
        }
        if (loadInitially) {
            nodeState.markForReloading();
        }
        this.attemptToSetWatcher(nodeState);
    }

    synchronized void processTriggeredWatcher(WatchedEvent event) {
        if (this.state == State.CLOSED) {
            return;
        }
        Watcher.Event.EventType eventType = event.getType();
        String znode = event.getPath();
        if (znode == null) {
            return;
        }
        NodeState nodeState = this.monitoredFiles.get(znode);
        if (nodeState == null) {
            logger.warn("Watcher triggered ({}) for unknown node {}.", (Object)eventType, (Object)znode);
            return;
        }
        if (!nodeState.isMonitored()) {
            nodeState.setWatcherState(NodeState.WatcherState.NO_WATCHER);
            return;
        }
        boolean needsWatcher = true;
        switch (eventType) {
            case None: 
            case NodeChildrenChanged: {
                break;
            }
            case NodeDeleted: {
                logger.error("Watched configuration part deleted! Keeping the last-known version in memory until this part is restored, or if all references to this part are removed from the config.");
                nodeState.markForReloading();
                break;
            }
            case NodeCreated: 
            case NodeDataChanged: {
                try {
                    Stat stat = new Stat();
                    byte[] data = this.zooKeeper.getData(znode, false, stat);
                    Instant mTime = Instant.ofEpochMilli(stat.getMtime());
                    Optional<Instant> lastUpdated = nodeState.getLastUpdated();
                    if (lastUpdated.isPresent() && !mTime.isAfter(lastUpdated.get())) {
                        logger.warn("Received duplicate watch trigger event for {}. Ignoring it.", (Object)znode);
                        needsWatcher = false;
                        break;
                    }
                    this.changeListener.changed(znode, new ByteArrayInputStream(data));
                    nodeState.markAsReloaded(mTime);
                    break;
                }
                catch (KeeperException e) {
                    logger.error("Failed to read data from znode " + znode + "! Will attempt to reload later.", (Throwable)e);
                    nodeState.markForReloading();
                    break;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (needsWatcher) {
            nodeState.setWatcherState(NodeState.WatcherState.NEEDS_WATCHER);
            this.attemptToSetWatcher(nodeState);
        }
    }

    synchronized void attemptToSetWatcher(NodeState nodeState) {
        if (nodeState == null || nodeState.getWatcherState() == NodeState.WatcherState.HAS_WATCHER) {
            return;
        }
        try {
            nodeState.setWatcherState(NodeState.WatcherState.HAS_WATCHER);
            if (nodeState.needsReloading()) {
                logger.info("Reloading znode {}", (Object)nodeState.znode);
                Stat stat = new Stat();
                byte[] data = this.zooKeeper.getData(nodeState.znode, this::processTriggeredWatcher, stat);
                Instant mTime = Instant.ofEpochMilli(stat.getMtime());
                this.changeListener.changed(nodeState.znode, new ByteArrayInputStream(data));
                nodeState.markAsReloaded(mTime);
            } else {
                logger.info("Setting watcher on znode {}", (Object)nodeState.znode);
                this.zooKeeper.exists(nodeState.znode, this::processTriggeredWatcher);
            }
        }
        catch (KeeperException e) {
            logger.error("Failed to set watcher for node " + nodeState.znode + "! Will attempt to re-set later.", (Throwable)e);
            nodeState.setWatcherState(NodeState.WatcherState.NEEDS_WATCHER);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public synchronized void stopListening(String part) {
        if (this.monitoredFiles.containsKey(part)) {
            this.monitoredFiles.get(part).markAsUnmonitored();
        }
    }

    public ZooKeeperLock prepareLock(String znode) {
        return new ZooKeeperLock(() -> this.zooKeeper, LOCKING_NODES + znode);
    }

    @Override
    public void close() throws IOException {
        this.state = State.CLOSED;
        if (this.connectionTask != null && !this.connectionTask.isDone()) {
            this.connectionTask.cancel(true);
        }
        if (this.jobRunner != null) {
            this.jobRunner.cancel(false);
        }
        if (this.zooKeeper != null) {
            try {
                this.zooKeeper.close();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    synchronized void performMaintenanceTasks() {
        this.monitoredFiles.values().stream().filter(NodeState::isMonitored).filter(nodeState -> nodeState.getWatcherState() == NodeState.WatcherState.NEEDS_WATCHER).forEach(nodeState -> {
            logger.warn("Reset watcher on " + ((NodeState)nodeState).znode);
            this.attemptToSetWatcher((NodeState)nodeState);
        });
    }

    void waitBeforeRetrying() {
        if (this.retryWait < 300) {
            this.retryWait *= 2;
            if (this.retryWait > 300) {
                this.retryWait = 300;
            }
        }
        ++this.retryCounter;
        try {
            logger.info("Failed to connect to ZooKeeper quorum, waiting " + this.retryWait + "s before retrying.");
            TimeUnit.SECONDS.sleep(this.retryWait);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
    }

    void resetRetryCounters() {
        this.retryCounter = 0;
        this.retryWait = 10;
    }

    static boolean isLegalName(String name) {
        return name != null && !name.isEmpty();
    }

    @FunctionalInterface
    public static interface NodeChangeListener {
        public void changed(String var1, InputStream var2);
    }

    static interface Task {
        public void execute() throws KeeperException, InterruptedException;
    }

    static class NodeState {
        private final String znode;
        private boolean needsReloading;
        private boolean monitored;
        private Instant lastUpdate;
        private WatcherState watcherState;

        public NodeState(String znode) {
            this.znode = znode;
            this.monitored = true;
            this.needsReloading = false;
            this.watcherState = WatcherState.NO_WATCHER;
            this.lastUpdate = null;
        }

        void markForReloading() {
            this.needsReloading = true;
        }

        void markAsReloaded(Instant mTime) {
            this.needsReloading = false;
            this.lastUpdate = mTime;
        }

        boolean needsReloading() {
            return this.needsReloading;
        }

        void markAsUnmonitored() {
            this.monitored = false;
        }

        void markAsMonitored() {
            this.monitored = true;
        }

        boolean isMonitored() {
            return this.monitored;
        }

        WatcherState getWatcherState() {
            return this.watcherState;
        }

        void setWatcherState(WatcherState watcherState) {
            this.watcherState = watcherState;
        }

        Optional<Instant> getLastUpdated() {
            return Optional.ofNullable(this.lastUpdate);
        }

        static enum WatcherState {
            NEEDS_WATCHER,
            HAS_WATCHER,
            NO_WATCHER;

        }
    }

    static class JobRunner
    implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(JobRunner.class);
        MonitoringZookeeperConnection monitoringZookeeperConnection;
        long maintenanceTaskLastRan;

        public JobRunner(MonitoringZookeeperConnection monitoringZookeeperConnection) {
            this.monitoringZookeeperConnection = monitoringZookeeperConnection;
            this.maintenanceTaskLastRan = System.currentTimeMillis();
        }

        @Override
        public void run() {
            switch (this.monitoringZookeeperConnection.getState()) {
                case CONNECTING: 
                case CLOSED: {
                    break;
                }
                case LIVE: {
                    while (!this.monitoringZookeeperConnection.jobQueue.isEmpty()) {
                        Task job = this.monitoringZookeeperConnection.jobQueue.poll();
                        if (job == null) continue;
                        try {
                            job.execute();
                        }
                        catch (KeeperException.ConnectionLossException | KeeperException.OperationTimeoutException | KeeperException.SessionExpiredException e) {
                            this.monitoringZookeeperConnection.jobQueue.add(job);
                        }
                        catch (KeeperException e) {
                            logger.error("Unexpected ZooKeeper error.", (Throwable)e);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    long now = System.currentTimeMillis();
                    if (!this.monitoringZookeeperConnection.runMaintenanceTasksNow && this.maintenanceTaskLastRan + MAINTENANCE_TIMER_INTERVAL >= now) break;
                    this.monitoringZookeeperConnection.runMaintenanceTasksNow = false;
                    this.monitoringZookeeperConnection.performMaintenanceTasks();
                    this.maintenanceTaskLastRan = now;
                }
            }
        }
    }

    private class MZKWatcher
    implements Watcher {
        private MZKWatcher() {
        }

        public void process(WatchedEvent event) {
            if (MonitoringZookeeperConnection.this.state != State.LIVE && MonitoringZookeeperConnection.this.state != State.CONNECTING) {
                return;
            }
            Watcher.Event.KeeperState state = event.getState();
            Watcher.Event.EventType type = event.getType();
            block1 : switch (state) {
                case SyncConnected: {
                    logger.info("Connection (re-)established.");
                    MonitoringZookeeperConnection.this.resetRetryCounters();
                    MonitoringZookeeperConnection.this.monitoredFiles.values().forEach(nodeState -> {
                        if (nodeState.getWatcherState() == NodeState.WatcherState.HAS_WATCHER) {
                            nodeState.setWatcherState(NodeState.WatcherState.NEEDS_WATCHER);
                            nodeState.markForReloading();
                        }
                    });
                    MonitoringZookeeperConnection.this.runMaintenanceTasksNow = true;
                    MonitoringZookeeperConnection.this.state = State.LIVE;
                    MonitoringZookeeperConnection.this.observers.forEach(ZooKeeperConnectionObserver::connected);
                    switch (type) {
                        case NodeCreated: {
                            logger.info("Our configuration parent znode was created (why was it gone?).");
                            break block1;
                        }
                        case NodeDeleted: {
                            logger.error("Our configuration parent znode was deleted! Waiting for it to be recreated\u2026");
                            break block1;
                        }
                    }
                    break;
                }
                case Disconnected: {
                    logger.warn("Disconnected from Zookeeper quorum, reconnecting\u2026");
                    MonitoringZookeeperConnection.this.observers.forEach(ZooKeeperConnectionObserver::disconnected);
                    MonitoringZookeeperConnection.this.state = State.CONNECTING;
                    MonitoringZookeeperConnection.this.waitBeforeRetrying();
                    break;
                }
                case Expired: {
                    logger.warn("Connection to Zookeeper quorum expired. Attempting to reconnect\u2026");
                    MonitoringZookeeperConnection.this.observers.forEach(ZooKeeperConnectionObserver::disconnected);
                    MonitoringZookeeperConnection.this.state = State.CONNECTING;
                    MonitoringZookeeperConnection.this.connect();
                    return;
                }
            }
            try {
                MonitoringZookeeperConnection.this.zooKeeper.getChildren("/", (Watcher)this);
            }
            catch (KeeperException.SessionExpiredException e) {
                MonitoringZookeeperConnection.this.connect();
            }
            catch (KeeperException e) {
                logger.error("KeeperException caught, retrying\u2026", (Throwable)e);
                MonitoringZookeeperConnection.this.waitBeforeRetrying();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public static enum State {
        CONNECTING,
        LIVE,
        CLOSED;

    }
}

