/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.spi.communication.tcp;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.processors.metric.impl.MetricUtils;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
import org.apache.ignite.spi.metric.ReadOnlyMetricRegistry;

class TcpCommunicationMetricsListener {
    private final GridMetricManager mmgr;
    private final MetricRegistry mreg;
    private final Ignite ignite;
    private final Set<ThreadMetrics> allMetrics = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ThreadLocal<ThreadMetrics> threadMetrics = ThreadLocal.withInitial(() -> {
        ThreadMetrics metrics = new ThreadMetrics();
        this.allMetrics.add(metrics);
        return metrics;
    });
    private final Function<Short, LongAdderMetric> sentMsgsCntByTypeMetricFactory;
    private final Function<Short, LongAdderMetric> rcvdMsgsCntByTypeMetricFactory;
    private final Function<Object, LongAdderMetric> sentMsgsCntByConsistentIdMetricFactory;
    private final Function<Object, LongAdderMetric> rcvdMsgsCntByConsistentIdMetricFactory;
    private final LongAdderMetric sentBytesMetric;
    private final LongAdderMetric rcvdBytesMetric;
    private final LongAdderMetric sentMsgsMetric;
    private final LongAdderMetric rcvdMsgsMetric;
    private final Object msgTypMapMux = new Object();
    private volatile Map<Short, String> msgTypMap;

