package com.github.sonus21.rqueue.listener;

import com.github.sonus21.rqueue.core.Job;
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
import com.github.sonus21.rqueue.core.RqueueMessage;
import com.github.sonus21.rqueue.core.middleware.HandlerMiddleware;
import com.github.sonus21.rqueue.core.middleware.Middleware;
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
import com.github.sonus21.rqueue.models.db.MessageMetadata;
import com.github.sonus21.rqueue.models.enums.ExecutionStatus;
import com.github.sonus21.rqueue.models.enums.MessageStatus;
import com.github.sonus21.rqueue.utils.Constants;
import com.github.sonus21.rqueue.utils.QueueThreadPool;
import java.util.Collections;
import java.util.List;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;

/* loaded from: input_file:com/github/sonus21/rqueue/listener/RqueueExecutor.class */
class RqueueExecutor extends MessageContainerBase {
    private final PostProcessingHandler postProcessingHandler;
    private final QueueThreadPool queueThreadPool;
    private final RqueueMessage rqueueMessage;
    private final RqueueBeanProvider beanProvider;
    private final QueueDetail queueDetail;
    private final List<Middleware> middlewareList;
    private boolean updatedToProcessing;
    private JobImpl job;
    private ExecutionStatus status;
    private Throwable error;
    private int failureCount;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RqueueExecutor(RqueueBeanProvider rqueueBeanProvider, RqueueMessageListenerContainer.QueueStateMgr queueStateMgr, List<Middleware> list, PostProcessingHandler postProcessingHandler, RqueueMessage rqueueMessage, QueueDetail queueDetail, QueueThreadPool queueThreadPool) {
        super(LoggerFactory.getLogger(RqueueExecutor.class), queueDetail.getName(), queueStateMgr);
        this.middlewareList = list;
        this.postProcessingHandler = postProcessingHandler;
        this.beanProvider = rqueueBeanProvider;
        this.queueThreadPool = queueThreadPool;
        this.rqueueMessage = rqueueMessage;
        this.queueDetail = queueDetail;
    }

    private Object getUserMessage() {
        try {
            return RqueueMessageUtils.convertMessageToObject((Message<String>) MessageBuilder.createMessage(this.rqueueMessage.getMessage(), RqueueMessageHeaders.buildMessageHeaders(this.queueDetail.getName(), this.rqueueMessage, null, null, this.rqueueMessage.getMessageHeaders())), this.beanProvider.getRqueueMessageHandler().getMessageConverter());
        } catch (Exception e) {
            log(Level.DEBUG, "Unable to convert message {}", e, this.rqueueMessage.getMessage());
            return this.rqueueMessage.getMessage();
        }
    }

    private void init() {
        this.job = new JobImpl(this.beanProvider.getRqueueConfig(), this.beanProvider.getRqueueMessageMetadataService(), this.beanProvider.getRqueueJobDao(), this.beanProvider.getRqueueMessageTemplate(), this.beanProvider.getRqueueLockManager(), this.queueDetail, this.beanProvider.getRqueueMessageMetadataService().getOrCreateMessageMetadata(this.rqueueMessage), this.rqueueMessage, getUserMessage(), this.postProcessingHandler);
        this.failureCount = this.job.getRqueueMessage().getFailureCount();
    }

    private int getMaxRetryCount() {
        return this.job.getRqueueMessage().getRetryCount() == null ? this.job.getQueueDetail().getNumRetry() : this.job.getRqueueMessage().getRetryCount().intValue();
    }

    private void updateCounter(boolean z) {
        RqueueMetricsCounter rqueueMetricsCounter = this.beanProvider.getRqueueMetricsCounter();
        if (rqueueMetricsCounter == null) {
            return;
        }
        if (z) {
            rqueueMetricsCounter.updateFailureCount(this.job.getQueueDetail().getName());
        } else {
            rqueueMetricsCounter.updateExecutionCount(this.job.getQueueDetail().getName());
        }
    }

    private long maxExecutionTime() {
        return this.job.getQueueDetail().getVisibilityTimeout() - 1000;
    }

    private long getMaxProcessingTime() {
        return System.currentTimeMillis() + maxExecutionTime();
    }

    private boolean isMessageDeleted() {
        MessageMetadata messageMetadata = this.job.getMessageMetadata();
        if (!messageMetadata.isDeleted()) {
            messageMetadata.merge(this.beanProvider.getRqueueMessageMetadataService().getOrCreateMessageMetadata(this.job.getRqueueMessage()));
        }
        boolean isDeleted = messageMetadata.isDeleted();
        if (isDeleted) {
            if (this.rqueueMessage.isPeriodic()) {
                log(Level.INFO, "Periodic Message {} having period {} has been deleted", null, this.rqueueMessage.getId(), Long.valueOf(this.rqueueMessage.getPeriod()));
            } else {
                log(Level.INFO, "Message {} has been deleted", null, this.rqueueMessage.getId());
            }
        }
        return isDeleted;
    }

    private boolean shouldIgnore() {
        return !this.beanProvider.getPreExecutionMessageProcessor().process((Job) this.job);
    }

    private boolean isOldMessage() {
        return (this.job.getMessageMetadata().getRqueueMessage() == null || this.job.getMessageMetadata().getRqueueMessage().getQueuedTime() == this.job.getRqueueMessage().getQueuedTime()) ? false : true;
    }

    private int getRetryCount() {
        int maxRetryCount = getMaxRetryCount();
        return this.beanProvider.getRqueueConfig().getRetryPerPoll() == -1 ? maxRetryCount : Math.min(this.beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetryCount);
    }

