/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.zookeeper;

import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.AbstractRegistryProperties;
import cn.ponfee.disjob.registry.RegistryException;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.zookeeper.CuratorFrameworkClient;
import cn.ponfee.disjob.registry.zookeeper.configuration.ZookeeperRegistryProperties;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;

public abstract class ZookeeperServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final int CREATE_EPHEMERAL_FAIL_RETRIES = 3;
    private final CuratorFrameworkClient client;
    private final String zkRegistryRootPath;

    protected ZookeeperServerRegistry(ZookeeperRegistryProperties config) {
        super((AbstractRegistryProperties)config, '/');
        this.zkRegistryRootPath = this.separator + this.registryRootPath;
        String zkDiscoveryRootPath = this.separator + this.discoveryRootPath;
        CuratorFrameworkClient client0 = null;
        try {
            this.client = client0 = new CuratorFrameworkClient(config, c -> {
                if (this.state.isStopped()) {
                    return;
                }
                for (Server server : this.registered) {
                    try {
                        c.createEphemeral(this.buildRegistryPath(server), 3);
                    }
                    catch (Throwable t) {
                        this.log.error("Re-registry server to zookeeper occur error: " + server, t);
                    }
                }
            });
            this.client.createPersistent(this.zkRegistryRootPath);
            this.client.createPersistent(zkDiscoveryRootPath);
            this.client.watch(zkDiscoveryRootPath, this::doRefreshDiscoveryServers);
        }
        catch (Throwable t) {
            if (client0 != null) {
                client0.close();
            }
            throw new RegistryException("Zookeeper registry init error: " + (Object)((Object)config), t);
        }
    }

    public final boolean isConnected() {
        return this.client.isConnected();
    }

    public final void register(R server) {
        if (this.state.isStopped()) {
            return;
        }
        try {
            this.client.createEphemeral(this.buildRegistryPath(server), 3);
            this.registered.add(server);
            this.log.info("Server registered: {}, {}", (Object)this.registryRole, server);
        }
        catch (Throwable e) {
            throw new RegistryException("Zookeeper server register failed: " + server, e);
        }
    }

    public final void deregister(R server) {
        String registryPath = this.buildRegistryPath(server);
        try {
            this.registered.remove(server);
            this.client.deletePath(registryPath);
            this.log.info("Server deregister: {}, {}", (Object)this.registryRole, server);
        }
        catch (Throwable e) {
            this.log.error("Deregister to zookeeper failed: " + registryPath, e);
        }
    }

    public List<R> getRegisteredServers() throws Exception {
        return this.deserializeRegistryServers(this.client.getChildren(this.zkRegistryRootPath));
    }

    @PreDestroy
    public void close() {
        if (!this.state.stop()) {
            return;
        }
        this.registered.forEach(this::deregister);
        Throwables.ThrowingRunnable.doCaught(this.client::close);
        super.close();
    }

    private String buildRegistryPath(R server) {
        return this.zkRegistryRootPath + this.separator + server.serialize();
    }

    private synchronized void doRefreshDiscoveryServers(List<String> list) {
        List servers;
        this.log.info("Watched servers {}", list);
        if (CollectionUtils.isEmpty(list)) {
            this.log.warn("Not discovered available {} from zookeeper.", (Object)this.discoveryRole);
            servers = Collections.emptyList();
        } else {
            servers = list.stream().filter(Objects::nonNull).map(arg_0 -> ((ServerRole)this.discoveryRole).deserialize(arg_0)).collect(Collectors.toList());
        }
        this.refreshDiscoveredServers(servers);
    }
}