    public TcpCommunicationMetricsListener(GridMetricManager mmgr, Ignite ignite) {
        this.mmgr = mmgr;
        this.ignite = ignite;
        this.mreg = mmgr.registry(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME);
        this.sentMsgsCntByTypeMetricFactory = directType -> this.mreg.longAdderMetric(TcpCommunicationMetricsListener.sentMessagesByTypeMetricName(directType), "Total number of messages with given type sent by current node");
        this.rcvdMsgsCntByTypeMetricFactory = directType -> this.mreg.longAdderMetric(TcpCommunicationMetricsListener.receivedMessagesByTypeMetricName(directType), "Total number of messages with given type received by current node");
        this.sentMsgsCntByConsistentIdMetricFactory = consistentId -> (LongAdderMetric)mmgr.registry(MetricUtils.metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString())).findMetric("sentMessagesToNode");
        this.rcvdMsgsCntByConsistentIdMetricFactory = consistentId -> (LongAdderMetric)mmgr.registry(MetricUtils.metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString())).findMetric("receivedMessagesFromNode");
        this.sentBytesMetric = this.mreg.longAdderMetric("sentBytes", "Total number of bytes sent by current node");
        this.rcvdBytesMetric = this.mreg.longAdderMetric("receivedBytes", "Total number of bytes received by current node");
        this.sentMsgsMetric = this.mreg.longAdderMetric("sentMessagesCount", "Total number of messages sent by current node");
        this.rcvdMsgsMetric = this.mreg.longAdderMetric("receivedMessagesCount", "Total number of messages received by current node");
        mmgr.addMetricRegistryCreationListener(mreg -> {
            if (!mreg.name().startsWith(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME + ".")) {
                return;
            }
            ((MetricRegistry)mreg).longAdderMetric("sentMessagesToNode", "Total number of messages sent by current node to the given node");
            ((MetricRegistry)mreg).longAdderMetric("receivedMessagesFromNode", "Total number of messages received by current node from the given node");
        });
    }

    public MetricRegistry metricRegistry() {
        return this.mreg;
    }

    public void onMessageSent(Message msg, Object consistentId) {
        assert (msg != null);
        assert (consistentId != null);
        if (msg instanceof GridIoMessage) {
            msg = ((GridIoMessage)msg).message();
            this.updateMessageTypeMap(msg);
            this.sentMsgsMetric.increment();
            this.threadMetrics.get().onMessageSent(msg, consistentId);
        }
    }

    public void onMessageReceived(Message msg, Object consistentId) {
        assert (msg != null);
        assert (consistentId != null);
        if (msg instanceof GridIoMessage) {
            msg = ((GridIoMessage)msg).message();
            this.updateMessageTypeMap(msg);
            this.rcvdMsgsMetric.increment();
            this.threadMetrics.get().onMessageReceived(msg, consistentId);
        }
    }

    public int sentMessagesCount() {
        int res0 = (int)this.sentMsgsMetric.value();
        return res0 < 0 ? Integer.MAX_VALUE : res0;
    }

    public long sentBytesCount() {
        return this.sentBytesMetric.value();
    }

    public int receivedMessagesCount() {
        int res0 = (int)this.rcvdMsgsMetric.value();
        return res0 < 0 ? Integer.MAX_VALUE : res0;
    }

    public long receivedBytesCount() {
        return this.rcvdBytesMetric.value();
    }

    public Map<String, Long> receivedMessagesByType() {
        return this.collectMessagesCountByType("receivedMessagesByType.");
    }

    public Map<UUID, Long> receivedMessagesByNode() {
        return this.collectMessagesCountByNodeId("receivedMessagesFromNode");
    }

    public Map<String, Long> sentMessagesByType() {
        return this.collectMessagesCountByType("sentMessagesByType.");
    }

    public Map<UUID, Long> sentMessagesByNode() {
        return this.collectMessagesCountByNodeId("sentMessagesToNode");
    }

    protected Map<String, Long> collectMessagesCountByType(String prefix) {
        HashMap<String, Long> res = new HashMap<String, Long>();
        prefix = MetricUtils.metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME, prefix);
        for (Metric metric : this.mreg) {
            String typeName;
            if (!metric.name().startsWith(prefix)) continue;
            short directType = Short.parseShort(metric.name().substring(prefix.length()));
            Map<Short, String> msgTypMap0 = this.msgTypMap;
            if (msgTypMap0 == null || (typeName = msgTypMap0.get(directType)) == null) continue;
            res.put(typeName, ((LongMetric)metric).value());
        }
        return res;
    }

    protected Map<UUID, Long> collectMessagesCountByNodeId(String metricName) {
        HashMap<UUID, Long> res = new HashMap<UUID, Long>();
        Map<String, UUID> nodesMapping = this.ignite.cluster().nodes().stream().collect(Collectors.toMap(node -> node.consistentId().toString(), ClusterNode::id));
        String mregPrefix = TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME + ".";
        for (ReadOnlyMetricRegistry mreg : this.mmgr) {
            String nodeConsIdStr;
            UUID nodeId;
            if (!mreg.name().startsWith(mregPrefix) || (nodeId = nodesMapping.get(nodeConsIdStr = mreg.name().substring(mregPrefix.length()))) == null) continue;
            res.put(nodeId, ((LongMetric)mreg.findMetric(metricName)).value());
        }
        return res;
    }

    public void resetMetrics() {
        this.rcvdMsgsMetric.reset();
        this.sentMsgsMetric.reset();
        this.sentBytesMetric.reset();
        this.rcvdBytesMetric.reset();
        for (Metric metric : this.mreg) {
            if (metric.name().startsWith("sentMessagesByType")) {
                metric.reset();
                continue;
            }
            if (!metric.name().startsWith("receivedMessagesByType")) continue;
            metric.reset();
        }
        for (ReadOnlyMetricRegistry mreg : this.mmgr) {
            if (!mreg.name().startsWith(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME + ".")) continue;
            mreg.findMetric("sentMessagesToNode").reset();
            mreg.findMetric("receivedMessagesFromNode").reset();
        }
    }

    public void onNodeLeft(Object consistentId) {
        for (ThreadMetrics threadMetrics : this.allMetrics) {
            threadMetrics.sentMsgsMetricsByConsistentId = new HashMap<Object, LongAdderMetric>();
            threadMetrics.rcvdMsgsMetricsByConsistentId = new HashMap<Object, LongAdderMetric>();
        }
        this.mmgr.remove(MetricUtils.metricName(TcpCommunicationSpi.COMMUNICATION_METRICS_GROUP_NAME, consistentId.toString()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateMessageTypeMap(Message msg) {
        short typeId = msg.directType();
        Map<Short, String> msgTypMap0 = this.msgTypMap;
        if (msgTypMap0 == null || !msgTypMap0.containsKey(typeId)) {
            Object object = this.msgTypMapMux;
            synchronized (object) {
                if (this.msgTypMap == null) {
                    msgTypMap0 = new HashMap<Short, String>();
                    msgTypMap0.put(typeId, msg.getClass().getName());
                    this.msgTypMap = msgTypMap0;
                } else if (!this.msgTypMap.containsKey(typeId)) {
                    msgTypMap0 = new HashMap<Short, String>(this.msgTypMap);
                    msgTypMap0.put(typeId, msg.getClass().getName());
                    this.msgTypMap = msgTypMap0;
                }
            }
        }
    }

    public static String sentMessagesByTypeMetricName(Short directType) {
        return MetricUtils.metricName("sentMessagesByType", directType.toString());
    }

    public static String receivedMessagesByTypeMetricName(Short directType) {
        return MetricUtils.metricName("receivedMessagesByType", directType.toString());
    }

    private class ThreadMetrics {
        private final Map<Short, LongAdderMetric> sentMsgsMetricsByType = new HashMap<Short, LongAdderMetric>();
        private final Map<Short, LongAdderMetric> rcvdMsgsMetricsByType = new HashMap<Short, LongAdderMetric>();
        public volatile Map<Object, LongAdderMetric> sentMsgsMetricsByConsistentId = new HashMap<Object, LongAdderMetric>();
        public volatile Map<Object, LongAdderMetric> rcvdMsgsMetricsByConsistentId = new HashMap<Object, LongAdderMetric>();

        private ThreadMetrics() {
        }

        private void onMessageSent(Message msg, Object consistentId) {
            this.sentMsgsMetricsByType.computeIfAbsent(msg.directType(), TcpCommunicationMetricsListener.this.sentMsgsCntByTypeMetricFactory).increment();
            this.sentMsgsMetricsByConsistentId.computeIfAbsent(consistentId, TcpCommunicationMetricsListener.this.sentMsgsCntByConsistentIdMetricFactory).increment();
        }

        private void onMessageReceived(Message msg, Object consistentId) {
            this.rcvdMsgsMetricsByType.computeIfAbsent(msg.directType(), TcpCommunicationMetricsListener.this.rcvdMsgsCntByTypeMetricFactory).increment();
            this.rcvdMsgsMetricsByConsistentId.computeIfAbsent(consistentId, TcpCommunicationMetricsListener.this.rcvdMsgsCntByConsistentIdMetricFactory).increment();
        }
    }
}

