/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.consul;

import cn.ponfee.disjob.common.base.LoggedUncaughtExceptionHandler;
import cn.ponfee.disjob.common.base.LoopThread;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.ObjectUtils;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.consul.configuration.ConsulRegistryProperties;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

public abstract class ConsulServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final int WAIT_TIME_SECONDS = 60;
    private static final int CHECK_PASS_INTERVAL_SECONDS = 2;
    private static final String CHECK_TTL_SECONDS = "16s";
    private static final String DEREGISTER_TIME_SECONDS = "20s";
    private final ConsulClient client;
    private final String token;
    private final LoopThread consulTtlCheckThread;
    private final ConsulSubscriberThread consulSubscriberThread;

    protected ConsulServerRegistry(ConsulRegistryProperties config) {
        super(config.getNamespace(), ':');
        this.client = new ConsulClient(config.getHost(), config.getPort());
        this.token = StringUtils.isBlank((CharSequence)config.getToken()) ? null : config.getToken().trim();
        int periodMs = Math.max(2, 1) * 1000;
        this.consulTtlCheckThread = LoopThread.createStarted((String)"consul_ttl_check", (long)periodMs, (long)periodMs, this::checkPass);
        this.consulSubscriberThread = new ConsulSubscriberThread(-1L);
        this.consulSubscriberThread.start();
    }

    public final boolean isConnected() {
        return this.client.getAgentSelf() != null;
    }

    public final void register(R server) {
        if (this.closed.get()) {
            return;
        }
        NewService newService = this.createService(server);
        if (this.token == null) {
            this.client.agentServiceRegister(newService);
        } else {
            this.client.agentServiceRegister(newService, this.token);
        }
        this.registered.add(server);
        this.log.info("Consul server registered: {} | {}", (Object)this.registryRole.name(), server);
    }

    public final void deregister(R server) {
        try {
            this.registered.remove(server);
            String serverId = this.buildServiceId(server);
            if (this.token == null) {
                this.client.agentServiceDeregister(serverId);
            } else {
                this.client.agentServiceDeregister(serverId, this.token);
            }
            this.log.info("Consul Server deregister: {} | {}", (Object)this.registryRole.name(), server);
        }
        catch (Throwable t) {
            this.log.error("Consul server deregister error.", t);
        }
    }

    @PreDestroy
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            this.log.warn("Repeat call close method\n{}", (Object)ObjectUtils.getStackTrace());
            return;
        }
        this.consulTtlCheckThread.terminate();
        this.registered.forEach(this::deregister);
        this.registered.clear();
        Throwables.ThrowingRunnable.execute(() -> Threads.stopThread((Thread)this.consulSubscriberThread, (int)0, (long)0L, (long)100L));
        super.close();
    }

    private String buildServiceId(R server) {
        return this.registryRootPath + this.separator + server.serialize();
    }

    private NewService createService(R server) {
        NewService service = new NewService();
        service.setName(this.registryRootPath);
        service.setId(this.buildServiceId(server));
        service.setAddress(server.getHost());
        service.setPort(Integer.valueOf(server.getPort()));
        service.setCheck(ConsulServerRegistry.createCheck());
        service.setTags(null);
        service.setMeta(null);
        return service;
    }

    private static NewService.Check createCheck() {
        NewService.Check check = new NewService.Check();
        check.setTtl(CHECK_TTL_SECONDS);
        check.setDeregisterCriticalServiceAfter(DEREGISTER_TIME_SECONDS);
        return check;
    }

    private void checkPass() {
        if (this.closed.get()) {
            return;
        }
        for (Server server : this.registered) {
            String checkId = this.buildServiceId(server);
            try {
                if (this.token == null) {
                    this.client.agentCheckPass("service:" + checkId);
                } else {
                    this.client.agentCheckPass("service:" + checkId, null, this.token);
                }
                this.log.debug("check pass for server: {} with check id {}", (Object)server, (Object)checkId);
            }
            catch (Throwable t) {
                this.log.warn("fail to check pass for server: " + server + ", check id is: " + checkId, t);
            }
        }
    }

    private synchronized void doRefreshDiscoveryServers(List<HealthService> healthServices) {
        List servers;
        if (CollectionUtils.isEmpty(healthServices)) {
            this.log.warn("Not discovered available {} from consul.", (Object)this.discoveryRole.name());
            servers = Collections.emptyList();
        } else {
            servers = healthServices.stream().map(HealthService::getService).filter(Objects::nonNull).map(s -> s.getId().substring(this.discoveryRootPath.length() + 1)).map(arg_0 -> ((ServerRole)this.discoveryRole).deserialize(arg_0)).collect(Collectors.toList());
        }
        this.refreshDiscoveredServers(servers);
    }

    private class ConsulSubscriberThread
    extends Thread {
        private long lastConsulIndex;

        private ConsulSubscriberThread(long initConsulIndex) {
            this.lastConsulIndex = initConsulIndex;
            super.setDaemon(true);
            super.setPriority(10);
            super.setName("consul_subscriber_thread");
            super.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)LoggedUncaughtExceptionHandler.INSTANCE);
        }

        @Override
        public void run() {
            while (!ConsulServerRegistry.this.closed.get()) {
                try {
                    Response<List<HealthService>> response = this.getDiscoveryServers(this.lastConsulIndex, 60L);
                    Long currentIndex = response.getConsulIndex();
                    if (currentIndex == null || currentIndex <= this.lastConsulIndex) continue;
                    this.lastConsulIndex = currentIndex;
                    ConsulServerRegistry.this.doRefreshDiscoveryServers((List)response.getValue());
                }
                catch (Throwable t) {
                    ConsulServerRegistry.this.log.error("Get consul health services occur error.", t);
                    Threads.interruptIfNecessary((Throwable)t);
                }
            }
        }

        private Response<List<HealthService>> getDiscoveryServers(long index, long waitTime) {
            HealthServicesRequest request = HealthServicesRequest.newBuilder().setQueryParams(new QueryParams(waitTime, index)).setPassing(true).setToken(ConsulServerRegistry.this.token).build();
            return ConsulServerRegistry.this.client.getHealthServices(ConsulServerRegistry.this.discoveryRootPath, request);
        }
    }
}

