package com.github.kaizen4j.redis.listener;

import com.github.kaizen4j.redis.connection.DelayMessage;
import com.github.kaizen4j.redis.connection.MessageConfig;
import com.github.kaizen4j.redis.message.RedisMessageTemplate;
import com.github.kaizen4j.thread.ThreadPoolTaskExecutorBuilder;
import com.github.kaizen4j.util.JsonUtils;
import com.github.kaizen4j.util.ObjectUtils;
import com.google.common.base.Preconditions;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;

/* loaded from: input_file:com/github/kaizen4j/redis/listener/RedisDelayMessageListenerContainer.class */
public class RedisDelayMessageListenerContainer implements InitializingBean, DisposableBean, SmartLifecycle {
    private static final String FETCH_TOPIC_SCRIPT_PATH = "script/FETCH.lua";
    private ThreadPoolTaskExecutor listenExecutor;
    private Executor taskExecutor;
    private MessageConfig messageConfig;
    private RedisMessageTemplate redisMessageTemplate;
    private String script;
    private static final Logger logger = LoggerFactory.getLogger(RedisDelayMessageListenerContainer.class);
    private static final long DEFAULT_WAIT_TIME = TimeUnit.SECONDS.toMillis(1);
    private static final long DEFAULT_INIT_WAIT = TimeUnit.SECONDS.toMillis(30);
    private volatile boolean running = false;
    private final Map<ByteArrayWrapper, Collection<DelayMessageListener>> topicListeners = new ConcurrentHashMap();
    private RedisSerializer<String> serializer = RedisSerializer.string();
    private long recoveryInterval = DEFAULT_WAIT_TIME;

    /* loaded from: input_file:com/github/kaizen4j/redis/listener/RedisDelayMessageListenerContainer$DispatchMessageListener.class */
    private class DispatchMessageListener implements DelayMessageListener {
        private DispatchMessageListener() {
        }

        @Override // com.github.kaizen4j.redis.listener.DelayMessageListener
        public void onMessage(DelayMessage delayMessage, @Nullable byte[] bArr) {
            Collection<DelayMessageListener> collection = (Collection) RedisDelayMessageListenerContainer.this.topicListeners.get(new ByteArrayWrapper(delayMessage.getChannel()));
            if (CollectionUtils.isNotEmpty(collection)) {
                try {
                    if (removeDelayMessageIfNecessary(delayMessage, collection)) {
                        RedisDelayMessageListenerContainer.logger.info("DelayMessage has consumed with id [{}] body [{}] on topic [{}]", new Object[]{delayMessage.getMessageId(), delayMessage.getBodyString(), delayMessage.getChannelString()});
                        return;
                    }
                } catch (Exception e) {
                    RedisDelayMessageListenerContainer.logger.error("Remove DelayMessage if necessary error: {}", delayMessage.getMessageId(), e);
                }
                byte[] channel = Objects.nonNull(bArr) ? (byte[]) bArr.clone() : delayMessage.getChannel();
                for (DelayMessageListener delayMessageListener : collection) {
                    RedisDelayMessageListenerContainer.this.taskExecutor.execute(() -> {
                        RedisDelayMessageListenerContainer.this.processMessage(delayMessageListener, delayMessage, channel);
                    });
                }
            }
        }

        private boolean removeDelayMessageIfNecessary(DelayMessage delayMessage, Collection<DelayMessageListener> collection) {
            String messageId = delayMessage.getMessageId();
            long messageAck = RedisDelayMessageListenerContainer.this.redisMessageTemplate.getMessageAck(messageId);
            if (RedisDelayMessageListenerContainer.logger.isDebugEnabled()) {
                RedisDelayMessageListenerContainer.logger.debug("Get DelayMessage ack [{}] listener size [{}] with id [{}]", new Object[]{Long.valueOf(messageAck), Integer.valueOf(CollectionUtils.size(collection)), messageId});
            }
            return messageAck >= ((long) CollectionUtils.size(collection)) && RedisDelayMessageListenerContainer.this.redisMessageTemplate.removeMessage(delayMessage.getChannelString(), messageId);
        }
    }

    public void setMessageConfig(MessageConfig messageConfig) {
        this.messageConfig = messageConfig;
    }

