/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.engine.backend.orchestration.spi;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.search.engine.environment.thread.spi.ThreadPoolProvider;
import org.hibernate.search.engine.logging.impl.Log;
import org.hibernate.search.engine.reporting.FailureContext;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.AssertionFailure;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.impl.Futures;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

public final class BatchingExecutor<W extends WorkSet<? super P>, P extends WorkProcessor> {
    private final Log log = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private final String name;
    private final P processor;
    private final FailureHandler failureHandler;
    private final int maxTasksPerBatch;
    private final BlockingQueue<W> workQueue;
    private final List<W> workBuffer;
    private final AtomicReference<ProcessingStatus> processingStatus;
    private ExecutorService executorService;
    private ScheduledExecutorService scheduledExecutorService;
    private volatile CompletableFuture<?> completionFuture;
    private volatile ScheduledFuture<?> scheduledNextProcessing;

    public BatchingExecutor(String name, P processor, int maxTasksPerBatch, boolean fair, FailureHandler failureHandler) {
        this.name = name;
        this.processor = processor;
        this.failureHandler = failureHandler;
        this.maxTasksPerBatch = maxTasksPerBatch;
        this.workQueue = new ArrayBlockingQueue<W>(maxTasksPerBatch, fair);
        this.workBuffer = new ArrayList<W>(maxTasksPerBatch);
        this.processingStatus = new AtomicReference<ProcessingStatus>(ProcessingStatus.IDLE);
    }

    public synchronized void start(ThreadPoolProvider threadPoolProvider) {
        this.log.startingExecutor(this.name);
        this.executorService = threadPoolProvider.newFixedThreadPool(1, this.name);
        this.scheduledExecutorService = threadPoolProvider.getSharedScheduledThreadPool();
    }

    public synchronized void stop() {
        this.log.stoppingExecutor(this.name);
        try (Closer closer = new Closer();){
            closer.push(ExecutorService::shutdownNow, (Object)this.executorService);
            this.executorService = null;
            this.workQueue.clear();
            if (this.completionFuture != null) {
                this.completionFuture.cancel(false);
                this.completionFuture = null;
            }
        }
    }

    public void submit(W workset) throws InterruptedException {
        if (this.executorService == null) {
            throw new AssertionFailure("Attempt to submit a workset to executor '" + this.name + "', which is stopped There is probably a bug in Hibernate Search, please report it.");
        }
        this.workQueue.put(workset);
        this.ensureProcessingRunning();
    }

    public CompletableFuture<?> getCompletion() {
        CompletableFuture<?> future = this.completionFuture;
        if (future == null) {
            return CompletableFuture.completedFuture(null);
        }
        return future;
    }

    private void ensureProcessingRunning() {
        if (!this.processingStatus.compareAndSet(ProcessingStatus.IDLE, ProcessingStatus.RUNNING)) {
            return;
        }
        try {
            if (this.scheduledNextProcessing != null) {
                this.scheduledNextProcessing.cancel(false);
                this.scheduledNextProcessing = null;
            }
            if (this.completionFuture == null) {
                this.completionFuture = new CompletableFuture();
            }
            this.executorService.submit(this::process);
        }
        catch (Throwable e) {
            try {
                CompletableFuture<?> future = this.completionFuture;
                this.completionFuture = null;
                this.processingStatus.set(ProcessingStatus.IDLE);
                future.completeExceptionally(e);
            }
            catch (Throwable e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private void process() {
        try {
            this.workBuffer.clear();
            this.workQueue.drainTo(this.workBuffer, this.maxTasksPerBatch);
            if (!this.workBuffer.isEmpty()) {
                this.processBatch(this.workBuffer);
            }
        }
        catch (Throwable e) {
            FailureContext.Builder contextBuilder = FailureContext.builder();
            contextBuilder.throwable(e);
            contextBuilder.failingOperation("Work processing in executor '" + this.name + "'");
            this.failureHandler.handle(contextBuilder.build());
        }
        finally {
            try {
                if (this.workQueue.isEmpty()) {
                    this.handleCompletion();
                }
                this.processingStatus.set(ProcessingStatus.IDLE);
                if (!this.workQueue.isEmpty()) {
                    this.ensureProcessingRunning();
                }
            }
            catch (Throwable e) {
                FailureContext.Builder contextBuilder = FailureContext.builder();
                contextBuilder.throwable(e);
                contextBuilder.failingOperation("Handling post-execution in executor '" + this.name + "'");
                this.failureHandler.handle(contextBuilder.build());
            }
        }
    }

    private void processBatch(List<W> works) {
        this.processor.beginBatch();
        for (WorkSet workset : works) {
            try {
                workset.submitTo(this.processor);
            }
            catch (Throwable e) {
                workset.markAsFailed(e);
            }
        }
        CompletableFuture<?> batchFuture = this.processor.endBatch();
        Futures.unwrappedExceptionJoin(batchFuture);
    }

    private void handleCompletion() {
        long delay = 0L;
        try {
            delay = this.processor.completeOrDelay();
        }
        catch (Throwable e) {
            FailureContext.Builder contextBuilder = FailureContext.builder();
            contextBuilder.throwable(e);
            contextBuilder.failingOperation("Calling processor.complete() in executor '" + this.name + "'");
            this.failureHandler.handle(contextBuilder.build());
        }
        if (delay <= 0L) {
            CompletableFuture<?> justFinishedQueueFuture = this.completionFuture;
            this.completionFuture = null;
            justFinishedQueueFuture.complete(null);
        } else {
            this.scheduledNextProcessing = this.scheduledExecutorService.schedule(this::ensureProcessingRunning, delay, TimeUnit.MILLISECONDS);
        }
    }

    public static enum ProcessingStatus {
        IDLE,
        RUNNING;

    }

    public static interface WorkSet<P extends WorkProcessor> {
        public void submitTo(P var1);

        public void markAsFailed(Throwable var1);
    }

    public static interface WorkProcessor {
        public void beginBatch();

        public CompletableFuture<?> endBatch();

        public long completeOrDelay();
    }
}

