/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.consul;

import cn.ponfee.disjob.common.concurrent.LoggedUncaughtExceptionHandler;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.AbstractRegistryProperties;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.consul.configuration.ConsulRegistryProperties;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.OperationException;
import com.ecwid.consul.v1.QueryParams;
import com.ecwid.consul.v1.Response;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.client.RestTemplate;

public abstract class ConsulServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private final ConsulClient client;
    private final String token;
    private final String checkTtl;
    private final String checkDeregisterCriticalTimeout;
    private final LoopThread registryTtlScheduler;
    private final DiscoveryWatcher discoveryWatcher;

    protected ConsulServerRegistry(ConsulRegistryProperties config, RestTemplate restTemplate) {
        super((AbstractRegistryProperties)config, restTemplate, ':');
        this.client = new ConsulClient(config.getHost(), config.getPort());
        this.token = StringUtils.isBlank((CharSequence)config.getToken()) ? null : config.getToken().trim();
        this.checkTtl = config.getCheckTtl();
        this.checkDeregisterCriticalTimeout = config.getCheckDeregisterCriticalTimeout();
        int periodMs = Math.max(config.getCheckPassPeriodSeconds(), 1) * 1000;
        this.registryTtlScheduler = LoopThread.createStarted((String)"consul_registry_ttl_scheduler", (long)periodMs, (long)periodMs, this::checkPass);
        this.discoveryWatcher = new DiscoveryWatcher();
        this.log.info("Consul server registry initialized: {}", this.client.getAgentSelf().getValue());
    }

    public final boolean isConnected() {
        return this.client.getAgentSelf() != null;
    }

    public final void register(R server) {
        if (this.state.isStopped()) {
            return;
        }
        NewService newService = this.createService(server);
        if (this.token == null) {
            this.client.agentServiceRegister(newService);
        } else {
            this.client.agentServiceRegister(newService, this.token);
        }
        this.registered.add(server);
        this.log.info("Consul server registered: {}, {}", (Object)this.registryRole, server);
    }

    public final void deregister(R server) {
        try {
            this.registered.remove(server);
            String serverId = this.buildServiceId(server);
            if (this.token == null) {
                this.client.agentServiceDeregister(serverId);
            } else {
                this.client.agentServiceDeregister(serverId, this.token);
            }
            this.log.info("Consul server deregister success: {}", server);
        }
        catch (Throwable t) {
            this.log.error("Consul server deregister error: " + server, t);
        }
    }

    public List<R> getRegisteredServers() {
        Response<List<HealthService>> response = this.getServers(this.registryRootPath, null);
        return this.deserializeServers(ConsulServerRegistry.extract(response, this.registryRootPath), this.registryRole);
    }

    @PreDestroy
    public void close() {
        if (!this.state.stop()) {
            return;
        }
        this.registryTtlScheduler.terminate();
        this.registered.forEach(this::deregister);
        Throwables.ThrowingRunnable.doCaught(() -> Threads.stopThread((Thread)this.discoveryWatcher, (long)1000L));
        super.close();
    }

    private String buildServiceId(R server) {
        return this.registryRootPath + this.separator + server.serialize();
    }

    private NewService createService(R server) {
        NewService service = new NewService();
        service.setName(this.registryRootPath);
        service.setId(this.buildServiceId(server));
        service.setAddress(server.getHost());
        service.setPort(Integer.valueOf(server.getPort()));
        service.setCheck(this.createCheck());
        service.setTags(null);
        service.setMeta(null);
        return service;
    }

    private NewService.Check createCheck() {
        NewService.Check check = new NewService.Check();
        check.setTtl(this.checkTtl);
        check.setDeregisterCriticalServiceAfter(this.checkDeregisterCriticalTimeout);
        return check;
    }

    private void checkPass() {
        if (this.state.isStopped()) {
            return;
        }
        for (Server server : this.registered) {
            String checkId = this.buildServiceId(server);
            try {
                if (this.token == null) {
                    this.client.agentCheckPass("service:" + checkId);
                } else {
                    this.client.agentCheckPass("service:" + checkId, null, this.token);
                }
                this.log.debug("check pass for server: {} with check id {}", (Object)server, (Object)checkId);
            }
            catch (OperationException e) {
                if (e.getStatusCode() == 404) {
                    Throwables.ThrowingRunnable.doCaught(() -> this.register(server), () -> "Not found server register failed: " + server);
                }
                this.log.warn("Check pass server operation exception: " + server + ", check id: " + checkId, (Throwable)e);
            }
            catch (Throwable t) {
                this.log.error("Check pass server error: " + server + ", check id: " + checkId, t);
            }
        }
    }

    private Response<List<HealthService>> getServers(String rootPath, QueryParams queryParams) {
        HealthServicesRequest request = HealthServicesRequest.newBuilder().setQueryParams(queryParams).setPassing(true).setToken(this.token).build();
        return this.client.getHealthServices(rootPath, request);
    }

    private static List<String> extract(Response<List<HealthService>> response, String rootPath) {
        List healthServices = (List)response.getValue();
        if (healthServices == null) {
            return null;
        }
        int beginIndex = rootPath.length() + 1;
        return healthServices.stream().filter(Objects::nonNull).map(HealthService::getService).filter(Objects::nonNull).map(HealthService.Service::getId).filter(e -> e != null && e.length() > beginIndex).map(e -> e.substring(beginIndex)).collect(Collectors.toList());
    }

    private class DiscoveryWatcher
    extends Thread {
        private long lastConsulIndex = -1L;

        private DiscoveryWatcher() {
            super.setDaemon(true);
            super.setPriority(10);
            super.setName("consul_discovery_watcher_thread");
            super.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new LoggedUncaughtExceptionHandler(ConsulServerRegistry.this.log));
            super.start();
        }

        @Override
        public void run() {
            while (ConsulServerRegistry.this.state.isRunning()) {
                try {
                    long waitTimeSeconds = this.lastConsulIndex == -1L ? 60L : Integer.MAX_VALUE;
                    QueryParams queryParams = new QueryParams(waitTimeSeconds, this.lastConsulIndex);
                    Response response = ConsulServerRegistry.this.getServers(ConsulServerRegistry.this.discoveryRootPath, queryParams);
                    Long currentIndex = response.getConsulIndex();
                    if (currentIndex == null || currentIndex <= this.lastConsulIndex) continue;
                    this.lastConsulIndex = currentIndex;
                    ConsulServerRegistry.this.refreshDiscoveryServers(ConsulServerRegistry.extract((Response<List<HealthService>>)response, ConsulServerRegistry.this.discoveryRootPath));
                }
                catch (Throwable t) {
                    ConsulServerRegistry.this.log.error("Get consul health services occur error.", t);
                    Threads.interruptIfNecessary((Throwable)t);
                }
            }
        }
    }
}

