/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.kafka09.org.apache.kafka.clients;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.ClientRequest;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.ClientResponse;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.ClusterConnectionStates;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.ConnectionState;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.InFlightRequests;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.KafkaClient;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.Metadata;
import org.graylog.shaded.kafka09.org.apache.kafka.clients.MetadataUpdater;
import org.graylog.shaded.kafka09.org.apache.kafka.common.Cluster;
import org.graylog.shaded.kafka09.org.apache.kafka.common.Node;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.NetworkReceive;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.Selectable;
import org.graylog.shaded.kafka09.org.apache.kafka.common.network.Send;
import org.graylog.shaded.kafka09.org.apache.kafka.common.protocol.ApiKeys;
import org.graylog.shaded.kafka09.org.apache.kafka.common.protocol.ProtoUtils;
import org.graylog.shaded.kafka09.org.apache.kafka.common.protocol.types.Struct;
import org.graylog.shaded.kafka09.org.apache.kafka.common.requests.MetadataRequest;
import org.graylog.shaded.kafka09.org.apache.kafka.common.requests.MetadataResponse;
import org.graylog.shaded.kafka09.org.apache.kafka.common.requests.RequestHeader;
import org.graylog.shaded.kafka09.org.apache.kafka.common.requests.RequestSend;
import org.graylog.shaded.kafka09.org.apache.kafka.common.requests.ResponseHeader;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Time;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Utils;
import org.graylog.shaded.kafka09.org.slf4j.Logger;
import org.graylog.shaded.kafka09.org.slf4j.LoggerFactory;

