/*
 * Decompiled with CFR 0.152.
 */
package org.xbib.catalog.entities;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xbib.catalog.entities.Worker;
import org.xbib.catalog.entities.WorkerPool;
import org.xbib.catalog.entities.WorkerPoolListener;

public abstract class AbstractWorkerPool<R>
implements WorkerPool<R>,
AutoCloseable {
    private static final int DEFAULT_WAIT_SECONDS = 30;
    private final BlockingQueue<R> queue;
    private final ThreadPoolWorkerExecutor executor;
    private final int workerCount;
    private final int waitSeconds;
    private final AtomicBoolean isClosed;
    private final CountDownLatch latch;
    private final Map<Runnable, Throwable> exceptions;
    private final WorkerPoolListener<WorkerPool<R>> listener;
    private final AtomicLong counter;

    public AbstractWorkerPool() {
        this(Runtime.getRuntime().availableProcessors());
    }

    public AbstractWorkerPool(int workerCount) {
        this(workerCount, null);
    }

    public AbstractWorkerPool(int workerCount, WorkerPoolListener<WorkerPool<R>> listener) {
        this(workerCount, listener, 30);
    }

    public AbstractWorkerPool(int workerCount, WorkerPoolListener<WorkerPool<R>> listener, int waitSeconds) {
        this.workerCount = workerCount;
        this.waitSeconds = waitSeconds;
        this.listener = listener;
        this.queue = new SynchronousQueue<R>(true);
        this.executor = new ThreadPoolWorkerExecutor(workerCount);
        this.isClosed = new AtomicBoolean(true);
        this.latch = new CountDownLatch(workerCount);
        this.exceptions = new ConcurrentHashMap<Runnable, Throwable>();
        this.counter = new AtomicLong();
    }

    @Override
    public WorkerPool<R> open() {
        if (this.isClosed.compareAndSet(true, false)) {
            for (int i = 0; i < this.workerCount; ++i) {
                Worker worker = this.newWorker();
                Wrapper wrapper = new Wrapper(worker);
                this.executor.submit(wrapper);
            }
        }
        return this;
    }

    @Override
    public AtomicLong getCounter() {
        return this.counter;
    }

    @Override
    public BlockingQueue<R> getQueue() {
        return this.queue;
    }

    @Override
    public void submit(R request) {
        if (request.equals(this.getPoison())) {
            return;
        }
        if (this.isClosed.get()) {
            return;
        }
        if (this.latch.getCount() == 0L) {
            throw new UncheckedIOException(new IOException("no worker available"));
        }
        try {
            this.queue.put(request);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new UncheckedIOException(new IOException(e));
        }
    }

    @Override
    public void close() {
        if (this.isClosed.compareAndSet(false, true)) {
            while (this.latch.getCount() > 0L) {
                try {
                    this.queue.put(this.getPoison());
                    Thread.sleep(50L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            try {
                this.executor.shutdown();
                this.executor.awaitTermination(this.waitSeconds, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedIOException(new IOException(e));
            }
            finally {
                if (this.listener != null) {
                    if (this.exceptions.isEmpty()) {
                        this.listener.success(this);
                    } else {
                        this.listener.failure(this, this.exceptions);
                    }
                }
            }
        }
    }

    public CountDownLatch getLatch() {
        return this.latch;
    }

    public Map<Runnable, Throwable> getExceptions() {
        return this.exceptions;
    }

    private class Wrapper
    implements Runnable {
        private final Logger logger = Logger.getLogger(Worker.class.getName());
        private final Worker<R> worker;
        private int counter;

        private Wrapper(Worker<R> worker) {
            this.worker = worker;
        }

        @Override
        public void run() {
            Object request = null;
            try {
                this.logger.log(Level.INFO, () -> MessageFormat.format("start of worker {0}", this.worker));
                while (true) {
                    request = AbstractWorkerPool.this.getQueue().take();
                    if (AbstractWorkerPool.this.getPoison().equals(request)) {
                        break;
                    }
                    this.worker.execute(request);
                    ++this.counter;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.log(Level.WARNING, e.getMessage(), e);
                AbstractWorkerPool.this.exceptions.put(this, e);
            }
            catch (AssertionError | Exception e) {
                this.logger.log(Level.SEVERE, ((Throwable)e).getMessage(), (Throwable)e);
                AbstractWorkerPool.this.exceptions.put(this, e);
                if (AbstractWorkerPool.this.isClosed.get()) {
                    try {
                        AbstractWorkerPool.this.getQueue().poll(1L, TimeUnit.MINUTES);
                    }
                    catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        this.logger.log(Level.WARNING, e2.getMessage(), e2);
                    }
                }
                throw new UncheckedIOException(new IOException((Throwable)e));
            }
            finally {
                AbstractWorkerPool.this.latch.countDown();
                if (AbstractWorkerPool.this.exceptions.containsKey(this) && AbstractWorkerPool.this.latch.getCount() == 0L) {
                    this.logger.log(Level.INFO, "last worker exited with error, draining queue");
                    ArrayList collection = new ArrayList();
                    AbstractWorkerPool.this.getQueue().drainTo(collection);
                }
                if (AbstractWorkerPool.this.getPoison().equals(request)) {
                    this.logger.log(Level.INFO, () -> MessageFormat.format("end of worker {0} {1}", this.worker, "(completed, " + this.counter + " requests)"));
                } else {
                    this.logger.log(Level.SEVERE, () -> MessageFormat.format("end of worker {0} {1}", this.worker, "(abnormal termination after " + this.counter + " requests)"));
                }
            }
        }
    }

    private class ThreadPoolWorkerExecutor
    extends ThreadPoolExecutor {
        private final Logger logger;

        ThreadPoolWorkerExecutor(int nThreads) {
            super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            this.logger = Logger.getLogger(ThreadPoolWorkerExecutor.class.getName());
        }

        @Override
        protected void afterExecute(Runnable runnable, Throwable terminationCause) {
            super.afterExecute(runnable, terminationCause);
            Throwable throwable = terminationCause;
            if (throwable == null && runnable instanceof Future) {
                try {
                    Future future = (Future)((Object)runnable);
                    if (future.isDone()) {
                        future.get();
                    }
                }
                catch (CancellationException ce) {
                    this.logger.log(Level.FINEST, ce.getMessage(), ce);
                    throwable = ce;
                }
                catch (ExecutionException ee) {
                    this.logger.log(Level.FINEST, ee.getMessage(), ee);
                    throwable = ee.getCause();
                }
                catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    this.logger.log(Level.FINEST, ie.getMessage(), ie);
                }
            }
            if (throwable != null) {
                this.logger.log(Level.SEVERE, throwable.getMessage(), throwable);
                AbstractWorkerPool.this.exceptions.put(runnable, throwable);
            }
        }
    }
}

