/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.datax.core.taskgroup;

import com.alibaba.datax.common.exception.CommonErrorCode;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.dataxservice.face.domain.enums.State;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskMonitor {
    private static final Logger LOG = LoggerFactory.getLogger(TaskMonitor.class);
    private static final TaskMonitor instance = new TaskMonitor();
    private static long EXPIRED_TIME = 172800000L;
    private ConcurrentHashMap<Integer, TaskCommunication> tasks = new ConcurrentHashMap();

    private TaskMonitor() {
    }

    public static TaskMonitor getInstance() {
        return instance;
    }

    public void registerTask(Integer taskid, Communication communication) {
        if (communication.isFinished()) {
            return;
        }
        this.tasks.putIfAbsent(taskid, new TaskCommunication(taskid, communication));
    }

    public void removeTask(Integer taskid) {
        this.tasks.remove(taskid);
    }

    public void report(Integer taskid, Communication communication) {
        if (communication.isFinished()) {
            return;
        }
        if (!this.tasks.containsKey(taskid)) {
            LOG.warn("unexpected: taskid({}) missed.", (Object)taskid);
            this.tasks.putIfAbsent(taskid, new TaskCommunication(taskid, communication));
        } else {
            this.tasks.get(taskid).report(communication);
        }
    }

    public TaskCommunication getTaskCommunication(Integer taskid) {
        return this.tasks.get(taskid);
    }

    public static class TaskCommunication {
        private Integer taskid;
        private long lastAllReadRecords = -1L;
        private long lastUpdateComunicationTS;
        private long ttl;

        private TaskCommunication(Integer taskid, Communication communication) {
            this.taskid = taskid;
            this.lastAllReadRecords = CommunicationTool.getTotalReadRecords(communication);
            this.lastUpdateComunicationTS = this.ttl = System.currentTimeMillis();
        }

        public void report(Communication communication) {
            this.ttl = System.currentTimeMillis();
            if (CommunicationTool.getTotalReadRecords(communication) > this.lastAllReadRecords) {
                this.lastAllReadRecords = CommunicationTool.getTotalReadRecords(communication);
                this.lastUpdateComunicationTS = this.ttl;
            } else if (this.isExpired(this.lastUpdateComunicationTS)) {
                communication.setState(State.FAILED);
                communication.setTimestamp(this.ttl);
                communication.setThrowable(DataXException.asDataXException((ErrorCode)CommonErrorCode.TASK_HUNG_EXPIRED, String.format("task(%s) hung expired [allReadRecord(%s), elased(%s)]", this.taskid, this.lastAllReadRecords, this.ttl - this.lastUpdateComunicationTS)));
            }
        }

        private boolean isExpired(long lastUpdateComunicationTS) {
            return System.currentTimeMillis() - lastUpdateComunicationTS > EXPIRED_TIME;
        }

        public Integer getTaskid() {
            return this.taskid;
        }

        public long getLastAllReadRecords() {
            return this.lastAllReadRecords;
        }

        public long getLastUpdateComunicationTS() {
            return this.lastUpdateComunicationTS;
        }

        public long getTtl() {
            return this.ttl;
        }
    }
}

