/*
 * Decompiled with CFR 0.152.
 */
package org.piangles.gateway.events;

import java.time.Duration;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.piangles.backbone.services.Locator;
import org.piangles.backbone.services.logging.LoggingService;
import org.piangles.backbone.services.msg.Event;
import org.piangles.backbone.services.msg.Topic;
import org.piangles.core.util.coding.JSON;
import org.piangles.gateway.client.ClientDetails;
import org.piangles.gateway.events.EventDispatcher;

public final class EventListener
implements Runnable {
    private static final int DEFAULT_WAIT_TIME = 100;
    private static final int MAX_ERROR_LIMIT = 10;
    private LoggingService logger = Locator.getInstance().getLoggingService();
    private ClientDetails clientDetails = null;
    private KafkaConsumer<String, String> consumer = null;
    private EventDispatcher eventDispatcher = null;
    private int errorCount = 0;
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);

    public EventListener(ClientDetails clientDetails, KafkaConsumer<String, String> consumer, EventDispatcher eventDispatcher) {
        this.clientDetails = clientDetails;
        this.consumer = consumer;
        this.eventDispatcher = eventDispatcher;
    }

    @Override
    public void run() {
        this.logger.info((Object)("Started listening for events for: " + this.clientDetails));
        while (!this.stopRequested.get()) {
            try {
                ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
                HashMap<Event, Topic> eventTopicMap = new HashMap<Event, Topic>();
                for (ConsumerRecord record : records) {
                    Event event = (Event)JSON.getDecoder().decode(((String)record.value()).getBytes(), Event.class);
                    eventTopicMap.put(event, new Topic(record.topic(), record.partition()));
                }
                this.eventDispatcher.dispatchAllEvents(eventTopicMap);
            }
            catch (Exception e) {
                this.logger.error((Object)"Exception while polling / composingEvent:", (Throwable)e);
                ++this.errorCount;
                if (this.errorCount <= 10) continue;
                this.logger.fatal((Object)("Event listener crossed the maximum limit of error : " + this.clientDetails));
                break;
            }
        }
        this.logger.info((Object)("Stopped listening for events for: " + this.clientDetails));
    }

    public void markForStopping() {
        this.logger.info((Object)("Stop listening for events requested for: " + this.clientDetails));
        this.stopRequested.set(true);
    }
}

