package com.baidu.hugegraph.computer.core.sort.sorting;

import com.baidu.hugegraph.computer.core.combiner.Combiner;
import com.baidu.hugegraph.computer.core.combiner.EdgeValueCombiner;
import com.baidu.hugegraph.computer.core.combiner.MessageValueCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.combiner.VertexValueCombiner;
import com.baidu.hugegraph.computer.core.common.ComputerContext;
import com.baidu.hugegraph.computer.core.common.exception.ComputerException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.io.RandomAccessOutput;
import com.baidu.hugegraph.computer.core.manager.Manager;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.sender.WriteBuffers;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterImpl;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.hgkvfile.entry.KvEntry;
import com.baidu.hugegraph.util.ExecutorUtil;
import com.baidu.hugegraph.util.Log;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;

/* loaded from: input_file:com/baidu/hugegraph/computer/core/sort/sorting/SortManager.class */
public abstract class SortManager implements Manager {
    public static final Logger LOG = Log.logger((Class<?>) SortManager.class);
    private final ComputerContext context;
    private final ExecutorService sortExecutor;
    private final Sorter sorter;
    private final int capacity;
    private final int flushThreshold;

    public SortManager(ComputerContext computerContext) {
        this.context = computerContext;
        Config config = computerContext.config();
        this.sortExecutor = ExecutorUtil.newFixedThreadPool(threadNum(config).intValue(), threadPrefix());
        this.sorter = new SorterImpl(config);
        this.capacity = ((Integer) config.get(ComputerOptions.WORKER_WRITE_BUFFER_INIT_CAPACITY)).intValue();
        this.flushThreshold = ((Integer) config.get(ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX)).intValue();
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public abstract String name();

    protected abstract String threadPrefix();

    /* JADX INFO: Access modifiers changed from: protected */
    public Integer threadNum(Config config) {
        return (Integer) config.get(ComputerOptions.SORT_THREAD_NUMS);
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void init(Config config) {
    }

    @Override // com.baidu.hugegraph.computer.core.manager.Manager
    public void close(Config config) {
        this.sortExecutor.shutdown();
        try {
            this.sortExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOG.warn("Interrupted when waiting sort executor terminated");
        }
    }

    public CompletableFuture<ByteBuffer> sort(MessageType messageType, WriteBuffers writeBuffers) {
        return CompletableFuture.supplyAsync(() -> {
            RandomAccessInput wrapForRead = writeBuffers.wrapForRead();
            BytesOutput createBytesOutput = IOFactory.createBytesOutput(this.capacity);
            try {
                this.sorter.sortBuffer(wrapForRead, createSortFlusher(messageType, createBytesOutput, this.flushThreshold), messageType == MessageType.EDGE);
                return ByteBuffer.wrap(createBytesOutput.buffer(), 0, (int) createBytesOutput.position());
            } catch (Exception e) {
                throw new ComputerException("Failed to sort buffers of %s message", e, messageType.name());
            }
        }, this.sortExecutor);
    }

    public CompletableFuture<Void> mergeBuffers(List<RandomAccessInput> list, String str, boolean z, OuterSortFlusher outerSortFlusher) {
        return CompletableFuture.runAsync(() -> {
            if (z) {
                outerSortFlusher.sources(list.size());
            }
            try {
                this.sorter.mergeBuffers(list, outerSortFlusher, str, z);
            } catch (Exception e) {
                throw new ComputerException("Failed to merge %s buffers to file '%s'", e, Integer.valueOf(list.size()), str);
            }
        }, this.sortExecutor);
    }

    public void mergeInputs(List<String> list, List<String> list2, boolean z, OuterSortFlusher outerSortFlusher) {
        if (z) {
            outerSortFlusher.sources(list.size());
        }
        try {
            this.sorter.mergeInputs(list, outerSortFlusher, list2, z);
        } catch (Exception e) {
            throw new ComputerException("Failed to merge %s files into %s files", e, Integer.valueOf(list.size()), Integer.valueOf(list2.size()));
        }
    }

    public PeekableIterator<KvEntry> iterator(List<String> list, boolean z) {
        try {
            return this.sorter.iterator(list, z);
        } catch (IOException e) {
            throw new ComputerException("Failed to iterate files: '%s'", list);
        }
    }

    private InnerSortFlusher createSortFlusher(MessageType messageType, RandomAccessOutput randomAccessOutput, int i) {
        PointerCombiner createMessageCombiner;
        boolean z;
        switch (messageType) {
            case VERTEX:
                createMessageCombiner = new VertexValueCombiner(this.context);
                z = false;
                break;
            case EDGE:
                createMessageCombiner = new EdgeValueCombiner(this.context);
                z = true;
                break;
            case MSG:
                createMessageCombiner = createMessageCombiner();
                z = false;
                break;
            default:
                throw new ComputerException("Unsupported combine message type for %s", messageType);
        }
        return createMessageCombiner == null ? new KvInnerSortFlusher(randomAccessOutput) : z ? new CombineSubKvInnerSortFlusher(randomAccessOutput, createMessageCombiner, i) : new CombineKvInnerSortFlusher(randomAccessOutput, createMessageCombiner);
    }

    private PointerCombiner createMessageCombiner() {
        if (((Combiner) this.context.config().createObject(ComputerOptions.WORKER_COMBINER_CLASS, false)) == null) {
            return null;
        }
        return new MessageValueCombiner(this.context);
    }
}
