/*
 * Decompiled with CFR 0.152.
 */
package org.piangles.gateway.events;

import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.piangles.backbone.services.Locator;
import org.piangles.backbone.services.config.DefaultConfigProvider;
import org.piangles.backbone.services.logging.LoggingService;
import org.piangles.backbone.services.msg.Event;
import org.piangles.backbone.services.msg.Topic;
import org.piangles.core.resources.ConsumerProperties;
import org.piangles.core.resources.KafkaMessagingSystem;
import org.piangles.core.resources.ResourceException;
import org.piangles.core.resources.ResourceManager;
import org.piangles.core.util.abstractions.ConfigProvider;
import org.piangles.core.util.coding.JSON;
import org.piangles.gateway.client.ClientDetails;
import org.piangles.gateway.events.EventDispatcher;

public final class EventListener
extends Thread {
    private static final String COMPONENT_ID = "1a465968-c647-4fac-9d25-fbd70fa86fee";
    private static final int DEFAULT_SLEEP_TIME = 786;
    private static final int DEFAULT_WAIT_TIME = 100;
    private static final int MAX_ERROR_LIMIT = 10;
    private LoggingService logger = Locator.getInstance().getLoggingService();
    private ClientDetails clientDetails = null;
    private KafkaMessagingSystem kms = null;
    private ConsumerProperties consumerProps = null;
    private KafkaConsumer<String, String> consumer = null;
    private EventDispatcher eventDispatcher = null;
    private int errorCount = 0;
    private final AtomicBoolean topicsHaveChanged = new AtomicBoolean(false);
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);

    public EventListener(ClientDetails clientDetails, ConsumerProperties consumerProps, EventDispatcher eventDispatcher) throws ResourceException {
        this.clientDetails = clientDetails;
        this.consumerProps = consumerProps;
        this.eventDispatcher = eventDispatcher;
        this.kms = ResourceManager.getInstance().getKafkaMessagingSystem((ConfigProvider)new DefaultConfigProvider("GatewayService", COMPONENT_ID));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.logger.info((Object)("Started listening for events for: " + this.clientDetails));
        while (!this.stopRequested.get()) {
            try {
                ConsumerRecords records;
                if (this.consumer != null) {
                    records = this.consumer.poll(Duration.ofMillis(100L));
                    if (records.count() != 0) {
                        HashMap<Event, Topic> eventTopicMap = new HashMap<Event, Topic>();
                        for (ConsumerRecord record : records) {
                            Event event = (Event)JSON.getDecoder().decode(((String)record.value()).getBytes(), Event.class);
                            eventTopicMap.put(event, new Topic(record.topic(), record.partition()));
                        }
                        this.eventDispatcher.dispatchAllEvents(eventTopicMap);
                    }
                } else {
                    Thread.sleep(786L);
                }
                records = this.consumerProps;
                synchronized (records) {
                    if (this.topicsHaveChanged.get()) {
                        this.logger.info((Object)("Topics have changed. Creating consumer for Topics: " + this.consumerProps.getTopics()));
                        this.closeConsumer();
                        this.consumer = this.kms.createConsumer(this.consumerProps);
                        this.topicsHaveChanged.set(false);
                    }
                }
            }
            catch (Exception e) {
                this.logger.error((Object)"Exception while polling / composingEvent:", (Throwable)e);
                ++this.errorCount;
                if (this.errorCount <= 10) continue;
                this.logger.fatal((Object)("Event listener crossed the maximum limit of error : " + this.clientDetails));
                break;
            }
        }
        this.closeConsumer();
        this.logger.info((Object)("Stopped listening for events for: " + this.clientDetails));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void topicsHaveChanged() {
        ConsumerProperties consumerProperties = this.consumerProps;
        synchronized (consumerProperties) {
            this.topicsHaveChanged.set(true);
        }
    }

    public void markForStopping() {
        this.logger.info((Object)("Stop listening for events requested for: " + this.clientDetails));
        this.stopRequested.set(true);
    }

    private void closeConsumer() {
        try {
            if (this.consumer != null) {
                this.logger.debug((Object)("Closing existing EventListener->Consumer for: " + this.clientDetails));
                this.consumer.close();
            }
        }
        catch (Exception e) {
            this.logger.warn((Object)("Unable to close KafkaConsumer because of: " + e.getMessage()), (Throwable)e);
        }
    }
}

