/*
 * Decompiled with CFR 0.152.
 */
package org.mydotey.kbear.client;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.mydotey.codec.Codec;
import org.mydotey.codec.json.JacksonJsonCodec;
import org.mydotey.java.CloseableExtension;
import org.mydotey.java.ObjectExtension;
import org.mydotey.java.collection.CollectionExtension;
import org.mydotey.kbear.client.KafkaMetaHolder;
import org.mydotey.kbear.client.KafkaMetaManager;
import org.mydotey.kbear.meta.Cluster;
import org.mydotey.kbear.meta.ConsumerGroup;
import org.mydotey.kbear.meta.ConsumerGroupId;
import org.mydotey.kbear.meta.Topic;
import org.mydotey.kbear.route.Client;
import org.mydotey.kbear.route.FetchConsumerRouteRequest;
import org.mydotey.kbear.route.FetchConsumerRouteResponse;
import org.mydotey.kbear.route.FetchProducerRouteRequest;
import org.mydotey.kbear.route.FetchProducerRouteResponse;
import org.mydotey.kbear.route.Route;
import org.mydotey.kbear.route.RouteServiceClient;
import org.mydotey.rpc.client.http.HttpLoadBalancer;
import org.mydotey.rpc.client.http.HttpServiceClientConfig;
import org.mydotey.rpc.client.http.RandomLoadBalancer;
import org.mydotey.rpc.client.http.apache.async.DynamicPoolingNHttpClientProvider;
import org.mydotey.rpc.client.http.apache.sync.DynamicPoolingHttpClientProvider;
import org.mydotey.scf.ConfigurationManager;
import org.mydotey.scf.Property;
import org.mydotey.scf.PropertyChangeEvent;
import org.mydotey.scf.facade.StringProperties;
import org.mydotey.scf.type.TypeConverter;
import org.mydotey.scf.type.string.StringInplaceConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultKafkaMetaManager
implements KafkaMetaManager {
    private static Logger _logger = LoggerFactory.getLogger(DefaultKafkaMetaManager.class);
    private Client _client;
    private ConfigurationManager _configurationManager;
    private Property<String, List<String>> _metaServiceUrls;
    private Property<String, Integer> _metaUpdateInterval;
    private Property<String, Integer> _metaUpdateRetryTimes;
    private Property<String, Integer> _metaUpdateRetryInterval;
    private ConcurrentHashMap<String, Runnable> _producerListeners;
    private ConcurrentHashMap<ConsumerGroupId, Runnable> _consumerListeners;
    private volatile KafkaMetaHolder _metaHolder;
    private volatile boolean _closed;
    private Thread _metaUpdater;
    private volatile RouteServiceClient _routeServiceClient;

    public DefaultKafkaMetaManager(ConfigurationManager configurationManager) {
        this(configurationManager, null);
    }

    public DefaultKafkaMetaManager(ConfigurationManager configurationManager, Client client) {
        ObjectExtension.requireNonNull((Object)configurationManager, (String)"configurationManager");
        this._configurationManager = configurationManager;
        this._client = client == null ? new Client() : client.clone();
        StringProperties stringProperties = new StringProperties(this._configurationManager);
        String key = "kafka.meta.service.url";
        this._metaServiceUrls = stringProperties.getListProperty((Object)key, null, (TypeConverter)StringInplaceConverter.DEFAULT, v -> CollectionExtension.isEmpty((Collection)v) ? null : v);
        if (CollectionExtension.isEmpty((Collection)((Collection)this._metaServiceUrls.getValue()))) {
            throw new IllegalArgumentException("property is not configured: " + key);
        }
        this._metaUpdateInterval = stringProperties.getIntProperty((Object)"kafka.meta.update.interval", Integer.valueOf(60000), v -> v < 0 ? null : v);
        this._metaUpdateRetryTimes = stringProperties.getIntProperty((Object)"kafka.meta.update.retry.times", Integer.valueOf(3), v -> v < 1 ? null : v);
        this._metaUpdateRetryInterval = stringProperties.getIntProperty((Object)"kafka.meta.update.retry.interval", Integer.valueOf(10), v -> v < 0 ? null : v);
        this._producerListeners = new ConcurrentHashMap();
        this._consumerListeners = new ConcurrentHashMap();
        this._metaHolder = new KafkaMetaHolder();
        this._metaHolder.immutable();
        this._routeServiceClient = new RouteServiceClient(this.newServiceClientConfig((List)this._metaServiceUrls.getValue()));
        this._metaServiceUrls.addChangeListener(this::updateRouteServiceClient);
        this._metaUpdater = new Thread(this::updateMeta, "kafka.meta.update.executor");
        this._metaUpdater.setDaemon(true);
        this._metaUpdater.start();
    }

    protected ConfigurationManager getConfigurationManager() {
        return this._configurationManager;
    }

    protected Client getClient() {
        return this._client;
    }

    @Override
    public KafkaMetaHolder getMetaHolder() {
        return this._metaHolder;
    }

    @Override
    public synchronized void registerProducer(String topicId, Runnable onChange) {
        ObjectExtension.requireNonNull((Object)topicId, (String)"topicId");
        ObjectExtension.requireNonNull((Object)onChange, (String)"onChange");
        this._producerListeners.put(topicId, onChange);
        if (!this._metaHolder.getTopicRoutes().containsKey(topicId)) {
            this.trySync(() -> this.syncProducer(topicId));
        }
    }

    @Override
    public synchronized void unregisterProducer(String topicId) {
        ObjectExtension.requireNonNull((Object)topicId, (String)"topicId");
        this._producerListeners.remove(topicId);
        KafkaMetaHolder metaHolder = this._metaHolder.clone();
        metaHolder.getTopicRoutes().remove(topicId);
        metaHolder.immutable();
        this._metaHolder = metaHolder;
    }

    @Override
    public synchronized void registerConsumer(ConsumerGroupId consumerGroupId, Runnable onChange) {
        ObjectExtension.requireNonNull((Object)consumerGroupId, (String)"consumerGroupId");
        ObjectExtension.requireNonNull((Object)onChange, (String)"onChange");
        this._consumerListeners.put(consumerGroupId.clone(), onChange);
        if (!this._metaHolder.getConsumerGroupRoutes().containsKey(consumerGroupId)) {
            this.trySync(() -> this.syncConsumer(consumerGroupId));
        }
    }

    @Override
    public synchronized void unregisterConsumer(ConsumerGroupId consumerGroupId) {
        ObjectExtension.requireNonNull((Object)consumerGroupId, (String)"consumerGroupId");
        this._consumerListeners.remove(consumerGroupId);
        KafkaMetaHolder metaHolder = this._metaHolder.clone();
        metaHolder.getConsumerGroups().remove(consumerGroupId);
        metaHolder.getConsumerGroupRoutes().remove(consumerGroupId);
        metaHolder.immutable();
        this._metaHolder = metaHolder;
    }

    protected void trySync(Supplier<Boolean> syncAction) {
        int i = 0;
        while (i < (Integer)this._metaUpdateRetryTimes.getValue() && !syncAction.get().booleanValue()) {
            if (++i >= (Integer)this._metaUpdateRetryTimes.getValue()) continue;
            try {
                Thread.sleep(((Integer)this._metaUpdateRetryInterval.getValue()).intValue());
            }
            catch (InterruptedException e) {
                _logger.warn("retry interrupted, ignore", (Throwable)e);
                break;
            }
            _logger.info("retry at time {}", (Object)i);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateMeta() {
        while (!this._closed) {
            try {
                Thread.sleep(((Integer)this._metaUpdateInterval.getValue()).intValue());
                DefaultKafkaMetaManager defaultKafkaMetaManager = this;
                synchronized (defaultKafkaMetaManager) {
                    this.doUpdateMeta();
                }
            }
            catch (InterruptedException e) {
                break;
            }
            catch (Exception e) {
                _logger.error("meta update failed", (Throwable)e);
            }
        }
    }

    protected void doUpdateMeta() {
        KafkaMetaHolder metaHolder = new KafkaMetaHolder();
        if (this._producerListeners.size() > 0) {
            FetchProducerRouteRequest fetchProducerRouteRequest = new FetchProducerRouteRequest();
            fetchProducerRouteRequest.setClient(this._client);
            fetchProducerRouteRequest.setTopicIds(new ArrayList<String>(this._producerListeners.keySet()));
            FetchProducerRouteResponse fetchProducerRouteResponse = this._routeServiceClient.fetchProducerRoute(fetchProducerRouteRequest);
            fetchProducerRouteResponse.getTopicIdRoutes().forEach(metaHolder.getTopicRoutes()::put);
            fetchProducerRouteResponse.getClusters().forEach(metaHolder.getClusters()::put);
            fetchProducerRouteResponse.getTopics().forEach(metaHolder.getTopics()::put);
        }
        if (this._consumerListeners.size() > 0) {
            FetchConsumerRouteRequest fetchConsumerRouteRequest = new FetchConsumerRouteRequest();
            fetchConsumerRouteRequest.setClient(this._client);
            fetchConsumerRouteRequest.setConsumerGroupIds(new ArrayList<ConsumerGroupId>(this._consumerListeners.keySet()));
            FetchConsumerRouteResponse fetchConsumerRouteResponse = this._routeServiceClient.fetchConsumerRoute(fetchConsumerRouteRequest);
            fetchConsumerRouteResponse.getConsumerGroupIdRoutes().forEach(p -> metaHolder.getConsumerGroupRoutes().put(p.getConsumerGroupId(), p.getRoute()));
            fetchConsumerRouteResponse.getConsumerGroups().forEach(c -> metaHolder.getConsumerGroups().put(c.getId(), (ConsumerGroup)c));
            fetchConsumerRouteResponse.getClusters().forEach(metaHolder.getClusters()::put);
            fetchConsumerRouteResponse.getTopics().forEach(metaHolder.getTopics()::put);
        }
        metaHolder.immutable();
        this.updateMetaHolder(metaHolder);
    }

    protected boolean syncProducer(String topicId) {
        try {
            FetchProducerRouteRequest request = new FetchProducerRouteRequest();
            request.setClient(this._client);
            request.setTopicIds(Arrays.asList(topicId));
            FetchProducerRouteResponse response = this._routeServiceClient.fetchProducerRoute(request);
            KafkaMetaHolder metaHolder = this._metaHolder.clone();
            response.getTopicIdRoutes().forEach(metaHolder.getTopicRoutes()::put);
            response.getClusters().forEach(metaHolder.getClusters()::put);
            response.getTopics().forEach(metaHolder.getTopics()::put);
            metaHolder.immutable();
            this._metaHolder = metaHolder;
            return true;
        }
        catch (Exception e) {
            _logger.error("sync producer failed", (Throwable)e);
            return false;
        }
    }

    protected boolean syncConsumer(ConsumerGroupId consumerGroupId) {
        try {
            FetchConsumerRouteRequest request = new FetchConsumerRouteRequest();
            request.setClient(this._client);
            request.setConsumerGroupIds(Arrays.asList(consumerGroupId));
            FetchConsumerRouteResponse response = this._routeServiceClient.fetchConsumerRoute(request);
            KafkaMetaHolder metaHolder = this._metaHolder.clone();
            response.getConsumerGroupIdRoutes().forEach(p -> metaHolder.getConsumerGroupRoutes().put(p.getConsumerGroupId(), p.getRoute()));
            response.getConsumerGroups().forEach(c -> metaHolder.getConsumerGroups().put(c.getId(), (ConsumerGroup)c));
            response.getClusters().forEach(metaHolder.getClusters()::put);
            response.getTopics().forEach(metaHolder.getTopics()::put);
            metaHolder.immutable();
            this._metaHolder = metaHolder;
            return true;
        }
        catch (Exception e) {
            _logger.error("sync consumer failed", (Throwable)e);
            return false;
        }
    }

    protected void updateMetaHolder(KafkaMetaHolder newMetaHolder) {
        KafkaMetaHolder oldMetaHolder = this._metaHolder;
        this._metaHolder = newMetaHolder;
        this._producerListeners.forEach((t, l) -> {
            if (this.isChanged((String)t, oldMetaHolder, newMetaHolder)) {
                try {
                    _logger.info("Topic metadata changed, notify listener to run: " + t);
                    l.run();
                }
                catch (Exception e) {
                    _logger.error("Topic metadata change listener failed to run: " + t, (Throwable)e);
                }
            }
        });
        this._consumerListeners.forEach((c, l) -> {
            if (this.isChanged((ConsumerGroupId)c, oldMetaHolder, newMetaHolder)) {
                try {
                    _logger.info("Consumer metadata changed, notify listener to run: " + c);
                    l.run();
                }
                catch (Exception e) {
                    _logger.error("Consumer metadata change listener failed to run: " + c, (Throwable)e);
                }
            }
        });
    }

    protected boolean isChanged(String topicId, KafkaMetaHolder oldMetaHolder, KafkaMetaHolder newMetaHolder) {
        Cluster newCluster;
        Route newRoute;
        Topic newTopic;
        Topic oldTopic = oldMetaHolder.getTopics().get(topicId);
        if (!Objects.equals(oldTopic, newTopic = newMetaHolder.getTopics().get(topicId))) {
            _logger.info("Topic {} changed from {} to {}", new Object[]{topicId, oldTopic, newTopic});
            return true;
        }
        Route oldRoute = oldMetaHolder.getTopicRoutes().get(topicId);
        if (!Objects.equals(oldRoute, newRoute = newMetaHolder.getTopicRoutes().get(topicId))) {
            _logger.info("Route for {} changed from {} to {}", new Object[]{topicId, oldRoute, newRoute});
            return true;
        }
        Cluster oldCluster = oldMetaHolder.getClusters().get(oldRoute.getClusterId());
        if (!Objects.equals(oldCluster, newCluster = newMetaHolder.getClusters().get(newRoute.getClusterId()))) {
            _logger.info("Cluster for {} changed from {} to {}", new Object[]{topicId, oldCluster, newCluster});
            return true;
        }
        return false;
    }

    protected boolean isChanged(ConsumerGroupId consumerGroupId, KafkaMetaHolder oldMetaHolder, KafkaMetaHolder newMetaHolder) {
        Cluster newCluster;
        Route newRoute;
        ConsumerGroup newConsumerGrop;
        ConsumerGroup oldConsumerGroup = oldMetaHolder.getConsumerGroups().get(consumerGroupId);
        if (!Objects.equals(oldConsumerGroup, newConsumerGrop = newMetaHolder.getConsumerGroups().get(consumerGroupId))) {
            _logger.info("ConsumerGroup {} changed from {} to {}", new Object[]{consumerGroupId, oldConsumerGroup, newConsumerGrop});
            return true;
        }
        Route oldRoute = oldMetaHolder.getConsumerGroupRoutes().get(consumerGroupId);
        if (!Objects.equals(oldRoute, newRoute = newMetaHolder.getConsumerGroupRoutes().get(consumerGroupId))) {
            _logger.info("Route for {} changed from {} to {}", new Object[]{consumerGroupId, oldRoute, newRoute});
            return true;
        }
        Cluster oldCluster = oldMetaHolder.getClusters().get(oldRoute.getClusterId());
        if (!Objects.equals(oldCluster, newCluster = newMetaHolder.getClusters().get(newRoute.getClusterId()))) {
            _logger.info("Cluster for {} changed from {} to {}", new Object[]{consumerGroupId, oldCluster, newCluster});
            return true;
        }
        return false;
    }

    protected HttpServiceClientConfig newServiceClientConfig(List<String> serviceUrls) {
        DynamicPoolingHttpClientProvider syncClientProvider = new DynamicPoolingHttpClientProvider("kafka.meta.route.client", this._configurationManager);
        DynamicPoolingNHttpClientProvider asyncClientProvider = new DynamicPoolingNHttpClientProvider("kafka.meta.route.async-client", this._configurationManager);
        return new HttpServiceClientConfig.Builder().setProcedureRestPathMap(RouteServiceClient.PROCEDURE_REST_PATH_MAP).setSyncClientProvider((Supplier)syncClientProvider).setAsyncClientProvider((Supplier)asyncClientProvider).setCodec((Codec)JacksonJsonCodec.DEFAULT).setLoadBalancer(this.newLoadBalancer(serviceUrls)).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void updateRouteServiceClient(PropertyChangeEvent<String, List<String>> e) {
        RouteServiceClient routeServiceClient = this._routeServiceClient;
        HttpServiceClientConfig oldConfig = routeServiceClient.getConfig();
        HttpServiceClientConfig newConfig = new HttpServiceClientConfig.Builder().setProcedureRestPathMap(oldConfig.getProcedureRestPathMap()).setSyncClientProvider(oldConfig.getSyncClientProvider()).setAsyncClientProvider(oldConfig.getAsyncClientProvider()).setCodec(oldConfig.getCodec()).setLoadBalancer(this.newLoadBalancer((List)e.getNewValue())).build();
        this._routeServiceClient = new RouteServiceClient(newConfig);
        CloseableExtension.close((AutoCloseable)routeServiceClient);
        DefaultKafkaMetaManager defaultKafkaMetaManager = this;
        synchronized (defaultKafkaMetaManager) {
            this.doUpdateMeta();
        }
    }

    protected HttpLoadBalancer newLoadBalancer(List<String> serviceUrls) {
        return new RandomLoadBalancer(serviceUrls, 300000L, 10000L);
    }

    @Override
    public void close() throws Exception {
        this._closed = true;
        this._metaUpdater.interrupt();
        CloseableExtension.close((AutoCloseable)this._routeServiceClient);
        CloseableExtension.close((AutoCloseable)((AutoCloseable)((Object)this._routeServiceClient.getConfig().getSyncClientProvider())));
        CloseableExtension.close((AutoCloseable)((AutoCloseable)((Object)this._routeServiceClient.getConfig().getAsyncClientProvider())));
    }
}

