/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.redis;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.spring.RedisTemplateUtils;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.core.enums.RegistryEventType;
import cn.ponfee.disjob.registry.AbstractRegistryProperties;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.redis.configuration.RedisRegistryProperties;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.client.RestTemplate;

public abstract class RedisServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final String CHANNEL = "channel";
    private static final RedisScript<Void> REGISTRY_SCRIPT = RedisScript.of((String)"local score  = ARGV[1];                        \nlocal expire = ARGV[2];                        \nlocal length = #ARGV;                          \nfor i = 3,length do                            \n  redis.call('zadd', KEYS[1], score, ARGV[i]); \nend                                            \nredis.call('pexpire', KEYS[1], expire);        \n", Void.class);
    private static final RedisScript<List> QUERY_SCRIPT = RedisScript.of((String)"redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1]);          \nlocal ret = redis.call('zrangebyscore', KEYS[1], ARGV[1], '+inf'); \nredis.call('pexpire', KEYS[1], ARGV[2]);                           \nreturn ret;                                                        \n", List.class);
    private static final String REDIS_KEY_TTL_MILLIS = Long.toString(TimeUnit.DAYS.toMillis(30L));
    private final String registryChannel;
    private final StringRedisTemplate stringRedisTemplate;
    private final long sessionTimeoutMs;
    private final LoopThread registerHeartbeatThread;
    private final List<String> registryRedisKey;
    private final LoopThread discoverHeartbeatThread;
    private final List<String> discoveryRedisKey;
    private final RedisMessageListenerContainer redisMessageListenerContainer;

    protected RedisServerRegistry(RedisRegistryProperties config, RestTemplate restTemplate, StringRedisTemplate stringRedisTemplate) {
        super((AbstractRegistryProperties)config, restTemplate, ':');
        this.registryChannel = this.registryRootPath + this.separator + CHANNEL;
        this.stringRedisTemplate = stringRedisTemplate;
        this.sessionTimeoutMs = config.getSessionTimeoutMs();
        this.registryRedisKey = Collections.singletonList(this.registryRootPath);
        this.discoveryRedisKey = Collections.singletonList(this.discoveryRootPath);
        long periodMs = this.sessionTimeoutMs / 3L;
        Throwables.ThrowingRunnable registerAction = () -> RetryTemplate.execute(() -> this.registerServers(this.registered), (int)3, (long)1000L);
        this.registerHeartbeatThread = LoopThread.createStarted((String)"redis_register_heartbeat", (long)periodMs, (long)periodMs, (Throwables.ThrowingRunnable)registerAction);
        this.discoverHeartbeatThread = LoopThread.createStarted((String)"redis_discover_heartbeat", (long)periodMs, (long)periodMs, this::discoverServers);
        this.redisMessageListenerContainer = RedisTemplateUtils.createRedisMessageListenerContainer((StringRedisTemplate)stringRedisTemplate, (String)(this.discoveryRootPath + this.separator + CHANNEL), (Executor)new SyncTaskExecutor(), (Object)((Object)this), (String)"handleMessage");
        this.log.info("Redis server registry initialized: {}", (Object)RedisTemplateUtils.getServerInfo((RedisTemplate)stringRedisTemplate));
    }

    public boolean isConnected() {
        Boolean result = (Boolean)this.stringRedisTemplate.execute(conn -> !conn.isClosed());
        return Boolean.TRUE.equals(result);
    }

    public final void register(R server) {
        if (this.state.isStopped()) {
            return;
        }
        this.registerServers(Collections.singleton(server));
        this.registered.add(server);
        this.publishServerEvent(RegistryEventType.REGISTER, server);
        this.log.info("Server registered: {}, {}", (Object)this.registryRole, server);
    }

    public final void deregister(R server) {
        try {
            this.registered.remove(server);
            Throwables.ThrowingSupplier.doCaught(() -> this.stringRedisTemplate.opsForZSet().remove((Object)this.registryRootPath, new Object[]{server.serialize()}));
            this.publishServerEvent(RegistryEventType.DEREGISTER, server);
            this.log.info("Redis server deregister success: {}", server);
        }
        catch (Throwable t) {
            this.log.error("Redis server deregister error: " + server, t);
        }
    }

    public List<R> getRegisteredServers() {
        return this.deserializeServers(this.getServers(this.registryRedisKey), this.registryRole);
    }

    public void discoverServers() throws Throwable {
        RetryTemplate.execute(() -> this.refreshDiscoveredServers(this.getServers(this.discoveryRedisKey)), (int)3, (long)1000L);
    }

    @PreDestroy
    public void close() {
        if (!this.state.stop()) {
            return;
        }
        this.registerHeartbeatThread.terminate();
        this.registered.forEach(this::deregister);
        Throwables.ThrowingRunnable.doCaught(() -> ((RedisMessageListenerContainer)this.redisMessageListenerContainer).stop());
        this.discoverHeartbeatThread.terminate();
        super.close();
    }

    public void handleMessage(String message, String channel) {
        try {
            this.log.info("Handle message param: {}, {}", (Object)message, (Object)channel);
            String[] array = message.split(":", 2);
            RegistryEventType eventType = RegistryEventType.valueOf((String)array[0]);
            Server server = this.deserializeServer(array[1], this.discoveryRole);
            this.subscribeServerEvent(eventType, server);
        }
        catch (Throwable t) {
            this.log.error("Handle message error: " + message + ", " + channel, t);
        }
    }

    protected void publishServerEvent(RegistryEventType eventType, R server) {
        this.log.info("Publish server event: {}, {}", (Object)eventType, server);
        String message = eventType.name() + ":" + server.serialize();
        Throwables.ThrowingRunnable.doCaught(() -> this.stringRedisTemplate.convertAndSend(this.registryChannel, (Object)message));
    }

    private void registerServers(Set<R> servers) {
        if (CollectionUtils.isEmpty(servers)) {
            return;
        }
        int i = 0;
        Object[] args = new Object[servers.size() + 2];
        args[i++] = Long.toString(System.currentTimeMillis() + this.sessionTimeoutMs);
        args[i++] = REDIS_KEY_TTL_MILLIS;
        for (Server server : servers) {
            args[i++] = server.serialize();
        }
        this.stringRedisTemplate.execute(REGISTRY_SCRIPT, this.registryRedisKey, args);
    }

    private List<String> getServers(List<String> serverRoleKey) {
        String baseScore = Long.toString(System.currentTimeMillis());
        return (List)this.stringRedisTemplate.execute(QUERY_SCRIPT, serverRoleKey, new Object[]{baseScore, REDIS_KEY_TTL_MILLIS});
    }
}

