package com.baidu.hugegraph.computer.core.receiver;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.MessageHandler;
import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.receiver.edge.EdgeMessageRecvPartitions;
import com.baidu.hugegraph.computer.core.receiver.message.ComputeMessageRecvPartitions;
import com.baidu.hugegraph.computer.core.receiver.vertex.VertexMessageRecvPartitions;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.sort.sorting.SortManager;
import com.baidu.hugegraph.computer.core.store.FileManager;
import com.baidu.hugegraph.computer.core.store.SuperstepFileGenerator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/receiver/MessageRecvManager.class */
public class MessageRecvManager implements Manager, MessageHandler {
    public static final String NAME = "message_recv";
    private static final Logger LOG;
    private final ComputerContext context;
    private final FileManager fileManager;
    private final SortManager sortManager;
    private VertexMessageRecvPartitions vertexPartitions;
    private EdgeMessageRecvPartitions edgePartitions;
    private ComputeMessageRecvPartitions messagePartitions;
    private int workerCount;
    private int expectedFinishMessages;
    private CountDownLatch finishMessagesLatch;
    private long waitFinishMessagesTimeout;
    private long superstep = -1;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MessageRecvManager(ComputerContext computerContext, FileManager fileManager, SortManager sortManager) {
        this.context = computerContext;
        this.fileManager = fileManager;
        this.sortManager = sortManager;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        SuperstepFileGenerator superstepFileGenerator = new SuperstepFileGenerator(this.fileManager, -1);
        this.vertexPartitions = new VertexMessageRecvPartitions(this.context, superstepFileGenerator, this.sortManager);
        this.edgePartitions = new EdgeMessageRecvPartitions(this.context, superstepFileGenerator, this.sortManager);
        this.workerCount = ((Integer) config.get(ComputerOptions.JOB_WORKERS_COUNT)).intValue();
        this.expectedFinishMessages = this.workerCount * 2;
        this.finishMessagesLatch = new CountDownLatch(this.expectedFinishMessages);
        this.waitFinishMessagesTimeout = ((Long) config.get(ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT)).longValue();
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void beforeSuperstep(Config config, int i) {
        this.messagePartitions = new ComputeMessageRecvPartitions(this.context, new SuperstepFileGenerator(this.fileManager, i), this.sortManager);
        this.expectedFinishMessages = this.workerCount;
        this.finishMessagesLatch = new CountDownLatch(this.expectedFinishMessages);
        this.superstep = i;
        if (this.superstep == 0) {
            if (!$assertionsDisabled && this.vertexPartitions == null) {
                throw new AssertionError();
            }
            this.vertexPartitions.clearOldFiles(-1);
            this.vertexPartitions = null;
            if (!$assertionsDisabled && this.edgePartitions == null) {
                throw new AssertionError();
            }
            this.edgePartitions.clearOldFiles(-1);
            this.edgePartitions = null;
        }
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void afterSuperstep(Config config, int i) {
        if (i > 0) {
            this.messagePartitions.clearOldFiles(i - 1);
        }
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportHandler
    public void onChannelActive(ConnectionId connectionId) {
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportHandler
    public void onChannelInactive(ConnectionId connectionId) {
    }

    @Override // com.baidu.hugegraph.computer.core.network.TransportHandler
    public void exceptionCaught(TransportException transportException, ConnectionId connectionId) {
        LOG.warn("Exception caught for connection:{}, root cause:{}", connectionId, transportException);
    }

    public void waitReceivedAllMessages() {
        try {
            if (this.finishMessagesLatch.await(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS)) {
            } else {
                throw new ComputerException("Expect %s finish-messages received in %s ms, %s absence in superstep %s", Integer.valueOf(this.expectedFinishMessages), Long.valueOf(this.waitFinishMessagesTimeout), Long.valueOf(this.finishMessagesLatch.getCount()), Long.valueOf(this.superstep));
            }
        } catch (InterruptedException e) {
            throw new ComputerException("Thread is interrupted while waiting %s finish-messages received in %s ms, %s absence in superstep %s", e, Integer.valueOf(this.expectedFinishMessages), Long.valueOf(this.waitFinishMessagesTimeout), Long.valueOf(this.finishMessagesLatch.getCount()), Long.valueOf(this.superstep));
        }
    }

    @Override // com.baidu.hugegraph.computer.core.network.MessageHandler
    public void handle(MessageType messageType, int i, ManagedBuffer managedBuffer) {
        switch (messageType) {
            case VERTEX:
                this.vertexPartitions.addBuffer(i, managedBuffer);
                return;
            case EDGE:
                this.edgePartitions.addBuffer(i, managedBuffer);
                return;
            case MSG:
                this.messagePartitions.addBuffer(i, managedBuffer);
                return;
            default:
                throw new ComputerException("Unable handle ManagedBuffer with type '%s'", messageType.name());
        }
    }

    @Override // com.baidu.hugegraph.computer.core.network.MessageHandler
    public void onStarted(ConnectionId connectionId) {
        LOG.debug("ConnectionId {} started", connectionId);
    }

    @Override // com.baidu.hugegraph.computer.core.network.MessageHandler
    public void onFinished(ConnectionId connectionId) {
        LOG.debug("ConnectionId {} finished", connectionId);
        this.finishMessagesLatch.countDown();
    }

    public Map<Integer, PeekableIterator<KvEntry>> vertexPartitions() {
        E.checkState(this.vertexPartitions != null, "The vertexPartitions can't be null", new Object[0]);
        return this.vertexPartitions.iterators();
    }

    public Map<Integer, PeekableIterator<KvEntry>> edgePartitions() {
        E.checkState(this.edgePartitions != null, "The edgePartitions can't be null", new Object[0]);
        return this.edgePartitions.iterators();
    }

    public Map<Integer, PeekableIterator<KvEntry>> messagePartitions() {
        E.checkState(this.messagePartitions != null, "The messagePartitions can't be null", new Object[0]);
        ComputeMessageRecvPartitions computeMessageRecvPartitions = this.messagePartitions;
        this.messagePartitions = null;
        return computeMessageRecvPartitions.iterators();
    }

    public Map<Integer, MessageStat> messageStats() {
        waitReceivedAllMessages();
        E.checkState(this.messagePartitions != null, "The messagePartitions can't be null", new Object[0]);
        return this.messagePartitions.messageStats();
    }

    static {
        $assertionsDisabled = !MessageRecvManager.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) MessageRecvManager.class);
    }
}
