/*
 * Decompiled with CFR 0.152.
 */
package org.openksavi.sponge.core.engine.processing;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.openksavi.sponge.EventProcessorAdapter;
import org.openksavi.sponge.SpongeException;
import org.openksavi.sponge.core.engine.BaseEngineModule;
import org.openksavi.sponge.core.engine.processing.EventProcessorRegistrationListener;
import org.openksavi.sponge.core.util.Utils;
import org.openksavi.sponge.engine.Engine;
import org.openksavi.sponge.engine.ThreadPoolManager;
import org.openksavi.sponge.engine.event.EventQueue;
import org.openksavi.sponge.engine.processing.ProcessingUnit;
import org.openksavi.sponge.event.Event;
import org.openksavi.sponge.shaded.com.google.common.cache.CacheBuilder;
import org.openksavi.sponge.shaded.com.google.common.cache.CacheLoader;
import org.openksavi.sponge.shaded.com.google.common.cache.LoadingCache;
import org.openksavi.sponge.util.Processable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseProcessingUnit<T extends EventProcessorAdapter<?>>
extends BaseEngineModule
implements ProcessingUnit<T> {
    private static final Logger logger = LoggerFactory.getLogger(BaseProcessingUnit.class);
    public static final long GET_ITERATION_TIMEOUT = 100L;
    private EventQueue inQueue;
    private EventQueue outQueue;
    private Map<String, Set<AtomicReference<T>>> eventPatternProcessorMap = Collections.synchronizedMap(new HashMap());
    private LoadingCache<String, Set<AtomicReference<T>>> eventNameProcessorsCache;
    private Map<String, AtomicReference<T>> registeredProcessorAdapterMap = Collections.synchronizedMap(new LinkedHashMap());
    protected EventProcessorRegistrationListener<T> eventProcessorRegistrationListener;
    protected Lock lock = new ReentrantLock(true);

    public BaseProcessingUnit(String name, Engine engine, EventQueue inQueue, EventQueue outQueue) {
        super(name, engine);
        this.inQueue = inQueue;
        this.outQueue = outQueue;
        long cacheExpireTime = engine.getDefaultParameters().getProcessingUnitEventProcessorCacheExpireTime();
        if (cacheExpireTime >= 0L) {
            CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder();
            if (cacheExpireTime > 0L) {
                builder.expireAfterAccess(cacheExpireTime, TimeUnit.MILLISECONDS);
            }
            this.eventNameProcessorsCache = builder.build(new CacheLoader<String, Set<AtomicReference<T>>>(){

                @Override
                public Set<AtomicReference<T>> load(String eventName) throws Exception {
                    return BaseProcessingUnit.this.resolveEventProcessors(eventName);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Set<AtomicReference<T>> resolveEventProcessors(String eventName) {
        this.lock.lock();
        try {
            LinkedHashSet<AtomicReference<T>> result = new LinkedHashSet<AtomicReference<T>>();
            this.eventPatternProcessorMap.keySet().stream().filter(pattern -> this.getEngine().getPatternMatcher().matches(pattern, eventName)).forEachOrdered(pattern -> {
                Set<AtomicReference<T>> internalList = this.eventPatternProcessorMap.get(pattern);
                if (internalList != null) {
                    result.addAll(internalList);
                }
            });
            LinkedHashSet<AtomicReference<T>> linkedHashSet = result;
            return linkedHashSet;
        }
        finally {
            this.lock.unlock();
        }
    }

    protected Set<AtomicReference<T>> getEventProcessors(String eventName) {
        this.lock.lock();
        try {
            Set<AtomicReference<T>> set = this.eventNameProcessorsCache != null ? this.eventNameProcessorsCache.get(eventName) : this.resolveEventProcessors(eventName);
            return set;
        }
        catch (ExecutionException e) {
            throw Utils.wrapException(this.getClass().getSimpleName(), e.getCause() != null ? e.getCause() : e);
        }
        finally {
            this.lock.unlock();
        }
    }

    public Map<String, T> getRegisteredProcessorAdapterMap() {
        return this.registeredProcessorAdapterMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> (EventProcessorAdapter)((AtomicReference)entry.getValue()).get()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addProcessor(T processor) {
        this.lock.lock();
        try {
            this.invalidateCache();
            AtomicReference<T> registered = this.registeredProcessorAdapterMap.get(processor.getName());
            if (registered != null) {
                EventProcessorAdapter oldProcessor = (EventProcessorAdapter)registered.get();
                if (!oldProcessor.getType().equals((Object)processor.getType())) {
                    throw new SpongeException("An event processor of different type has been already registered with the name: " + processor.getName());
                }
                List<String> processorEventNamesList = Arrays.asList(processor.getEventNames());
                for (String oldEventName : oldProcessor.getEventNames()) {
                    if (processorEventNamesList.contains(oldEventName)) continue;
                    this.eventPatternProcessorMap.get(oldEventName).remove(registered);
                }
                registered.set(processor);
                this.processorChanged(oldProcessor, processor);
            } else {
                registered = new AtomicReference<T>(processor);
                this.registeredProcessorAdapterMap.put(processor.getName(), registered);
            }
            for (String eventName : processor.getEventNames()) {
                Set<AtomicReference<T>> processorList = this.eventPatternProcessorMap.get(eventName);
                if (processorList == null) {
                    processorList = new CopyOnWriteArraySet<AtomicReference<T>>();
                    this.eventPatternProcessorMap.put(eventName, processorList);
                }
                processorList.add(registered);
            }
            if (this.eventProcessorRegistrationListener != null) {
                this.eventProcessorRegistrationListener.onProcessorAdded(processor);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void processorChanged(T oldProcessor, T newProcessor) {
    }

    public void removeAllProcessors() {
        this.lock.lock();
        try {
            this.invalidateCache();
            for (Set<AtomicReference<AtomicReference>> set : this.eventPatternProcessorMap.values()) {
                set.forEach(processor -> ((EventProcessorAdapter)processor.get()).clear());
            }
            this.eventPatternProcessorMap.clear();
            this.registeredProcessorAdapterMap.clear();
        }
        finally {
            this.lock.unlock();
        }
    }

    protected void clearUnusedEventMapping() {
        this.lock.lock();
        try {
            this.invalidateCache();
            this.eventPatternProcessorMap.entrySet().removeIf(entry -> ((Set)entry.getValue()).isEmpty());
        }
        finally {
            this.lock.unlock();
        }
    }

    public void removeProcessor(String name) {
        this.lock.lock();
        try {
            this.invalidateCache();
            this.eventPatternProcessorMap.values().forEach(eventProcessors -> this.doRemoveProcessor((Set<AtomicReference<T>>)eventProcessors, name, true));
            this.registeredProcessorAdapterMap.remove(name);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRemoveProcessor(Set<AtomicReference<T>> processorList, String removedProcessorName, boolean clear) {
        this.lock.lock();
        try {
            List<AtomicReference> toRemove = processorList.stream().filter(processor -> ((EventProcessorAdapter)processor.get()).getName() != null && ((EventProcessorAdapter)processor.get()).getName().equals(removedProcessorName)).collect(Collectors.toList());
            toRemove.forEach(processor -> {
                if (clear) {
                    ((EventProcessorAdapter)processor.get()).clear();
                }
            });
            processorList.removeAll(toRemove);
            if (this.eventProcessorRegistrationListener != null) {
                toRemove.forEach(processor -> this.eventProcessorRegistrationListener.onProcessorRemoved((EventProcessorAdapter)processor.get()));
            }
            this.registeredProcessorAdapterMap.remove(removedProcessorName);
        }
        finally {
            this.lock.unlock();
        }
    }

    public boolean existsProcessor(String name) {
        return this.registeredProcessorAdapterMap.containsKey(name);
    }

    protected EventQueue getInQueue() {
        return this.inQueue;
    }

    protected EventQueue getOutQueue() {
        return this.outQueue;
    }

    public EventProcessorRegistrationListener<T> getEventProcessorRegistrationListener() {
        return this.eventProcessorRegistrationListener;
    }

    public void setEventProcessorRegistrationListener(EventProcessorRegistrationListener<T> eventProcessorRegistrationListener) {
        this.eventProcessorRegistrationListener = eventProcessorRegistrationListener;
    }

    protected ThreadPoolManager getThreadPoolManager() {
        return this.getEngine().getThreadPoolManager();
    }

    public void invalidateCache() {
        if (this.eventNameProcessorsCache != null) {
            this.eventNameProcessorsCache.invalidateAll();
        }
    }

    public abstract class EventLoopWorker
    extends LoopWorker {
        private Event lastEvent;

        public EventLoopWorker(Processable processable) {
            super(processable);
        }

        protected Event getLastEvent() {
            return this.lastEvent;
        }

        protected Event getInEvent() throws InterruptedException {
            while (this.shouldContinueLoop()) {
                Event event;
                this.lastEvent = event = BaseProcessingUnit.this.getInQueue().get(100L);
                if (event == null) continue;
                return event;
            }
            return null;
        }

        public abstract boolean processEvent(Event var1) throws InterruptedException;

        @Override
        public boolean runIteration() throws InterruptedException {
            Event event = this.getInEvent();
            if (event == null) {
                return false;
            }
            if (this.processEvent(event)) {
                BaseProcessingUnit.this.getOutQueue().put(event);
            }
            return true;
        }
    }

    public abstract class LoopWorker
    implements Runnable {
        private Processable processable;

        protected LoopWorker(Processable processable) {
            this.processable = processable;
        }

        public Processable getProcessable() {
            return this.processable;
        }

        public abstract boolean runIteration() throws InterruptedException;

        public abstract boolean shouldContinueLoop();

        @Override
        public final void run() {
            while (this.shouldContinueLoop()) {
                try {
                    if (this.runIteration()) continue;
                    return;
                }
                catch (InterruptedException e) {
                    if (!this.handleInterruptedException(e)) continue;
                    break;
                }
                catch (Exception e) {
                    Throwable cause = ExceptionUtils.getRootCause((Throwable)e);
                    if (cause instanceof InterruptedException) {
                        if (!this.handleInterruptedException((InterruptedException)cause)) continue;
                        break;
                    }
                    BaseProcessingUnit.this.getEngine().handleError(this.getMessageSource(), (Throwable)e);
                }
            }
        }

        private boolean handleInterruptedException(InterruptedException e) {
            if (BaseProcessingUnit.this.isStopping() || BaseProcessingUnit.this.isTerminated()) {
                return true;
            }
            logger.warn(this.createMessage("Interrupted"));
            return false;
        }

        private final String createMessage(String msg) {
            return this.getMessageSource() + ": " + msg;
        }

        private final String getMessageSource() {
            return this.processable.toString();
        }
    }
}

