package com.baidu.hugegraph.computer.core.sender;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.common.exception.TransportException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.network.TransportClient;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.concurrent.BarrierEvent;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/sender/QueuedMessageSender.class */
public class QueuedMessageSender implements MessageSender {
    public static final Logger LOG;
    private static final String NAME = "send-executor";
    private final WorkerChannel[] channels;
    private final Thread sendExecutor = new Thread(new Sender(), NAME);
    private final BarrierEvent anyQueueNotEmptyEvent = new BarrierEvent();
    private final BarrierEvent anyClientNotBusyEvent = new BarrierEvent();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/baidu/hugegraph/computer/core/sender/QueuedMessageSender$Sender.class */
    private class Sender implements Runnable {
        private Sender() {
        }

        @Override // java.lang.Runnable
        public void run() {
            QueuedMessageSender.LOG.info("The send-executor is running");
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                try {
                    int i = 0;
                    int i2 = 0;
                    for (WorkerChannel workerChannel : QueuedMessageSender.this.channels) {
                        QueuedMessage peek = workerChannel.queue.peek();
                        if (peek == null) {
                            i++;
                        } else if (workerChannel.doSend(peek)) {
                            workerChannel.queue.take();
                        } else {
                            i2++;
                        }
                    }
                    int length = QueuedMessageSender.this.channels.length;
                    if (i >= length) {
                        QueuedMessageSender.LOG.debug("The send executor was blocked to wait any queue not empty");
                        QueuedMessageSender.this.waitAnyQueueNotEmpty();
                    }
                    if (i2 >= length) {
                        QueuedMessageSender.LOG.debug("The send executor was blocked to wait any client not busy");
                        QueuedMessageSender.this.waitAnyClientNotBusy();
                    }
                } catch (TransportException e) {
                    throw new ComputerException("Failed to send message", e);
                } catch (InterruptedException e2) {
                    currentThread.interrupt();
                    if (QueuedMessageSender.this.activeClientCount() > 0) {
                        throw new ComputerException("Interrupted when waiting for message queue not empty");
                    }
                }
            }
            QueuedMessageSender.LOG.info("The send-executor is terminated");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/hugegraph/computer/core/sender/QueuedMessageSender$WorkerChannel.class */
    public static class WorkerChannel {
        private final int workerId;
        private final MessageQueue queue;
        private final TransportClient client;
        private final AtomicReference<CompletableFuture<Void>> futureRef = new AtomicReference<>();
        static final /* synthetic */ boolean $assertionsDisabled;

        public WorkerChannel(int i, MessageQueue messageQueue, TransportClient transportClient) {
            this.workerId = i;
            this.queue = messageQueue;
            this.client = transportClient;
        }

        public CompletableFuture<Void> newFuture() {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            if (this.futureRef.compareAndSet(null, completableFuture)) {
                return completableFuture;
            }
            throw new ComputerException("The origin future must be null");
        }

        public void resetFuture(CompletableFuture<Void> completableFuture) {
            if (!this.futureRef.compareAndSet(completableFuture, null)) {
                throw new ComputerException("Failed to reset futureRef, expect future object is %s, but some thread modified it", completableFuture);
            }
        }

        public boolean doSend(QueuedMessage queuedMessage) throws TransportException, InterruptedException {
            switch (queuedMessage.type()) {
                case START:
                    sendStartMessage();
                    return true;
                case FINISH:
                    sendFinishMessage();
                    return true;
                default:
                    return sendDataMessage(queuedMessage);
            }
        }

        public void sendStartMessage() throws TransportException {
            this.client.startSessionAsync().whenComplete((r5, th) -> {
                CompletableFuture<Void> completableFuture = this.futureRef.get();
                if (!$assertionsDisabled && completableFuture == null) {
                    throw new AssertionError();
                }
                if (th != null) {
                    QueuedMessageSender.LOG.info("Failed to start session connected to {}", this);
                    completableFuture.completeExceptionally(th);
                } else {
                    QueuedMessageSender.LOG.info("Start session connected to {}", this);
                    completableFuture.complete(null);
                }
            });
        }

