/*
 * Decompiled with CFR 0.152.
 */
package org.qubership.atp.mia.service;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.qubership.atp.integration.configuration.model.notification.Notification;
import org.qubership.atp.integration.configuration.service.NotificationService;
import org.qubership.atp.mia.config.SseProperties;
import org.qubership.atp.mia.exceptions.MiaException;
import org.qubership.atp.mia.kafka.producers.MiaExecutionFinishProducer;
import org.qubership.atp.mia.model.impl.ExecutionResponse;
import org.qubership.atp.mia.model.sse.SseEventType;
import org.qubership.atp.mia.model.sse.SsePingRunnable;
import org.qubership.atp.mia.service.AtpUserService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

@Service
public class SseEmitterService {
    private static final Logger log = LoggerFactory.getLogger(SseEmitterService.class);
    private final AtpUserService atpUserService;
    private final MiaExecutionFinishProducer miaExecutionFinishProducer;
    private final NotificationService notificationService;
    private final Map<UUID, SseEmitter> sseEmitters = new ConcurrentHashMap<UUID, SseEmitter>();
    private final Map<UUID, SsePingRunnable> ssePings = new ConcurrentHashMap<UUID, SsePingRunnable>();
    private final ExecutorService ssePingsExecutorService = Executors.newCachedThreadPool();
    private final SseProperties sseProperties;

    public SseEmitter generateAndConfigureEmitter(UUID sseId, String token) {
        SseEmitter emitter = new SseEmitter(this.sseProperties.getSseEmitterTimeout());
        this.sseEmitters.put(sseId, emitter);
        emitter.onError(throwable -> {
            log.error("Error while executing emitter", throwable);
            this.complete(sseId);
        });
        emitter.onCompletion(() -> this.complete(sseId));
        emitter.onTimeout(() -> {
            this.complete(sseId);
            this.prepareAndSendSseEmitterExpiredNotification(token);
        });
        SsePingRunnable ping = new SsePingRunnable(emitter, sseId, this.sseProperties.getSseEmitterPingTimeout());
        this.ssePings.put(sseId, ping);
        this.ssePingsExecutorService.execute(ping);
        return emitter;
    }

    public SseEmitter getEmitter(UUID sseId) {
        return this.sseEmitters.getOrDefault(sseId, null);
    }

    public void sendError(UUID sseId, MiaException e) {
        String message = e.getMessage();
        log.error("Exception occurred while sending a response throw emitter: {}", (Object)message, (Object)e);
        ExecutionResponse response = new ExecutionResponse();
        response.setSseId(sseId);
        response.setFinalMessage(true);
        response.setError(e);
        this.sendEventWithExecutionResult(response);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEventWithExecutionResult(SseEmitter emitter, ExecutionResponse executionResponse) {
        log.debug("Sending sse event for sseId = {}", (Object)executionResponse.getSseId());
        SseEmitter.SseEventBuilder executionEvent = SseEmitter.event().name(SseEventType.EXECUTION_FINISHED.name()).data((Object)executionResponse, MediaType.APPLICATION_JSON);
        try {
            log.info("Send response:\n{}", (Object)executionResponse);
            emitter.send(executionEvent);
        }
        catch (Exception e) {
            log.error("ERROR during sending sse event for sseId {}", (Object)executionResponse.getSseId(), (Object)e);
        }
        finally {
            if (executionResponse.isFinalMessage()) {
                emitter.complete();
            }
        }
    }

    public void sendEventWithExecutionResult(ExecutionResponse response) {
        int tryNumber = 1;
        do {
            SseEmitter sseEmitter;
            if ((sseEmitter = this.getEmitter(response.getSseId())) != null) {
                this.sendEventWithExecutionResult(sseEmitter, response);
                break;
            }
            if (!this.miaExecutionFinishProducer.isMock() && this.miaExecutionFinishProducer.executionFinishEventSend(response)) break;
            log.error("Not possible to sent response due to emitter absent and kafka is turned off (try number {}/3)", (Object)tryNumber);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        } while (++tryNumber <= 3);
    }

    public void updateResponseAndSendToEmitter(ExecutionResponse response, UUID sseId, boolean isFinal, int order) {
        if (response != null && sseId != null) {
            response.setSseId(sseId);
            response.setFinalMessage(isFinal);
            response.setOrder(order);
            this.sendEventWithExecutionResult(response);
        }
    }

    private void complete(UUID sseId) {
        SsePingRunnable ping = this.ssePings.get(sseId);
        if (ping != null) {
            ping.shutdown();
        }
        this.sseEmitters.remove(sseId);
        this.ssePings.remove(sseId);
    }

    private void prepareAndSendSseEmitterExpiredNotification(String token) {
        Notification notification = new Notification("SSE emitter is expired. Please establish new connection.", Notification.Type.INFO, this.atpUserService.getUserIdFromToken(token));
        this.notificationService.sendNotification(notification);
    }

    public SseEmitterService(AtpUserService atpUserService, MiaExecutionFinishProducer miaExecutionFinishProducer, NotificationService notificationService, SseProperties sseProperties) {
        this.atpUserService = atpUserService;
        this.miaExecutionFinishProducer = miaExecutionFinishProducer;
        this.notificationService = notificationService;
        this.sseProperties = sseProperties;
    }
}

