/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.registry.redis;

import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.common.util.ObjectUtils;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.EventType;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.redis.configuration.RedisRegistryProperties;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

public abstract class RedisServerRegistry<R extends Server, D extends Server>
extends ServerRegistry<R, D> {
    private static final long REDIS_KEY_TTL_MILLIS = 2592000000L;
    private static final String CHANNEL = "channel";
    private final String registryChannel = this.registryRootPath + this.separator + "channel";
    private final StringRedisTemplate stringRedisTemplate;
    private final long sessionTimeoutMs;
    private final ScheduledThreadPoolExecutor registryScheduledExecutor;
    private volatile long nextRefreshTimeMillis = 0L;
    private final RedisMessageListenerContainer redisMessageListenerContainer;

    protected RedisServerRegistry(StringRedisTemplate stringRedisTemplate, RedisRegistryProperties config) {
        super(config.getNamespace(), ':');
        this.stringRedisTemplate = stringRedisTemplate;
        this.sessionTimeoutMs = config.getSessionTimeoutMs();
        this.registryScheduledExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)NamedThreadFactory.builder().prefix("redis_server_registry").daemon(true).build());
        this.registryScheduledExecutor.scheduleWithFixedDelay(() -> {
            if (this.closed.get()) {
                return;
            }
            try {
                this.doRegister(this.registered);
            }
            catch (Throwable t) {
                this.log.error("Do scheduled register occur error: " + this.registered, t);
            }
        }, config.getRegistryPeriodMs(), config.getRegistryPeriodMs(), TimeUnit.MILLISECONDS);
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(stringRedisTemplate.getConnectionFactory());
        container.setTaskExecutor((Executor)this.registryScheduledExecutor);
        String listenerMethod = (String)Throwables.ThrowingSupplier.get(() -> RedisServerRegistry.class.getMethod("handleMessage", String.class, String.class).getName());
        MessageListenerAdapter listenerAdapter = new MessageListenerAdapter((Object)this, listenerMethod);
        listenerAdapter.afterPropertiesSet();
        container.addMessageListener((MessageListener)listenerAdapter, (Topic)new ChannelTopic(this.discoveryRootPath + this.separator + CHANNEL));
        container.afterPropertiesSet();
        container.start();
        this.redisMessageListenerContainer = container;
        this.doRefreshDiscoveryServers();
    }

    public boolean isConnected() {
        Boolean result = (Boolean)this.stringRedisTemplate.execute(conn -> !conn.isClosed());
        return result != null && result != false;
    }

    public final List<D> getDiscoveredServers(String group) {
        this.doRefreshDiscoveryServersIfNecessary();
        return super.getDiscoveredServers(group);
    }

    public final boolean hasDiscoveredServers() {
        this.doRefreshDiscoveryServersIfNecessary();
        return super.hasDiscoveredServers();
    }

    public final boolean isDiscoveredServer(D server) {
        this.doRefreshDiscoveryServersIfNecessary();
        return super.isDiscoveredServer(server);
    }

    public final void register(R server) {
        if (this.closed.get()) {
            return;
        }
        this.doRegister(Collections.singleton(server));
        this.registered.add(server);
        this.publish(server, EventType.REGISTER);
        this.log.info("Server registered: {} | {}", (Object)this.registryRole.name(), server);
    }

    public final void deregister(R server) {
        this.registered.remove(server);
        Throwables.ThrowingSupplier.caught(() -> this.stringRedisTemplate.opsForZSet().remove((Object)this.registryRootPath, new Object[]{server.serialize()}));
        Throwables.ThrowingRunnable.caught(() -> this.publish(server, EventType.DEREGISTER));
        this.log.info("Server deregister: {} | {}", (Object)this.registryRole.name(), server);
    }

    public void close() {
        if (!this.closed.compareAndSet(false, true)) {
            this.log.warn("Repeat call close method\n{}", (Object)ObjectUtils.getStackTrace());
            return;
        }
        Throwables.ThrowingRunnable.caught(() -> ((RedisMessageListenerContainer)this.redisMessageListenerContainer).stop());
        Throwables.ThrowingSupplier.caught(this.registryScheduledExecutor::shutdownNow);
        this.registered.forEach(this::deregister);
        this.registered.clear();
        super.close();
    }

    public void handleMessage(String message, String pattern) {
        try {
            int pos = -1;
            int n = ++pos;
            pos = message.indexOf(":", pos);
            String s0 = message.substring(n, pos);
            String s1 = message.substring(++pos);
            this.log.info("Subscribed message: {} | {}", (Object)pattern, (Object)message);
            this.subscribe(EventType.valueOf((String)s0), this.discoveryRole.deserialize(s1));
        }
        catch (Throwable t) {
            this.log.error("Parse subscribed message error: " + message + ", " + pattern, t);
        }
    }

    private void publish(R server, EventType eventType) {
        String publish = eventType.name() + ":" + server.serialize();
        this.stringRedisTemplate.convertAndSend(this.registryChannel, (Object)publish);
    }

    private void subscribe(EventType eventType, D server) {
        this.doRefreshDiscoveryServers();
    }

    private void doRegister(Set<R> servers) {
        if (CollectionUtils.isEmpty(servers)) {
            return;
        }
        Double score = System.currentTimeMillis() + this.sessionTimeoutMs;
        final Set tuples = servers.stream().map(e -> ZSetOperations.TypedTuple.of((Object)e.serialize(), (Double)score)).collect(Collectors.toSet());
        this.stringRedisTemplate.executePipelined((SessionCallback)new SessionCallback<Object>(){

            public Void execute(RedisOperations operations) {
                operations.opsForZSet().add((Object)RedisServerRegistry.this.registryRootPath, tuples);
                operations.expire((Object)RedisServerRegistry.this.registryRootPath, 2592000000L, TimeUnit.MILLISECONDS);
                return null;
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRefreshDiscoveryServersIfNecessary() {
        if (!this.requireRefresh()) {
            return;
        }
        RedisServerRegistry redisServerRegistry = this;
        synchronized (redisServerRegistry) {
            if (this.requireRefresh()) {
                this.doRefreshDiscoveryServers();
            }
        }
    }

    private synchronized void doRefreshDiscoveryServers() {
        final long now = System.currentTimeMillis();
        List result = this.stringRedisTemplate.executePipelined((SessionCallback)new SessionCallback<Object>(){

            public Void execute(RedisOperations operations) {
                operations.opsForZSet().removeRangeByScore((Object)RedisServerRegistry.this.discoveryRootPath, 0.0, (double)now);
                operations.opsForZSet().rangeByScore((Object)RedisServerRegistry.this.discoveryRootPath, (double)now, 9.223372036854776E18);
                operations.expire((Object)RedisServerRegistry.this.discoveryRootPath, 2592000000L, TimeUnit.MILLISECONDS);
                return null;
            }
        });
        Set discovered = (Set)result.get(1);
        if (CollectionUtils.isEmpty((Collection)discovered)) {
            this.log.error("Not discovered available {} from redis.", (Object)this.discoveryRole.name());
            discovered = Collections.emptySet();
        }
        List servers = discovered.stream().map(arg_0 -> ((ServerRole)this.discoveryRole).deserialize(arg_0)).collect(Collectors.toList());
        this.refreshDiscoveredServers(servers);
        this.updateRefresh();
    }

    private boolean requireRefresh() {
        return this.nextRefreshTimeMillis < System.currentTimeMillis();
    }

    private void updateRefresh() {
        this.nextRefreshTimeMillis = System.currentTimeMillis() + this.sessionTimeoutMs;
    }
}

