package com.baidu.hugegraph.computer.core.receiver;

import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.network.buffer.ManagedBuffer;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.sort.sorting.SortManager;
import com.baidu.hugegraph.computer.core.store.SuperstepFileGenerator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.computer.core.util.FileUtil;
import com.baidu.hugegraph.util.Log;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/receiver/MessageRecvPartition.class */
public abstract class MessageRecvPartition {
    public static final Logger LOG;
    private MessageRecvBuffers recvBuffers;
    private MessageRecvBuffers sortBuffers;
    private final SortManager sortManager;
    private List<String> outputFiles;
    private final SuperstepFileGenerator fileGenerator;
    private final boolean withSubKv;
    private final int mergeFileNum;
    private long totalBytes;
    private final AtomicReference<Throwable> exception;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MessageRecvPartition(Config config, SuperstepFileGenerator superstepFileGenerator, SortManager sortManager, boolean z) {
        this.fileGenerator = superstepFileGenerator;
        this.sortManager = sortManager;
        this.withSubKv = z;
        long longValue = ((Long) config.get(ComputerOptions.WORKER_RECEIVED_BUFFERS_BYTES_LIMIT)).longValue();
        long longValue2 = ((Long) config.get(ComputerOptions.WORKER_WAIT_SORT_TIMEOUT)).longValue();
        this.mergeFileNum = ((Integer) config.get(ComputerOptions.HGKV_MERGE_FILES_NUM)).intValue();
        this.recvBuffers = new MessageRecvBuffers(longValue, longValue2);
        this.sortBuffers = new MessageRecvBuffers(longValue, longValue2);
        this.outputFiles = new ArrayList();
        this.totalBytes = 0L;
        this.exception = new AtomicReference<>();
    }

    public synchronized void addBuffer(ManagedBuffer managedBuffer) {
        this.totalBytes += managedBuffer.length();
        this.recvBuffers.addBuffer(managedBuffer);
        if (this.recvBuffers.full()) {
            this.sortBuffers.waitSorted();
            swapReceiveAndSortBuffers();
            flushSortBuffersAsync();
        }
    }

    public synchronized PeekableIterator<KvEntry> iterator() {
        flushAllBuffersAndWaitSorted();
        mergeOutputFilesIfNeeded();
        return this.outputFiles.size() == 0 ? PeekableIterator.emptyIterator() : this.sortManager.iterator(this.outputFiles, this.withSubKv);
    }

    public synchronized long totalBytes() {
        return this.totalBytes;
    }

    public synchronized MessageStat messageStat() {
        return new MessageStat(0L, this.totalBytes);
    }

    protected abstract OuterSortFlusher outerSortFlusher();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String type();

    private void flushAllBuffersAndWaitSorted() {
        this.sortBuffers.waitSorted();
        if (this.recvBuffers.totalBytes() > 0) {
            swapReceiveAndSortBuffers();
            flushSortBuffersAsync();
            this.sortBuffers.waitSorted();
        }
        checkException();
    }

    private void flushSortBuffersAsync() {
        String nextPath = this.fileGenerator.nextPath(type());
        mergeBuffersAsync(this.sortBuffers, nextPath);
        this.outputFiles.add(nextPath);
    }

    private void mergeBuffersAsync(MessageRecvBuffers messageRecvBuffers, String str) {
        checkException();
        this.sortManager.mergeBuffers(messageRecvBuffers.buffers(), str, this.withSubKv, outerSortFlusher()).whenComplete((r6, th) -> {
            if (th != null) {
                LOG.error("Failed to merge buffers", th);
                this.exception.compareAndSet(null, th);
            }
            messageRecvBuffers.signalSorted();
        });
    }

    private void swapReceiveAndSortBuffers() {
        if (!$assertionsDisabled && this.recvBuffers.totalBytes() <= 0) {
            throw new AssertionError();
        }
        MessageRecvBuffers messageRecvBuffers = this.recvBuffers;
        this.recvBuffers = this.sortBuffers;
        this.sortBuffers = messageRecvBuffers;
        this.recvBuffers.prepareSort();
    }

    private void mergeOutputFilesIfNeeded() {
        if (this.outputFiles.size() <= 1) {
            return;
        }
        int i = this.mergeFileNum;
        List<String> genOutputFileNames = genOutputFileNames(1);
        this.sortManager.mergeInputs(this.outputFiles, genOutputFileNames, this.withSubKv, outerSortFlusher());
        FileUtil.deleteFilesQuietly(this.outputFiles);
        this.outputFiles = genOutputFileNames;
    }

    private List<String> genOutputFileNames(int i) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(this.fileGenerator.nextPath(type()));
        }
        return arrayList;
    }

    private void checkException() {
        Throwable th = this.exception.get();
        if (th != null) {
            throw new ComputerException(th.getMessage(), th);
        }
    }

    static {
        $assertionsDisabled = !MessageRecvPartition.class.desiredAssertionStatus();
        LOG = Log.logger((Class<?>) MessageRecvPartition.class);
    }
}