public class NetworkClient
implements KafkaClient {
    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
    private final Selectable selector;
    private final MetadataUpdater metadataUpdater;
    private final Random randOffset;
    private final ClusterConnectionStates connectionStates;
    private final InFlightRequests inFlightRequests;
    private final int socketSendBuffer;
    private final int socketReceiveBuffer;
    private final String clientId;
    private int correlation;
    private final int requestTimeoutMs;
    private final Time time;

    public NetworkClient(Selectable selector, Metadata metadata, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, int requestTimeoutMs, Time time) {
        this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
    }

    public NetworkClient(Selectable selector, MetadataUpdater metadataUpdater, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, int requestTimeoutMs, Time time) {
        this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time);
    }

    private NetworkClient(MetadataUpdater metadataUpdater, Metadata metadata, Selectable selector, String clientId, int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, int requestTimeoutMs, Time time) {
        if (metadataUpdater == null) {
            if (metadata == null) {
                throw new IllegalArgumentException("`metadata` must not be null");
            }
            this.metadataUpdater = new DefaultMetadataUpdater(metadata);
        } else {
            this.metadataUpdater = metadataUpdater;
        }
        this.selector = selector;
        this.clientId = clientId;
        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
        this.socketSendBuffer = socketSendBuffer;
        this.socketReceiveBuffer = socketReceiveBuffer;
        this.correlation = 0;
        this.randOffset = new Random();
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
    }

    @Override
    public boolean ready(Node node, long now) {
        if (this.isReady(node, now)) {
            return true;
        }
        if (this.connectionStates.canConnect(node.idString(), now)) {
            this.initiateConnect(node, now);
        }
        return false;
    }

    @Override
    public void close(String nodeId) {
        this.selector.close(nodeId);
        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
            this.metadataUpdater.maybeHandleDisconnection(request);
        }
        this.connectionStates.remove(nodeId);
    }

    @Override
    public long connectionDelay(Node node, long now) {
        return this.connectionStates.connectionDelay(node.idString(), now);
    }

    @Override
    public boolean connectionFailed(Node node) {
        return this.connectionStates.connectionState(node.idString()).equals((Object)ConnectionState.DISCONNECTED);
    }

    @Override
    public boolean isReady(Node node, long now) {
        return !this.metadataUpdater.isUpdateDue(now) && this.canSendRequest(node.idString());
    }

    private boolean canSendRequest(String node) {
        return this.connectionStates.isConnected(node) && this.selector.isChannelReady(node) && this.inFlightRequests.canSendMore(node);
    }

    @Override
    public void send(ClientRequest request, long now) {
        String nodeId = request.request().destination();
        if (!this.canSendRequest(nodeId)) {
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        }
        this.doSend(request, now);
    }

    private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        this.inFlightRequests.add(request);
        this.selector.send(request.request());
    }

    @Override
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = this.metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, this.requestTimeoutMs));
        }
        catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }
        long updatedNow = this.time.milliseconds();
        ArrayList<ClientResponse> responses = new ArrayList<ClientResponse>();
        this.handleCompletedSends(responses, updatedNow);
        this.handleCompletedReceives(responses, updatedNow);
        this.handleDisconnections(responses, updatedNow);
        this.handleConnections();
        this.handleTimedOutRequests(responses, updatedNow);
        for (ClientResponse response : responses) {
            if (!response.request().hasCallback()) continue;
            try {
                response.request().callback().onComplete(response);
            }
            catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
        return responses;
    }

    @Override
    public int inFlightRequestCount() {
        return this.inFlightRequests.inFlightRequestCount();
    }

    @Override
    public int inFlightRequestCount(String node) {
        return this.inFlightRequests.inFlightRequestCount(node);
    }

    @Override
    public RequestHeader nextRequestHeader(ApiKeys key) {
        return new RequestHeader(key.id, this.clientId, this.correlation++);
    }

    @Override
    public RequestHeader nextRequestHeader(ApiKeys key, short version) {
        return new RequestHeader(key.id, version, this.clientId, this.correlation++);
    }

    @Override
    public void wakeup() {
        this.selector.wakeup();
    }

    @Override
    public void close() {
        this.selector.close();
    }

    @Override
    public Node leastLoadedNode(long now) {
        List<Node> nodes = this.metadataUpdater.fetchNodes();
        int inflight = Integer.MAX_VALUE;
        Node found = null;
        int offset = this.randOffset.nextInt(nodes.size());
        for (int i = 0; i < nodes.size(); ++i) {
            int idx = (offset + i) % nodes.size();
            Node node = nodes.get(idx);
            int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
            if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
                return node;
            }
            if (this.connectionStates.isBlackedOut(node.idString(), now) || currInflight >= inflight) continue;
            inflight = currInflight;
            found = node;
        }
        return found;
    }

    private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) {
        this.connectionStates.disconnected(nodeId, now);
        for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) {
            log.trace("Cancelled request {} due to node {} being disconnected", request, (Object)nodeId);
            if (this.metadataUpdater.maybeHandleDisconnection(request)) continue;
            responses.add(new ClientResponse(request, now, true, null));
        }
    }

    private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
        for (String nodeId : nodeIds) {
            this.selector.close(nodeId);
            log.debug("Disconnecting from node {} due to request timeout.", (Object)nodeId);
            this.processDisconnection(responses, nodeId, now);
        }
        if (nodeIds.size() > 0) {
            this.metadataUpdater.requestUpdate();
        }
    }

    private void handleCompletedSends(List<ClientResponse> responses, long now) {
        for (Send send2 : this.selector.completedSends()) {
            ClientRequest request = this.inFlightRequests.lastSent(send2.destination());
            if (request.expectResponse()) continue;
            this.inFlightRequests.completeLastSent(send2.destination());
            responses.add(new ClientResponse(request, now, false, null));
        }
    }

    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            ClientRequest req = this.inFlightRequests.completeNext(source);
            ResponseHeader header = ResponseHeader.parse(receive.payload());
            short apiKey = req.request().header().apiKey();
            short apiVer = req.request().header().apiVersion();
            Struct body = (Struct)ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
            this.correlate(req.request().header(), header);
            if (this.metadataUpdater.maybeHandleCompletedReceive(req, now, body)) continue;
            responses.add(new ClientResponse(req, now, false, body));
        }
    }

    private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (String node : this.selector.disconnected()) {
            log.debug("Node {} disconnected.", (Object)node);
            this.processDisconnection(responses, node, now);
        }
        if (this.selector.disconnected().size() > 0) {
            this.metadataUpdater.requestUpdate();
        }
    }

    private void handleConnections() {
        for (String node : this.selector.connected()) {
            log.debug("Completed connection to node {}", (Object)node);
            this.connectionStates.connected(node);
        }
    }

    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
        if (requestHeader.correlationId() != responseHeader.correlationId()) {
            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + ") does not match request (" + requestHeader.correlationId() + ")");
        }
    }

    private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", new Object[]{node.id(), node.host(), node.port()});
            this.connectionStates.connecting(nodeConnectionId, now);
            this.selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
        }
        catch (IOException e) {
            this.connectionStates.disconnected(nodeConnectionId, now);
            this.metadataUpdater.requestUpdate();
            log.debug("Error connecting to node {} at {}:{}:", new Object[]{node.id(), node.host(), node.port(), e});
        }
    }

    class DefaultMetadataUpdater
    implements MetadataUpdater {
        private final Metadata metadata;
        private boolean metadataFetchInProgress;
        private long lastNoNodeAvailableMs;

        DefaultMetadataUpdater(Metadata metadata) {
            this.metadata = metadata;
            this.metadataFetchInProgress = false;
            this.lastNoNodeAvailableMs = 0L;
        }

        @Override
        public List<Node> fetchNodes() {
            return this.metadata.fetch().nodes();
        }

        @Override
        public boolean isUpdateDue(long now) {
            return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0L;
        }

        @Override
        public long maybeUpdate(long now) {
            long timeToNextMetadataUpdate = this.metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + this.metadata.refreshBackoff() - now, 0L);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0L;
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch);
            if (metadataTimeout == 0L) {
                Node node = NetworkClient.this.leastLoadedNode(now);
                this.maybeUpdate(now, node);
            }
            return metadataTimeout;
        }

        @Override
        public boolean maybeHandleDisconnection(ClientRequest request) {
            ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
            if (requestKey == ApiKeys.METADATA) {
                this.metadataFetchInProgress = false;
                return true;
            }
            return false;
        }

        @Override
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                this.handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }

        @Override
        public void requestUpdate() {
            this.metadata.requestUpdate();
        }

        private void handleResponse(RequestHeader header, Struct body, long now) {
            this.metadataFetchInProgress = false;
            MetadataResponse response = new MetadataResponse(body);
            Cluster cluster = response.cluster();
            if (response.errors().size() > 0) {
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
            }
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);
            }
        }

        private ClientRequest request(long now, String node, Set<String> topics) {
            MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
            RequestSend send2 = new RequestSend(node, NetworkClient.this.nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
            return new ClientRequest(now, true, send2, null, true);
        }

        private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();
            if (NetworkClient.this.canSendRequest(nodeConnectionId)) {
                HashSet<String> topics = this.metadata.needMetadataForAllTopics() ? new HashSet() : this.metadata.topics();
                this.metadataFetchInProgress = true;
                ClientRequest metadataRequest = this.request(now, nodeConnectionId, topics);
                log.debug("Sending metadata request {} to node {}", metadataRequest, (Object)node.id());
                NetworkClient.this.doSend(metadataRequest, now);
            } else if (NetworkClient.this.connectionStates.canConnect(nodeConnectionId, now)) {
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                NetworkClient.this.initiateConnect(node, now);
            } else {
                this.lastNoNodeAvailableMs = now;
            }
        }
    }
}

