/*
 * Decompiled with CFR 0.152.
 */
package org.mydotey.kbear.client;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.mydotey.java.ObjectExtension;
import org.mydotey.java.StringExtension;
import org.mydotey.kbear.client.ConsumerHolder;
import org.mydotey.kbear.client.ConsumerRestartListener;
import org.mydotey.kbear.client.KafkaConsumerConfig;
import org.mydotey.kbear.client.KafkaMetaHolder;
import org.mydotey.kbear.client.KafkaMetaManager;
import org.mydotey.kbear.meta.Cluster;
import org.mydotey.kbear.meta.ConsumerGroup;
import org.mydotey.kbear.meta.ConsumerGroupId;
import org.mydotey.kbear.meta.Topic;
import org.mydotey.kbear.route.Route;
import org.mydotey.scf.ConfigurationManager;
import org.mydotey.scf.Property;
import org.mydotey.scf.facade.StringProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerProxy<K, V>
implements Consumer<K, V> {
    private static Logger _logger = LoggerFactory.getLogger(ConsumerProxy.class);
    protected static final long NO_CURRENT_THREAD = -1L;
    protected static final String JMX_PREFIX = "kafka.consumer";
    private KafkaMetaManager _metaManager;
    private KafkaConsumerConfig _kafkaConsumerConfig;
    private String _clientId;
    private String _groupName;
    private ConcurrentHashMap<String, ConsumerHolder> _consumerHolders;
    private ExecutorService _executorService;
    Property<String, Integer> _destroyDelay;
    private ScheduledExecutorService _scheduledExecutorService;
    private Field _recordsField;
    private volatile boolean _closed;
    private AtomicLong _currentThread;
    private AtomicInteger _refcount;
    private Object _addRemoveLock;

    public ConsumerProxy(ConfigurationManager configurationManager, KafkaMetaManager metaManager, KafkaConsumerConfig kafkaConsumerConfig) {
        ObjectExtension.requireNonNull((Object)configurationManager, (String)"configurationManager");
        ObjectExtension.requireNonNull((Object)metaManager, (String)"metaManager");
        ObjectExtension.requireNonNull((Object)kafkaConsumerConfig, (String)"kafkaConsumerConfig");
        this._clientId = kafkaConsumerConfig.getProperties().getProperty("client.id");
        if (StringExtension.isBlank((String)this._clientId)) {
            this._clientId = "";
        }
        this._groupName = kafkaConsumerConfig.getProperties().getProperty("group.id");
        ObjectExtension.requireNonBlank((String)this._groupName, (String)"group.id");
        this._metaManager = metaManager;
        this._kafkaConsumerConfig = kafkaConsumerConfig;
        this._consumerHolders = new ConcurrentHashMap();
        AtomicInteger threadCount = new AtomicInteger();
        this._executorService = Executors.newCachedThreadPool(r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("kafka.consumer-proxy.executor-" + this._clientId + "-" + threadCount.getAndIncrement());
            return thread;
        });
        StringProperties stringProperties = new StringProperties(configurationManager);
        this._destroyDelay = stringProperties.getIntProperty((Object)"kafka.consumer-proxy.consumer-destroy-delay", Integer.valueOf(1000), v -> v < 0 ? null : v);
        this._scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "kafka.consumer-proxy.scheduled-executor");
            thread.setDaemon(true);
            return thread;
        });
        try {
            this._recordsField = ConsumerRecords.class.getDeclaredField("records");
            this._recordsField.setAccessible(true);
        }
        catch (NoSuchFieldException | SecurityException e) {
            throw new UnsupportedOperationException("ConsumerRecords has not a field 'records'", e);
        }
        this._currentThread = new AtomicLong(-1L);
        this._refcount = new AtomicInteger();
        this._addRemoveLock = new Object();
    }

    public Set<TopicPartition> assignment() {
        return this.runWithoutConcurrency(() -> {
            HashSet result = new HashSet();
            this._consumerHolders.values().forEach((? super T c) -> result.addAll(c.getConsumer().assignment()));
            return Collections.unmodifiableSet(result);
        });
    }

    public Set<String> subscription() {
        return this.runWithoutConcurrency(() -> Collections.unmodifiableSet(new HashSet(this._consumerHolders.keySet())));
    }

    public void subscribe(Collection<String> topics) {
        ObjectExtension.requireNonNull(topics, (String)"topics");
        this.runWithoutConcurrency(() -> {
            Object object = this._addRemoveLock;
            synchronized (object) {
                this.removeNotUsed(topics);
                topics.forEach((? super T t) -> {
                    ConsumerHolder consumerHolder = this.getOrAddConsumer((String)t);
                    consumerHolder.getConsumer().subscribe(Arrays.asList(t));
                    consumerHolder.setConsumerRebalanceListener(null);
                    consumerHolder.setAssignments(null);
                });
            }
        });
    }

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        ObjectExtension.requireNonNull(topics, (String)"topics");
        ObjectExtension.requireNonNull((Object)callback, (String)"callback");
        this.runWithoutConcurrency(() -> {
            Object object = this._addRemoveLock;
            synchronized (object) {
                this.removeNotUsed(topics);
                topics.forEach((? super T t) -> {
                    ConsumerHolder consumerHolder = this.getOrAddConsumer((String)t);
                    consumerHolder.getConsumer().subscribe(Arrays.asList(t), callback);
                    consumerHolder.setConsumerRebalanceListener(callback);
                    consumerHolder.setAssignments(null);
                });
            }
        });
    }

    public void assign(Collection<TopicPartition> partitions) {
        ObjectExtension.requireNonNull(partitions, (String)"partitions");
        this.runWithoutConcurrency(() -> {
            Object object = this._addRemoveLock;
            synchronized (object) {
                if (partitions.isEmpty()) {
                    this.unsubscribe();
                    return;
                }
                Map<String, Collection<TopicPartition>> byTopic = this.toMap(partitions);
                this.removeNotUsed(byTopic.keySet());
                byTopic.entrySet().forEach((? super T e) -> {
                    ConsumerHolder consumerHolder = this.getOrAddConsumer((String)e.getKey());
                    consumerHolder.getConsumer().assign((Collection)e.getValue());
                    consumerHolder.setAssignments((Collection)e.getValue());
                    consumerHolder.setConsumerRebalanceListener(null);
                });
            }
        });
    }

    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        throw new UnsupportedOperationException();
    }

    public void subscribe(Pattern pattern) {
        throw new UnsupportedOperationException();
    }

    public void unsubscribe() {
        this.runWithoutConcurrency(() -> {
            Object object = this._addRemoveLock;
            synchronized (object) {
                ((ConcurrentHashMap.KeySetView)this._consumerHolders.keySet()).forEach(this::removeConsumer);
            }
        });
    }

    public ConsumerRecords<K, V> poll(long timeout) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach((t, c) -> result.putAll(this.toMap(c.getConsumer().poll(timeout))));
            return new ConsumerRecords(result);
        });
    }

    public ConsumerRecords<K, V> poll(Duration timeout) {
        ObjectExtension.requireNonNull((Object)timeout, (String)"timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach((t, c) -> result.putAll(this.toMap(c.getConsumer().poll(timeout))));
            return new ConsumerRecords(result);
        });
    }

    public void commitSync() {
        this.runWithoutConcurrency(() -> this.forEach((t, c) -> c.getConsumer().commitSync()));
    }

    public void commitSync(Duration timeout) {
        ObjectExtension.requireNonNull((Object)timeout, (String)"timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        this.runWithoutConcurrency(() -> this.forEach((t, c) -> c.getConsumer().commitSync(timeout)));
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        Map<String, Map<TopicPartition, OffsetAndMetadata>> map = this.toMap(offsets);
        this.runWithoutConcurrency(() -> this.forEach(map::containsKey, (t, c) -> c.getConsumer().commitSync((Map)map.get(t))));
    }

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        ObjectExtension.requireNonNull((Object)timeout, (String)"timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        Map<String, Map<TopicPartition, OffsetAndMetadata>> map = this.toMap(offsets);
        this.runWithoutConcurrency(() -> this.forEach(map::containsKey, (t, c) -> c.getConsumer().commitSync((Map)map.get(t), timeout)));
    }

    public void commitAsync() {
        this.runWithoutConcurrency(() -> this._consumerHolders.values().forEach((? super T c) -> c.getConsumer().commitAsync()));
    }

    public void commitAsync(OffsetCommitCallback callback) {
        this.runWithoutConcurrency(() -> this._consumerHolders.values().forEach((? super T c) -> c.getConsumer().commitAsync(callback)));
    }

    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        ObjectExtension.requireNonNull(offsets, (String)"offsets");
        this.runWithoutConcurrency(() -> {
            HashMap<String, Map> byTopic = new HashMap<String, Map>();
            offsets.forEach((? super K tp, ? super V oam) -> byTopic.computeIfAbsent(tp.topic(), k -> new HashMap()).put(tp, oam));
            byTopic.forEach((? super K t, ? super V os) -> this._consumerHolders.get(t).getConsumer().commitAsync(os, callback));
        });
    }

    public void seek(TopicPartition partition, long offset) {
        ObjectExtension.requireNonNull((Object)partition, (String)"partition");
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        this.runWithoutConcurrency(() -> this.getOrAddConsumer(partition.topic()).getConsumer().seek(partition, offset));
    }

    public void seekToBeginning(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> byTopic = this.toMap(partitions);
        this.runWithoutConcurrency(() -> byTopic.forEach((? super K t, ? super V ps) -> this.getOrAddConsumer((String)t).getConsumer().seekToBeginning(ps)));
    }

    public void seekToEnd(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> byTopic = this.toMap(partitions);
        this.runWithoutConcurrency(() -> byTopic.forEach((? super K t, ? super V ps) -> this.getOrAddConsumer((String)t).getConsumer().seekToEnd(ps)));
    }

    public long position(TopicPartition partition) {
        ObjectExtension.requireNonNull((Object)partition, (String)"partition");
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(partition.topic()).getConsumer().position(partition));
    }

    public long position(TopicPartition partition, Duration timeout) {
        ObjectExtension.requireNonNull((Object)partition, (String)"partition");
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(partition.topic()).getConsumer().position(partition, timeout));
    }

    public OffsetAndMetadata committed(TopicPartition partition) {
        ObjectExtension.requireNonNull((Object)partition, (String)"partition");
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(partition.topic()).getConsumer().committed(partition));
    }

    public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
        ObjectExtension.requireNonNull((Object)partition, (String)"partition");
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(partition.topic()).getConsumer().committed(partition, timeout));
    }

    public Map<MetricName, ? extends Metric> metrics() {
        HashMap result = new HashMap();
        this._consumerHolders.forEach((? super K t, ? super V c) -> result.putAll(c.getConsumer().metrics()));
        return Collections.unmodifiableMap(result);
    }

    public List<PartitionInfo> partitionsFor(String topic) {
        ObjectExtension.requireNonNull((Object)topic, (String)"topic");
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(topic).getConsumer().partitionsFor(topic));
    }

    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        ObjectExtension.requireNonNull((Object)topic, (String)"topic");
        ObjectExtension.requireNonNull((Object)timeout, (String)"timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        return this.runWithoutConcurrency(() -> this.getOrAddConsumer(topic).getConsumer().partitionsFor(topic, timeout));
    }

    public Map<String, List<PartitionInfo>> listTopics() {
        return this.runWithoutConcurrency(() -> {
            HashMap results = new HashMap();
            this.forEach((t, c) -> results.putAll(c.getConsumer().listTopics()));
            return Collections.unmodifiableMap(results);
        });
    }

    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        return this.runWithoutConcurrency(() -> {
            HashMap results = new HashMap();
            this.forEach((t, c) -> results.putAll(c.getConsumer().listTopics(timeout)));
            return Collections.unmodifiableMap(results);
        });
    }

    public Set<TopicPartition> paused() {
        return this.runWithoutConcurrency(() -> {
            HashSet results = new HashSet();
            this.forEach((t, c) -> results.addAll(c.getConsumer().paused()));
            return Collections.unmodifiableSet(results);
        });
    }

    public void pause(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> byTopic = this.toMap(partitions);
        this.runWithoutConcurrency(() -> byTopic.forEach((? super K t, ? super V ps) -> this._consumerHolders.get(t).getConsumer().pause(ps)));
    }

    public void resume(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> byTopic = this.toMap(partitions);
        this.runWithoutConcurrency(() -> byTopic.forEach((? super K t, ? super V ps) -> this._consumerHolders.get(t).getConsumer().resume(ps)));
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        ObjectExtension.requireNonNull(timestampsToSearch, (String)"timestampsToSearch");
        return this.runWithoutConcurrency(() -> {
            HashMap byTopic = new HashMap();
            timestampsToSearch.forEach((? super K tp, ? super V ts) -> byTopic.computeIfAbsent(tp.topic(), k -> new HashMap()).put(tp, ts));
            HashMap result = new HashMap();
            this.forEach(byTopic::containsKey, (t, c) -> result.putAll(c.getConsumer().offsetsForTimes((Map)byTopic.get(t))));
            return Collections.unmodifiableMap(result);
        });
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        ObjectExtension.requireNonNull(timestampsToSearch, (String)"timestampsToSearch");
        Objects.requireNonNull(timeout, "timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        return this.runWithoutConcurrency(() -> {
            HashMap byTopic = new HashMap();
            timestampsToSearch.forEach((? super K tp, ? super V ts) -> byTopic.computeIfAbsent(tp.topic(), k -> new HashMap()).put(tp, ts));
            HashMap result = new HashMap();
            this.forEach(byTopic::containsKey, (t, c) -> result.putAll(c.getConsumer().offsetsForTimes((Map)byTopic.get(t), timeout)));
            return Collections.unmodifiableMap(result);
        });
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> map = this.toMap(partitions);
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().beginningOffsets((Collection)map.get(t))));
            return Collections.unmodifiableMap(result);
        });
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        Map<String, Collection<TopicPartition>> map = this.toMap(partitions);
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().beginningOffsets((Collection)map.get(t), timeout)));
            return Collections.unmodifiableMap(result);
        });
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        Map<String, Collection<TopicPartition>> map = this.toMap(partitions);
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().endOffsets((Collection)map.get(t))));
            return Collections.unmodifiableMap(result);
        });
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        Objects.requireNonNull(timeout, "timeout");
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("timeout must not be negative");
        }
        Map<String, Collection<TopicPartition>> map = this.toMap(partitions);
        return this.runWithoutConcurrency(() -> {
            HashMap result = new HashMap();
            this.forEach(map::containsKey, (t, c) -> result.putAll(c.getConsumer().endOffsets((Collection)map.get(t), timeout)));
            return Collections.unmodifiableMap(result);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        block7: {
            this.acquire();
            try {
                if (this._closed) break block7;
                Object object = this._addRemoveLock;
                synchronized (object) {
                    if (!this._closed) {
                        ((ConcurrentHashMap.KeySetView)this._consumerHolders.keySet()).forEach(this::removeConsumer);
                        this.doClose();
                    }
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(long timeout, TimeUnit unit) {
        block8: {
            if (timeout < 0L) {
                throw new IllegalArgumentException("timeout cannot be negative");
            }
            ObjectExtension.requireNonNull((Object)((Object)unit), (String)"unit");
            this.acquire();
            try {
                if (this._closed) break block8;
                Object object = this._addRemoveLock;
                synchronized (object) {
                    if (!this._closed) {
                        ((ConcurrentHashMap.KeySetView)this._consumerHolders.keySet()).forEach((? super K t) -> this.removeConsumer((String)t, timeout, unit));
                        this.doClose();
                    }
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(Duration timeout) {
        block8: {
            ObjectExtension.requireNonNull((Object)"timeout", (String)"timeout");
            if (timeout.toMillis() < 0L) {
                throw new IllegalArgumentException("The timeout cannot be negative.");
            }
            this.acquire();
            try {
                if (this._closed) break block8;
                Object object = this._addRemoveLock;
                synchronized (object) {
                    if (!this._closed) {
                        ((ConcurrentHashMap.KeySetView)this._consumerHolders.keySet()).forEach((? super K t) -> this.removeConsumer((String)t, timeout));
                        this.doClose();
                    }
                }
            }
            finally {
                this.release();
            }
        }
    }

    protected void doClose() {
        this._closed = true;
        _logger.info("consumer closed");
        this._executorService.shutdown();
        this._scheduledExecutorService.shutdown();
    }

    public void wakeup() {
        this.forEach((t, c) -> c.getConsumer().wakeup());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected ConsumerHolder getOrAddConsumer(String topicId) {
        ConsumerHolder consumerHolder = this._consumerHolders.get(topicId);
        if (consumerHolder == null) {
            Object object = this._addRemoveLock;
            synchronized (object) {
                consumerHolder = this._consumerHolders.get(topicId);
                if (consumerHolder == null) {
                    consumerHolder = this.newConsumerHolder(topicId);
                    this._consumerHolders.put(topicId, consumerHolder);
                    _logger.info("consumer created: {}", (Object)new ConsumerGroupId(this._groupName, topicId));
                }
            }
        }
        return consumerHolder;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeNotUsed(Collection<String> topics) {
        Object object = this._addRemoveLock;
        synchronized (object) {
            HashSet existing = new HashSet(this._consumerHolders.keySet());
            existing.removeAll(topics);
            existing.forEach(this::removeConsumer);
        }
    }

    protected void removeConsumer(String topicId) {
        this.removeConsumer(topicId, (Consumer c) -> c.close());
    }

    protected void removeConsumer(String topicId, long timeout, TimeUnit unit) {
        this.removeConsumer(topicId, (Consumer c) -> c.close(timeout, unit));
    }

    protected void removeConsumer(String topicId, Duration timeout) {
        this.removeConsumer(topicId, (Consumer c) -> c.close(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeConsumer(String topicId, java.util.function.Consumer<Consumer> closer) {
        Object object = this._addRemoveLock;
        synchronized (object) {
            ConsumerHolder consumerHolder = this._consumerHolders.remove(topicId);
            if (consumerHolder == null) {
                return;
            }
            ConsumerGroupId consumerGroupId = new ConsumerGroupId(this._groupName, topicId);
            this._metaManager.unregisterConsumer(consumerGroupId);
            closer.accept(consumerHolder.getConsumer());
            _logger.info("Consumer removed: {}, config: {}", (Object)consumerGroupId, consumerHolder.getConfig());
        }
    }

    protected ConsumerHolder newConsumerHolder(String topicId) {
        ConsumerGroupId consumerGroupId = new ConsumerGroupId(this._groupName, topicId);
        this._metaManager.registerConsumer(consumerGroupId, () -> this._executorService.submit(() -> this.restartConsumer(topicId)));
        KafkaMetaHolder metaHolder = this._metaManager.getMetaHolder();
        Route route = metaHolder.getConsumerGroupRoutes().get(consumerGroupId);
        if (route == null) {
            throw new IllegalArgumentException("ConsumerGroup not found: " + consumerGroupId);
        }
        ConsumerGroup consumerGroup = metaHolder.getConsumerGroups().get(consumerGroupId);
        Cluster cluster = metaHolder.getClusters().get(route.getClusterId());
        Topic topic = metaHolder.getTopics().get(route.getTopicId());
        KafkaConsumerConfig kafkaConsumerConfig = this.constructConsumerConfig(this._kafkaConsumerConfig, consumerGroup, topic, cluster);
        Consumer consumer = this.newConsumer(consumerGroupId, kafkaConsumerConfig);
        return new ConsumerHolder(consumerGroupId, kafkaConsumerConfig, route, consumer);
    }

    protected KafkaConsumerConfig constructConsumerConfig(KafkaConsumerConfig consumerConfig, ConsumerGroup consumerGroup, Topic topic, Cluster cluster) {
        Object kafkaConsumerConfig = consumerConfig.clone();
        ((KafkaConsumerConfig)kafkaConsumerConfig).getProperties().setProperty("bootstrap.servers", cluster.getMeta().get("bootstrap.servers"));
        ((KafkaConsumerConfig)kafkaConsumerConfig).getProperties().setProperty("client.id", this.constructClientId(consumerGroup.getId()));
        return kafkaConsumerConfig;
    }

    protected String constructClientId(ConsumerGroupId consumerGroupId) {
        return this._clientId + "_" + consumerGroupId.getTopicId() + "_" + consumerGroupId.getGroupName();
    }

    protected Consumer newConsumer(ConsumerGroupId consumerGroupId, KafkaConsumerConfig kafkaConsumerConfig) {
        _logger.info("Consumer {} created with config: {}", (Object)consumerGroupId, (Object)kafkaConsumerConfig);
        return new KafkaConsumer(kafkaConsumerConfig.getProperties(), kafkaConsumerConfig.getKeyDeserializer(), kafkaConsumerConfig.getValueDeserializer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void restartConsumer(String topicId) {
        Object object = this._addRemoveLock;
        synchronized (object) {
            ConsumerGroupId consumerGroupId = new ConsumerGroupId(this._groupName, topicId);
            this._metaManager.unregisterConsumer(consumerGroupId);
            ConsumerHolder oldConsumerHolder = this._consumerHolders.remove(topicId);
            if (oldConsumerHolder == null) {
                return;
            }
            ConsumerRestartListener consumerRestartListener = oldConsumerHolder.getConfig().getConsumerRestartListener();
            if (consumerRestartListener != null) {
                try {
                    consumerRestartListener.beforeRestart(consumerGroupId.clone(), oldConsumerHolder.getConsumer());
                }
                catch (Exception e) {
                    _logger.error("consumerRestartListener.beforeRestart failed for consumer: " + consumerGroupId, (Throwable)e);
                }
            }
            this.closeConsumer(oldConsumerHolder);
            ConsumerHolder newConsumerHolder = this.getOrAddConsumer(topicId);
            if (oldConsumerHolder.getAssignments() != null) {
                newConsumerHolder.setAssignments(oldConsumerHolder.getAssignments());
                oldConsumerHolder.getConsumer().assign(oldConsumerHolder.getAssignments());
            } else if (oldConsumerHolder.getConsumerRebalanceListener() != null) {
                newConsumerHolder.setConsumerRebalanceListener(oldConsumerHolder.getConsumerRebalanceListener());
                newConsumerHolder.getConsumer().subscribe(Arrays.asList(topicId), oldConsumerHolder.getConsumerRebalanceListener());
            } else {
                newConsumerHolder.getConsumer().subscribe(Arrays.asList(topicId));
            }
            if (consumerRestartListener != null) {
                try {
                    consumerRestartListener.afterRestart(consumerGroupId, newConsumerHolder.getConsumer());
                }
                catch (Exception e) {
                    _logger.error("consumerRestartListener.afterRestart failed for consumer: " + consumerGroupId, (Throwable)e);
                }
            }
        }
    }

    protected void closeConsumer(ConsumerHolder consumerHolder) {
        try {
            consumerHolder.getConsumer().close();
            _logger.info("Old consumer removed: {}, config: {}", (Object)consumerHolder.getConsumerGroupId(), consumerHolder.getConfig());
        }
        catch (ConcurrentModificationException e) {
            _logger.info("Old consumer {} is used currently, delay {} ms to destroy", (Object)consumerHolder.getConsumerGroupId(), this._destroyDelay.getValue());
            this._scheduledExecutorService.schedule(() -> this.closeConsumer(consumerHolder), (long)((Integer)this._destroyDelay.getValue()).intValue(), TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            _logger.warn("Close old consumer " + consumerHolder.getConsumerGroupId() + " failed", e);
        }
    }

    protected Map<String, Collection<TopicPartition>> toMap(Collection<TopicPartition> partitions) {
        ObjectExtension.requireNonNull(partitions, (String)"partitions");
        HashMap<String, Collection<TopicPartition>> byTopic = new HashMap<String, Collection<TopicPartition>>();
        partitions.forEach((? super T p) -> byTopic.computeIfAbsent(p.topic(), k -> new ArrayList()).add(p));
        return byTopic;
    }

    protected Map<TopicPartition, List<ConsumerRecord>> toMap(ConsumerRecords records) {
        ObjectExtension.requireNonNull((Object)records, (String)"records");
        try {
            return (Map)this._recordsField.get(records);
        }
        catch (IllegalAccessException | IllegalArgumentException e) {
            throw new UnsupportedOperationException(e);
        }
    }

    protected Map<String, Map<TopicPartition, OffsetAndMetadata>> toMap(Map<TopicPartition, OffsetAndMetadata> offsets) {
        ObjectExtension.requireNonNull(offsets, (String)"offsets");
        HashMap<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<String, Map<TopicPartition, OffsetAndMetadata>>();
        offsets.forEach((? super K tp, ? super V os) -> result.computeIfAbsent(tp.topic(), k -> new HashMap()).put(tp, os));
        return result;
    }

    protected void forEach(BiConsumer<String, ConsumerHolder> biConsumer) {
        this.forEach(t -> true, biConsumer);
    }

    protected void forEach(Predicate<String> predicate, BiConsumer<String, ConsumerHolder> biConsumer) {
        Collection topics = this._consumerHolders.keySet().stream().filter(predicate).collect(Collectors.toSet());
        CountDownLatch latch = new CountDownLatch(topics.size());
        topics.forEach((? super T t) -> this._executorService.execute(() -> {
            try {
                biConsumer.accept((String)t, this._consumerHolders.get(t));
            }
            catch (WakeupException e) {
                _logger.info("foreach was waken up");
            }
            catch (Throwable e) {
                _logger.error("foreach failed for consumer", e);
            }
            finally {
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
    }

    protected void acquireAndEnsureOpen() {
        this.acquire();
        if (this._closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    protected void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this._currentThread.get() && !this._currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        this._refcount.incrementAndGet();
    }

    protected void release() {
        if (this._refcount.decrementAndGet() == 0) {
            this._currentThread.set(-1L);
        }
    }

    protected void runWithoutConcurrency(Runnable action) {
        this.acquireAndEnsureOpen();
        try {
            action.run();
        }
        finally {
            this.release();
        }
    }

    protected <T> T runWithoutConcurrency(Supplier<T> supplier) {
        this.acquireAndEnsureOpen();
        try {
            T t = supplier.get();
            return t;
        }
        finally {
            this.release();
        }
    }
}

