/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.redis;

import cn.ponfee.disjob.common.base.RetryTemplate;
import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.EventType;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.redis.configuration.RedisRegistryProperties;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

public abstract class RedisServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final RedisScript<Void> REGISTRY_SCRIPT = RedisScript.of((String)"local score  = ARGV[1];                        \nlocal expire = ARGV[2];                        \nlocal length = #ARGV;                          \nfor i = 3,length do                            \n  redis.call('zadd', KEYS[1], score, ARGV[i]); \nend                                            \nredis.call('pexpire', KEYS[1], expire);        \n", Void.class);
    private static final RedisScript<List> DISCOVERY_SCRIPT = RedisScript.of((String)"redis.call('zremrangebyscore', KEYS[1], '-inf', ARGV[1]);          \nlocal ret = redis.call('zrangebyscore', KEYS[1], ARGV[1], '+inf'); \nredis.call('pexpire', KEYS[1], ARGV[2]);                           \nreturn ret;                                                        \n", List.class);
    private static final String REDIS_KEY_TTL_MILLIS = Long.toString(2592000000L);
    private static final String CHANNEL = "channel";
    private final String registryChannel;
    private final StringRedisTemplate stringRedisTemplate;
    private final long sessionTimeoutMs;
    private final long periodMs;
    private final LoopThread registerHeartbeatThread;
    private final List<String> registryRedisKey;
    private final LoopThread discoverHeartbeatThread;
    private final ThreadPoolExecutor redisSubscribeExecutor;
    private final Lock asyncRefreshLock = new ReentrantLock();
    private final List<String> discoveryRedisKey;
    private volatile long nextDiscoverTimeMillis = 0L;
    private final RedisMessageListenerContainer redisMessageListenerContainer;

    protected RedisServerRegistry(StringRedisTemplate stringRedisTemplate, RedisRegistryProperties config) {
        super(config.getNamespace(), ':');
        this.registryChannel = this.registryRootPath + this.separator + CHANNEL;
        this.stringRedisTemplate = stringRedisTemplate;
        this.sessionTimeoutMs = config.getSessionTimeoutMs();
        this.periodMs = config.getSessionTimeoutMs() / 3L;
        this.registryRedisKey = Collections.singletonList(this.registryRootPath);
        this.discoveryRedisKey = Collections.singletonList(this.discoveryRootPath);
        Throwables.ThrowingRunnable registerAction = () -> RetryTemplate.execute(() -> this.doRegisterServers(this.registered), (int)3, (long)1000L);
        this.registerHeartbeatThread = LoopThread.createStarted((String)"redis_register_heartbeat", (long)this.periodMs, (long)this.periodMs, (Throwables.ThrowingRunnable)registerAction);
        Throwables.ThrowingRunnable discoverAction = () -> {
            if (this.requireDiscoverServers()) {
                this.tryDiscoverServers();
            }
        };
        this.discoverHeartbeatThread = LoopThread.createStarted((String)"redis_discover_heartbeat", (long)this.periodMs, (long)this.periodMs, (Throwables.ThrowingRunnable)discoverAction);
        this.redisSubscribeExecutor = ThreadPoolExecutors.builder().corePoolSize(1).maximumPoolSize(1).workQueue(new ArrayBlockingQueue(1)).keepAliveTimeSeconds(600L).rejectedHandler(ThreadPoolExecutors.DISCARD).threadFactory((ThreadFactory)NamedThreadFactory.builder().prefix("redis_async_subscribe").priority(Integer.valueOf(10)).build()).build();
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()));
        container.setTaskExecutor((Executor)this.redisSubscribeExecutor);
        String listenerMethod = (String)Throwables.ThrowingSupplier.get(() -> RedisServerRegistry.class.getMethod("handleMessage", String.class, String.class).getName());
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter((Object)this, listenerMethod);
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener((MessageListener)listenerAdapter, (Topic)new ChannelTopic(this.discoveryRootPath + this.separator + CHANNEL));
        container.afterPropertiesSet();
        container.start();
        this.redisMessageListenerContainer = container;
        try {
            this.doDiscoverServers();
        }
        catch (Throwable e) {
            Threads.interruptIfNecessary((Throwable)e);
            this.close();
            throw new Error("Redis init discover error.", e);
        }
    }

    public boolean isConnected() {
        Boolean result = (Boolean)this.stringRedisTemplate.execute(conn -> !conn.isClosed());
        return result != null && result != false;
    }

    public final void register(R server) {
        if (this.closed.get()) {
            return;
        }
        this.doRegisterServers(Collections.singleton(server));
        this.registered.add(server);
        Throwables.ThrowingRunnable.execute(() -> this.publish(server, EventType.REGISTER));
        this.log.info("Server registered: {} | {}", (Object)this.registryRole.name(), server);
    }

    public final void deregister(R server) {
        this.registered.remove(server);
        Throwables.ThrowingSupplier.execute(() -> this.stringRedisTemplate.opsForZSet().remove((Object)this.registryRootPath, new Object[]{server.serialize()}));
        Throwables.ThrowingRunnable.execute(() -> this.publish(server, EventType.DEREGISTER));
        this.log.info("Server deregister: {} | {}", (Object)this.registryRole.name(), server);
    }

    @PreDestroy
    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.registerHeartbeatThread.terminate();
        this.registered.forEach(this::deregister);
        Throwables.ThrowingRunnable.execute(() -> ((RedisMessageListenerContainer)this.redisMessageListenerContainer).stop());
        this.discoverHeartbeatThread.terminate();
        ThreadPoolExecutors.shutdown((ExecutorService)this.redisSubscribeExecutor, (int)2);
        this.registered.clear();
        super.close();
    }

    public void handleMessage(String message, String pattern) {
        try {
            int pos = -1;
            int n = ++pos;
            pos = message.indexOf(":", pos);
            String s0 = message.substring(n, pos);
            String s1 = message.substring(++pos);
            this.log.info("Subscribed message: {} | {}", (Object)pattern, (Object)message);
            this.subscribe(EventType.valueOf((String)s0), this.discoveryRole.deserialize(s1));
        }
        catch (Throwable t) {
            this.log.error("Parse subscribed message error: " + message + ", " + pattern, t);
        }
    }

    private void publish(R server, EventType eventType) {
        String publish = eventType.name() + ":" + server.serialize();
        this.stringRedisTemplate.convertAndSend(this.registryChannel, (Object)publish);
    }

    private void subscribe(EventType eventType, D server) {
        this.tryDiscoverServers();
    }

    private void doRegisterServers(Set<R> servers) {
        if (CollectionUtils.isEmpty(servers)) {
            return;
        }
        int i = 0;
        Object[] args = new Object[servers.size() + 2];
        args[i++] = Long.toString(System.currentTimeMillis() + this.sessionTimeoutMs);
        args[i++] = REDIS_KEY_TTL_MILLIS;
        for (Server server : servers) {
            args[i++] = server.serialize();
        }
        this.stringRedisTemplate.execute(REGISTRY_SCRIPT, this.registryRedisKey, args);
    }

    private void tryDiscoverServers() {
        if (!this.asyncRefreshLock.tryLock()) {
            return;
        }
        try {
            this.doDiscoverServers();
        }
        catch (Throwable t) {
            Threads.interruptIfNecessary((Throwable)t);
            this.log.error("Redis discover servers occur error.", t);
        }
        finally {
            this.asyncRefreshLock.unlock();
        }
    }

    private void doDiscoverServers() throws Throwable {
        RetryTemplate.execute(() -> {
            List discovered = (List)this.stringRedisTemplate.execute(DISCOVERY_SCRIPT, this.discoveryRedisKey, new Object[]{Long.toString(System.currentTimeMillis()), REDIS_KEY_TTL_MILLIS});
            if (CollectionUtils.isEmpty((Collection)discovered)) {
                this.log.warn("Not discovered available {} from redis.", (Object)this.discoveryRole.name());
                discovered = Collections.emptyList();
            }
            List servers = discovered.stream().map(arg_0 -> ((ServerRole)this.discoveryRole).deserialize(arg_0)).collect(Collectors.toList());
            this.refreshDiscoveredServers(servers);
            this.renewNextDiscoverTimeMillis();
            this.log.debug("Redis discovered {} servers.", (Object)this.discoveryRole.name());
        }, (int)3, (long)1000L);
    }

    private boolean requireDiscoverServers() {
        return !this.closed.get() && this.nextDiscoverTimeMillis < System.currentTimeMillis();
    }

    private void renewNextDiscoverTimeMillis() {
        this.nextDiscoverTimeMillis = System.currentTimeMillis() + this.periodMs;
    }
}