    private boolean queueInactive() {
        return !isQueueActive(this.job.getQueueDetail().getName());
    }

    private ExecutionStatus getStatus() {
        if (queueInactive()) {
            return ExecutionStatus.QUEUE_INACTIVE;
        }
        if (shouldIgnore()) {
            return ExecutionStatus.IGNORED;
        }
        if (isMessageDeleted()) {
            return ExecutionStatus.DELETED;
        }
        if (isOldMessage()) {
            return ExecutionStatus.OLD_MESSAGE;
        }
        return null;
    }

    private void updateToProcessing() {
        if (this.updatedToProcessing) {
            return;
        }
        this.updatedToProcessing = true;
        this.job.updateMessageStatus(MessageStatus.PROCESSING);
    }

    private void logExecutionTimeWarning(long j, long j2, ExecutionStatus executionStatus) {
        if (System.currentTimeMillis() > j) {
            log(Level.WARN, "Message listener is taking longer time [Queue: {}, TaskStatus: {}] MaxAllowedTime: {}, ExecutionTime: {}", null, this.job.getQueueDetail().getName(), executionStatus, Long.valueOf(maxExecutionTime()), Long.valueOf(System.currentTimeMillis() - j2));
        }
    }

    private void begin() {
        this.job.execute();
        this.error = null;
        this.status = getStatus();
    }

    private void end() {
        if (this.status == null) {
            this.job.updateExecutionStatus(ExecutionStatus.FAILED, this.error);
        } else {
            this.job.updateExecutionStatus(this.status, this.error);
        }
    }

    private void callMiddlewares(int i, List<Middleware> list, Job job) throws Exception {
        if (i == list.size()) {
            new HandlerMiddleware(this.beanProvider.getRqueueMessageHandler()).handle(job, null);
        } else {
            list.get(i).handle(job, () -> {
                callMiddlewares(i + 1, list, job);
                return null;
            });
        }
    }

    private void processMessage() throws Exception {
        if (this.middlewareList == null) {
            callMiddlewares(0, Collections.emptyList(), this.job);
        } else {
            callMiddlewares(0, this.middlewareList, this.job);
        }
        this.status = ExecutionStatus.SUCCESSFUL;
    }

    private void execute() {
        try {
            updateToProcessing();
            updateCounter(false);
            processMessage();
        } catch (MessagingException e) {
            updateCounter(true);
            this.failureCount++;
            this.error = e;
        } catch (Exception e2) {
            updateCounter(true);
            this.failureCount++;
            this.error = e2;
            log(Level.ERROR, "Message execution failed, RqueueMessage: {}", e2, this.job.getRqueueMessage());
        }
    }

    private void handleMessage() {
        long maxProcessingTime = getMaxProcessingTime();
        long currentTimeMillis = System.currentTimeMillis();
        int retryCount = getRetryCount();
        int i = 1;
        do {
            log(Level.DEBUG, "Attempt {} message: {}", null, Integer.valueOf(i), this.job.getMessage());
            begin();
            if (this.status == null) {
                execute();
            }
            retryCount--;
            i++;
            end();
            if (retryCount <= 0 || this.status != null) {
                break;
            }
        } while (System.currentTimeMillis() < maxProcessingTime);
        this.postProcessingHandler.handle(this.job, this.status == null ? ExecutionStatus.FAILED : this.status, this.failureCount);
        logExecutionTimeWarning(maxProcessingTime, currentTimeMillis, this.status);
    }

    private long getTtlForScheduledMessageKey(RqueueMessage rqueueMessage) {
        long visibilityTimeout = (2 * this.job.getQueueDetail().getVisibilityTimeout()) / 1000;
        long processAt = (rqueueMessage.getProcessAt() - System.currentTimeMillis()) / 1000;
        if (processAt > 0) {
            visibilityTimeout += processAt;
        }
        return visibilityTimeout;
    }

    private String getScheduledMessageKey(RqueueMessage rqueueMessage) {
        return String.format("%s%s%s%ssch%s%d", this.job.getQueueDetail().getQueueName(), Constants.REDIS_KEY_SEPARATOR, this.job.getRqueueMessage().getId(), Constants.REDIS_KEY_SEPARATOR, Constants.REDIS_KEY_SEPARATOR, Long.valueOf(rqueueMessage.getProcessAt()));
    }

    private void schedulePeriodicMessage() {
        if (isMessageDeleted()) {
            return;
        }
        RqueueMessage build = this.job.getRqueueMessage().toBuilder().processAt(this.job.getRqueueMessage().nextProcessAt()).build();
        log(Level.DEBUG, "Schedule periodic message: {} Status: {}", null, this.job.getRqueueMessage(), this.beanProvider.getRqueueMessageTemplate().scheduleMessage(this.job.getQueueDetail().getScheduledQueueName(), getScheduledMessageKey(build), build, Long.valueOf(getTtlForScheduledMessageKey(build))));
    }

    private void handlePeriodicMessage() {
        schedulePeriodicMessage();
        handleMessage();
    }

    private void handle() {
        try {
            if (this.job.getRqueueMessage().isPeriodic()) {
                handlePeriodicMessage();
            } else {
                handleMessage();
            }
        } finally {
            this.queueThreadPool.release();
        }
    }

    @Override // com.github.sonus21.rqueue.utils.RetryableRunnable
    public void start() {
        try {
            init();
            handle();
        } catch (Exception e) {
            log(Level.WARN, "Executor init failed Msg: {}", e, this.rqueueMessage);
            release(this.postProcessingHandler, this.queueThreadPool, this.queueDetail, this.rqueueMessage);
        }
    }
}