    public void setRedisMessageTemplate(RedisMessageTemplate redisMessageTemplate) {
        this.redisMessageTemplate = redisMessageTemplate;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    public void setMessageListeners(Map<? extends DelayMessageListener, Collection<? extends Topic>> map) {
        if (MapUtils.isNotEmpty(map)) {
            for (Map.Entry<? extends DelayMessageListener, Collection<? extends Topic>> entry : map.entrySet()) {
                addListener(entry.getKey(), entry.getValue());
            }
        }
    }

    private void addListener(DelayMessageListener delayMessageListener, Collection<? extends Topic> collection) {
        Assert.notNull(delayMessageListener, "A valid listener is required");
        Assert.notEmpty(collection, "At least one topic is required");
        for (Topic topic : collection) {
            ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(this.serializer.serialize(topic.getTopic()));
            Collection<DelayMessageListener> collection2 = this.topicListeners.get(byteArrayWrapper);
            if (Objects.isNull(collection2)) {
                collection2 = new CopyOnWriteArraySet();
                this.topicListeners.put(byteArrayWrapper, collection2);
            }
            collection2.add(delayMessageListener);
            if (logger.isDebugEnabled()) {
                logger.debug("Adding listener [{}] on topic [{}]", delayMessageListener, topic.getTopic());
            }
        }
    }

    private void lazyListen() {
        for (ByteArrayWrapper byteArrayWrapper : this.topicListeners.keySet()) {
            this.listenExecutor.execute(() -> {
                try {
                    Thread.sleep(DEFAULT_INIT_WAIT);
                    while (true) {
                        if (!listenTopic(unwrap(byteArrayWrapper))) {
                            Thread.sleep(this.recoveryInterval);
                        }
                    }
                } catch (Exception e) {
                }
            });
        }
    }

    private boolean listenTopic(String str) {
        List<String> doListen = doListen(str);
        if (logger.isDebugEnabled()) {
            logger.debug("Listen DelayMessage on topic [{}] ids {}", str, JsonUtils.toJson(doListen));
        }
        doListen.forEach(str2 -> {
            String messageBody = this.redisMessageTemplate.getMessageBody(str2);
            if (logger.isDebugEnabled()) {
                logger.debug("Get DelayMessage body [{}] with id [{}]", messageBody, str2);
            }
            if (!StringUtils.isNotBlank(messageBody)) {
                logger.info("DelayMessage body has expired on topic [{}] delete with id [{}] result [{}]", new Object[]{str, str2, Boolean.valueOf(this.redisMessageTemplate.removeMessage(str, str2))});
            } else {
                DelayMessage delayMessage = new DelayMessage(str, messageBody, Duration.ZERO);
                delayMessage.setMessageId(str2);
                new DispatchMessageListener().onMessage(delayMessage, ObjectUtils.toBytes(str));
            }
        });
        return CollectionUtils.isNotEmpty(doListen);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v1, types: [byte[], byte[][]] */
    private List<String> doListen(String str) {
        try {
            Optional eval = eval(this.script, ReturnType.MULTI, 1, new byte[]{ObjectUtils.toBytes(str), String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8), BigDecimal.ZERO.toPlainString().getBytes(StandardCharsets.UTF_8), String.valueOf(this.messageConfig.getFetchSize()).getBytes(StandardCharsets.UTF_8)});
            if (eval.isPresent()) {
                RedisSerializer valueSerializer = this.redisMessageTemplate.getRedisTemplate().getValueSerializer();
                Stream stream = ((List) eval.get()).stream();
                valueSerializer.getClass();
                return (List) stream.map(valueSerializer::deserialize).collect(Collectors.toList());
            }
        } catch (Exception e) {
            logger.error("Listen DelayMessage on topic failed [{}]", str, e);
        }
        return Collections.emptyList();
    }

    private <T> Optional<T> eval(String str, ReturnType returnType, int i, byte[]... bArr) {
        try {
            return Optional.ofNullable(this.redisMessageTemplate.getRedisTemplate().execute(redisConnection -> {
                return redisConnection.eval(ObjectUtils.toBytes(str), returnType, i, bArr);
            }));
        } catch (Exception e) {
            logger.error("Eval fetch script failed [{}] ", str, e);
            return Optional.empty();
        }
    }

    private String unwrap(ByteArrayWrapper byteArrayWrapper) {
        return StringUtils.toEncodedString(byteArrayWrapper.getArray(), StandardCharsets.UTF_8);
    }

    protected void processMessage(DelayMessageListener delayMessageListener, DelayMessage delayMessage, byte[] bArr) {
        executeListener(delayMessageListener, delayMessage, bArr);
    }

    protected void executeListener(DelayMessageListener delayMessageListener, DelayMessage delayMessage, byte[] bArr) {
        try {
            logger.info("MessageListener [{}] consume DelayMessage [{}] on topic [{}]", new Object[]{delayMessageListener.getClass(), delayMessage.getBodyString(), delayMessage.getChannelString()});
            delayMessageListener.onMessage(delayMessage, bArr);
        } catch (Exception e) {
            logger.error("MessageListener onMessage error：{}，{}", new Object[]{new String(bArr, StandardCharsets.UTF_8), delayMessage, e});
        }
    }

    public void destroy() {
        stop();
        if (this.taskExecutor instanceof ThreadPoolTaskExecutor) {
            this.taskExecutor.destroy();
            if (logger.isDebugEnabled()) {
                logger.debug("Stopped internally-managed task executor");
            }
        }
        this.listenExecutor.destroy();
        if (logger.isDebugEnabled()) {
            logger.debug("Stopped internally-managed listen executor");
        }
    }

    public void afterPropertiesSet() {
        Preconditions.checkState(MapUtils.isNotEmpty(this.topicListeners), "A valid listener and one topic is required");
        Assert.notNull(this.messageConfig, "MessageConfig must not be null!");
        Assert.notNull(this.redisMessageTemplate, "RedisMessageTemplate must not be null!");
        if (Objects.isNull(this.taskExecutor)) {
            this.taskExecutor = new ThreadPoolTaskExecutorBuilder().threadNamePrefix("DelayMessageListenerExecutor-").build();
        }
        if (Objects.isNull(this.listenExecutor)) {
            this.listenExecutor = new ThreadPoolTaskExecutorBuilder().corePoolSize(this.topicListeners.keySet().size()).threadNamePrefix("DelayTopicListenerExecutor-").build();
        }
        try {
            this.script = IOUtils.toString(new ClassPathResource(FETCH_TOPIC_SCRIPT_PATH).getInputStream(), StandardCharsets.UTF_8).replace("#delayMills", String.valueOf(Duration.ofSeconds(this.messageConfig.getRetryDelaySeconds()).toMillis()));
            if (logger.isDebugEnabled()) {
                logger.debug("Initial fetch script done [{}]", this.script);
            }
        } catch (Exception e) {
            throw new IllegalStateException("Initial fetch script failed", e);
        }
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        lazyListen();
        if (logger.isDebugEnabled()) {
            logger.debug("Started RedisDelayMessageListenerContainer");
        }
    }

    public void stop() {
        if (isRunning()) {
            this.running = false;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Stopped RedisDelayMessageListenerContainer");
        }
    }

    public boolean isRunning() {
        return this.running;
    }
}
