/*
 * Decompiled with CFR 0.152.
 */
package cn.ponfee.disjob.alert;

import cn.ponfee.disjob.alert.configuration.AlerterProperties;
import cn.ponfee.disjob.alert.sender.AlertSender;
import cn.ponfee.disjob.common.base.SingletonClassConstraint;
import cn.ponfee.disjob.common.collect.SlidingWindow;
import cn.ponfee.disjob.common.concurrent.NamedThreadFactory;
import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.core.alert.AlertEvent;
import cn.ponfee.disjob.core.base.GroupInfoService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.event.EventListener;

public class Alerter
extends SingletonClassConstraint
implements DisposableBean {
    public static final String KEY_PREFIX = "disjob.alert";
    public static final String ENABLED_KEY_EXPRESSION = "${disjob.alert.enabled:true}";
    public static final String SENDER_CONFIG_KEY_PREFIX = "disjob.alert.sender";
    public static final String USER_RECIPIENT_MAPPER_BEAN_NAME_PREFIX = "disjob.alert.user_recipient_mapper";
    private static final Logger LOG = LoggerFactory.getLogger(Alerter.class);
    private final AlerterProperties config;
    private final GroupInfoService groupInfoService;
    private final ThreadPoolExecutor alarmAsyncExecutor;
    private final ThreadPoolExecutor noticeAsyncExecutor;
    private final ConcurrentHashMap<String, SlidingWindow> rateLimiterMap = new ConcurrentHashMap();

    public Alerter(AlerterProperties config, GroupInfoService groupInfoService) {
        this.config = Objects.requireNonNull(config, "Alerter config cannot be null.");
        this.groupInfoService = Objects.requireNonNull(groupInfoService, "Group info service cannot be null.");
        this.alarmAsyncExecutor = Alerter.createThreadPoolExecutor(config);
        this.noticeAsyncExecutor = Alerter.createThreadPoolExecutor(config);
    }

    @EventListener
    public void onAlertEvent(AlertEvent event) {
        try {
            this.alert(event);
        }
        catch (Throwable t) {
            LOG.warn("Alert event occur error: " + event, t);
        }
    }

    public void destroy() throws Exception {
        ThreadPoolExecutors.shutdown((ExecutorService)this.noticeAsyncExecutor, (int)this.config.getSendThreadPool().getAwaitTerminationSeconds());
        ThreadPoolExecutors.shutdown((ExecutorService)this.alarmAsyncExecutor, (int)this.config.getSendThreadPool().getAwaitTerminationSeconds());
    }

    private static ThreadPoolExecutor createThreadPoolExecutor(AlerterProperties config) {
        AlerterProperties.SendThreadPool pool = config.getSendThreadPool();
        return ThreadPoolExecutors.builder().corePoolSize(pool.getCorePoolSize()).maximumPoolSize(pool.getMaximumPoolSize()).workQueue(new LinkedBlockingQueue(pool.getQueueCapacity())).keepAliveTimeSeconds((long)pool.getKeepAliveTimeSeconds()).allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeOut()).threadFactory((ThreadFactory)NamedThreadFactory.builder().prefix("alert_async_send_thread").build()).rejectedHandler((task, executor) -> LOG.warn("Alert event be discard: {}", (Object)((AlertTask)task).event)).build();
    }

    private void alert(AlertEvent event) {
        Object[] channels = this.config.getTypeChannelsMap().get(event.getAlertType());
        if (ArrayUtils.isEmpty((Object[])channels)) {
            return;
        }
        Set alertUsers = this.groupInfoService.getAlertUsers(event.getGroup());
        String webhook = this.groupInfoService.getWebhook(event.getGroup());
        if (CollectionUtils.isEmpty((Collection)alertUsers) && StringUtils.isBlank((CharSequence)webhook)) {
            return;
        }
        ThreadPoolExecutor executor = event.getAlertType().isAlarm() ? this.alarmAsyncExecutor : this.noticeAsyncExecutor;
        executor.execute(new AlertTask(event, (String[])channels, alertUsers, webhook));
    }

    private class AlertTask
    implements Runnable {
        private final AlertEvent event;
        private final String[] channels;
        private final Set<String> alertUsers;
        private final String webhook;

        AlertTask(AlertEvent event, String[] channels, Set<String> alertUsers, String webhook) {
            this.event = event;
            this.channels = channels;
            this.alertUsers = alertUsers;
            this.webhook = webhook;
        }

        @Override
        public void run() {
            AlerterProperties.SendRateLimit sendRateLimit = Alerter.this.config.getSendRateLimit();
            SlidingWindow slidingWindow = Alerter.this.rateLimiterMap.computeIfAbsent(this.event.buildRateLimitKey(), key -> new SlidingWindow(sendRateLimit.getMaxRequests(), sendRateLimit.getWindowSizeInMillis()));
            if (!slidingWindow.tryAcquire()) {
                LOG.warn("Alert event rate limited: {}", (Object)this.event);
                return;
            }
            Arrays.stream(this.channels).map(AlertSender::get).filter(Objects::nonNull).forEach(sender -> {
                try {
                    sender.send(this.event, this.alertUsers, this.webhook);
                }
                catch (Throwable t) {
                    LOG.error("Alert event send error: " + this.event, t);
                }
            });
        }
    }
}