        public void sendFinishMessage() throws TransportException {
            this.client.finishSessionAsync().whenComplete((r5, th) -> {
                CompletableFuture<Void> completableFuture = this.futureRef.get();
                if (!$assertionsDisabled && completableFuture == null) {
                    throw new AssertionError();
                }
                if (th != null) {
                    QueuedMessageSender.LOG.info("Failed to finish session connected to {}", this);
                    completableFuture.completeExceptionally(th);
                } else {
                    QueuedMessageSender.LOG.info("Finish session connected to {}", this);
                    completableFuture.complete(null);
                }
            });
        }

        public boolean sendDataMessage(QueuedMessage queuedMessage) throws TransportException {
            return this.client.send(queuedMessage.type(), queuedMessage.partitionId(), queuedMessage.buffer());
        }

        public String toString() {
            return String.format("workerId=%s(remoteAddress=%s)", Integer.valueOf(this.workerId), this.client.remoteAddress());
        }

        static {
            $assertionsDisabled = !QueuedMessageSender.class.desiredAssertionStatus();
        }
    }

    public QueuedMessageSender(Config config) {
        this.channels = new WorkerChannel[((Integer) config.get(ComputerOptions.JOB_WORKERS_COUNT)).intValue()];
    }

    public void init() {
        for (WorkerChannel workerChannel : this.channels) {
            E.checkNotNull(workerChannel, "channel");
        }
        this.sendExecutor.start();
    }

    public void close() {
        this.sendExecutor.interrupt();
        try {
            this.sendExecutor.join();
        } catch (InterruptedException e) {
            throw new ComputerException("Interrupted when waiting for send-executor to stop", e);
        }
    }

    public void addWorkerClient(int i, TransportClient transportClient) {
        BarrierEvent barrierEvent = this.anyQueueNotEmptyEvent;
        barrierEvent.getClass();
        this.channels[channelId(i)] = new WorkerChannel(i, new MessageQueue(barrierEvent::signal), transportClient);
        LOG.info("Add client {} for worker {}", transportClient.connectionId(), Integer.valueOf(i));
    }

    @Override // com.baidu.hugegraph.computer.core.sender.MessageSender
    public CompletableFuture<Void> send(int i, MessageType messageType) throws InterruptedException {
        WorkerChannel workerChannel = this.channels[channelId(i)];
        CompletableFuture<Void> newFuture = workerChannel.newFuture();
        newFuture.whenComplete((r5, th) -> {
            workerChannel.resetFuture(newFuture);
        });
        workerChannel.queue.put(new QueuedMessage(-1, messageType, null));
        return newFuture;
    }

    @Override // com.baidu.hugegraph.computer.core.sender.MessageSender
    public void send(int i, QueuedMessage queuedMessage) throws InterruptedException {
        this.channels[channelId(i)].queue.put(queuedMessage);
    }

    public Runnable notBusyNotifier() {
        BarrierEvent barrierEvent = this.anyClientNotBusyEvent;
        barrierEvent.getClass();
        return barrierEvent::signal;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitAnyQueueNotEmpty() {
        try {
            this.anyQueueNotEmptyEvent.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            this.anyQueueNotEmptyEvent.reset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitAnyClientNotBusy() {
        try {
            try {
                this.anyClientNotBusyEvent.await();
                this.anyClientNotBusyEvent.reset();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ComputerException("Interrupted when waiting any client not busy");
            }
        } catch (Throwable th) {
            this.anyClientNotBusyEvent.reset();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int activeClientCount() {
        int i = 0;
        for (WorkerChannel workerChannel : this.channels) {
            if (workerChannel.client.sessionActive()) {
                i++;
            }
        }
        return i;
    }

    private static int channelId(int i) {
        if ($assertionsDisabled || i > 0) {
            return i - 1;
        }
        throw new AssertionError();
    }

    static {
        $assertionsDisabled = !QueuedMessageSender.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) QueuedMessageSender.class);
    }
}
