/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.kafka09.org.apache.kafka.common.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.graylog.shaded.kafka09.org.apache.kafka.common.KafkaException;
import org.graylog.shaded.kafka09.org.apache.kafka.common.MetricName;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Measurable;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.MetricConfig;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Metrics;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.Sensor;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.stats.Avg;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.stats.Count;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.stats.Max;
import org.graylog.shaded.kafka09.org.apache.kafka.common.metrics.stats.Rate;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.ChannelBuilder;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.KafkaChannel;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.NetworkReceive;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.Selectable;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.Send;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.SystemTime;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Time;
import org.graylog.shaded.kafka09.org.slf4j.Logger;
import org.graylog.shaded.kafka09.org.slf4j.LoggerFactory;

public class Selector
implements Selectable {
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, KafkaChannel> channels;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
    private final List<String> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final String metricGrpPrefix;
    private final Map<String, String> metricTags;
    private final ChannelBuilder channelBuilder;
    private final Map<String, Long> lruConnections;
    private final long connectionsMaxIdleNanos;
    private final int maxReceiveSize;
    private final boolean metricsPerConnection;
    private long currentTimeNanos;
    private long nextIdleCloseCheckTime;

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, ChannelBuilder channelBuilder) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
        }
        catch (IOException e) {
            throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000L * 1000L;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.channels = new HashMap<String, KafkaChannel>();
        this.completedSends = new ArrayList<Send>();
        this.completedReceives = new ArrayList<NetworkReceive>();
        this.stagedReceives = new HashMap<KafkaChannel, Deque<NetworkReceive>>();
        this.connected = new ArrayList<String>();
        this.disconnected = new ArrayList<String>();
        this.failedSends = new ArrayList<String>();
        this.sensors = new SelectorMetrics(metrics);
        this.channelBuilder = channelBuilder;
        this.lruConnections = new LinkedHashMap<String, Long>(16, 0.75f, true);
        this.currentTimeNanos = new SystemTime().nanoseconds();
        this.nextIdleCloseCheckTime = this.currentTimeNanos + this.connectionsMaxIdleNanos;
        this.metricsPerConnection = metricsPerConnection;
    }

    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, ChannelBuilder channelBuilder) {
        this(-1, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true, channelBuilder);
    }

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.channels.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        if (sendBufferSize != -1) {
            socket.setSendBufferSize(sendBufferSize);
        }
        if (receiveBufferSize != -1) {
            socket.setReceiveBufferSize(receiveBufferSize);
        }
        socket.setTcpNoDelay(true);
        try {
            socketChannel.connect(address);
        }
        catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        }
        catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        SelectionKey key = socketChannel.register(this.nioSelector, 8);
        KafkaChannel channel = this.channelBuilder.buildChannel(id, key, this.maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);
    }

    public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
        SelectionKey key = socketChannel.register(this.nioSelector, 1);
        KafkaChannel channel = this.channelBuilder.buildChannel(id, key, this.maxReceiveSize);
        key.attach(channel);
        this.channels.put(id, channel);
    }

    @Override
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override
    public void close() {
        ArrayList<String> connections = new ArrayList<String>(this.channels.keySet());
        for (String id : connections) {
            this.close(id);
        }
        try {
            this.nioSelector.close();
        }
        catch (IOException e) {
            log.error("Exception closing nioSelector:", e);
        }
        catch (SecurityException se) {
            log.error("Exception closing nioSelector:", se);
        }
        this.sensors.close();
        this.channelBuilder.close();
    }

    @Override
    public void send(Send send2) {
        KafkaChannel channel = this.channelOrFail(send2.destination());
        try {
            channel.setSend(send2);
        }
        catch (CancelledKeyException e) {
            this.failedSends.add(send2.destination());
            this.close(channel);
        }
    }

    @Override
    public void poll(long timeout) throws IOException {
        long endSelect;
        if (timeout < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        this.clear();
        if (this.hasStagedReceives()) {
            timeout = 0L;
        }
        long startSelect = this.time.nanoseconds();
        int readyKeys = this.select(timeout);
        this.currentTimeNanos = endSelect = this.time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, this.time.milliseconds());
        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
            Iterator<SelectionKey> iter2 = keys.iterator();
            while (iter2.hasNext()) {
                SelectionKey key = iter2.next();
                iter2.remove();
                KafkaChannel channel = this.channel(key);
                this.sensors.maybeRegisterConnectionMetrics(channel.id());
                this.lruConnections.put(channel.id(), this.currentTimeNanos);
                try {
                    Send send2;
                    if (key.isConnectable()) {
                        channel.finishConnect();
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    }
                    if (channel.isConnected() && !channel.ready()) {
                        channel.prepare();
                    }
                    if (channel.ready() && key.isReadable() && !this.hasStagedReceive(channel)) {
                        NetworkReceive networkReceive;
                        while ((networkReceive = channel.read()) != null) {
                            this.addToStagedReceives(channel, networkReceive);
                        }
                    }
                    if (channel.ready() && key.isWritable() && (send2 = channel.write()) != null) {
                        this.completedSends.add(send2);
                        this.sensors.recordBytesSent(channel.id(), send2.size());
                    }
                    if (key.isValid()) continue;
                    this.close(channel);
                    this.disconnected.add(channel.id());
                }
                catch (Exception e) {
                    String desc = channel.socketDescription();
                    if (e instanceof IOException) {
                        log.debug("Connection with {} disconnected", (Object)desc, (Object)e);
                    } else {
                        log.warn("Unexpected error from {}; closing connection", (Object)desc, (Object)e);
                    }
                    this.close(channel);
                    this.disconnected.add(channel.id());
                }
            }
        }
        this.addToCompletedReceives();
        long endIo = this.time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, this.time.milliseconds());
        this.maybeCloseOldestConnection();
    }

    @Override
    public List<Send> completedSends() {
        return this.completedSends;
    }

    @Override
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public List<String> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<String> connected() {
        return this.connected;
    }

    @Override
    public void mute(String id) {
        KafkaChannel channel = this.channelOrFail(id);
        this.mute(channel);
    }

    private void mute(KafkaChannel channel) {
        channel.mute();
    }

    @Override
    public void unmute(String id) {
        KafkaChannel channel = this.channelOrFail(id);
        this.unmute(channel);
    }

    private void unmute(KafkaChannel channel) {
        channel.unmute();
    }

    @Override
    public void muteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.mute(channel);
        }
    }

    @Override
    public void unmuteAll() {
        for (KafkaChannel channel : this.channels.values()) {
            this.unmute(channel);
        }
    }

    private void maybeCloseOldestConnection() {
        if (this.currentTimeNanos > this.nextIdleCloseCheckTime) {
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = this.currentTimeNanos + this.connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = this.lruConnections.entrySet().iterator().next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                this.nextIdleCloseCheckTime = connectionLastActiveTime + this.connectionsMaxIdleNanos;
                if (this.currentTimeNanos > this.nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled()) {
                        log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (this.currentTimeNanos - connectionLastActiveTime) / 1000L / 1000L + " millis");
                    }
                    this.disconnected.add(connectionId);
                    this.close(connectionId);
                }
            }
        }
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        this.disconnected.addAll(this.failedSends);
        this.failedSends.clear();
    }

    private int select(long ms) throws IOException {
        if (ms < 0L) {
            throw new IllegalArgumentException("timeout should be >= 0");
        }
        if (ms == 0L) {
            return this.nioSelector.selectNow();
        }
        return this.nioSelector.select(ms);
    }

    @Override
    public void close(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel != null) {
            this.close(channel);
        }
    }

    private void close(KafkaChannel channel) {
        try {
            channel.close();
        }
        catch (IOException e) {
            log.error("Exception closing connection to node {}:", (Object)channel.id(), (Object)e);
        }
        this.stagedReceives.remove(channel);
        this.channels.remove(channel.id());
        this.lruConnections.remove(channel.id());
        this.sensors.connectionClosed.record();
    }

    @Override
    public boolean isChannelReady(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel == null) {
            return false;
        }
        return channel.ready();
    }

    private KafkaChannel channelOrFail(String id) {
        KafkaChannel channel = this.channels.get(id);
        if (channel == null) {
            throw new IllegalStateException("Attempt to retrieve channel for which there is no open connection. Connection id " + id + " existing connections " + this.channels.keySet().toString());
        }
        return channel;
    }

    public List<KafkaChannel> channels() {
        return new ArrayList<KafkaChannel>(this.channels.values());
    }

    public KafkaChannel channel(String id) {
        return this.channels.get(id);
    }

    private KafkaChannel channel(SelectionKey key) {
        return (KafkaChannel)key.attachment();
    }

    private boolean hasStagedReceive(KafkaChannel channel) {
        return this.stagedReceives.containsKey(channel);
    }

    private boolean hasStagedReceives() {
        for (KafkaChannel channel : this.stagedReceives.keySet()) {
            if (channel.isMute()) continue;
            return true;
        }
        return false;
    }

    private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!this.stagedReceives.containsKey(channel)) {
            this.stagedReceives.put(channel, new ArrayDeque());
        }
        Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
        deque.add(receive);
    }

    private void addToCompletedReceives() {
        if (this.stagedReceives.size() > 0) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter2 = this.stagedReceives.entrySet().iterator();
            while (iter2.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry2 = iter2.next();
                KafkaChannel channel = entry2.getKey();
                if (channel.isMute()) continue;
                Deque<NetworkReceive> deque = entry2.getValue();
                NetworkReceive networkReceive = deque.poll();
                this.completedReceives.add(networkReceive);
                this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
                if (deque.size() != 0) continue;
                iter2.remove();
            }
        }
    }

    private class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;
        private final List<MetricName> topLevelMetricNames = new ArrayList<MetricName>();
        private final List<Sensor> sensors = new ArrayList<Sensor>();

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            String metricGrpName = Selector.this.metricGrpPrefix + "-metrics";
            StringBuilder tagsSuffix = new StringBuilder();
            for (Map.Entry tag : Selector.this.metricTags.entrySet()) {
                tagsSuffix.append((String)tag.getKey());
                tagsSuffix.append("-");
                tagsSuffix.append((String)tag.getValue());
            }
            this.connectionClosed = this.sensor("connections-closed:" + tagsSuffix.toString(), new Sensor[0]);
            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", Selector.this.metricTags);
            this.connectionClosed.add(metricName, new Rate());
            this.connectionCreated = this.sensor("connections-created:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", Selector.this.metricTags);
            this.connectionCreated.add(metricName, new Rate());
            this.bytesTransferred = this.sensor("bytes-sent-received:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", Selector.this.metricTags);
            this.bytesTransferred.add(metricName, new Rate(new Count()));
            this.bytesSent = this.sensor("bytes-sent:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate());
            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate(new Count()));
            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Avg());
            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Max());
            this.bytesReceived = this.sensor("bytes-received:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate());
            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate(new Count()));
            this.selectTime = this.sensor("select-time:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(new Count()));
            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Avg());
            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = this.sensor("io-time:" + tagsSuffix.toString(), new Sensor[0]);
            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", Selector.this.metricTags);
            this.ioTime.add(metricName, new Avg());
            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", Selector.this.metricTags);
            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", Selector.this.metricTags);
            this.topLevelMetricNames.add(metricName);
            this.metrics.addMetric(metricName, new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return Selector.this.channels.size();
                }
            });
        }

        private Sensor sensor(String name, Sensor ... parents) {
            Sensor sensor = this.metrics.sensor(name, parents);
            this.sensors.add(sensor);
            return sensor;
        }

        public void maybeRegisterConnectionMetrics(String connectionId) {
            String nodeRequestName;
            Sensor nodeRequest;
            if (!connectionId.isEmpty() && Selector.this.metricsPerConnection && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) == null) {
                String metricGrpName = Selector.this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(Selector.this.metricTags);
                tags.put("node-id", "node-" + connectionId);
                nodeRequest = this.sensor(nodeRequestName, new Sensor[0]);
                MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
                nodeRequest.add(metricName, new Rate());
                metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                nodeRequest.add(metricName, new Rate(new Count()));
                metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                nodeRequest.add(metricName, new Avg());
                metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                nodeRequest.add(metricName, new Max());
                String nodeResponseName = "node-" + connectionId + ".bytes-received";
                Sensor nodeResponse = this.sensor(nodeResponseName, new Sensor[0]);
                metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
                nodeResponse.add(metricName, new Rate());
                metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                nodeResponse.add(metricName, new Rate(new Count()));
                String nodeTimeName = "node-" + connectionId + ".latency";
                Sensor nodeRequestTime = this.sensor(nodeTimeName, new Sensor[0]);
                metricName = new MetricName("request-latency-avg", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Avg());
                metricName = new MetricName("request-latency-max", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Max());
            }
        }

        public void recordBytesSent(String connectionId, long bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesSent.record(bytes, now);
            if (!connectionId.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void recordBytesReceived(String connection, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesReceived.record(bytes, now);
            if (!connection.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connection + ".bytes-received")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void close() {
            for (MetricName metricName : this.topLevelMetricNames) {
                this.metrics.removeMetric(metricName);
            }
            for (Sensor sensor : this.sensors) {
                this.metrics.removeSensor(sensor.name());
            }
        }
    }
}

