/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.hotrod.event.impl;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.commons.configuration.ClassAllowList;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.TypedProperties;
import org.infinispan.commons.util.Util;
import org.infinispan.hotrod.configuration.HotRodConfiguration;
import org.infinispan.hotrod.event.impl.ClientEventDispatcher;
import org.infinispan.hotrod.event.impl.EventDispatcher;
import org.infinispan.hotrod.event.impl.ReconnectTask;
import org.infinispan.hotrod.impl.ConfigurationProperties;
import org.infinispan.hotrod.impl.DataFormat;
import org.infinispan.hotrod.impl.logging.Log;
import org.infinispan.hotrod.impl.logging.LogFactory;
import org.infinispan.hotrod.impl.protocol.Codec;
import org.infinispan.hotrod.impl.transport.netty.ChannelFactory;

public class ClientListenerNotifier {
    private static final Log log = LogFactory.getLog(ClientListenerNotifier.class, Log.class);
    public static final AtomicInteger counter = new AtomicInteger(0);
    public static final int RECONNECT_PERIOD = 5000;
    private final ConcurrentMap<WrappedByteArray, EventDispatcher<?>> dispatchers = new ConcurrentHashMap();
    private final ScheduledThreadPoolExecutor reconnectExecutor;
    private final Codec codec;
    private final Marshaller marshaller;
    private final ChannelFactory channelFactory;
    private final ClassAllowList allowList;

    public ClientListenerNotifier(Codec codec, Marshaller marshaller, ChannelFactory channelFactory, HotRodConfiguration configuration) {
        this.codec = codec;
        this.marshaller = marshaller;
        this.channelFactory = channelFactory;
        this.allowList = configuration.getClassAllowList();
        TypedProperties defaultAsyncExecutorProperties = configuration.asyncExecutorFactory().properties();
        ConfigurationProperties cp = new ConfigurationProperties((Properties)defaultAsyncExecutorProperties);
        String threadNamePrefix = cp.getDefaultExecutorFactoryThreadNamePrefix();
        String threadNameSuffix = cp.getDefaultExecutorFactoryThreadNameSuffix();
        this.reconnectExecutor = new ScheduledThreadPoolExecutor(1, r -> {
            Thread th = new Thread(r, threadNamePrefix + "-" + counter.getAndIncrement() + threadNameSuffix);
            th.setDaemon(true);
            return th;
        });
        this.reconnectExecutor.setKeepAliveTime(10000L, TimeUnit.MILLISECONDS);
        this.reconnectExecutor.allowCoreThreadTimeOut(true);
    }

    public Marshaller marshaller() {
        return this.marshaller;
    }

    public void addDispatcher(EventDispatcher<?> dispatcher) {
        this.dispatchers.put(new WrappedByteArray(dispatcher.listenerId), dispatcher);
        if (log.isTraceEnabled()) {
            log.tracef("Add dispatcher %s for client listener with id %s, for listener %s", dispatcher, Util.printArray((byte[])dispatcher.listenerId), dispatcher.listener);
        }
    }

    public void failoverListeners(Set<SocketAddress> failedServers) {
        ArrayList<WrappedByteArray> failoverListenerIds = new ArrayList<WrappedByteArray>();
        for (Map.Entry entry : this.dispatchers.entrySet()) {
            EventDispatcher dispatcher = (EventDispatcher)entry.getValue();
            if (!failedServers.contains(dispatcher.address())) continue;
            failoverListenerIds.add((WrappedByteArray)entry.getKey());
        }
        if (log.isTraceEnabled() && failoverListenerIds.isEmpty()) {
            log.tracef("No event listeners registered in failed servers: %s", failedServers);
        }
        failoverListenerIds.forEach(wrapped -> this.failoverClientListener(wrapped.getBytes()));
    }

