/*
 * Decompiled with CFR 0.152.
 */
package me.ahoo.cosky.discovery.redis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import me.ahoo.cosky.core.listener.MessageListenable;
import me.ahoo.cosky.core.listener.MessageListener;
import me.ahoo.cosky.discovery.DiscoveryKeyGenerator;
import me.ahoo.cosky.discovery.Instance;
import me.ahoo.cosky.discovery.InstanceIdGenerator;
import me.ahoo.cosky.discovery.NamespacedServiceId;
import me.ahoo.cosky.discovery.ServiceChangedEvent;
import me.ahoo.cosky.discovery.ServiceChangedListener;
import me.ahoo.cosky.discovery.ServiceDiscovery;
import me.ahoo.cosky.discovery.ServiceInstance;
import me.ahoo.cosky.discovery.ServiceInstanceContext;
import me.ahoo.cosky.discovery.ServiceListenable;
import me.ahoo.cosky.discovery.ServiceTopology;
import me.ahoo.cosky.discovery.redis.DiscoveryRedisScripts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ConsistencyRedisServiceDiscovery
implements ServiceDiscovery,
ServiceListenable,
ServiceTopology {
    private static final Logger log = LoggerFactory.getLogger(ConsistencyRedisServiceDiscovery.class);
    private final ServiceDiscovery delegate;
    private final MessageListenable messageListenable;
    private final RedisClusterReactiveCommands<String, String> redisCommands;
    private final ServiceIdxListener serviceIdxListener;
    private final InstanceListener instanceListener;
    private final ConcurrentHashMap<NamespacedServiceId, Mono<CopyOnWriteArrayList<ServiceInstance>>> serviceMapInstances;
    private final ConcurrentHashMap<NamespacedServiceId, CopyOnWriteArraySet<ServiceChangedListener>> serviceMapListener;
    private final ConcurrentHashMap<String, Mono<List<String>>> namespaceMapServices;

    public ConsistencyRedisServiceDiscovery(ServiceDiscovery delegate, MessageListenable messageListenable, RedisClusterReactiveCommands<String, String> redisCommands) {
        this.redisCommands = redisCommands;
        this.serviceMapInstances = new ConcurrentHashMap();
        this.namespaceMapServices = new ConcurrentHashMap();
        this.serviceMapListener = new ConcurrentHashMap();
        this.delegate = delegate;
        this.messageListenable = messageListenable;
        this.serviceIdxListener = new ServiceIdxListener();
        this.instanceListener = new InstanceListener();
    }

    @Override
    public Mono<List<String>> getServices(String namespace) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)namespace) ? 1 : 0) != 0, (Object)"namespace can not be empty!");
        return this.namespaceMapServices.computeIfAbsent(namespace, _namespace -> {
            String serviceIdxKey = DiscoveryKeyGenerator.getServiceIdxKey(namespace);
            this.messageListenable.addChannelListener(serviceIdxKey, (MessageListener)this.serviceIdxListener);
            return this.delegate.getServices(namespace).cache();
        });
    }

    @Override
    public Mono<List<ServiceInstance>> getInstances(String namespace, String serviceId) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)namespace) ? 1 : 0) != 0, (Object)"namespace can not be empty!");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)serviceId) ? 1 : 0) != 0, (Object)"serviceId can not be empty!");
        return this.serviceMapInstances.computeIfAbsent(NamespacedServiceId.of(namespace, serviceId), _serviceId -> this.addListener(namespace, serviceId).then(this.delegate.getInstances(namespace, serviceId)).map(CopyOnWriteArrayList::new).cache()).map(serviceInstances -> serviceInstances.stream().filter(instance -> !instance.isExpired()).collect(Collectors.toList()));
    }

    public Mono<ServiceInstance> getInstance0(String namespace, String serviceId, String instanceId) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)namespace) ? 1 : 0) != 0, (Object)"namespace can not be empty!");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)serviceId) ? 1 : 0) != 0, (Object)"serviceId can not be empty!");
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)instanceId) ? 1 : 0) != 0, (Object)"instanceId can not be empty!");
        NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
        Mono<CopyOnWriteArrayList<ServiceInstance>> instancesMono = this.serviceMapInstances.get(namespacedServiceId);
        if (Objects.isNull(instancesMono)) {
            return this.delegate.getInstance(namespace, serviceId, instanceId);
        }
        return instancesMono.mapNotNull(serviceInstances -> serviceInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst().orElse(null));
    }

    @Override
    public Mono<ServiceInstance> getInstance(String namespace, String serviceId, String instanceId) {
        return this.getInstance0(namespace, serviceId, instanceId);
    }

    @Override
    public Mono<Long> getInstanceTtl(String namespace, String serviceId, String instanceId) {
        return this.getInstance0(namespace, serviceId, instanceId).map(ServiceInstance::getTtlAt);
    }

    @VisibleForTesting
    public Mono<Void> addListener(String namespace, String serviceId) {
        String instancePattern = DiscoveryKeyGenerator.getInstanceKeyPatternOfService(namespace, serviceId);
        this.messageListenable.addPatternListener(instancePattern, (MessageListener)this.instanceListener);
        return this.addTopology(namespace, serviceId);
    }

    @Override
    public Mono<Void> addTopology(String producerNamespace, String producerServiceId) {
        String producerName;
        String consumerNamespace = ServiceInstanceContext.CURRENT.getNamespace();
        String consumerName = ServiceTopology.getConsumerName();
        if (Objects.equals(consumerName, producerName = ServiceTopology.getProducerName(producerNamespace, producerServiceId))) {
            return Mono.empty();
        }
        return DiscoveryRedisScripts.doServiceTopologyAdd(this.redisCommands, sha -> {
            Object[] keys = new String[]{consumerNamespace};
            Object[] values = new String[]{consumerName, producerName};
            return this.redisCommands.evalsha(sha, ScriptOutputType.STATUS, keys, values).then();
        });
    }

    @Override
    public void addListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (key, val) -> {
            CopyOnWriteArraySet<ServiceChangedListener> listeners = val;
            if (Objects.isNull(val)) {
                this.addListener(namespacedServiceId.getNamespace(), namespacedServiceId.getServiceId()).subscribe();
                listeners = new CopyOnWriteArraySet<ServiceChangedListener>();
            }
            listeners.add(serviceChangedListener);
            return listeners;
        });
    }

    @Override
    public void removeListener(NamespacedServiceId namespacedServiceId, ServiceChangedListener serviceChangedListener) {
        this.serviceMapListener.compute(namespacedServiceId, (key, val) -> {
            if (Objects.isNull(val)) {
                return null;
            }
            CopyOnWriteArraySet listeners = val;
            listeners.remove(serviceChangedListener);
            return listeners;
        });
    }

    @VisibleForTesting
    public void removeListener(String namespace, String serviceId) {
        String instancePattern = DiscoveryKeyGenerator.getInstanceKeyPatternOfService(namespace, serviceId);
        this.messageListenable.removePatternListener(instancePattern, (MessageListener)this.instanceListener);
    }

    private class InstanceListener
    implements MessageListener {
        private InstanceListener() {
        }

        public void onMessage(@Nullable String pattern, String channel, String message) {
            if (log.isInfoEnabled()) {
                log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}]", new Object[]{pattern, channel, message});
            }
            String instanceKey = channel;
            String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(instanceKey);
            String instanceId = DiscoveryKeyGenerator.getInstanceIdOfKey(namespace, instanceKey);
            Instance instance = InstanceIdGenerator.DEFAULT.of(instanceId);
            String serviceId = instance.getServiceId();
            NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
            AtomicReference<ServiceChangedEvent> serviceChangedEvent = new AtomicReference<ServiceChangedEvent>(ServiceChangedEvent.of(namespacedServiceId, message, instance));
            CopyOnWriteArraySet serviceChangedListeners = (CopyOnWriteArraySet)ConsistencyRedisServiceDiscovery.this.serviceMapListener.get(namespacedServiceId);
            Mono instancesMono = (Mono)ConsistencyRedisServiceDiscovery.this.serviceMapInstances.get(namespacedServiceId);
            if (Objects.isNull(instancesMono)) {
                if (log.isInfoEnabled()) {
                    log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] instancesMono is null.", new Object[]{pattern, channel, message});
                }
                this.invokeChanged(serviceChangedEvent.get(), serviceChangedListeners);
                return;
            }
            instancesMono.flatMap(cachedInstances -> {
                ServiceInstance cachedInstance = cachedInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst().orElse(ServiceInstance.NOT_FOUND);
                if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
                    if (!"register".equals(message) && !"renew".equals(message)) {
                        if (log.isWarnEnabled()) {
                            log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", new Object[]{pattern, channel, message});
                        }
                        return Mono.empty();
                    }
                    return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespace, serviceId, instanceId).doOnNext(serviceInstance -> {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", new Object[]{pattern, channel, message});
                        }
                        cachedInstances.add(serviceInstance);
                    });
                }
                switch (message) {
                    case "register": {
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespace, serviceId, instanceId).doOnNext(registeredInstance -> {
                            cachedInstance.setSchema(registeredInstance.getSchema());
                            cachedInstance.setHost(registeredInstance.getHost());
                            cachedInstance.setPort(registeredInstance.getPort());
                            cachedInstance.setEphemeral(registeredInstance.isEphemeral());
                            cachedInstance.setTtlAt(registeredInstance.getTtlAt());
                            cachedInstance.setWeight(registeredInstance.getWeight());
                            cachedInstance.setMetadata(registeredInstance.getMetadata());
                        });
                    }
                    case "renew": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setTtlAt.", new Object[]{pattern, channel, message});
                        }
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstanceTtl(namespace, serviceId, instanceId).doOnNext(cachedInstance::setTtlAt);
                    }
                    case "set_metadata": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setMetadata.", new Object[]{pattern, channel, message});
                        }
                        return ConsistencyRedisServiceDiscovery.this.delegate.getInstance(namespace, serviceId, instanceId).doOnNext(nextInstance -> cachedInstance.setMetadata(nextInstance.getMetadata()));
                    }
                    case "deregister": 
                    case "expired": {
                        if (log.isInfoEnabled()) {
                            log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] remove instance.", new Object[]{pattern, channel, message});
                        }
                        serviceChangedEvent.set(ServiceChangedEvent.of(namespacedServiceId, message, cachedInstance));
                        cachedInstances.remove(cachedInstance);
                        return Mono.empty();
                    }
                }
                return Mono.error((Throwable)new IllegalStateException("Unexpected value: " + message));
            }).doOnSuccess(nil -> this.invokeChanged((ServiceChangedEvent)serviceChangedEvent.get(), serviceChangedListeners)).subscribe();
        }

        private void invokeChanged(ServiceChangedEvent serviceChangedEvent, CopyOnWriteArraySet<ServiceChangedListener> serviceChangedListeners) {
            if (Objects.nonNull(serviceChangedListeners) && !serviceChangedListeners.isEmpty()) {
                serviceChangedListeners.forEach(serviceChangedListener -> serviceChangedListener.onChange(serviceChangedEvent));
            }
        }
    }

    private class ServiceIdxListener
    implements MessageListener {
        private ServiceIdxListener() {
        }

        public void onMessage(@Nullable String pattern, String channel, String message) {
            if (log.isInfoEnabled()) {
                log.info("onMessage@ServiceIdxListener - pattern:[{}] - channel:[{}] - message:[{}]", new Object[]{pattern, channel, message});
            }
            String serviceIdxKey = channel;
            String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(serviceIdxKey);
            ConsistencyRedisServiceDiscovery.this.namespaceMapServices.put(namespace, ConsistencyRedisServiceDiscovery.this.delegate.getServices(namespace).cache());
        }
    }
}

