package com.github.hackerwin7.jlib.utils.drivers.kafka.utils;

import com.github.hackerwin7.jlib.utils.drivers.kafka.consumer.KafkaSimpleConsumer;
import com.github.hackerwin7.jlib.utils.drivers.kafka.data.TopicInfo;
import com.github.hackerwin7.jlib.utils.drivers.zk.ZkClient;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/github/hackerwin7/jlib/utils/drivers/kafka/utils/KafkaUtils.class */
public class KafkaUtils {
    private static Logger logger = Logger.getLogger(KafkaUtils.class);
    private String zkconn;
    private String zkroot;
    private ZkClient zk;
    public static final String ZK_TOPICS_PATH = "/brokers/topics";
    public static final String ZK_IDS_PATH = "/brokers/ids";
    public static final long SLEEPING_TIME = 3000;
    private Map<String, String> idsBrokers = new HashMap();
    private List<String> brokerSeeds = new ArrayList();
    private List<String> replicaBrokers = new ArrayList();
    private Map<Integer, SimpleConsumer> querys = new HashMap();

    public KafkaUtils(String str) throws Exception {
        this.zkconn = null;
        this.zkroot = "";
        this.zk = null;
        String[] split = StringUtils.split(str, ",");
        if (split.length >= 2) {
            for (int i = 0; i <= split.length - 1; i++) {
                if (i == 0) {
                    this.zkconn = split[0];
                } else {
                    this.zkroot += "/" + split[i];
                }
            }
        } else {
            if (split.length != 1) {
                throw new Exception("error format zk connection string = " + str);
            }
            this.zkconn = split[0];
            this.zkroot = "/";
        }
        this.zk = new ZkClient(this.zkconn);
        String str2 = this.zkroot + ZK_IDS_PATH;
        for (String str3 : this.zk.getChildren(str2)) {
            JSONObject fromObject = JSONObject.fromObject(this.zk.get(str2 + "/" + str3));
            String str4 = fromObject.getString("host") + ":" + fromObject.getString("port");
            this.idsBrokers.put(str3, str4);
            this.brokerSeeds.add(str4);
        }
    }

    public TopicInfo findTopicInfo(String str) throws Exception {
        String str2 = this.zkroot + ZK_TOPICS_PATH + "/" + str;
        String str3 = str2 + "/partitions";
        String str4 = this.zk.get(str2);
        List<String> children = this.zk.getChildren(str3);
        children.size();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (String str5 : children) {
            int intValue = Integer.valueOf(str5).intValue();
            hashMap.put(Integer.valueOf(intValue), findPartitionBrokerStr(str4, str5));
            SimpleConsumer findConsumer = findConsumer(this.brokerSeeds, str, intValue);
            this.querys.put(Integer.valueOf(intValue), findConsumer);
            long minOffset = getMinOffset(findConsumer, str, intValue);
            long maxOffset = getMaxOffset(findConsumer, str, intValue);
            hashMap2.put(Integer.valueOf(intValue), Long.valueOf(minOffset));
            hashMap3.put(Integer.valueOf(intValue), Long.valueOf(maxOffset));
        }
        TopicInfo build = TopicInfo.createBuilder().topic(str).zks(this.zkconn + this.zkroot).brokers(this.idsBrokers.toString()).build();
        build.putBrokersAll(hashMap);
        build.putBeginoffsetsAll(hashMap2);
        build.putEndOffsetsAll(hashMap3);
        return build;
    }

    private String findPartitionBrokerStr(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        JSONArray jSONArray = JSONObject.fromObject(str).getJSONObject("partitions").getJSONArray(str2);
        for (int i = 0; i <= jSONArray.size() - 1; i++) {
            arrayList.add(this.idsBrokers.get(String.valueOf(jSONArray.getInt(i))));
        }
        return StringUtils.join(arrayList, ",");
    }

    private SimpleConsumer findConsumer(List<String> list, String str, int i) throws Exception {
        PartitionMetadata findLeader = findLeader(list, str, i);
        if (findLeader == null) {
            throw new Exception("find leader error......");
        }
        if (findLeader.leader().host() == null) {
            throw new Exception("find leader host error");
        }
        return new SimpleConsumer(findLeader.leader().host(), findLeader.leader().port(), KafkaSimpleConsumer.CONSUME_TIME_OUT, KafkaSimpleConsumer.CONSUME_BUFFER_SIZE, "fff" + System.currentTimeMillis());
    }

    private PartitionMetadata findLeader(List<String> list, String str, int i) {
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String[] split = StringUtils.split(it.next(), ":");
            String str2 = split[0];
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(str2, Integer.valueOf(split[1]).intValue(), KafkaSimpleConsumer.CONSUME_TIME_OUT, KafkaSimpleConsumer.CONSUME_BUFFER_SIZE, "find_leader" + System.currentTimeMillis());
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            if (partitionMetadata2.partitionId() == i) {
                                partitionMetadata = partitionMetadata2;
                                if (simpleConsumer != null) {
                                    simpleConsumer.close();
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Throwable th) {
                    logger.error("error communicating with broker = [" + str2 + "] to find leader for topic = [" + str + "], partition = [" + i + "]");
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                }
            } catch (Throwable th2) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th2;
            }
        }
        if (this.replicaBrokers != null) {
            this.replicaBrokers.clear();
            for (Broker broker : partitionMetadata.replicas()) {
                this.replicaBrokers.add(broker.host() + ":" + broker.port());
            }
        }
        return partitionMetadata;
    }

    public static long getMinOffset(SimpleConsumer simpleConsumer, String str, int i) throws Exception {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), "find offset " + System.currentTimeMillis()));
        if (offsetsBefore.hasError()) {
            throw new Exception("error fetching data offset , reason = " + ((int) offsetsBefore.errorCode(str, i)));
        }
        return offsetsBefore.offsets(str, i)[0];
    }

    public static long getMaxOffset(SimpleConsumer simpleConsumer, String str, int i) throws Exception {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), "find offset " + System.currentTimeMillis()));
        if (offsetsBefore.hasError()) {
            throw new Exception("error fetching data offset , reason = " + ((int) offsetsBefore.errorCode(str, i)));
        }
        return offsetsBefore.offsets(str, i)[0];
    }
}
