/*
 * Decompiled with CFR 0.152.
 */
package org.tiogasolutions.notify.kernel.task;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.tiogasolutions.dev.common.exceptions.ApiConflictException;
import org.tiogasolutions.dev.common.exceptions.ApiNotFoundException;
import org.tiogasolutions.dev.domain.query.ListQueryResult;
import org.tiogasolutions.dev.domain.query.QueryResult;
import org.tiogasolutions.notify.kernel.domain.DomainKernel;
import org.tiogasolutions.notify.kernel.event.EventBus;
import org.tiogasolutions.notify.kernel.event.TaskEventListener;
import org.tiogasolutions.notify.kernel.notification.NotificationDomain;
import org.tiogasolutions.notify.kernel.task.TaskEntity;
import org.tiogasolutions.notify.kernel.task.TaskProcessor;
import org.tiogasolutions.notify.kernel.task.TaskProcessorExecutorStatus;
import org.tiogasolutions.notify.kernel.task.TaskProcessorType;
import org.tiogasolutions.notify.pub.domain.DomainProfile;
import org.tiogasolutions.notify.pub.notification.Notification;
import org.tiogasolutions.notify.pub.route.Destination;
import org.tiogasolutions.notify.pub.task.TaskQuery;
import org.tiogasolutions.notify.pub.task.TaskResponse;
import org.tiogasolutions.notify.pub.task.TaskStatus;

