package org.sourcelab.kafka.webview.ui.manager.socket;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.SessionIdentifier;
import org.sourcelab.kafka.webview.ui.manager.kafka.SocketKafkaConsumer;
import org.sourcelab.kafka.webview.ui.manager.kafka.WebKafkaConsumerFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.config.FilterDefinition;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
import org.sourcelab.kafka.webview.ui.model.View;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.SimpMessagingTemplate;

/* loaded from: input_file:BOOT-INF/classes/org/sourcelab/kafka/webview/ui/manager/socket/WebSocketConsumersManager.class */
public class WebSocketConsumersManager implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) WebSocketConsumersManager.class);
    private final Map<ConsumerKey, ConsumerEntry> consumers = new ConcurrentHashMap();
    private final WebKafkaConsumerFactory webKafkaConsumerFactory;
    private final SimpMessagingTemplate simpMessagingTemplate;
    private final ThreadPoolExecutor threadPoolExecutor;

    /* loaded from: input_file:BOOT-INF/classes/org/sourcelab/kafka/webview/ui/manager/socket/WebSocketConsumersManager$ConsumerEntry.class */
    private static class ConsumerEntry {
        private final SocketKafkaConsumer socketKafkaConsumer;
        private boolean shouldStop = false;
        private boolean isPaused = false;

        public ConsumerEntry(SocketKafkaConsumer socketKafkaConsumer) {
            this.socketKafkaConsumer = socketKafkaConsumer;
        }

        public Optional<KafkaResult> nextResult() {
            return this.isPaused ? Optional.empty() : this.socketKafkaConsumer.nextResult();
        }

        public synchronized boolean isShouldStop() {
            return this.shouldStop;
        }

        public synchronized void requestStop() {
            this.socketKafkaConsumer.requestStop();
            this.shouldStop = true;
        }

        public synchronized void requestPause() {
            this.isPaused = true;
        }

        public synchronized void requestResume() {
            this.isPaused = false;
        }
    }

    /* loaded from: input_file:BOOT-INF/classes/org/sourcelab/kafka/webview/ui/manager/socket/WebSocketConsumersManager$ConsumerKey.class */
    private static class ConsumerKey {
        private final long viewId;
        private final long userId;
        private final String sessionId;

        public ConsumerKey(long j, SessionIdentifier sessionIdentifier) {
            this.viewId = j;
            this.userId = sessionIdentifier.getUserId();
            this.sessionId = sessionIdentifier.getSessionId();
        }

        public long getViewId() {
            return this.viewId;
        }

        public long getUserId() {
            return this.userId;
        }

        public String getSessionId() {
            return this.sessionId;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ConsumerKey consumerKey = (ConsumerKey) obj;
            if (this.viewId == consumerKey.viewId && this.userId == consumerKey.userId) {
                return this.sessionId.equals(consumerKey.sessionId);
            }
            return false;
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.viewId ^ (this.viewId >>> 32)))) + ((int) (this.userId ^ (this.userId >>> 32))))) + this.sessionId.hashCode();
        }
    }

    public WebSocketConsumersManager(WebKafkaConsumerFactory webKafkaConsumerFactory, SimpMessagingTemplate simpMessagingTemplate, int i) {
        this.webKafkaConsumerFactory = webKafkaConsumerFactory;
        this.simpMessagingTemplate = simpMessagingTemplate;
        this.threadPoolExecutor = new ThreadPoolExecutor(i, i, 5L, TimeUnit.MINUTES, new LinkedBlockingQueue(100));
    }

    public void addNewConsumer(View view, Collection<FilterDefinition> collection, StartingPosition startingPosition, SessionIdentifier sessionIdentifier) {
        synchronized (this.consumers) {
            ConsumerKey consumerKey = new ConsumerKey(view.getId().longValue(), sessionIdentifier);
            if (this.consumers.containsKey(consumerKey)) {
                throw new RuntimeException("Consumer already exists!");
            }
            SocketKafkaConsumer createWebSocketClient = this.webKafkaConsumerFactory.createWebSocketClient(view, collection, startingPosition, sessionIdentifier);
            this.consumers.put(consumerKey, new ConsumerEntry(createWebSocketClient));
            this.threadPoolExecutor.execute(createWebSocketClient);
            logger.info("Added new web socket consumer, now has {}/{} running consumers", Integer.valueOf(this.threadPoolExecutor.getActiveCount()), Integer.valueOf(this.threadPoolExecutor.getMaximumPoolSize()));
        }
    }

    public void removeConsumersForSessionId(String str) {
        synchronized (this.consumers) {
            for (Map.Entry<ConsumerKey, ConsumerEntry> entry : this.consumers.entrySet()) {
                if (entry.getKey().getSessionId().equals(str)) {
                    entry.getValue().requestStop();
                }
            }
        }
    }

    public void pauseConsumer(long j, SessionIdentifier sessionIdentifier) {
        synchronized (this.consumers) {
            ConsumerKey consumerKey = new ConsumerKey(j, sessionIdentifier);
            if (this.consumers.containsKey(consumerKey)) {
                this.consumers.get(consumerKey).requestPause();
            }
        }
    }

    public void resumeConsumer(long j, SessionIdentifier sessionIdentifier) {
        synchronized (this.consumers) {
            ConsumerKey consumerKey = new ConsumerKey(j, sessionIdentifier);
            if (this.consumers.containsKey(consumerKey)) {
                this.consumers.get(consumerKey).requestResume();
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            boolean z = false;
            ArrayList arrayList = new ArrayList();
            for (Map.Entry<ConsumerKey, ConsumerEntry> entry : this.consumers.entrySet()) {
                try {
                    ConsumerKey key = entry.getKey();
                    ConsumerEntry value = entry.getValue();
                    if (value.isShouldStop()) {
                        arrayList.add(key);
                    } else {
                        Optional<KafkaResult> nextResult = value.nextResult();
                        if (nextResult.isPresent()) {
                            z = true;
                            String str = "/topic/view/" + key.getViewId() + "/" + key.getUserId();
                            SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
                            create.setSessionId(key.getSessionId());
                            create.setLeaveMutable(true);
                            this.simpMessagingTemplate.convertAndSendToUser(key.getSessionId(), str, nextResult.get(), create.getMessageHeaders());
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), (Throwable) e);
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.consumers.remove((ConsumerKey) it.next());
                logger.info("Removed web socket consumer, now has ~ {}/{} running consumers", Integer.valueOf(this.threadPoolExecutor.getActiveCount()), Integer.valueOf(this.threadPoolExecutor.getMaximumPoolSize()));
            }
            if (!z) {
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e2) {
                    this.threadPoolExecutor.shutdown();
                    return;
                }
            }
        }
    }
}
