/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.net.ConnectException;
import java.security.KeyStore;
import java.security.MessageDigest;
import java.security.PublicKey;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.SynchronizedDescriptiveStatistics;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.cluster.messaging.impl.InternalMessage;
import org.onosproject.store.cluster.messaging.impl.InternalReply;
import org.onosproject.store.cluster.messaging.impl.InternalRequest;
import org.onosproject.store.cluster.messaging.impl.MessageDecoder;
import org.onosproject.store.cluster.messaging.impl.MessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class NettyMessagingManager
implements MessagingService {
    private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1L).toMillis();
    private static final long TIMEOUT_INTERVAL = 50L;
    private static final int WINDOW_SIZE = 60;
    private static final int WINDOW_UPDATE_SAMPLE_SIZE = 100;
    private static final long WINDOW_UPDATE_MILLIS = 10000L;
    private static final int MIN_SAMPLES = 25;
    private static final int MIN_STANDARD_DEVIATION = 100;
    private static final int PHI_FAILURE_THRESHOLD = 12;
    private static final long MIN_TIMEOUT_MILLIS = 100L;
    private static final long MAX_TIMEOUT_MILLIS = 5000L;
    private static final int CHANNEL_POOL_SIZE = 8;
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final LocalClientConnection localClientConnection = new LocalClientConnection();
    private final LocalServerConnection localServerConnection = new LocalServerConnection(null);
    private static final String CONFIG_DIR = "../config";
    private static final String KS_FILE_NAME = "onos.jks";
    private static final File DEFAULT_KS_FILE = new File("../config", "onos.jks");
    private static final String DEFAULT_KS_PASSWORD = "changeit";
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected HybridLogicalClockService clockService;
    private Endpoint localEndpoint;
    private int preamble;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Map<String, BiConsumer<InternalRequest, ServerConnection>> handlers = new ConcurrentHashMap<String, BiConsumer<InternalRequest, ServerConnection>>();
    private final Map<Channel, RemoteClientConnection> clientConnections = Maps.newConcurrentMap();
    private final Map<Channel, RemoteServerConnection> serverConnections = Maps.newConcurrentMap();
    private final AtomicLong messageIdGenerator = new AtomicLong(0L);
    private ScheduledFuture<?> timeoutFuture;
    private final Map<Endpoint, List<CompletableFuture<Channel>>> channels = Maps.newConcurrentMap();
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;
    private ScheduledExecutorService timeoutExecutor;
    protected static final boolean TLS_ENABLED = true;
    protected static final boolean TLS_DISABLED = false;
    protected boolean enableNettyTls = true;
    protected TrustManagerFactory trustManager;
    protected KeyManagerFactory keyManager;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterMetadataService clusterMetadataService;

    @Activate
    public void activate() throws InterruptedException {
        ControllerNode localNode = this.clusterMetadataService.getLocalNode();
        this.getTlsParameters();
        if (this.started.get()) {
            this.log.warn("Already running at local endpoint: {}", (Object)this.localEndpoint);
            return;
        }
        this.preamble = this.clusterMetadataService.getClusterMetadata().getName().hashCode();
        this.localEndpoint = new Endpoint(localNode.ip(), localNode.tcpPort());
        this.initEventLoopGroup();
        this.startAcceptingConnections();
        this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads((String)"NettyMessagingEvt", (String)"timeout", (Logger)this.log));
        this.timeoutFuture = this.timeoutExecutor.scheduleAtFixedRate(this::timeoutAllCallbacks, 50L, 50L, TimeUnit.MILLISECONDS);
        this.started.set(true);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        if (this.started.get()) {
            this.serverGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
            this.timeoutFuture.cancel(false);
            this.timeoutExecutor.shutdown();
            this.started.set(false);
        }
        this.log.info("Stopped");
    }

    private void getTlsParameters() {
        this.enableNettyTls = Boolean.parseBoolean(System.getProperty("enableNettyTLS", Boolean.toString(true)));
        if (this.enableNettyTls) {
            this.enableNettyTls = this.loadKeyStores();
        }
    }

    private boolean loadKeyStores() {
        KeyManagerFactory kmf;
        TrustManagerFactory tmf;
        try {
            String ksLocation = System.getProperty("javax.net.ssl.keyStore", DEFAULT_KS_FILE.toString());
            String tsLocation = System.getProperty("javax.net.ssl.trustStore", DEFAULT_KS_FILE.toString());
            char[] ksPwd = System.getProperty("javax.net.ssl.keyStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
            char[] tsPwd = System.getProperty("javax.net.ssl.trustStorePassword", DEFAULT_KS_PASSWORD).toCharArray();
            tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore ts = KeyStore.getInstance("JKS");
            ts.load(new FileInputStream(tsLocation), tsPwd);
            tmf.init(ts);
            kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(new FileInputStream(ksLocation), ksPwd);
            kmf.init(ks, ksPwd);
            if (this.log.isInfoEnabled()) {
                this.logKeyStore(ks, ksLocation, ksPwd);
            }
        }
        catch (FileNotFoundException e) {
            this.log.warn("Disabling TLS for intra-cluster messaging; Could not load cluster key store: {}", (Object)e.getMessage());
            return false;
        }
        catch (Exception e) {
            this.log.error("Error loading key store; disabling TLS for intra-cluster messaging", (Throwable)e);
            return false;
        }
        this.trustManager = tmf;
        this.keyManager = kmf;
        return true;
    }

    private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
        if (this.log.isInfoEnabled()) {
            this.log.info("Loaded cluster key store from: {}", (Object)ksLocation);
            try {
                Enumeration<String> e = ks.aliases();
                while (e.hasMoreElements()) {
                    String alias = e.nextElement();
                    Certificate cert = ks.getCertificate(alias);
                    if (cert == null) {
                        this.log.info("No certificate for alias {}", (Object)alias);
                        continue;
                    }
                    PublicKey key = cert.getPublicKey();
                    MessageDigest digest = MessageDigest.getInstance("SHA1");
                    digest.update(key.getEncoded());
                    StringJoiner fingerprint = new StringJoiner(":");
                    for (byte b : digest.digest()) {
                        fingerprint.add(String.format("%02X", b));
                    }
                    this.log.info("{} -> {}", (Object)alias, (Object)fingerprint);
                }
            }
            catch (Exception e) {
                this.log.warn("Unable to print contents of key store: {}", (Object)ksLocation, (Object)e);
            }
        }
    }

    private void initEventLoopGroup() {
        try {
            this.clientGroup = new EpollEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"epollC-%d", (Logger)this.log));
            this.serverGroup = new EpollEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"epollS-%d", (Logger)this.log));
            this.serverChannelClass = EpollServerSocketChannel.class;
            this.clientChannelClass = EpollSocketChannel.class;
            return;
        }
        catch (Throwable e) {
            this.log.debug("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", (Object)e.getMessage());
            this.clientGroup = new NioEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"nioC-%d", (Logger)this.log));
            this.serverGroup = new NioEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"nioS-%d", (Logger)this.log));
            this.serverChannelClass = NioServerSocketChannel.class;
            this.clientChannelClass = NioSocketChannel.class;
            return;
        }
    }

    private void timeoutAllCallbacks() {
        this.localClientConnection.timeoutCallbacks();
        for (RemoteClientConnection connection : this.clientConnections.values()) {
            connection.timeoutCallbacks();
        }
    }

    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        InternalRequest message = new InternalRequest(this.preamble, this.clockService.timeNow(), this.messageIdGenerator.incrementAndGet(), this.localEndpoint, type, payload);
        return this.executeOnPooledConnection(ep, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        return this.sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        long messageId = this.messageIdGenerator.incrementAndGet();
        InternalRequest message = new InternalRequest(this.preamble, this.clockService.timeNow(), messageId, this.localEndpoint, type, payload);
        return this.executeOnPooledConnection(ep, type, c -> c.sendAndReceive(message), executor);
    }

    private List<CompletableFuture<Channel>> getChannelPool(Endpoint endpoint) {
        return this.channels.computeIfAbsent(endpoint, e -> {
            ArrayList<Object> defaultList = new ArrayList<Object>(8);
            for (int i = 0; i < 8; ++i) {
                defaultList.add(null);
            }
            return Lists.newCopyOnWriteArrayList(defaultList);
        });
    }

    private int getChannelOffset(String messageType) {
        return Math.abs(messageType.hashCode() % 8);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Channel> getChannel(Endpoint endpoint, String messageType) {
        int offset;
        List<CompletableFuture<Channel>> channelPool = this.getChannelPool(endpoint);
        CompletableFuture<Channel> channelFuture = channelPool.get(offset = this.getChannelOffset(messageType));
        if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
            List<CompletableFuture<Channel>> list = channelPool;
            synchronized (list) {
                channelFuture = channelPool.get(offset);
                if (channelFuture == null || channelFuture.isCompletedExceptionally()) {
                    channelFuture = this.openChannel(endpoint);
                    channelPool.set(offset, channelFuture);
                }
            }
        }
        CompletableFuture<Channel> future = new CompletableFuture<Channel>();
        CompletableFuture<Channel> finalFuture = channelFuture;
        finalFuture.whenComplete((channel, error) -> {
            if (error == null) {
                if (!channel.isActive()) {
                    CompletableFuture currentFuture;
                    List list = channelPool;
                    synchronized (list) {
                        currentFuture = (CompletableFuture)channelPool.get(offset);
                        if (currentFuture == finalFuture) {
                            channelPool.set(offset, null);
                        }
                    }
                    ClientConnection connection = this.clientConnections.remove(channel);
                    if (connection != null) {
                        connection.close();
                    }
                    if (currentFuture == finalFuture) {
                        this.getChannel(endpoint, messageType).whenComplete((recursiveResult, recursiveError) -> {
                            if (recursiveError == null) {
                                future.complete((Channel)recursiveResult);
                            } else {
                                future.completeExceptionally((Throwable)recursiveError);
                            }
                        });
                    } else {
                        currentFuture.whenComplete((recursiveResult, recursiveError) -> {
                            if (recursiveError == null) {
                                future.complete((Channel)recursiveResult);
                            } else {
                                future.completeExceptionally((Throwable)recursiveError);
                            }
                        });
                    }
                } else {
                    future.complete((Channel)channel);
                }
            } else {
                future.completeExceptionally((Throwable)error);
            }
        });
        return future;
    }

    private <T> CompletableFuture<T> executeOnPooledConnection(Endpoint endpoint, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor) {
        CompletableFuture future = new CompletableFuture();
        this.executeOnPooledConnection(endpoint, type, callback, executor, future);
        return future;
    }

    private <T> void executeOnPooledConnection(Endpoint endpoint, String type, Function<ClientConnection, CompletableFuture<T>> callback, Executor executor, CompletableFuture<T> future) {
        if (endpoint.equals((Object)this.localEndpoint)) {
            callback.apply(this.localClientConnection).whenComplete((result, error) -> {
                if (error == null) {
                    executor.execute(() -> future.complete(result));
                } else {
                    executor.execute(() -> future.completeExceptionally((Throwable)error));
                }
            });
            return;
        }
        this.getChannel(endpoint, type).whenComplete((channel, channelError) -> {
            if (channelError == null) {
                ClientConnection connection = this.clientConnections.computeIfAbsent((Channel)channel, x$0 -> new RemoteClientConnection((Channel)x$0));
                ((CompletableFuture)callback.apply(connection)).whenComplete((result, sendError) -> {
                    if (sendError == null) {
                        executor.execute(() -> future.complete(result));
                    } else {
                        Throwable cause = Throwables.getRootCause((Throwable)sendError);
                        if (!(cause instanceof TimeoutException) && !(cause instanceof MessagingException)) {
                            channel.close().addListener(f -> {
                                connection.close();
                                this.clientConnections.remove(channel);
                            });
                        }
                        executor.execute(() -> future.completeExceptionally((Throwable)sendError));
                    }
                });
            } else {
                executor.execute(() -> future.completeExceptionally((Throwable)channelError));
            }
        });
    }

    public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, (message, connection) -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
    }

    public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, (message, connection) -> executor.execute(() -> {
            byte[] responsePayload = null;
            InternalReply.Status status = InternalReply.Status.OK;
            try {
                responsePayload = (byte[])handler.apply(message.sender(), message.payload());
            }
            catch (Exception e) {
                this.log.debug("An error occurred in a message handler: {}", (Throwable)e);
                status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
            }
            connection.reply((InternalRequest)message, status, Optional.ofNullable(responsePayload));
        }));
    }

    public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, (message, connection) -> ((CompletableFuture)handler.apply(message.sender(), message.payload())).whenComplete((result, error) -> {
            InternalReply.Status status;
            if (error == null) {
                status = InternalReply.Status.OK;
            } else {
                this.log.debug("An error occurred in a message handler: {}", error);
                status = InternalReply.Status.ERROR_HANDLER_EXCEPTION;
            }
            connection.reply((InternalRequest)message, status, Optional.ofNullable(result));
        }));
    }

    public void unregisterHandler(String type) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.remove(type);
    }

    private Bootstrap bootstrapClient(Endpoint endpoint) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(327680, 655360));
        bootstrap.option(ChannelOption.SO_SNDBUF, (Object)0x100000);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)1000);
        bootstrap.group(this.clientGroup);
        bootstrap.channel(this.clientChannelClass);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.remoteAddress(endpoint.host().toInetAddress(), endpoint.port());
        if (this.enableNettyTls) {
            bootstrap.handler((ChannelHandler)new SslClientCommunicationChannelInitializer());
        } else {
            bootstrap.handler((ChannelHandler)new BasicChannelInitializer());
        }
        return bootstrap;
    }

    private void startAcceptingConnections() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(8192, 32768));
        b.option(ChannelOption.SO_RCVBUF, (Object)0x100000);
        b.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        b.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        b.childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        b.group(this.serverGroup, this.clientGroup);
        b.channel(this.serverChannelClass);
        if (this.enableNettyTls) {
            b.childHandler((ChannelHandler)new SslServerCommunicationChannelInitializer());
        } else {
            b.childHandler((ChannelHandler)new BasicChannelInitializer());
        }
        b.option(ChannelOption.SO_BACKLOG, (Object)128);
        b.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        b.bind(this.localEndpoint.port()).sync().addListener(future -> {
            if (future.isSuccess()) {
                this.log.info("{} accepting incoming connections on port {}", (Object)this.localEndpoint.host(), (Object)this.localEndpoint.port());
            } else {
                this.log.warn("{} failed to bind to port {} due to {}", new Object[]{this.localEndpoint.host(), this.localEndpoint.port(), future.cause()});
            }
        });
    }

    private CompletableFuture<Channel> openChannel(Endpoint ep) {
        Bootstrap bootstrap = this.bootstrapClient(ep);
        CompletableFuture<Channel> retFuture = new CompletableFuture<Channel>();
        ChannelFuture f = bootstrap.connect();
        f.addListener(future -> {
            if (future.isSuccess()) {
                retFuture.complete(f.channel());
            } else {
                retFuture.completeExceptionally(future.cause());
            }
        });
        this.log.debug("Established a new connection to {}", (Object)ep);
        return retFuture;
    }

    static /* synthetic */ long access$1200() {
        return HISTORY_EXPIRE_MILLIS;
    }

    protected void bindClockService(HybridLogicalClockService hybridLogicalClockService) {
        this.clockService = hybridLogicalClockService;
    }

    protected void unbindClockService(HybridLogicalClockService hybridLogicalClockService) {
        if (this.clockService == hybridLogicalClockService) {
            this.clockService = null;
        }
    }

    protected void bindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        this.clusterMetadataService = clusterMetadataService;
    }

    protected void unbindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        if (this.clusterMetadataService == clusterMetadataService) {
            this.clusterMetadataService = null;
        }
    }

    private static final class RequestMonitor {
        private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(60);
        private final AtomicLong max = new AtomicLong();
        private volatile int replyCount;
        private volatile long lastUpdate = System.currentTimeMillis();

        private RequestMonitor() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void addReplyTime(long replyTime) {
            this.max.accumulateAndGet(replyTime, Math::max);
            int replyCount = ++this.replyCount;
            if (replyCount >= 100 && System.currentTimeMillis() - this.lastUpdate > 10000L) {
                RequestMonitor requestMonitor = this;
                synchronized (requestMonitor) {
                    long lastMax;
                    if (System.currentTimeMillis() - this.lastUpdate > 10000L && (lastMax = this.max.get()) > 0L) {
                        this.samples.addValue((double)lastMax);
                        this.lastUpdate = System.currentTimeMillis();
                        this.replyCount = 0;
                        this.max.set(0L);
                    }
                }
            }
        }

        boolean isTimedOut(long elapsedTime) {
            return this.samples.getN() == 60L && this.phi(elapsedTime) >= 12.0;
        }

        private double phi(long elapsedTime) {
            if (this.samples.getN() < 25L) {
                return 0.0;
            }
            return this.computePhi(this.samples, elapsedTime);
        }

        private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
            double meanMillis = samples.getMean();
            double y = ((double)elapsedTime - meanMillis) / Math.max(samples.getStandardDeviation(), 100.0);
            double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
            if ((double)elapsedTime > meanMillis) {
                return -Math.log10(e / (1.0 + e));
            }
            return -Math.log10(1.0 - 1.0 / (1.0 + e));
        }
    }

    private final class RemoteServerConnection
    implements ServerConnection {
        private final Channel channel;

        RemoteServerConnection(Channel channel) {
            this.channel = channel;
        }

        private void dispatch(InternalRequest message) {
            if (message.preamble() != NettyMessagingManager.this.preamble) {
                NettyMessagingManager.this.log.debug("Received {} with invalid preamble from {}", (Object)message.type(), (Object)message.sender());
                this.reply(message, InternalReply.Status.PROTOCOL_EXCEPTION, Optional.empty());
                return;
            }
            NettyMessagingManager.this.clockService.recordEventTime(message.time());
            BiConsumer handler = (BiConsumer)NettyMessagingManager.this.handlers.get(message.subject());
            if (handler != null) {
                handler.accept(message, this);
            } else {
                NettyMessagingManager.this.log.debug("No handler for message type {} from {}", (Object)message.type(), (Object)message.sender());
                this.reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
            }
        }

        @Override
        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
            InternalReply response = new InternalReply(NettyMessagingManager.this.preamble, NettyMessagingManager.this.clockService.timeNow(), message.id(), payload.orElse(EMPTY_PAYLOAD), status);
            this.channel.writeAndFlush((Object)response);
        }
    }

    private final class RemoteClientConnection
    extends AbstractClientConnection {
        private final Channel channel;

        RemoteClientConnection(Channel channel) {
            this.channel = channel;
        }

        @Override
        public CompletableFuture<Void> sendAsync(InternalRequest message) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            this.channel.writeAndFlush((Object)message).addListener(channelFuture -> {
                if (!channelFuture.isSuccess()) {
                    future.completeExceptionally(channelFuture.cause());
                } else {
                    future.complete(null);
                }
            });
            return future;
        }

        @Override
        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
            CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
            this.registerCallback(message.id(), message.subject(), future);
            this.channel.writeAndFlush((Object)message).addListener(channelFuture -> {
                Callback callback;
                if (!channelFuture.isSuccess() && (callback = this.failCallback(message.id())) != null) {
                    callback.completeExceptionally(channelFuture.cause());
                }
            });
            return future;
        }

        private void dispatch(InternalReply message) {
            if (message.preamble() != NettyMessagingManager.this.preamble) {
                NettyMessagingManager.this.log.debug("Received {} with invalid preamble", (Object)message.type());
                return;
            }
            NettyMessagingManager.this.clockService.recordEventTime(message.time());
            Callback callback = this.completeCallback(message.id());
            if (callback != null) {
                if (message.status() == InternalReply.Status.OK) {
                    callback.complete(message.payload());
                } else if (message.status() == InternalReply.Status.ERROR_NO_HANDLER) {
                    callback.completeExceptionally((Throwable)new MessagingException.NoRemoteHandler());
                } else if (message.status() == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                    callback.completeExceptionally((Throwable)new MessagingException.RemoteHandlerFailure());
                } else if (message.status() == InternalReply.Status.PROTOCOL_EXCEPTION) {
                    callback.completeExceptionally((Throwable)new MessagingException.ProtocolException());
                }
            } else {
                NettyMessagingManager.this.log.debug("Received a reply for message id:[{}] but was unable to locate the request handle", (Object)message.id());
            }
        }
    }

    private final class LocalServerConnection
    implements ServerConnection {
        private final CompletableFuture<byte[]> future;

        LocalServerConnection(CompletableFuture<byte[]> future) {
            this.future = future;
        }

        @Override
        public void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload) {
            if (this.future != null) {
                if (status == InternalReply.Status.OK) {
                    this.future.complete(payload.orElse(EMPTY_PAYLOAD));
                } else if (status == InternalReply.Status.ERROR_NO_HANDLER) {
                    this.future.completeExceptionally((Throwable)new MessagingException.NoRemoteHandler());
                } else if (status == InternalReply.Status.ERROR_HANDLER_EXCEPTION) {
                    this.future.completeExceptionally((Throwable)new MessagingException.RemoteHandlerFailure());
                } else if (status == InternalReply.Status.PROTOCOL_EXCEPTION) {
                    this.future.completeExceptionally((Throwable)new MessagingException.ProtocolException());
                }
            }
        }
    }

    private final class LocalClientConnection
    extends AbstractClientConnection {
        private LocalClientConnection() {
        }

        @Override
        public CompletableFuture<Void> sendAsync(InternalRequest message) {
            BiConsumer handler = (BiConsumer)NettyMessagingManager.this.handlers.get(message.subject());
            if (handler != null) {
                handler.accept(message, NettyMessagingManager.this.localServerConnection);
            } else {
                NettyMessagingManager.this.log.debug("No handler for message type {} from {}", (Object)message.type(), (Object)message.sender());
            }
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public CompletableFuture<byte[]> sendAndReceive(InternalRequest message) {
            CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
            future.whenComplete((r, e) -> this.completeCallback(message.id()));
            this.registerCallback(message.id(), message.subject(), future);
            BiConsumer handler = (BiConsumer)NettyMessagingManager.this.handlers.get(message.subject());
            if (handler != null) {
                handler.accept(message, new LocalServerConnection(future));
            } else {
                NettyMessagingManager.this.log.debug("No handler for message type {} from {}", (Object)message.type(), (Object)message.sender());
                new LocalServerConnection(future).reply(message, InternalReply.Status.ERROR_NO_HANDLER, Optional.empty());
            }
            return future;
        }
    }

    private abstract class AbstractClientConnection
    implements ClientConnection {
        private final Map<Long, Callback> futures = Maps.newConcurrentMap();
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder().expireAfterAccess(NettyMessagingManager.access$1200(), TimeUnit.MILLISECONDS).build();

        private AbstractClientConnection() {
        }

        void timeoutCallbacks() {
            long currentTime = System.currentTimeMillis();
            Iterator<Map.Entry<Long, Callback>> iterator = this.futures.entrySet().iterator();
            while (iterator.hasNext()) {
                Callback callback = iterator.next().getValue();
                try {
                    RequestMonitor requestMonitor = (RequestMonitor)this.requestMonitors.get((Object)callback.type, () -> new RequestMonitor());
                    long elapsedTime = currentTime - callback.time;
                    if (elapsedTime <= 5000L && (elapsedTime <= 100L || !requestMonitor.isTimedOut(elapsedTime))) continue;
                    iterator.remove();
                    requestMonitor.addReplyTime(elapsedTime);
                    callback.completeExceptionally(new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
                }
                catch (ExecutionException e) {
                    throw new AssertionError();
                }
            }
        }

        protected void registerCallback(long id, String subject, CompletableFuture<byte[]> future) {
            this.futures.put(id, new Callback(subject, future));
        }

        protected Callback completeCallback(long id) {
            Callback callback = this.futures.remove(id);
            if (callback != null) {
                try {
                    RequestMonitor requestMonitor = (RequestMonitor)this.requestMonitors.get((Object)callback.type, () -> new RequestMonitor());
                    requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
                }
                catch (ExecutionException e) {
                    throw new AssertionError();
                }
            }
            return callback;
        }

        protected Callback failCallback(long id) {
            return this.futures.remove(id);
        }

        @Override
        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                for (Callback callback : this.futures.values()) {
                    callback.completeExceptionally(new ConnectException());
                }
            }
        }
    }

    private static interface ServerConnection {
        public void reply(InternalRequest var1, InternalReply.Status var2, Optional<byte[]> var3);

        default public void close() {
        }
    }

    private static interface ClientConnection {
        public CompletableFuture<Void> sendAsync(InternalRequest var1);

        public CompletableFuture<byte[]> sendAndReceive(InternalRequest var1);

        default public void close() {
        }
    }

    private final class Callback {
        private final String type;
        private final CompletableFuture<byte[]> future;
        private final long time = System.currentTimeMillis();

        Callback(String type, CompletableFuture<byte[]> future) {
            this.type = type;
            this.future = future;
        }

        public void complete(byte[] value) {
            this.future.complete(value);
        }

        public void completeExceptionally(Throwable error) {
            this.future.completeExceptionally(error);
        }
    }

    @ChannelHandler.Sharable
    private class InboundMessageDispatcher
    extends SimpleChannelInboundHandler<Object> {
        private InboundMessageDispatcher() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
            InternalMessage message = (InternalMessage)rawMessage;
            try {
                if (message.isRequest()) {
                    RemoteServerConnection connection = NettyMessagingManager.this.serverConnections.computeIfAbsent(ctx.channel(), x$0 -> new RemoteServerConnection((Channel)x$0));
                    connection.dispatch((InternalRequest)message);
                } else {
                    RemoteClientConnection connection = NettyMessagingManager.this.clientConnections.computeIfAbsent(ctx.channel(), x$0 -> new RemoteClientConnection((Channel)x$0));
                    connection.dispatch((InternalReply)message);
                }
            }
            catch (RejectedExecutionException e) {
                NettyMessagingManager.this.log.warn("Unable to dispatch message due to {}", (Object)e.getMessage());
            }
        }

        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
            RemoteServerConnection serverConnection;
            NettyMessagingManager.this.log.error("Exception inside channel handling pipeline.", cause);
            RemoteClientConnection clientConnection = (RemoteClientConnection)NettyMessagingManager.this.clientConnections.remove(context.channel());
            if (clientConnection != null) {
                clientConnection.close();
            }
            if ((serverConnection = (RemoteServerConnection)NettyMessagingManager.this.serverConnections.remove(context.channel())) != null) {
                serverConnection.close();
            }
            context.close();
        }

        public void channelInactive(ChannelHandlerContext context) throws Exception {
            RemoteServerConnection serverConnection;
            RemoteClientConnection clientConnection = (RemoteClientConnection)NettyMessagingManager.this.clientConnections.remove(context.channel());
            if (clientConnection != null) {
                clientConnection.close();
            }
            if ((serverConnection = (RemoteServerConnection)NettyMessagingManager.this.serverConnections.remove(context.channel())) != null) {
                serverConnection.close();
            }
            context.close();
        }

        public final boolean acceptInboundMessage(Object msg) {
            return msg instanceof InternalMessage;
        }
    }

    private class BasicChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;

        private BasicChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("encoder", (ChannelHandler)new MessageEncoder(NettyMessagingManager.this.localEndpoint, NettyMessagingManager.this.preamble)).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    private class SslClientCommunicationChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;

        private SslClientCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            SSLContext clientContext = SSLContext.getInstance("TLS");
            clientContext.init(NettyMessagingManager.this.keyManager.getKeyManagers(), NettyMessagingManager.this.trustManager.getTrustManagers(), null);
            SSLEngine clientSslEngine = clientContext.createSSLEngine();
            clientSslEngine.setUseClientMode(true);
            clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
            clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
            clientSslEngine.setEnableSessionCreation(true);
            channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(clientSslEngine)).addLast("encoder", (ChannelHandler)new MessageEncoder(NettyMessagingManager.this.localEndpoint, NettyMessagingManager.this.preamble)).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    private class SslServerCommunicationChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;

        private SslServerCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            SSLContext serverContext = SSLContext.getInstance("TLS");
            serverContext.init(NettyMessagingManager.this.keyManager.getKeyManagers(), NettyMessagingManager.this.trustManager.getTrustManagers(), null);
            SSLEngine serverSslEngine = serverContext.createSSLEngine();
            serverSslEngine.setNeedClientAuth(true);
            serverSslEngine.setUseClientMode(false);
            serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
            serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
            serverSslEngine.setEnableSessionCreation(true);
            channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(serverSslEngine)).addLast("encoder", (ChannelHandler)new MessageEncoder(NettyMessagingManager.this.localEndpoint, NettyMessagingManager.this.preamble)).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }
}

