/*
 * Decompiled with CFR 0.152.
 */
package ch.admin.bit.jeap.processcontext.domain.processinstance;

import ch.admin.bit.jeap.processcontext.domain.PcsConfigProperties;
import ch.admin.bit.jeap.processcontext.domain.message.Message;
import ch.admin.bit.jeap.processcontext.domain.message.MessageRepository;
import ch.admin.bit.jeap.processcontext.domain.port.InternalMessageProducer;
import ch.admin.bit.jeap.processcontext.domain.port.MetricsListener;
import ch.admin.bit.jeap.processcontext.domain.processinstance.MessageReferenceMessageDTO;
import ch.admin.bit.jeap.processcontext.domain.processinstance.NotFoundException;
import ch.admin.bit.jeap.processcontext.domain.processinstance.ProcessContextFactory;
import ch.admin.bit.jeap.processcontext.domain.processinstance.ProcessData;
import ch.admin.bit.jeap.processcontext.domain.processinstance.ProcessInstance;
import ch.admin.bit.jeap.processcontext.domain.processinstance.ProcessInstanceRepository;
import ch.admin.bit.jeap.processcontext.domain.processinstance.ProcessSnapshotService;
import ch.admin.bit.jeap.processcontext.domain.processinstance.TaskInstance;
import ch.admin.bit.jeap.processcontext.domain.processinstance.TaskPlanningException;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.MessageReference;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.ProcessTemplate;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.ProcessTemplateRepository;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.TaskCardinality;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.TaskLifecycle;
import ch.admin.bit.jeap.processcontext.domain.processtemplate.TaskType;
import ch.admin.bit.jeap.processcontext.domain.processupdate.ProcessUpdate;
import ch.admin.bit.jeap.processcontext.domain.processupdate.ProcessUpdateQueryRepository;
import ch.admin.bit.jeap.processcontext.domain.processupdate.ProcessUpdateRepository;
import ch.admin.bit.jeap.processcontext.domain.processupdate.ProcessUpdateType;
import ch.admin.bit.jeap.processcontext.domain.tx.Transactions;
import ch.admin.bit.jeap.processcontext.plugin.api.condition.TaskInstantiationCondition;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Timed;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import lombok.Generated;
import net.logstash.logback.argument.StructuredArguments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class ProcessInstanceService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProcessInstanceService.class);
    private final InternalMessageProducer internalMessageProducer;
    private final ProcessUpdateQueryRepository processUpdateQueryRepository;
    private final ProcessInstanceRepository processInstanceRepository;
    private final ProcessTemplateRepository processTemplateRepository;
    private final MessageRepository messageRepository;
    private final ProcessUpdateRepository processUpdateRepository;
    private final ProcessSnapshotService processSnapshotService;
    private final Transactions transactions;
    private final MetricsListener metricsListener;
    private final PcsConfigProperties pcsConfigProperties;

    public void createProcessInstance(String originProcessId, String processTemplateName, Set<ProcessData> processData) {
        this.transactions.withinNewTransaction(() -> {
            if (!this.processInstanceRepository.existsByOriginProcessId(originProcessId)) {
                this.createFromTemplate(originProcessId, processTemplateName, processData);
                ProcessUpdate processUpdate = ProcessUpdate.processCreated().originProcessId(originProcessId).build();
                this.processUpdateRepository.save(processUpdate);
            }
        });
        this.internalMessageProducer.produceProcessContextOutdatedEventSynchronously(originProcessId);
    }

    private ProcessInstance createFromTemplate(String originProcessId, String processTemplateName, Set<ProcessData> processData) {
        ProcessTemplate processTemplate = this.processTemplateRepository.findByName(processTemplateName).orElseThrow(NotFoundException.templateNotFound(processTemplateName, originProcessId));
        ProcessInstance processInstance = ProcessInstance.startProcess(originProcessId, processTemplate, processData);
        log.info("Creating process {} with origin process ID {} from template {}", new Object[]{processInstance.getId(), originProcessId, processTemplateName});
        this.metricsListener.processInstanceCreated(processTemplateName);
        return this.processInstanceRepository.save(processInstance);
    }

    @Timed(value="jeap_pcs_update_process_state", description="Update process state", percentiles={0.5, 0.8, 0.95, 0.99})
    public void updateProcessState(String originProcessId) {
        List<ProcessUpdate> processUpdates = this.processUpdateQueryRepository.findByOriginProcessIdAndHandledFalse(originProcessId).stream().sorted(Comparator.comparingInt(pu -> pu.getProcessUpdateType().getPriority())).toList();
        if (processUpdates.isEmpty()) {
            this.migrateProcessInstanceTemplateIfNeeded(originProcessId);
        }
        Lists.partition(processUpdates, (int)this.pcsConfigProperties.getProcessInstanceUpdateApplicationBatchSize()).forEach(batch -> this.processUpdates(originProcessId, (List<ProcessUpdate>)batch));
        if (this.transactions.withinNewTransactionWithResult(() -> this.processInstanceRepository.existsByOriginProcessId(originProcessId)).booleanValue()) {
            this.internalMessageProducer.produceProcessContextStateChangedEventSynchronously(originProcessId);
        }
        this.metricsListener.timed("jeap_pcs_late_correlate_message", Collections.emptyMap(), () -> this.correlateMessagesByProcessData(originProcessId));
    }

    private void migrateProcessInstanceTemplateIfNeeded(String originProcessId) {
        this.transactions.withinNewTransaction(() -> this.processInstanceRepository.findProcessInstanceTemplate(originProcessId).ifPresent(processInstanceTemplate -> this.processTemplateRepository.findByName(processInstanceTemplate.getTemplateName()).ifPresent(template -> {
            if (!template.getTemplateHash().equals(processInstanceTemplate.getTemplateHash())) {
                this.processInstanceRepository.findByOriginProcessIdLoadingMessages(originProcessId).ifPresent(ProcessInstance::applyTemplateMigrationIfChanged);
            }
        })));
    }

    private void processUpdates(String originProcessId, List<ProcessUpdate> processUpdates) {
        if (processUpdates.isEmpty()) {
            return;
        }
        this.metricsListener.timed("pcs_process_batch_update", Map.of("batchSize", this.getBatchSizeGroup(processUpdates)), () -> this.handleProcessUpdateBatch(originProcessId, processUpdates));
    }

    private void handleProcessUpdateBatch(String originProcessId, List<ProcessUpdate> processUpdates) {
        List remainingUpdates = this.transactions.withinNewTransactionWithTxStatusAndResult(txStatus -> {
            try {
                ProcessInstance processInstance = this.createOrLoadProcessInstance(originProcessId, processUpdates).orElse(null);
                if (processInstance == null) {
                    log.debug("Process instance for process " + String.valueOf(StructuredArguments.keyValue((String)"originProcessId", (Object)originProcessId)) + " not found, won't handle the corresponding process updates (yet).");
                    return List.of();
                }
                processInstance.applyTemplateMigrationIfChanged();
                this.processUpdates(processInstance, processUpdates);
                this.markHandled(processUpdates);
                this.metricsListener.processUpdateProcessed(processInstance.getProcessTemplate(), true, processUpdates.size());
                return List.of();
            }
            catch (ProcessUpdateFailedException pufe) {
                txStatus.setRollbackOnly();
                ProcessTemplate processTemplate = this.createOrLoadProcessInstance(originProcessId, processUpdates).map(ProcessInstance::getProcessTemplate).orElse(null);
                this.metricsListener.processUpdateProcessed(processTemplate, false, 1);
                this.failUpdate(pufe.getFailedProcessUpdate(), pufe);
                return processUpdates.stream().filter(u -> u != pufe.getFailedProcessUpdate()).toList();
            }
        });
        if (!remainingUpdates.isEmpty()) {
            if (remainingUpdates.size() < processUpdates.size()) {
                this.processUpdates(originProcessId, (List<ProcessUpdate>)remainingUpdates);
            } else {
                String msg = String.format("This should not have happened. There must be less remaining updates (%s) than initial updates (%s).", remainingUpdates.size(), processUpdates.size());
                throw new IllegalStateException(msg);
            }
        }
    }

    private Optional<ProcessInstance> createOrLoadProcessInstance(String originProcessId, List<ProcessUpdate> processUpdates) {
        return processUpdates.stream().filter(update -> update.getProcessUpdateType() == ProcessUpdateType.CREATE_PROCESS).findFirst().map(createProcessUpdate -> {
            try {
                return Optional.of(this.createOrGetProcessInstance((ProcessUpdate)createProcessUpdate));
            }
            catch (Exception e) {
                throw ProcessUpdateFailedException.createProcessUpdateFailed(createProcessUpdate, e);
            }
        }).orElse(this.processInstanceRepository.findByOriginProcessIdLoadingMessages(originProcessId));
    }

    private ProcessInstance createOrGetProcessInstance(ProcessUpdate processUpdate) {
        String originProcessId = processUpdate.getOriginProcessId();
        Optional<ProcessInstance> pio = this.processInstanceRepository.findByOriginProcessIdLoadingMessages(originProcessId);
        return pio.orElseGet(() -> this.createFromTemplate(originProcessId, processUpdate.getParams(), Set.of()));
    }

    private void processUpdates(ProcessInstance processInstance, List<ProcessUpdate> processUpdates) {
        for (ProcessUpdate processUpdate : processUpdates) {
            try {
                this.applyProcessUpdateAndReEvaluateProcessInstance(processInstance, processUpdate);
            }
            catch (Exception e) {
                throw ProcessUpdateFailedException.createProcessUpdateFailed(processUpdate, e);
            }
        }
    }

    private void failUpdate(ProcessUpdate processUpdate, Exception e) {
        log.error("Failed to apply process update to process " + String.valueOf(StructuredArguments.keyValue((String)"originProcessId", (Object)processUpdate.getOriginProcessId())), (Throwable)e);
        this.transactions.withinNewTransaction(() -> this.processUpdateRepository.markHandlingFailed(processUpdate.getId()));
        this.metricsListener.processUpdateFailed(processUpdate, e);
    }

    private void markHandled(Collection<ProcessUpdate> processUpdates) {
        processUpdates.forEach(update -> this.processUpdateRepository.markHandled(update.getId()));
    }

    private void applyProcessUpdateAndReEvaluateProcessInstance(ProcessInstance processInstance, ProcessUpdate update) {
        ZonedDateTime timestamp = ZonedDateTime.now();
        Optional<UUID> reference = update.getMessageReference();
        if (reference.isPresent()) {
            Message message = this.messageRepository.findById(reference.get()).orElseThrow(NotFoundException.messageNotFound(reference.get(), processInstance.getOriginProcessId()));
            MessageReferenceMessageDTO messageReferenceMessageDTO = processInstance.addMessage(message);
            this.metricsListener.timed("pcs_process_single_update", Map.of("updateType", update.getProcessUpdateType().name()), () -> this.updateProcessInstance(processInstance, update, messageReferenceMessageDTO, message));
            timestamp = message.getMessageCreatedAt();
        }
        processInstance.evaluateCompletedTasksAndReachedMilestones(timestamp);
        this.createProcessSnapshotIfTriggered(processInstance);
    }

    private void createProcessSnapshotIfTriggered(ProcessInstance processInstance) {
        Set<String> triggeredSnapshots = processInstance.evaluateSnapshotConditions();
        if (!triggeredSnapshots.isEmpty()) {
            this.processInstanceRepository.flush();
            this.processSnapshotService.createAndStoreSnapshot(processInstance);
            triggeredSnapshots.forEach(processInstance::registerSnapshot);
        }
    }

    private void updateProcessInstance(ProcessInstance processInstance, ProcessUpdate processUpdate, MessageReferenceMessageDTO messageReferenceMessageDTO, Message message) {
        switch (processUpdate.getProcessUpdateType()) {
            case CREATE_PROCESS: 
            case DOMAIN_EVENT: {
                this.createMessageTasks(processUpdate, processInstance, messageReferenceMessageDTO, message);
                this.completeObservationTasks(processUpdate, processInstance, messageReferenceMessageDTO, message);
                processInstance.evaluateCompletedTasks(messageReferenceMessageDTO, message.getMessageCreatedAt());
                processInstance.evaluateRelations();
                processInstance.evaluateProcessRelations(message);
                break;
            }
            case PROCESS_CREATED: {
                processInstance.evaluateRelations();
                processInstance.evaluateProcessRelations(message);
                break;
            }
            default: {
                throw new RuntimeException("Update Type " + String.valueOf((Object)processUpdate.getProcessUpdateType()) + " does not exist");
            }
        }
    }

    private void createMessageTasks(ProcessUpdate processUpdate, ProcessInstance processInstance, MessageReferenceMessageDTO messageReferenceMessageDTO, Message message) {
        processInstance.getProcessTemplate().getTaskTypes().stream().filter(taskType -> processUpdate.getMessageName().equals(taskType.getPlannedByDomainEvent())).forEach(taskType -> {
            TaskInstantiationCondition instantiationCondition = taskType.getInstantiationCondition();
            if (instantiationCondition == null || instantiationCondition.instantiate(ProcessContextFactory.createMessage(messageReferenceMessageDTO))) {
                if (taskType.getCardinality() == TaskCardinality.SINGLE_INSTANCE) {
                    this.planSingleInstanceDomainEventTaskIfNotExists(processInstance, messageReferenceMessageDTO, (TaskType)taskType, message.getMessageId(), message.getMessageCreatedAt(), message.getId());
                } else {
                    messageReferenceMessageDTO.getRelatedOriginTaskIds().forEach(originTaskId -> processInstance.planDomainEventTask((TaskType)taskType, (String)originTaskId, message.getMessageCreatedAt(), message.getId()));
                }
            }
        });
    }

    private void planSingleInstanceDomainEventTaskIfNotExists(ProcessInstance processInstance, MessageReferenceMessageDTO eventReference, TaskType taskType, String eventId, ZonedDateTime timestamp, UUID messageId) {
        String taskId;
        List<TaskInstance> existingTaskInstancesForTaskType = processInstance.getTasks().stream().filter(task -> task.getTaskType().isPresent()).filter(task -> task.getTaskType().get().equals(taskType)).toList();
        Set<String> relatedOriginTaskIds = eventReference.getRelatedOriginTaskIds();
        if (relatedOriginTaskIds.size() > 1) {
            throw TaskPlanningException.singleInstanceTaskMultipleIds(taskType, relatedOriginTaskIds);
        }
        String string = taskId = relatedOriginTaskIds.isEmpty() ? eventId : relatedOriginTaskIds.iterator().next();
        if (!existingTaskInstancesForTaskType.isEmpty()) {
            boolean isNewTaskId = existingTaskInstancesForTaskType.stream().noneMatch(instance -> instance.getOriginTaskId().equals(taskId));
            if (isNewTaskId) {
                throw TaskPlanningException.createTaskAlreadyPlanned(taskId, taskType, existingTaskInstancesForTaskType);
            }
            return;
        }
        processInstance.planDomainEventTask(taskType, taskId, timestamp, messageId);
    }

    private void completeObservationTasks(ProcessUpdate processUpdate, ProcessInstance processInstance, MessageReferenceMessageDTO messageReferenceMessageDTO, Message message) {
        processInstance.getProcessTemplate().getTaskTypes().stream().filter(taskType -> TaskLifecycle.OBSERVED == taskType.getLifecycle() && processUpdate.getMessageName().equals(taskType.getObservesDomainEvent())).forEach(taskType -> {
            TaskInstantiationCondition instantiationCondition = taskType.getInstantiationCondition();
            if (instantiationCondition == null || instantiationCondition.instantiate(ProcessContextFactory.createMessage(messageReferenceMessageDTO))) {
                processInstance.addObservationTask((TaskType)taskType, message.getMessageId(), message.getMessageCreatedAt(), message.getId());
            }
        });
    }

    private void correlateMessagesByProcessData(String originProcessId) {
        boolean newEventsCorrelated = this.transactions.withinNewTransactionWithResult(() -> {
            Optional<ProcessInstance> processInstance = this.processInstanceRepository.findByOriginProcessIdLoadingMessages(originProcessId);
            if (processInstance.isPresent()) {
                return this.correlateMessagesByProcessData(processInstance.get());
            }
            log.info("Process with originProcessId '{}' not found. There is no message to correlate.", (Object)originProcessId);
            return false;
        });
        if (newEventsCorrelated) {
            this.internalMessageProducer.produceProcessContextOutdatedEventSynchronously(originProcessId);
        }
    }

    private boolean correlateMessagesByProcessData(ProcessInstance processInstance) {
        ZonedDateTime lastCorrelationAt = processInstance.getLastCorrelationAt() != null ? processInstance.getLastCorrelationAt() : ZonedDateTime.of(LocalDateTime.MIN, ZoneId.systemDefault());
        List<ProcessData> processDataList = processInstance.getProcessData().stream().filter(pd -> pd.getCreatedAt().isAfter(lastCorrelationAt)).toList();
        log.debug("Found {} processData created after the last correlation of this process instance at {}", (Object)processDataList.size(), (Object)lastCorrelationAt.format(DateTimeFormatter.ISO_DATE_TIME));
        ArrayList<Message> eventsToCorrelate = new ArrayList<Message>();
        List<UUID> alreadyCorrelatedEventIds = processInstance.getMessageReferences().stream().map(MessageReferenceMessageDTO::getMessageId).toList();
        for (ProcessData processData : processDataList) {
            Set<MessageReference> messageReferences = processInstance.getProcessTemplate().getDomainEventReferencesCorrelatedBy(processData.getKey());
            log.debug("Found {} messageReferences with processDataKey {}", (Object)messageReferences.size(), (Object)processData.getKey());
            for (MessageReference messageReference : messageReferences) {
                List<Message> messageRepositoryEventsToCorrelate = processData.getRole() != null ? (alreadyCorrelatedEventIds.isEmpty() ? this.messageRepository.findMessagesToCorrelate(messageReference.getMessageName(), processInstance.getProcessTemplateName(), messageReference.getCorrelatedByProcessData().getMessageDataKey(), processData.getValue(), processData.getRole()) : this.messageRepository.findMessagesToCorrelate(messageReference.getMessageName(), processInstance.getProcessTemplateName(), messageReference.getCorrelatedByProcessData().getMessageDataKey(), processData.getValue(), processData.getRole(), alreadyCorrelatedEventIds)) : (alreadyCorrelatedEventIds.isEmpty() ? this.messageRepository.findMessagesToCorrelate(messageReference.getMessageName(), processInstance.getProcessTemplateName(), messageReference.getCorrelatedByProcessData().getMessageDataKey(), processData.getValue()) : this.messageRepository.findMessagesToCorrelate(messageReference.getMessageName(), processInstance.getProcessTemplateName(), messageReference.getCorrelatedByProcessData().getMessageDataKey(), processData.getValue(), alreadyCorrelatedEventIds));
                log.debug("Found {} old messages with messageName {}, messageDataKey {}, value {}, role {}", new Object[]{messageRepositoryEventsToCorrelate.size(), messageReference.getMessageName(), messageReference.getCorrelatedByProcessData().getMessageDataKey(), processData.getValue(), processData.getRole()});
                eventsToCorrelate.addAll(messageRepositoryEventsToCorrelate);
            }
        }
        log.debug("Found {} old messages in the db to correlate with this process instance", (Object)eventsToCorrelate.size());
        for (Message message : eventsToCorrelate) {
            ProcessUpdate processUpdate = ProcessUpdate.messageReceived().originProcessId(processInstance.getOriginProcessId()).messageReference(message.getId()).messageName(message.getMessageName()).idempotenceId(message.getIdempotenceId()).build();
            this.processUpdateRepository.save(processUpdate);
        }
        processInstance.correlatedAt(processDataList.stream().map(ProcessData::getCreatedAt).max(ChronoZonedDateTime::compareTo).orElse(ZonedDateTime.now()));
        return !eventsToCorrelate.isEmpty();
    }

    private String getBatchSizeGroup(List<ProcessUpdate> processUpdates) {
        int num = processUpdates.size();
        if (num <= 5) {
            return Integer.toString(num);
        }
        if (num <= 10) {
            return "5<n<11";
        }
        if (num <= 25) {
            return "10<n<26";
        }
        if (num <= 50) {
            return "25<n<51";
        }
        if (num <= 100) {
            return "50<n<100";
        }
        return "100<n";
    }

    @Generated
    public ProcessInstanceService(InternalMessageProducer internalMessageProducer, ProcessUpdateQueryRepository processUpdateQueryRepository, ProcessInstanceRepository processInstanceRepository, ProcessTemplateRepository processTemplateRepository, MessageRepository messageRepository, ProcessUpdateRepository processUpdateRepository, ProcessSnapshotService processSnapshotService, Transactions transactions, MetricsListener metricsListener, PcsConfigProperties pcsConfigProperties) {
        this.internalMessageProducer = internalMessageProducer;
        this.processUpdateQueryRepository = processUpdateQueryRepository;
        this.processInstanceRepository = processInstanceRepository;
        this.processTemplateRepository = processTemplateRepository;
        this.messageRepository = messageRepository;
        this.processUpdateRepository = processUpdateRepository;
        this.processSnapshotService = processSnapshotService;
        this.transactions = transactions;
        this.metricsListener = metricsListener;
        this.pcsConfigProperties = pcsConfigProperties;
    }

    private static class ProcessUpdateFailedException
    extends RuntimeException {
        private final transient ProcessUpdate failedProcessUpdate;

        private ProcessUpdateFailedException(ProcessUpdate processUpdate, String message, Throwable cause) {
            super(message, cause);
            this.failedProcessUpdate = processUpdate;
        }

        private static ProcessUpdateFailedException createProcessUpdateFailed(ProcessUpdate processUpdate, Throwable cause) {
            String message = String.format("Failed creating process with originId %s for a create process update for the template %s and the message %s.", processUpdate.getOriginProcessId(), processUpdate.getParams(), processUpdate.getMessageName());
            return new ProcessUpdateFailedException(processUpdate, message, cause);
        }

        @Generated
        public ProcessUpdate getFailedProcessUpdate() {
            return this.failedProcessUpdate;
        }
    }
}

