/*
 * Decompiled with CFR 0.152.
 */
package org.piangles.gateway.events;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.piangles.backbone.services.Locator;
import org.piangles.backbone.services.logging.LoggingService;

public class KafkaConsumerManager {
    private LoggingService logger = Locator.getInstance().getLoggingService();
    private static KafkaConsumerManager self = null;
    private Map<KafkaConsumer<String, String>, Long> consumerThreadMap = new HashMap<KafkaConsumer<String, String>, Long>();
    private Map<Long, KafkaConsumer<String, String>> closedConsumerMap = new HashMap<Long, KafkaConsumer<String, String>>();

    private KafkaConsumerManager() {
    }

    public static final synchronized KafkaConsumerManager getInstance() {
        if (self == null) {
            self = new KafkaConsumerManager();
        }
        return self;
    }

    public synchronized void addNewConsumer(KafkaConsumer<String, String> consumer) {
        this.consumerThreadMap.put(consumer, Thread.currentThread().getId());
    }

    public synchronized void closeOrMarkForClose(KafkaConsumer<String, String> consumer) {
        if (consumer != null) {
            long currentThreadId = Thread.currentThread().getId();
            Long threadId = this.consumerThreadMap.get(consumer);
            if (threadId != null && currentThreadId == threadId) {
                this.closeConsumer(consumer);
            } else {
                this.consumerThreadMap.remove(consumer);
                this.closedConsumerMap.put(threadId, consumer);
            }
        }
    }

    public synchronized void closeAnyMarked() {
        this.closeConsumer(this.closedConsumerMap.remove(Thread.currentThread().getId()));
    }

    private void closeConsumer(KafkaConsumer<String, String> consumer) {
        try {
            if (consumer != null) {
                consumer.close();
            }
        }
        catch (Exception e) {
            this.logger.warn((Object)("Unable to close KafkaConsumer because of: " + e.getMessage()), (Throwable)e);
        }
    }
}

