/*
 * Decompiled with CFR 0.152.
 */
package org.onosproject.store.cluster.messaging.impl;

import com.google.common.base.Strings;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.HybridLogicalClockService;
import org.onosproject.security.AppGuard;
import org.onosproject.security.AppPermission;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingException;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.cluster.messaging.impl.InternalMessage;
import org.onosproject.store.cluster.messaging.impl.MessageDecoder;
import org.onosproject.store.cluster.messaging.impl.MessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(immediate=true)
@Service
public class NettyMessagingManager
implements MessagingService {
    private static final int REPLY_TIME_OUT_MILLIS = 500;
    private static final short MIN_KS_LENGTH = 6;
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected HybridLogicalClockService clockService;
    private Endpoint localEp;
    private int preamble;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<String, Consumer<InternalMessage>>();
    private final AtomicLong messageIdGenerator = new AtomicLong(0L);
    private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder().expireAfterWrite(500L, TimeUnit.MILLISECONDS).removalListener((RemovalListener)new RemovalListener<Long, Callback>(){

        public void onRemoval(RemovalNotification<Long, Callback> entry) {
            if (entry.wasEvicted()) {
                ((Callback)entry.getValue()).completeExceptionally(new TimeoutException("Timedout waiting for reply"));
            }
        }
    }).build();
    private final GenericKeyedObjectPool<Endpoint, Connection> channels = new GenericKeyedObjectPool((KeyedPoolableObjectFactory)new OnosCommunicationChannelFactory());
    private EventLoopGroup serverGroup;
    private EventLoopGroup clientGroup;
    private Class<? extends ServerChannel> serverChannelClass;
    private Class<? extends Channel> clientChannelClass;
    protected static final boolean TLS_DISABLED = false;
    protected boolean enableNettyTls = false;
    protected String ksLocation;
    protected String tsLocation;
    protected char[] ksPwd;
    protected char[] tsPwd;
    @Reference(cardinality=ReferenceCardinality.MANDATORY_UNARY)
    protected ClusterMetadataService clusterMetadataService;

    @Activate
    public void activate() throws Exception {
        ControllerNode localNode = this.clusterMetadataService.getLocalNode();
        this.getTlsParameters();
        if (this.started.get()) {
            this.log.warn("Already running at local endpoint: {}", (Object)this.localEp);
            return;
        }
        this.preamble = this.clusterMetadataService.getClusterMetadata().getName().hashCode();
        this.localEp = new Endpoint(localNode.ip(), localNode.tcpPort());
        this.channels.setLifo(true);
        this.channels.setTestOnBorrow(true);
        this.channels.setTestOnReturn(true);
        this.channels.setMinEvictableIdleTimeMillis(60000L);
        this.channels.setTimeBetweenEvictionRunsMillis(30000L);
        this.initEventLoopGroup();
        this.startAcceptingConnections();
        this.started.set(true);
        this.serverGroup.scheduleWithFixedDelay(() -> this.callbacks.cleanUp(), 0L, 500L, TimeUnit.MILLISECONDS);
        this.log.info("Started");
    }

    @Deactivate
    public void deactivate() throws Exception {
        if (this.started.get()) {
            this.channels.close();
            this.serverGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
            this.started.set(false);
        }
        this.log.info("Stopped");
    }

    private void getTlsParameters() {
        String tempString = System.getProperty("enableNettyTLS");
        this.enableNettyTls = Strings.isNullOrEmpty((String)tempString) ? false : Boolean.parseBoolean(tempString);
        this.log.info("enableNettyTLS = {}", (Object)this.enableNettyTls);
        if (this.enableNettyTls) {
            this.ksLocation = System.getProperty("javax.net.ssl.keyStore");
            if (Strings.isNullOrEmpty((String)this.ksLocation)) {
                this.enableNettyTls = false;
                return;
            }
            this.tsLocation = System.getProperty("javax.net.ssl.trustStore");
            if (Strings.isNullOrEmpty((String)this.tsLocation)) {
                this.enableNettyTls = false;
                return;
            }
            this.ksPwd = System.getProperty("javax.net.ssl.keyStorePassword").toCharArray();
            if (6 > this.ksPwd.length) {
                this.enableNettyTls = false;
                return;
            }
            this.tsPwd = System.getProperty("javax.net.ssl.trustStorePassword").toCharArray();
            if (6 > this.tsPwd.length) {
                this.enableNettyTls = false;
                return;
            }
        }
    }

    private void initEventLoopGroup() {
        try {
            this.clientGroup = new EpollEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"epollC-%d", (Logger)this.log));
            this.serverGroup = new EpollEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"epollS-%d", (Logger)this.log));
            this.serverChannelClass = EpollServerSocketChannel.class;
            this.clientChannelClass = EpollSocketChannel.class;
            return;
        }
        catch (Throwable e) {
            this.log.debug("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", (Object)e.getMessage());
            this.clientGroup = new NioEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"nioC-%d", (Logger)this.log));
            this.serverGroup = new NioEventLoopGroup(0, Tools.groupedThreads((String)"NettyMessagingEvt", (String)"nioS-%d", (Logger)this.log));
            this.serverChannelClass = NioServerSocketChannel.class;
            this.clientChannelClass = NioSocketChannel.class;
            return;
        }
    }

    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        InternalMessage message = new InternalMessage(this.preamble, this.clockService.timeNow(), this.messageIdGenerator.incrementAndGet(), this.localEp, type, payload);
        return this.sendAsync(ep, message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        if (ep.equals((Object)this.localEp)) {
            try {
                this.dispatchLocally(message);
            }
            catch (IOException e) {
                return Tools.exceptionalFuture((Throwable)e);
            }
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        try {
            Connection connection = null;
            try {
                connection = (Connection)this.channels.borrowObject((Object)ep);
                connection.send(message, future);
            }
            finally {
                if (connection != null) {
                    this.channels.returnObject((Object)ep, (Object)connection);
                }
            }
        }
        catch (Exception e) {
            future.completeExceptionally(e);
        }
        return future;
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        return this.sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
    }

    public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        Callback callback = new Callback(future, executor);
        Long messageId = this.messageIdGenerator.incrementAndGet();
        this.callbacks.put((Object)messageId, (Object)callback);
        InternalMessage message = new InternalMessage(this.preamble, this.clockService.timeNow(), messageId, this.localEp, type, payload);
        this.sendAsync(ep, message).whenComplete((response, error) -> {
            if (error != null) {
                this.callbacks.invalidate((Object)messageId);
                callback.completeExceptionally((Throwable)error);
            }
        });
        return future;
    }

    public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
    }

    public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, message -> executor.execute(() -> {
            byte[] responsePayload = null;
            InternalMessage.Status status = InternalMessage.Status.OK;
            try {
                responsePayload = (byte[])handler.apply(message.sender(), message.payload());
            }
            catch (Exception e) {
                this.log.debug("An error occurred in a message handler: {}", (Throwable)e);
                status = InternalMessage.Status.ERROR_HANDLER_EXCEPTION;
            }
            this.sendReply((InternalMessage)message, status, Optional.ofNullable(responsePayload));
        }));
    }

    public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.put(type, message -> ((CompletableFuture)handler.apply(message.sender(), message.payload())).whenComplete((result, error) -> {
            InternalMessage.Status status;
            if (error == null) {
                status = InternalMessage.Status.OK;
            } else {
                this.log.debug("An error occurred in a message handler: {}", error);
                status = InternalMessage.Status.ERROR_HANDLER_EXCEPTION;
            }
            this.sendReply((InternalMessage)message, status, Optional.ofNullable(result));
        }));
    }

    public void unregisterHandler(String type) {
        AppGuard.checkPermission((AppPermission.Type)AppPermission.Type.CLUSTER_WRITE);
        this.handlers.remove(type);
    }

    private void startAcceptingConnections() throws InterruptedException {
        ServerBootstrap b = new ServerBootstrap();
        b.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)32768);
        b.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)8192);
        b.option(ChannelOption.SO_RCVBUF, (Object)0x100000);
        b.option(ChannelOption.TCP_NODELAY, (Object)true);
        b.childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
        b.group(this.serverGroup, this.clientGroup);
        b.channel(this.serverChannelClass);
        if (this.enableNettyTls) {
            b.childHandler((ChannelHandler)new SslServerCommunicationChannelInitializer());
        } else {
            b.childHandler((ChannelHandler)new OnosCommunicationChannelInitializer());
        }
        b.option(ChannelOption.SO_BACKLOG, (Object)128);
        b.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        b.bind(this.localEp.port()).sync().addListener(future -> {
            if (future.isSuccess()) {
                this.log.info("{} accepting incoming connections on port {}", (Object)this.localEp.host(), (Object)this.localEp.port());
            } else {
                this.log.warn("{} failed to bind to port {}", new Object[]{this.localEp.host(), this.localEp.port(), future.cause()});
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void dispatchLocally(InternalMessage message) throws IOException {
        if (message.preamble() != this.preamble) {
            this.log.debug("Received {} with invalid preamble from {}", (Object)message.type(), (Object)message.sender());
            this.sendReply(message, InternalMessage.Status.PROTOCOL_EXCEPTION, Optional.empty());
        }
        this.clockService.recordEventTime(message.time());
        String type = message.type();
        if (REPLY_MESSAGE_TYPE.equals(type)) {
            try {
                Callback callback = (Callback)this.callbacks.getIfPresent((Object)message.id());
                if (callback != null) {
                    if (message.status() == InternalMessage.Status.OK) {
                        callback.complete(message.payload());
                    } else if (message.status() == InternalMessage.Status.ERROR_NO_HANDLER) {
                        callback.completeExceptionally((Throwable)new MessagingException.NoRemoteHandler());
                    } else if (message.status() == InternalMessage.Status.ERROR_HANDLER_EXCEPTION) {
                        callback.completeExceptionally((Throwable)new MessagingException.RemoteHandlerFailure());
                    } else if (message.status() == InternalMessage.Status.PROTOCOL_EXCEPTION) {
                        callback.completeExceptionally((Throwable)new MessagingException.ProtocolException());
                    }
                } else {
                    this.log.debug("Received a reply for message id:[{}].  from {}. But was unable to locate the request handle", (Object)message.id(), (Object)message.sender());
                }
            }
            finally {
                this.callbacks.invalidate((Object)message.id());
            }
            return;
        }
        Consumer<InternalMessage> handler = this.handlers.get(type);
        if (handler != null) {
            handler.accept(message);
        } else {
            this.log.debug("No handler for message type {}", (Object)message.type(), (Object)message.sender());
            this.sendReply(message, InternalMessage.Status.ERROR_NO_HANDLER, Optional.empty());
        }
    }

    private void sendReply(InternalMessage message, InternalMessage.Status status, Optional<byte[]> responsePayload) {
        InternalMessage response = new InternalMessage(this.preamble, this.clockService.timeNow(), message.id(), this.localEp, REPLY_MESSAGE_TYPE, responsePayload.orElse(new byte[0]), status);
        this.sendAsync(message.sender(), response).whenComplete((result, error) -> {
            if (error != null) {
                this.log.debug("Failed to respond", error);
            }
        });
    }

    protected void bindClockService(HybridLogicalClockService hybridLogicalClockService) {
        this.clockService = hybridLogicalClockService;
    }

    protected void unbindClockService(HybridLogicalClockService hybridLogicalClockService) {
        if (this.clockService == hybridLogicalClockService) {
            this.clockService = null;
        }
    }

    protected void bindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        this.clusterMetadataService = clusterMetadataService;
    }

    protected void unbindClusterMetadataService(ClusterMetadataService clusterMetadataService) {
        if (this.clusterMetadataService == clusterMetadataService) {
            this.clusterMetadataService = null;
        }
    }

    private final class Connection {
        private final CompletableFuture<Channel> internalFuture;

        public Connection(CompletableFuture<Channel> internalFuture) {
            this.internalFuture = internalFuture;
        }

        public void send(Object message, CompletableFuture<Void> future) {
            this.internalFuture.whenComplete((channel, throwable) -> {
                if (throwable == null) {
                    channel.writeAndFlush(message).addListener(channelFuture -> {
                        if (!channelFuture.isSuccess()) {
                            future.completeExceptionally(channelFuture.cause());
                        } else {
                            future.complete(null);
                        }
                    });
                } else {
                    future.completeExceptionally((Throwable)throwable);
                }
            });
        }

        public void destroy() {
            Channel channel = this.internalFuture.getNow(null);
            if (channel != null) {
                channel.close();
            }
            this.internalFuture.cancel(false);
        }

        public boolean validate() {
            if (this.internalFuture.isCompletedExceptionally()) {
                return false;
            }
            Channel channel = this.internalFuture.getNow(null);
            return channel == null || channel.isActive();
        }
    }

    private final class Callback {
        private final CompletableFuture<byte[]> future;
        private final Executor executor;

        public Callback(CompletableFuture<byte[]> future, Executor executor) {
            this.future = future;
            this.executor = executor;
        }

        public void complete(byte[] value) {
            this.executor.execute(() -> this.future.complete(value));
        }

        public void completeExceptionally(Throwable error) {
            this.executor.execute(() -> this.future.completeExceptionally(error));
        }
    }

    @ChannelHandler.Sharable
    private class InboundMessageDispatcher
    extends SimpleChannelInboundHandler<Object> {
        private InboundMessageDispatcher() {
        }

        protected void channelRead0(ChannelHandlerContext ctx, Object rawMessage) throws Exception {
            InternalMessage message = (InternalMessage)rawMessage;
            try {
                NettyMessagingManager.this.dispatchLocally(message);
            }
            catch (RejectedExecutionException e) {
                NettyMessagingManager.this.log.warn("Unable to dispatch message due to {}", (Object)e.getMessage());
            }
        }

        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
            NettyMessagingManager.this.log.error("Exception inside channel handling pipeline.", cause);
            context.close();
        }

        public final boolean acceptInboundMessage(Object msg) {
            return msg instanceof InternalMessage;
        }
    }

    private class OnosCommunicationChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private OnosCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast("encoder", this.encoder).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    private class SslClientCommunicationChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private SslClientCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore ts = KeyStore.getInstance("JKS");
            ts.load(new FileInputStream(NettyMessagingManager.this.tsLocation), NettyMessagingManager.this.tsPwd);
            tmFactory.init(ts);
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(new FileInputStream(NettyMessagingManager.this.ksLocation), NettyMessagingManager.this.ksPwd);
            kmf.init(ks, NettyMessagingManager.this.ksPwd);
            SSLContext clientContext = SSLContext.getInstance("TLS");
            clientContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
            SSLEngine clientSslEngine = clientContext.createSSLEngine();
            clientSslEngine.setUseClientMode(true);
            clientSslEngine.setEnabledProtocols(clientSslEngine.getSupportedProtocols());
            clientSslEngine.setEnabledCipherSuites(clientSslEngine.getSupportedCipherSuites());
            clientSslEngine.setEnableSessionCreation(true);
            channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(clientSslEngine)).addLast("encoder", this.encoder).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    private class SslServerCommunicationChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final ChannelHandler dispatcher;
        private final ChannelHandler encoder;

        private SslServerCommunicationChannelInitializer() {
            this.dispatcher = new InboundMessageDispatcher();
            this.encoder = new MessageEncoder(NettyMessagingManager.this.preamble);
        }

        protected void initChannel(SocketChannel channel) throws Exception {
            TrustManagerFactory tmFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            KeyStore ts = KeyStore.getInstance("JKS");
            ts.load(new FileInputStream(NettyMessagingManager.this.tsLocation), NettyMessagingManager.this.tsPwd);
            tmFactory.init(ts);
            KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            KeyStore ks = KeyStore.getInstance("JKS");
            ks.load(new FileInputStream(NettyMessagingManager.this.ksLocation), NettyMessagingManager.this.ksPwd);
            kmf.init(ks, NettyMessagingManager.this.ksPwd);
            SSLContext serverContext = SSLContext.getInstance("TLS");
            serverContext.init(kmf.getKeyManagers(), tmFactory.getTrustManagers(), null);
            SSLEngine serverSslEngine = serverContext.createSSLEngine();
            serverSslEngine.setNeedClientAuth(true);
            serverSslEngine.setUseClientMode(false);
            serverSslEngine.setEnabledProtocols(serverSslEngine.getSupportedProtocols());
            serverSslEngine.setEnabledCipherSuites(serverSslEngine.getSupportedCipherSuites());
            serverSslEngine.setEnableSessionCreation(true);
            channel.pipeline().addLast("ssl", (ChannelHandler)new SslHandler(serverSslEngine)).addLast("encoder", this.encoder).addLast("decoder", (ChannelHandler)new MessageDecoder()).addLast("handler", this.dispatcher);
        }
    }

    private class OnosCommunicationChannelFactory
    implements KeyedPoolableObjectFactory<Endpoint, Connection> {
        private OnosCommunicationChannelFactory() {
        }

        public void activateObject(Endpoint endpoint, Connection connection) throws Exception {
        }

        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
            NettyMessagingManager.this.log.debug("Closing connection to {}", (Object)ep);
            connection.destroy();
        }

        public Connection makeObject(Endpoint ep) throws Exception {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.option(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT);
            bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)655360);
            bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)327680);
            bootstrap.option(ChannelOption.SO_SNDBUF, (Object)0x100000);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)1000);
            bootstrap.group(NettyMessagingManager.this.clientGroup);
            bootstrap.channel(NettyMessagingManager.this.clientChannelClass);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
            if (NettyMessagingManager.this.enableNettyTls) {
                bootstrap.handler((ChannelHandler)new SslClientCommunicationChannelInitializer());
            } else {
                bootstrap.handler((ChannelHandler)new OnosCommunicationChannelInitializer());
            }
            CompletableFuture<Channel> retFuture = new CompletableFuture<Channel>();
            ChannelFuture f = bootstrap.connect(ep.host().toInetAddress(), ep.port());
            f.addListener(future -> {
                if (future.isSuccess()) {
                    retFuture.complete(f.channel());
                } else {
                    retFuture.completeExceptionally(future.cause());
                }
            });
            NettyMessagingManager.this.log.debug("Established a new connection to {}", (Object)ep);
            return new Connection(retFuture);
        }

        public void passivateObject(Endpoint ep, Connection connection) throws Exception {
        }

        public boolean validateObject(Endpoint ep, Connection connection) {
            return connection.validate();
        }
    }
}