    public void failoverClientListener(byte[] listenerId) {
        EventDispatcher<?> dispatcher = this.removeClientListener(listenerId);
        if (dispatcher == null) {
            return;
        }
        dispatcher.invokeFailoverEvent();
        dispatcher.executeFailover().whenComplete((ignore, throwable) -> {
            if (throwable != null) {
                if (throwable instanceof RejectedExecutionException) {
                    log.debug("Client listener failover rejected, not retrying", (Throwable)throwable);
                } else {
                    log.debug("Unable to failover client listener, so ignore connection reset", (Throwable)throwable);
                    ReconnectTask reconnectTask = new ReconnectTask(dispatcher);
                    ScheduledFuture<?> scheduledFuture = this.reconnectExecutor.scheduleAtFixedRate(reconnectTask, 5000L, 5000L, TimeUnit.MILLISECONDS);
                    reconnectTask.setCancellationFuture(scheduledFuture);
                }
            } else if (log.isTraceEnabled()) {
                log.tracef("Fallback listener id %s from a failed server %s", Util.printArray((byte[])listenerId), dispatcher.address());
            }
        });
    }

    public void startClientListener(byte[] listenerId) {
        EventDispatcher eventDispatcher = (EventDispatcher)this.dispatchers.get(new WrappedByteArray(listenerId));
        eventDispatcher.start();
    }

    public EventDispatcher<?> removeClientListener(byte[] listenerId) {
        return this.removeClientListener(new WrappedByteArray(listenerId));
    }

    private EventDispatcher<?> removeClientListener(WrappedByteArray listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.dispatchers.remove(listenerId);
        if (dispatcher == null) {
            if (log.isTraceEnabled()) {
                log.tracef("Client listener %s not present (removed concurrently?)", Util.printArray((byte[])listenerId.getBytes()));
            }
        } else {
            dispatcher.stop();
        }
        if (log.isTraceEnabled()) {
            log.tracef("Remove client listener with id %s", Util.printArray((byte[])listenerId.getBytes()));
        }
        return dispatcher;
    }

    public byte[] findListenerId(Object listener) {
        for (EventDispatcher dispatcher : this.dispatchers.values()) {
            if (!dispatcher.listener.equals(listener)) continue;
            return dispatcher.listenerId;
        }
        return null;
    }

    public boolean isListenerConnected(byte[] listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.dispatchers.get(new WrappedByteArray(listenerId));
        return dispatcher != null && dispatcher.isRunning();
    }

    public SocketAddress findAddress(byte[] listenerId) {
        EventDispatcher dispatcher = (EventDispatcher)this.dispatchers.get(new WrappedByteArray(listenerId));
        if (dispatcher != null) {
            return dispatcher.address();
        }
        return null;
    }

    public Set<Object> getListeners(String cacheName) {
        HashSet<Object> ret = new HashSet<Object>(this.dispatchers.size());
        for (EventDispatcher dispatcher : this.dispatchers.values()) {
            if (!dispatcher.cacheName.equals(cacheName)) continue;
            ret.add(dispatcher.listener);
        }
        return ret;
    }

    public void stop() {
        for (WrappedByteArray listenerId : this.dispatchers.keySet()) {
            if (log.isTraceEnabled()) {
                log.tracef("Remote cache manager stopping, remove client listener id %s", Util.printArray((byte[])listenerId.getBytes()));
            }
            this.removeClientListener(listenerId);
        }
        this.reconnectExecutor.shutdownNow();
    }

    public <T> void invokeEvent(byte[] listenerId, T event) {
        EventDispatcher eventDispatcher = (EventDispatcher)this.dispatchers.get(new WrappedByteArray(listenerId));
        if (eventDispatcher == null) {
            throw Log.HOTROD.unexpectedListenerId(Util.printArray((byte[])listenerId));
        }
        eventDispatcher.invokeEvent(event);
    }

    public DataFormat getCacheDataFormat(byte[] listenerId) {
        ClientEventDispatcher clientEventDispatcher = (ClientEventDispatcher)this.dispatchers.get(new WrappedByteArray(listenerId));
        if (clientEventDispatcher == null) {
            throw Log.HOTROD.unexpectedListenerId(Util.printArray((byte[])listenerId));
        }
        return clientEventDispatcher.getDataFormat();
    }

    public Codec codec() {
        return this.codec;
    }

    public ClassAllowList allowList() {
        return this.allowList;
    }

    public ChannelFactory channelFactory() {
        return this.channelFactory;
    }
}

