/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.routing.correlation;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.ObjectAlreadyExistsException;
import org.mule.runtime.api.store.ObjectDoesNotExistException;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.routing.RoutingException;
import org.mule.runtime.core.api.store.PartitionableObjectStore;
import org.mule.runtime.core.api.util.StringMessageUtils;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.RoutingNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.execution.ErrorHandlingExecutionTemplate;
import org.mule.runtime.core.routing.EventGroup;
import org.mule.runtime.core.routing.correlation.CorrelationTimeoutException;
import org.mule.runtime.core.routing.correlation.EventCorrelatorCallback;
import org.mule.runtime.core.util.monitor.Expirable;
import org.mule.runtime.core.util.monitor.ExpiryMonitor;
import org.mule.runtime.core.util.store.DeserializationPostInitialisable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventCorrelator
implements Startable,
Stoppable {
    protected final transient Logger logger = LoggerFactory.getLogger(EventCorrelator.class);
    public static final String NO_CORRELATION_ID = "no-id";
    private static final long DELAY_TIME = 10L;
    protected final Object groupsLock = new Object();
    protected ObjectStore<Long> processedGroups = null;
    private long timeout = -1L;
    private boolean failOnTimeout = true;
    private MuleContext muleContext;
    private EventCorrelatorCallback callback;
    private Processor timeoutMessageProcessor;
    private PartitionableObjectStore correlatorStore = null;
    private String storePrefix;
    private Scheduler scheduler;
    private ExpiringGroupMonitoringRunnable expiringGroupRunnable;
    private final String name;
    private final FlowConstruct flowConstruct;

    public EventCorrelator(EventCorrelatorCallback callback, Processor timeoutMessageProcessor, MuleContext muleContext, FlowConstruct flowConstruct, PartitionableObjectStore correlatorStore, String storePrefix, ObjectStore<Long> processedGroups) {
        if (callback == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("EventCorrelatorCallback").getMessage());
        }
        if (muleContext == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MuleContext").getMessage());
        }
        this.callback = callback;
        this.muleContext = muleContext;
        this.timeoutMessageProcessor = timeoutMessageProcessor;
        this.name = String.format("%s.event.correlator", flowConstruct.getName());
        this.flowConstruct = flowConstruct;
        this.correlatorStore = correlatorStore;
        this.storePrefix = storePrefix;
        this.processedGroups = processedGroups;
    }

    public void forceGroupExpiry(String groupId) throws MuleException {
        try {
            if (this.correlatorStore.retrieve((Serializable)((Object)groupId), this.getEventGroupsPartitionKey()) != null) {
                this.handleGroupExpiry(this.getEventGroup((Serializable)((Object)groupId)));
            } else {
                this.addProcessedGroup(groupId);
            }
        }
        catch (ObjectStoreException e) {
            throw new MessagingException(null, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Event process(Event event) throws RoutingException {
        EventGroup group;
        String groupId = event.getCorrelationId();
        if (this.logger.isTraceEnabled()) {
            try {
                this.logger.trace(String.format("Received async reply message for correlationID: %s%n%s%n%s", groupId, StringMessageUtils.truncate(StringMessageUtils.toString(event.getMessage().getPayload().getValue()), 200, false), event.getMessage().toString()));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        try {
            if (this.isGroupAlreadyProcessed(groupId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. GroupCorrelation Id is: " + groupId + ". Dropping event");
                }
                this.muleContext.fireNotification(new RoutingNotification(event.getMessage(), event.getContext().getOriginatingConnectorName(), 1304));
                return null;
            }
        }
        catch (ObjectStoreException e) {
            throw new RoutingException(this.timeoutMessageProcessor, e);
        }
        try {
            group = this.getEventGroup((Serializable)((Object)groupId));
        }
        catch (ObjectStoreException e) {
            throw new RoutingException(this.timeoutMessageProcessor, e);
        }
        if (group == null) {
            try {
                EventGroup eventGroup = this.callback.createEventGroup(event, groupId);
                eventGroup.initEventsStore(this.correlatorStore);
                group = this.addEventGroup(eventGroup);
            }
            catch (ObjectStoreException e) {
                throw new RoutingException(this.timeoutMessageProcessor, e);
            }
        }
        Object object = this.groupsLock;
        synchronized (object) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Adding event to aggregator group: " + groupId);
            }
            try {
                group.addEvent(event);
            }
            catch (ObjectStoreException e) {
                throw new RoutingException(this.timeoutMessageProcessor, e);
            }
            if (this.callback.shouldAggregateEvents(group)) {
                Event returnEvent = null;
                try {
                    returnEvent = this.callback.aggregateEvents(group);
                }
                catch (RoutingException routingException) {
                    try {
                        this.removeEventGroup(group);
                        group.clear();
                    }
                    catch (ObjectStoreException objectStoreException) {
                        throw new RoutingException(this.timeoutMessageProcessor, objectStoreException);
                    }
                    throw routingException;
                }
                try {
                    this.removeEventGroup(group);
                    group.clear();
                }
                catch (ObjectStoreException e) {
                    throw new RoutingException(this.timeoutMessageProcessor, e);
                }
                return returnEvent;
            }
            return null;
        }
    }

    protected EventGroup getEventGroup(Serializable groupId) throws ObjectStoreException {
        try {
            EventGroup eventGroup = (EventGroup)this.correlatorStore.retrieve(groupId, this.getEventGroupsPartitionKey());
            if (!eventGroup.isInitialised()) {
                try {
                    DeserializationPostInitialisable.Implementation.init(eventGroup, this.muleContext);
                }
                catch (Exception e) {
                    throw new ObjectStoreException((Throwable)e);
                }
            }
            eventGroup.initEventsStore(this.correlatorStore);
            return eventGroup;
        }
        catch (ObjectDoesNotExistException e) {
            return null;
        }
    }

    protected EventGroup addEventGroup(EventGroup group) throws ObjectStoreException {
        try {
            this.correlatorStore.store((Serializable)group.getGroupId(), group, this.getEventGroupsPartitionKey());
            return group;
        }
        catch (ObjectAlreadyExistsException e) {
            return this.getEventGroup((Serializable)((Object)((String)group.getGroupId())));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeEventGroup(EventGroup group) throws ObjectStoreException {
        Object groupId = group.getGroupId();
        Object object = this.groupsLock;
        synchronized (object) {
            if (!this.isGroupAlreadyProcessed(groupId)) {
                this.correlatorStore.remove((Serializable)groupId, this.getEventGroupsPartitionKey());
                this.addProcessedGroup(groupId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addProcessedGroup(Object id) throws ObjectStoreException {
        Object object = this.groupsLock;
        synchronized (object) {
            this.processedGroups.store((Serializable)id, (Serializable)Long.valueOf(System.currentTimeMillis()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean isGroupAlreadyProcessed(Object id) throws ObjectStoreException {
        Object object = this.groupsLock;
        synchronized (object) {
            return this.processedGroups.contains((Serializable)id);
        }
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean failOnTimeout) {
        this.failOnTimeout = failOnTimeout;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long timeout) {
        this.timeout = timeout;
    }

    protected void handleGroupExpiry(EventGroup group) throws MuleException {
        try {
            this.removeEventGroup(group);
        }
        catch (ObjectStoreException e) {
            throw new DefaultMuleException(e);
        }
        if (this.isFailOnTimeout()) {
            Event messageCollectionEvent = group.getMessageCollectionEvent();
            this.muleContext.fireNotification(new RoutingNotification(messageCollectionEvent.getMessage(), null, 1303));
            try {
                group.clear();
            }
            catch (ObjectStoreException e) {
                this.logger.warn("Failed to clear group with id " + group.getGroupId() + " since underlying ObjectStore threw Exception:" + e.getMessage());
            }
            throw new CorrelationTimeoutException(CoreMessages.correlationTimedOut(group.getGroupId()));
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(MessageFormat.format("Aggregator expired, but ''failOnTimeOut'' is false. Forwarding {0} events out of {1} total for group ID: {2}", group.size(), group.expectedSize().map(v -> v.toString()).orElse("<not set>"), group.getGroupId()));
        }
        try {
            if (group.getCreated() + TimeUnit.DAYS.toMillis(1L) >= System.currentTimeMillis()) {
                Event newEvent = Event.builder(this.callback.aggregateEvents(group)).build();
                group.clear();
                if (!this.correlatorStore.contains((Serializable)group.getGroupId(), this.getExpiredAndDispatchedPartitionKey())) {
                    if (this.timeoutMessageProcessor == null) {
                        throw new MessagingException(CoreMessages.createStaticMessage((String)MessageFormat.format("Group {0} timed out, but no timeout message processor was configured.", group.getGroupId())), newEvent);
                    }
                    this.timeoutMessageProcessor.process(newEvent);
                    this.correlatorStore.store((Serializable)group.getGroupId(), group.getCreated(), this.getExpiredAndDispatchedPartitionKey());
                } else {
                    this.logger.warn(MessageFormat.format("Discarding group {0}", group.getGroupId()));
                }
            }
        }
        catch (MessagingException me) {
            throw me;
        }
        catch (Exception e) {
            throw new MessagingException(group.getMessageCollectionEvent(), e);
        }
    }

    public void start() throws MuleException {
        this.logger.info("Starting event correlator: " + this.name);
        if (this.timeout != 0L) {
            this.scheduler = this.muleContext.getSchedulerService().customScheduler(this.muleContext.getSchedulerBaseConfig().withName(this.name).withMaxConcurrentTasks(1).withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
            this.expiringGroupRunnable = new ExpiringGroupMonitoringRunnable();
            this.scheduler.scheduleWithFixedDelay((Runnable)this.expiringGroupRunnable, 0L, 10L, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() throws MuleException {
        this.logger.info("Stopping event correlator: " + this.name);
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
        if (this.expiringGroupRunnable != null) {
            this.expiringGroupRunnable.dispose();
            this.expiringGroupRunnable = null;
        }
    }

    protected String getExpiredAndDispatchedPartitionKey() {
        return this.storePrefix + ".expiredAndDispatchedGroups";
    }

    protected String getEventGroupsPartitionKey() {
        return this.storePrefix + ".eventGroups";
    }

    private final class ExpiringGroupMonitoringRunnable
    implements Runnable,
    Expirable,
    Disposable {
        private ExpiryMonitor expiryMonitor;

        public ExpiringGroupMonitoringRunnable() {
            this.expiryMonitor = new ExpiryMonitor(EventCorrelator.this.name, TimeUnit.MINUTES.toMillis(1L), EventCorrelator.this.muleContext, true);
            this.expiryMonitor.addExpirable(30L, TimeUnit.MINUTES, this);
        }

        @Override
        public void expired() {
            try {
                for (Serializable o : EventCorrelator.this.correlatorStore.allKeys(EventCorrelator.this.getExpiredAndDispatchedPartitionKey())) {
                    Long time = (Long)EventCorrelator.this.correlatorStore.retrieve(o, EventCorrelator.this.getExpiredAndDispatchedPartitionKey());
                    if (time + TimeUnit.DAYS.toMillis(1L) >= System.currentTimeMillis()) continue;
                    EventCorrelator.this.correlatorStore.remove(o, EventCorrelator.this.getExpiredAndDispatchedPartitionKey());
                    EventCorrelator.this.logger.warn(MessageFormat.format("Discarding group {0}", o));
                }
            }
            catch (ObjectStoreException e) {
                EventCorrelator.this.logger.warn("Expiration of objects failed due to ObjectStoreException " + (Object)((Object)e) + ".");
            }
        }

        @Override
        public void run() {
            if (!EventCorrelator.this.muleContext.isPrimaryPollingInstance()) {
                return;
            }
            ArrayList<EventGroup> expired = new ArrayList<EventGroup>(1);
            try {
                for (Serializable o : EventCorrelator.this.correlatorStore.allKeys(EventCorrelator.this.getEventGroupsPartitionKey())) {
                    EventGroup group = EventCorrelator.this.getEventGroup(o);
                    if (group == null || group.getCreated() + EventCorrelator.this.getTimeout() >= System.currentTimeMillis()) continue;
                    expired.add(group);
                }
            }
            catch (ObjectStoreException e) {
                EventCorrelator.this.logger.warn("expiry failed dues to ObjectStoreException " + (Object)((Object)e));
            }
            for (EventGroup group : expired) {
                ErrorHandlingExecutionTemplate executionTemplate = ErrorHandlingExecutionTemplate.createErrorHandlingExecutionTemplate(EventCorrelator.this.muleContext, EventCorrelator.this.flowConstruct, EventCorrelator.this.flowConstruct.getExceptionListener());
                try {
                    executionTemplate.execute(() -> {
                        EventCorrelator.this.handleGroupExpiry(group);
                        return null;
                    });
                }
                catch (MessagingException messagingException) {
                }
                catch (Exception e) {
                    EventCorrelator.this.muleContext.getExceptionListener().handleException(e);
                }
            }
        }

        public void dispose() {
            if (this.expiryMonitor != null) {
                this.expiryMonitor.dispose();
            }
        }
    }
}

