/*
 * Decompiled with CFR 0.152.
 */
package org.piangles.gateway.events;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.piangles.backbone.services.Locator;
import org.piangles.backbone.services.config.DefaultConfigProvider;
import org.piangles.backbone.services.logging.LoggingService;
import org.piangles.backbone.services.msg.Event;
import org.piangles.backbone.services.msg.Topic;
import org.piangles.core.resources.ConsumerProperties;
import org.piangles.core.resources.KafkaMessagingSystem;
import org.piangles.core.resources.ResourceException;
import org.piangles.core.resources.ResourceManager;
import org.piangles.core.util.abstractions.ConfigProvider;
import org.piangles.gateway.client.ClientDetails;
import org.piangles.gateway.events.EventDispatcher;
import org.piangles.gateway.events.EventListener;
import org.piangles.gateway.events.EventProcessor;
import org.piangles.gateway.events.EventRouter;
import org.piangles.gateway.events.KafkaConsumerManager;

public class EventProcessingManager
implements EventDispatcher {
    private static final String COMPONENT_ID = "1a465968-c647-4fac-9d25-fbd70fa86fee";
    private LoggingService logger = Locator.getInstance().getLoggingService();
    private ClientDetails clientDetails = null;
    private Map<Topic, UUID> topicTraceIdMap = null;
    private boolean restartEventListener = true;
    private KafkaMessagingSystem kms = null;
    private KafkaConsumer<String, String> consumer = null;
    private EventListener eventListener = null;

    public EventProcessingManager(ClientDetails clientDetails) throws ResourceException {
        this.clientDetails = clientDetails;
        this.topicTraceIdMap = new HashMap<Topic, UUID>();
        this.kms = ResourceManager.getInstance().getKafkaMessagingSystem((ConfigProvider)new DefaultConfigProvider("GatewayService", COMPONENT_ID));
    }

    public synchronized void restart() {
        this.stop();
        this.start();
    }

    public synchronized void stop() {
        KafkaConsumerManager.getInstance().closeOrMarkForClose(this.consumer);
        if (this.eventListener != null) {
            this.eventListener.markForStopping();
        }
    }

    public synchronized void subscribeToTopics(Map<Topic, UUID> topicTraceIdMap) {
        this.logger.info((Object)("Subscribing to " + topicTraceIdMap.keySet()));
        this.topicTraceIdMap.putAll(topicTraceIdMap);
        this.restartEventListener = true;
    }

    public synchronized void unsubscribeTopics(List<Topic> topics) {
        this.logger.info((Object)("Unsubscribing to " + topics));
        topics.stream().forEach(topic -> this.topicTraceIdMap.remove(topics));
        this.restartEventListener = true;
    }

    @Override
    public synchronized void dispatchAllEvents(Map<Event, Topic> toBeDispactedTopicEventMap) throws Exception {
        for (Map.Entry<Event, Topic> entry : toBeDispactedTopicEventMap.entrySet()) {
            Event event = entry.getKey();
            try {
                EventProcessor mp = EventRouter.getInstance().getProcessor(event);
                if (mp == null) continue;
                event.setTraceId(this.topicTraceIdMap.get(entry.getValue()));
                mp.process(this.clientDetails, event);
            }
            catch (Exception e) {
                this.logger.error((Object)("Unexpected Eror while processing Event : " + event), (Throwable)e);
            }
        }
        if (this.restartEventListener) {
            this.restart();
        }
    }

    private void start() {
        this.restartEventListener = false;
        if (this.topicTraceIdMap.size() != 0) {
            ConsumerProperties consumerProps = new ConsumerProperties(this.clientDetails.getSessionDetails().getUserId());
            List modifiedTopics = this.topicTraceIdMap.keySet().stream().map(topic -> {
                ConsumerProperties consumerProperties = consumerProps;
                consumerProperties.getClass();
                return new ConsumerProperties.Topic(consumerProperties, topic.getTopicName(), topic.getPartition(), topic.shouldReadEarliest());
            }).collect(Collectors.toList());
            consumerProps.setTopics(modifiedTopics);
            this.consumer = this.kms.createConsumer(consumerProps);
            KafkaConsumerManager.getInstance().addNewConsumer(this.consumer);
            this.eventListener = new EventListener(this.clientDetails, this.consumer, this);
            Thread thread = new Thread(this.eventListener);
            thread.start();
        } else {
            this.logger.info((Object)("No topics to listen for: " + this.clientDetails));
        }
    }
}