@Named
public class TaskProcessorExecutor
implements BeanFactoryAware,
TaskEventListener {
    private static final String NAME = TaskProcessorExecutor.class.getSimpleName();
    private static final Logger log = LoggerFactory.getLogger(TaskProcessorExecutor.class);
    private TaskProcessorExecutorStatus executorStatus;
    private final DomainKernel domainKernel;
    private final Map<TaskProcessorType, TaskProcessor> processorMap = new HashMap<TaskProcessorType, TaskProcessor>();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledFuture executorFuture = null;
    private final ExecutorService threadPoolExecutor;

    @Inject
    public TaskProcessorExecutor(DomainKernel domainKernel, EventBus eventBus) {
        this.executorStatus = TaskProcessorExecutorStatus.STOPPED;
        this.domainKernel = domainKernel;
        this.threadPoolExecutor = Executors.newCachedThreadPool();
        ServiceLoader<TaskProcessor> loader = ServiceLoader.load(TaskProcessor.class);
        loader.reload();
        for (TaskProcessor processor : loader) {
            TaskProcessorType type = processor.getType();
            if (this.processorMap.containsKey(type)) {
                String msg = String.format("The processor type \"%s\" has already been registered.", type);
                throw new IllegalArgumentException(msg);
            }
            this.processorMap.put(type, processor);
        }
        eventBus.subscribe(this);
    }

    public TaskProcessorExecutorStatus getExecutorStatus() {
        return this.executorStatus;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        for (TaskProcessor processor : this.processorMap.values()) {
            processor.init(beanFactory);
        }
    }

    public synchronized void start() {
        if (this.executorFuture != null) {
            throw new IllegalStateException(NAME + " is already started.");
        }
        this.executorFuture = this.scheduledExecutorService.scheduleWithFixedDelay(this::execute, 15L, 60L, TimeUnit.SECONDS);
        this.executorStatus = TaskProcessorExecutorStatus.IDLE;
        log.info(NAME + " started, now idle.");
    }

    public synchronized void stop() {
        if (this.executorFuture != null) {
            this.executorFuture.cancel(false);
            this.executorFuture = null;
        }
        this.executorStatus = TaskProcessorExecutorStatus.STOPPED;
        log.info(NAME + " stopped.");
    }

    @PreDestroy
    private void shutdown() {
        this.stop();
        this.scheduledExecutorService.shutdown();
    }

    @Override
    public void taskCreated(String domainName, TaskEntity task, Notification notification) {
        try {
            NotificationDomain notificationDomain = this.domainKernel.notificationDomain(domainName);
            HashMap<String, List<TaskEntity>> tasks = new HashMap<String, List<TaskEntity>>();
            tasks.put(task.getDestination().getProvider(), Arrays.asList(task));
            this.processTasksByProvider(notificationDomain, tasks);
        }
        catch (Exception e) {
            log.error("Unexpected exception during processing.", (Throwable)e);
        }
    }

    public void execute() {
        if (this.running.compareAndSet(false, true)) {
            log.debug(NAME + "is executing.");
            this.executorStatus = TaskProcessorExecutorStatus.EXECUTING;
            try {
                List<NotificationDomain> activeNotificationDomains = this.domainKernel.listActiveNotificationDomains();
                activeNotificationDomains.stream().forEach(this::processDomain);
            }
            catch (Exception e) {
                log.error("Unexpected exception during processing.", (Throwable)e);
            }
            finally {
                this.executorStatus = TaskProcessorExecutorStatus.IDLE;
                log.debug(NAME + " finished, now idle.");
                this.running.set(false);
            }
        } else {
            log.debug(NAME + " already running.");
        }
    }

    private void processDomain(NotificationDomain notificationDomain) {
        String domainName = notificationDomain.getDomainName();
        log.debug("Processing all tasks for domain {}.", (Object)domainName);
        ListQueryResult<TaskEntity> pendingTasks = notificationDomain.query(new TaskQuery().setTaskStatus(TaskStatus.PENDING));
        if (pendingTasks.isNotEmpty()) {
            Map<String, List<TaskEntity>> tasksMappedByProvider = this.mapTasksByProvider((QueryResult<TaskEntity>)pendingTasks);
            this.processTasksByProvider(notificationDomain, tasksMappedByProvider);
        }
    }

    private void processTasksByProvider(NotificationDomain notificationDomain, Map<String, List<TaskEntity>> tasksMappedByProvider) {
        for (Map.Entry<String, List<TaskEntity>> entry : tasksMappedByProvider.entrySet()) {
            TaskProcessor processor = this.findTaskProcessor(entry.getKey());
            if (processor == null) {
                log.error("A processor was not found for {}, skipping {} tasks.", (Object)entry.getKey(), (Object)entry.getValue().size());
                return;
            }
            this.processTaskForProvider(notificationDomain, processor, entry.getValue());
        }
    }

    private TaskProcessor findTaskProcessor(String providerName) {
        TaskProcessorType processorType = TaskProcessorType.valueOf(providerName);
        return this.processorMap.get(processorType);
    }

    private Map<String, List<TaskEntity>> mapTasksByProvider(QueryResult<TaskEntity> result) {
        HashMap<String, List<TaskEntity>> map = new HashMap<String, List<TaskEntity>>();
        for (TaskEntity task : result) {
            try {
                Destination destination = task.getDestination();
                String provider = destination.getProvider();
                if (!map.containsKey(provider)) {
                    map.put(provider, new ArrayList());
                }
                ((List)map.get(provider)).add(task);
            }
            catch (NullPointerException e) {
                log.error("Weird bug", (Throwable)e);
            }
        }
        return map;
    }

    private void processTaskForProvider(NotificationDomain notificationDomain, TaskProcessor processor, List<TaskEntity> tasks) {
        log.debug("Processing {} provider's tasks for domain {}.", (Object)processor.getType(), (Object)notificationDomain.getDomainName());
        if (!processor.isReady()) {
            log.warn("The {} provider is not ready to process tasks for the domain {}.", (Object)processor.getType(), (Object)notificationDomain.getDomainName());
            return;
        }
        for (TaskEntity task : tasks) {
            Notification notification = notificationDomain.findNotificationById(task.getNotificationId()).toNotification();
            this.processTask(notificationDomain, processor, task, notification);
        }
    }

    private void processTask(NotificationDomain notificationDomain, TaskProcessor processor, TaskEntity taskEntity, Notification notification) {
        Callable<Void> taskProcessorCallable = () -> {
            String domainName = notificationDomain.getDomainName();
            TaskEntity localTaskEntity = taskEntity;
            try {
                localTaskEntity.sending();
                localTaskEntity = notificationDomain.saveAndReload(localTaskEntity);
            }
            catch (ApiConflictException | ApiNotFoundException ex) {
                localTaskEntity.pending();
                String msg = ex instanceof ApiNotFoundException ? String.format("Cannot find task for domain %s: %s", domainName, localTaskEntity.getLabel()) : String.format("DB conflict processing task for domain %s, (already processed?): %s", domainName, localTaskEntity.getLabel());
                log.info(msg);
                return null;
            }
            catch (Exception ex) {
                log.error("Exception setting task to sending", (Throwable)ex);
                return null;
            }
            String processorName = "n/a";
            try {
                processorName = processor.getType().getCode();
                DomainProfile domainProfile = this.domainKernel.getOrCreateDomain(domainName);
                log.debug("Begin processing task for domain {} with processor {}: {}", new Object[]{domainName, processorName, localTaskEntity.getLabel()});
                TaskResponse taskResponse = processor.processTask(domainProfile, notification, localTaskEntity.toTask());
                localTaskEntity.response(taskResponse);
                notificationDomain.save(localTaskEntity);
                log.debug("Finished processing task for domain {} and processor {} with response action {}: {}", new Object[]{domainName, processorName, taskResponse.getResponseAction(), localTaskEntity.getLabel()});
            }
            catch (Exception e) {
                TaskResponse taskResponse = TaskResponse.fail((String)"Exception thrown from task processor", (Throwable)e);
                localTaskEntity.response(taskResponse);
                notificationDomain.save(localTaskEntity);
                log.error("Exception processing task for domain {} and processor {}: {}", new Object[]{domainName, processorName, localTaskEntity.getLabel(), e});
            }
            return null;
        };
        this.threadPoolExecutor.submit(taskProcessorCallable);
    }
}

