package com.baidu.hugegraph.computer.core.sender;

import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.id.Id;
import com.baidu.hugegraph.computer.core.graph.partition.Partitioner;
import com.baidu.hugegraph.computer.core.graph.value.Value;
import com.baidu.hugegraph.computer.core.graph.vertex.Vertex;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.receiver.MessageStat;
import com.baidu.hugegraph.computer.core.sort.sorting.SortManager;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/sender/MessageSendManager.class */
public class MessageSendManager implements Manager {
    public static final Logger LOG = Log.logger((Class<?>) MessageSendManager.class);
    public static final String NAME = "message_send";
    private final MessageSendBuffers buffers;
    private final Partitioner partitioner;
    private final SortManager sortManager;
    private final MessageSender sender;
    private final AtomicReference<Throwable> exception = new AtomicReference<>();
    private final TransportConf transportConf;

    public MessageSendManager(ComputerContext computerContext, SortManager sortManager, MessageSender messageSender) {
        this.buffers = new MessageSendBuffers(computerContext);
        this.partitioner = (Partitioner) computerContext.config().createObject(ComputerOptions.WORKER_PARTITIONER);
        this.transportConf = TransportConf.wrapConfig(computerContext.config());
        this.sortManager = sortManager;
        this.sender = messageSender;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public String name() {
        return NAME;
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
        this.partitioner.init(config);
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
    }

    public void sendVertex(Vertex vertex) {
        checkException();
        try {
            sortIfTargetBufferIsFull(vertex.id(), MessageType.VERTEX).writeVertex(vertex);
        } catch (IOException e) {
            throw new ComputerException("Failed to write vertex '%s'", e, vertex.id());
        }
    }

    public void sendEdge(Vertex vertex) {
        checkException();
        try {
            sortIfTargetBufferIsFull(vertex.id(), MessageType.EDGE).writeEdges(vertex);
        } catch (IOException e) {
            throw new ComputerException("Failed to write edges of vertex '%s'", e, vertex.id());
        }
    }

    public void sendMessage(Id id, Value value) {
        checkException();
        try {
            sortIfTargetBufferIsFull(id, MessageType.MSG).writeMessage(id, value);
        } catch (IOException e) {
            throw new ComputerException("Failed to write message", e);
        }
    }

    public void startSend(MessageType messageType) {
        Map<Integer, WriteBuffers> all = this.buffers.all();
        all.values().stream().forEach((v0) -> {
            v0.resetMessageWritten();
        });
        Stream<Integer> stream = all.keySet().stream();
        Partitioner partitioner = this.partitioner;
        partitioner.getClass();
        sendControlMessageToWorkers((Set) stream.map((v1) -> {
            return r1.workerId(v1);
        }).collect(Collectors.toSet()), MessageType.START);
        LOG.info("Start sending message(type={})", messageType);
    }

    public void finishSend(MessageType messageType) {
        Map<Integer, WriteBuffers> all = this.buffers.all();
        MessageStat sortAndSendLastBuffer = sortAndSendLastBuffer(all, messageType);
        Stream<Integer> stream = all.keySet().stream();
        Partitioner partitioner = this.partitioner;
        partitioner.getClass();
        sendControlMessageToWorkers((Set) stream.map((v1) -> {
            return r1.workerId(v1);
        }).collect(Collectors.toSet()), MessageType.FINISH);
        LOG.info("Finish sending message(type={},count={},bytes={})", messageType, Long.valueOf(sortAndSendLastBuffer.messageCount()), Long.valueOf(sortAndSendLastBuffer.messageBytes()));
    }

    public MessageStat messageStat(int i) {
        return this.buffers.get(i).messageWritten();
    }

    private WriteBuffers sortIfTargetBufferIsFull(Id id, MessageType messageType) {
        int partitionId = this.partitioner.partitionId(id);
        WriteBuffers writeBuffers = this.buffers.get(partitionId);
        if (writeBuffers.reachThreshold()) {
            writeBuffers.switchForSorting();
            sortThenSend(partitionId, messageType, writeBuffers);
        }
        return writeBuffers;
    }

    private Future<?> sortThenSend(int i, MessageType messageType, WriteBuffers writeBuffers) {
        int workerId = this.partitioner.workerId(i);
        return this.sortManager.sort(messageType, writeBuffers).thenAccept(byteBuffer -> {
            writeBuffers.finishSorting();
            try {
                this.sender.send(workerId, new QueuedMessage(i, messageType, byteBuffer));
            } catch (InterruptedException e) {
                throw new ComputerException("Interrupted when waiting to put buffer into queue");
            }
        }).whenComplete((r5, th) -> {
            if (th != null) {
                LOG.error("Failed to sort buffer or put sorted buffer into queue", th);
                this.exception.compareAndSet(null, th);
            }
        });
    }

    private MessageStat sortAndSendLastBuffer(Map<Integer, WriteBuffers> map, MessageType messageType) {
        MessageStat messageStat = new MessageStat();
        ArrayList arrayList = new ArrayList(map.size());
        for (Map.Entry<Integer, WriteBuffers> entry : map.entrySet()) {
            int intValue = entry.getKey().intValue();
            WriteBuffers value = entry.getValue();
            if (!value.isEmpty()) {
                value.prepareSorting();
                arrayList.add(sortThenSend(intValue, messageType, value));
            }
            messageStat.increase(value.messageWritten());
        }
        checkException();
        try {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get(300L, TimeUnit.SECONDS);
            }
            return messageStat;
        } catch (InterruptedException | ExecutionException e) {
            throw new ComputerException("Failed to wait for sorting task to finished", e);
        } catch (TimeoutException e2) {
            throw new ComputerException("Timed out to wait for sorting task to finished", e2);
        }
    }

    private void sendControlMessageToWorkers(Set<Integer> set, MessageType messageType) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                arrayList.add(this.sender.send(it.next().intValue(), messageType));
            }
            long timeoutFinishSession = messageType == MessageType.FINISH ? this.transportConf.timeoutFinishSession() : this.transportConf.timeoutSyncRequest();
            try {
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    ((CompletableFuture) it2.next()).get(timeoutFinishSession, TimeUnit.MILLISECONDS);
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new ComputerException("Failed to wait for controling message(%s) to finished", e, messageType);
            } catch (TimeoutException e2) {
                throw new ComputerException("Timeout(%sms) to wait for controling message(%s) to finished", e2, Long.valueOf(timeoutFinishSession), messageType);
            }
        } catch (InterruptedException e3) {
            throw new ComputerException("Interrupted when waiting to send message async");
        }
    }

    private void checkException() {
        if (this.exception.get() != null) {
            throw new ComputerException("Failed to send message", this.exception.get());
        }
    }
}
