/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.commons.concurrent;

import cn.ponfee.commons.concurrent.ThreadPoolExecutors;
import cn.ponfee.commons.concurrent.ThreadPoolMonitor;
import cn.ponfee.commons.concurrent.Threads;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

public final class AsyncBatchProcessor<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchProcessor.class);
    private final AsyncBatchThread<T> async;

    public AsyncBatchProcessor(BatchProcessor<T> processor) {
        this(processor, 100, 200, 2);
    }

    public AsyncBatchProcessor(BatchProcessor<T> processor, int periodTimeMillis, int batchSize, int maximumPoolSize) {
        this.async = new AsyncBatchThread(processor, periodTimeMillis, batchSize, maximumPoolSize);
    }

    public boolean put(T element) {
        return !((AsyncBatchThread)this.async).stopped.get() && ((AsyncBatchThread)this.async).queue.offer(element);
    }

    public boolean put(T[] elements) {
        if (((AsyncBatchThread)this.async).stopped.get() || elements == null || elements.length == 0) {
            return false;
        }
        for (T element : elements) {
            if (((AsyncBatchThread)this.async).queue.offer(element)) continue;
            return false;
        }
        return true;
    }

    public boolean put(List<T> elements) {
        if (((AsyncBatchThread)this.async).stopped.get() || elements == null || elements.isEmpty()) {
            return false;
        }
        for (T element : elements) {
            if (((AsyncBatchThread)this.async).queue.offer(element)) continue;
            return false;
        }
        return true;
    }

    public boolean stop() {
        return ((AsyncBatchThread)this.async).stopped.compareAndSet(false, true);
    }

    public void stopAndAwait() throws InterruptedException {
        this.stop();
        while (!Threads.isStopped(this.async)) {
            Thread.sleep(((AsyncBatchThread)this.async).periodTimeMillis);
        }
    }

    @FunctionalInterface
    public static interface BatchProcessor<T> {
        public void process(List<T> var1, boolean var2);
    }

    private static class AsyncBatchThread<T>
    extends Thread {
        private static final int MINIMUM_PERIOD_TIME_MILLIS = 9;
        private final LinkedBlockingQueue<T> queue = new LinkedBlockingQueue();
        private final AtomicBoolean stopped = new AtomicBoolean(false);
        private final BatchProcessor<T> processor;
        private final int periodTimeMillis;
        private final int sleepTimeMillis;
        private final int batchSize;
        private final int asyncExecuteThreshold;
        private final int maximumPoolSize;
        private long nextRefreshTimeMillis = 0L;

        private AsyncBatchThread(BatchProcessor<T> processor, int periodTimeMillis, int batchSize, int maximumPoolSize) {
            Assert.isTrue((periodTimeMillis >= 9 ? 1 : 0) != 0, (String)("Period time millis must greater than 9, but actual " + periodTimeMillis));
            Assert.isTrue((batchSize > 0 ? 1 : 0) != 0, (String)"Batch size cannot negative number.");
            Assert.isTrue((maximumPoolSize > 0 ? 1 : 0) != 0, (String)"Maximum pool size cannot negative number.");
            this.processor = processor;
            this.periodTimeMillis = periodTimeMillis;
            this.sleepTimeMillis = periodTimeMillis >>> 1;
            this.batchSize = batchSize;
            this.asyncExecuteThreshold = batchSize + (batchSize >>> 1);
            this.maximumPoolSize = maximumPoolSize;
            super.setName("async-batch-processor-thread-" + Integer.toHexString(this.hashCode()));
            super.setDaemon(false);
            super.start();
        }

        @Override
        public void run() {
            AbstractExecutorService asyncExecutor = null;
            ArrayList list = new ArrayList(this.batchSize);
            int left = this.batchSize;
            while (true) {
                if (this.isEnd()) {
                    if (asyncExecutor != null) {
                        try {
                            Thread.sleep(this.periodTimeMillis);
                        }
                        catch (InterruptedException e) {
                            LOG.error("Thread#sleep occur error.", (Throwable)e);
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (this.isEnd()) {
                        if (asyncExecutor == null) break;
                        ThreadPoolExecutors.shutdown(asyncExecutor);
                        break;
                    }
                }
                if (!this.queue.isEmpty() && left > 0) {
                    left -= this.queue.drainTo(list, left);
                }
                long currentTimeMillis = System.currentTimeMillis();
                if (left == 0 || !list.isEmpty() && (this.stopped.get() || currentTimeMillis >= this.nextRefreshTimeMillis)) {
                    if (asyncExecutor == null && left == 0 && this.queue.size() > this.asyncExecuteThreshold) {
                        asyncExecutor = ThreadPoolExecutors.create(1, this.maximumPoolSize, 300L, 2, "async-batch-processor-worker", ThreadPoolExecutors.ALWAYS_CALLER_RUNS);
                        LOG.info("Asnyc batch processor created thread pool executor: {}", (Object)new ThreadPoolMonitor((ThreadPoolExecutor)asyncExecutor));
                    }
                    if (asyncExecutor != null) {
                        ArrayList data = list;
                        asyncExecutor.submit(() -> this.processor.process(data, this.isEnd()));
                        left = this.batchSize;
                        list = new ArrayList(left);
                    } else {
                        this.processor.process(list, this.isEnd());
                        list.clear();
                        left = this.batchSize;
                    }
                    this.nextRefreshTimeMillis = currentTimeMillis + (long)this.periodTimeMillis;
                    continue;
                }
                if (this.stopped.get()) continue;
                try {
                    Thread.sleep(this.sleepTimeMillis);
                }
                catch (InterruptedException e) {
                    LOG.error("Thread#sleep occur error.", (Throwable)e);
                    this.stopped.compareAndSet(false, true);
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        private boolean isEnd() {
            return this.stopped.get() && this.queue.isEmpty();
        }
    }
}

