/*
 * Decompiled with CFR 0.152.
 */
package org.openksavi.sponge.core.engine.processing.decomposed;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.openksavi.sponge.EventProcessorAdapter;
import org.openksavi.sponge.ProcessorAdapter;
import org.openksavi.sponge.core.engine.processing.BaseMainProcessingUnit;
import org.openksavi.sponge.core.engine.processing.BaseProcessingUnit;
import org.openksavi.sponge.core.engine.processing.decomposed.DecomposedQueue;
import org.openksavi.sponge.core.event.ProcessorControlEvent;
import org.openksavi.sponge.engine.Engine;
import org.openksavi.sponge.engine.ProcessableThreadPool;
import org.openksavi.sponge.engine.ProcessorType;
import org.openksavi.sponge.engine.ThreadPool;
import org.openksavi.sponge.engine.event.EventQueue;
import org.openksavi.sponge.engine.processing.MainProcessingUnit;
import org.openksavi.sponge.event.ControlEvent;
import org.openksavi.sponge.event.Event;
import org.openksavi.sponge.util.Processable;

public class DecomposedQueueMainProcessingUnit
extends BaseMainProcessingUnit {
    protected ProcessableThreadPool listenerThreadPool;
    protected ProcessableThreadPool decomposedQueueThreadPool;
    protected ThreadPool workerThreadPool;
    protected ThreadPool asyncEventSetProcessorThreadPool;
    private DecomposedQueue<EventProcessorAdapter<?>> decomposedQueue;

    public DecomposedQueueMainProcessingUnit(String name, Engine engine, EventQueue inQueue, EventQueue outQueue) {
        super(name, engine, inQueue, outQueue);
        this.setDecomposedQueue(new DecomposedQueue(engine.getDefaultParameters().getDecomposedQueueCapacity(), engine.getDefaultParameters().getAllowConcurrentEventTypeProcessingByEventSetProcessors()));
    }

    public void setDecomposedQueue(DecomposedQueue<EventProcessorAdapter<?>> decomposedQueue) {
        this.decomposedQueue = decomposedQueue;
        this.setEventProcessorRegistrationListener(decomposedQueue);
    }

    @Override
    public void doStartup() {
        this.startupHandlers();
        this.asyncEventSetProcessorThreadPool = this.getThreadPoolManager().createMainProcessingUnitAsyncEventSetProcessorThreadPool();
        this.workerThreadPool = this.getThreadPoolManager().createMainProcessingUnitWorkerThreadPool();
        this.decomposedQueueThreadPool = this.getThreadPoolManager().createMainProcessingUnitDecomposedQueueThreadPool((Processable)new DecomposedQueueReaderProcessable());
        this.listenerThreadPool = this.getThreadPoolManager().createMainProcessingUnitListenerThreadPool((MainProcessingUnit)this);
        this.getThreadPoolManager().startupProcessableThreadPool(this.decomposedQueueThreadPool);
        this.getThreadPoolManager().startupProcessableThreadPool(this.listenerThreadPool);
    }

    @Override
    public void doShutdown() {
        this.getThreadPoolManager().shutdownThreadPool((ThreadPool)this.listenerThreadPool);
        this.getThreadPoolManager().shutdownThreadPool((ThreadPool)this.decomposedQueueThreadPool);
        this.getThreadPoolManager().shutdownThreadPool(this.workerThreadPool);
        this.getThreadPoolManager().shutdownThreadPool(this.asyncEventSetProcessorThreadPool);
        this.shutdownHandlers();
    }

    public Runnable createWorker() {
        return new DecomposedQueueWriterLoopWorker((Processable)this);
    }

    protected boolean supportsControlEventForProcessor(ProcessorAdapter<?> processorAdapter) {
        ProcessorType type = processorAdapter.getType();
        return type == ProcessorType.TRIGGER || type == ProcessorType.RULE || type == ProcessorType.CORRELATOR || type == ProcessorType.RULE_GROUP || type == ProcessorType.CORRELATOR_GROUP;
    }

    public boolean processEvent(Event event) throws InterruptedException {
        if (event instanceof ControlEvent) {
            ProcessorAdapter processorAdapter;
            if (event instanceof ProcessorControlEvent && (processorAdapter = ((ProcessorControlEvent)event).getProcessorAdapter()) instanceof EventProcessorAdapter && this.supportsControlEventForProcessor(processorAdapter)) {
                this.putIntoDecomposedQueue((Pair<EventProcessorAdapter<?>, Event>)new ImmutablePair((Object)((EventProcessorAdapter)processorAdapter), (Object)event));
            }
            return false;
        }
        this.getEngine().getStatisticsManager().startTimeMeasurementIfNotStartedYet();
        Set adapterRs = this.getEventProcessors(event.getName());
        for (AtomicReference adapterR : adapterRs) {
            this.putIntoDecomposedQueue((Pair<EventProcessorAdapter<?>, Event>)new ImmutablePair(adapterR.get(), (Object)event));
        }
        this.getEngine().getStatisticsManager().incrementTimeMeasurementEventCount();
        return adapterRs.isEmpty();
    }

    protected void putIntoDecomposedQueue(Pair<EventProcessorAdapter<?>, Event> entry) throws InterruptedException {
        while (!this.decomposedQueue.put(entry)) {
            TimeUnit.MILLISECONDS.sleep(this.getEngine().getDefaultParameters().getInternalQueueBlockingPutSleep());
        }
    }

    public boolean supportsConcurrentListenerThreadPool() {
        return false;
    }

    public DecomposedQueue<EventProcessorAdapter<?>> getDecomposedQueue() {
        return this.decomposedQueue;
    }

    public ThreadPool getWorkerThreadPool() {
        return this.workerThreadPool;
    }

    public ThreadPool getAsyncEventSetProcessorThreadPool() {
        return this.asyncEventSetProcessorThreadPool;
    }

    public ProcessableThreadPool getDecomposedQueueThreadPool() {
        return this.decomposedQueueThreadPool;
    }

    protected class DecomposedQueueReaderWorker
    extends BaseProcessingUnit.LoopWorker {
        private Pair<EventProcessorAdapter<?>, Event> lastEntry;

        public DecomposedQueueReaderWorker(DecomposedQueueReaderProcessable processable) {
            super(processable);
        }

        @Override
        public boolean shouldContinueLoop() {
            return (DecomposedQueueMainProcessingUnit.this.isNewOrStartingOrRunning() || DecomposedQueueMainProcessingUnit.this.isStopping() && this.lastEntry != null) && !Thread.currentThread().isInterrupted();
        }

        @Override
        public boolean runIteration() throws InterruptedException {
            try {
                while (this.shouldContinueLoop()) {
                    Pair entry = DecomposedQueueMainProcessingUnit.this.decomposedQueue.get(100L, TimeUnit.MILLISECONDS);
                    this.lastEntry = entry;
                    if (entry == null) continue;
                    EventProcessorAdapter adapter = (EventProcessorAdapter)entry.getLeft();
                    Event event = (Event)entry.getRight();
                    while (true) {
                        try {
                            CompletableFuture.runAsync(() -> {
                                try {
                                    DecomposedQueueMainProcessingUnit.this.getHandler(adapter.getType()).processEvent((ProcessorAdapter)adapter, event);
                                }
                                catch (Throwable e) {
                                    DecomposedQueueMainProcessingUnit.this.getEngine().handleError("WorkerThread", e);
                                }
                            }, DecomposedQueueMainProcessingUnit.this.workerThreadPool.getExecutor()).handle((result, exception) -> {
                                DecomposedQueueMainProcessingUnit.this.decomposedQueue.release(entry);
                                return null;
                            });
                        }
                        catch (RejectedExecutionException e) {
                            TimeUnit.MILLISECONDS.sleep(DecomposedQueueMainProcessingUnit.this.getEngine().getDefaultParameters().getInternalQueueBlockingPutSleep());
                            continue;
                        }
                        break;
                    }
                    return true;
                }
                return false;
            }
            catch (InterruptedException e) {
                throw e;
            }
            catch (Throwable e) {
                Throwable rootCause = ExceptionUtils.getRootCause((Throwable)e);
                if (rootCause != null && rootCause instanceof InterruptedException) {
                    throw (InterruptedException)rootCause;
                }
                DecomposedQueueMainProcessingUnit.this.getEngine().handleError("runIteration", e);
                return true;
            }
        }
    }

    public class DecomposedQueueReaderProcessable
    implements Processable {
        public Runnable createWorker() {
            return new DecomposedQueueReaderWorker(this);
        }

        public String toString() {
            return DecomposedQueueMainProcessingUnit.this.getName() + ".DecomposedQueueReader";
        }
    }

    protected class DecomposedQueueWriterLoopWorker
    extends BaseProcessingUnit.EventLoopWorker {
        public DecomposedQueueWriterLoopWorker(Processable processable) {
            super(processable);
        }

        @Override
        public boolean shouldContinueLoop() {
            return (DecomposedQueueMainProcessingUnit.this.isNewOrStartingOrRunning() || DecomposedQueueMainProcessingUnit.this.isStopping() && this.getLastEvent() != null) && !Thread.currentThread().isInterrupted();
        }

        @Override
        public boolean processEvent(Event event) throws InterruptedException {
            return DecomposedQueueMainProcessingUnit.this.processEvent(event);
        }
    }
}

