package com.github.hackerwin7.mysql.tracker.kafka.driver.consumer;

import com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf;
import com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaMetaMsg;
import com.github.hackerwin7.mysql.tracker.tracker.parser.LogEventConvert;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/hackerwin7/mysql/tracker/kafka/driver/consumer/KafkaReceiver.class */
public class KafkaReceiver extends Thread {
    private Logger logger;
    private KafkaConf conf;
    private List<String> replicaBrokers;
    private List<Integer> replicaPorts;
    public static int retry = 3;
    private int MAXLEN;
    private SimpleConsumer consumer;
    public BlockingQueue<KafkaMetaMsg> msgQueue;
    public boolean isFetch;

    public KafkaReceiver(KafkaConf kafkaConf) {
        this.logger = LoggerFactory.getLogger(KafkaReceiver.class);
        this.replicaBrokers = new ArrayList();
        this.replicaPorts = new ArrayList();
        this.MAXLEN = 10000;
        this.msgQueue = new LinkedBlockingQueue(this.MAXLEN);
        this.isFetch = true;
        this.conf = kafkaConf;
    }

    public KafkaReceiver(KafkaConf kafkaConf, int i) {
        this.logger = LoggerFactory.getLogger(KafkaReceiver.class);
        this.replicaBrokers = new ArrayList();
        this.replicaPorts = new ArrayList();
        this.MAXLEN = 10000;
        this.msgQueue = new LinkedBlockingQueue(this.MAXLEN);
        this.isFetch = true;
        this.conf = kafkaConf;
        this.MAXLEN = i;
    }

    public PartitionMetadata findLeader(List<String> list, int i, String str, int i2) {
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = list.iterator();
        loop0: while (true) {
            if (!it.hasNext()) {
                break;
            }
            Iterator it2 = new SimpleConsumer(it.next(), i, 100000, LogEventConvert.SMALLINT_MAX_VALUE, "leader").send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
            while (it2.hasNext()) {
                for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                    if (partitionMetadata2.partitionId() == i2) {
                        partitionMetadata = partitionMetadata2;
                        break loop0;
                    }
                }
            }
        }
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            Iterator it3 = partitionMetadata.replicas().iterator();
            while (it3.hasNext()) {
                this.replicaBrokers.add(((Broker) it3.next()).host());
            }
        }
        return partitionMetadata;
    }

    public PartitionMetadata findLeader(List<String> list, List<Integer> list2, String str, int i) {
        PartitionMetadata partitionMetadata = null;
        int i2 = 0;
        loop0: while (true) {
            if (i2 > list.size() - 1) {
                break;
            }
            Iterator it = new SimpleConsumer(list.get(i2), list2.get(i2).intValue(), 100000, LogEventConvert.SMALLINT_MAX_VALUE, "leader").send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
            while (it.hasNext()) {
                for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it.next()).partitionsMetadata()) {
                    if (partitionMetadata2.partitionId() == i) {
                        partitionMetadata = partitionMetadata2;
                        break loop0;
                    }
                }
            }
            i2++;
        }
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            this.replicaPorts.clear();
            for (Broker broker : partitionMetadata.replicas()) {
                this.replicaBrokers.add(broker.host());
                this.replicaPorts.add(Integer.valueOf(broker.port()));
            }
        }
        return partitionMetadata;
    }

    public long getLastOffset(SimpleConsumer simpleConsumer, String str, int i, long j, String str2) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), str2));
        if (!offsetsBefore.hasError()) {
            return offsetsBefore.offsets(str, i)[0];
        }
        this.logger.error("Error fetching data Offset Data the Broker. Reason: " + ((int) offsetsBefore.errorCode(str, i)));
        return -1L;
    }

    public String findNewLeader(String str, String str2, int i, int i2) throws Exception {
        boolean z;
        for (int i3 = 0; i3 < retry; i3++) {
            PartitionMetadata findLeader = findLeader(this.replicaBrokers, i2, str2, i);
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!str.equalsIgnoreCase(findLeader.leader().host()) || i3 != 0) {
                    return findLeader.leader().host();
                }
                z = true;
            }
            if (z) {
                delay(1);
            }
        }
        this.logger.error("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    public String findNewLeader(String str, String str2, int i) throws Exception {
        boolean z;
        for (int i2 = 0; i2 < retry; i2++) {
            PartitionMetadata findLeader = findLeader(this.replicaBrokers, this.replicaPorts, str2, i);
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!str.equalsIgnoreCase(findLeader.leader().host()) || i2 != 0) {
                    return findLeader.leader().host();
                }
                z = true;
            }
            if (z) {
                delay(1);
            }
        }
        this.logger.error("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        KafkaConf kafkaConf = this.conf;
        List<String> list = KafkaConf.brokerSeeds;
        KafkaConf kafkaConf2 = this.conf;
        List<Integer> list2 = KafkaConf.portList;
        KafkaConf kafkaConf3 = this.conf;
        String str = KafkaConf.topic;
        KafkaConf kafkaConf4 = this.conf;
        PartitionMetadata findLeader = findLeader(list, list2, str, KafkaConf.partition);
        if (findLeader == null) {
            this.logger.error("Can't find metadata for Topic and Partition. Existing");
            return;
        }
        if (findLeader.leader() == null) {
            this.logger.error("Can't find Leader for Topic and Partition. Existing");
            return;
        }
        String host = findLeader.leader().host();
        int port = findLeader.leader().port();
        StringBuilder append = new StringBuilder().append("client_");
        KafkaConf kafkaConf5 = this.conf;
        StringBuilder append2 = append.append(KafkaConf.topic);
        KafkaConf kafkaConf6 = this.conf;
        String sb = append2.append(KafkaConf.partition).toString();
        this.consumer = new SimpleConsumer(host, port, 100000, LogEventConvert.SMALLINT_MAX_VALUE, sb);
        SimpleConsumer simpleConsumer = this.consumer;
        KafkaConf kafkaConf7 = this.conf;
        String str2 = KafkaConf.topic;
        KafkaConf kafkaConf8 = this.conf;
        long lastOffset = getLastOffset(simpleConsumer, str2, KafkaConf.partition, kafka.api.OffsetRequest.LatestTime(), sb);
        int i = 0;
        while (this.isFetch) {
            if (this.consumer == null) {
                this.consumer = new SimpleConsumer(host, port, 100000, LogEventConvert.SMALLINT_MAX_VALUE, sb);
            }
            FetchRequestBuilder clientId = new FetchRequestBuilder().clientId(sb);
            KafkaConf kafkaConf9 = this.conf;
            String str3 = KafkaConf.topic;
            KafkaConf kafkaConf10 = this.conf;
            KafkaConf kafkaConf11 = this.conf;
            FetchResponse fetch = this.consumer.fetch(clientId.addFetch(str3, KafkaConf.partition, lastOffset, KafkaConf.readBufferSize).build());
            if (fetch.hasError()) {
                i++;
                KafkaConf kafkaConf12 = this.conf;
                String str4 = KafkaConf.topic;
                KafkaConf kafkaConf13 = this.conf;
                short errorCode = fetch.errorCode(str4, KafkaConf.partition);
                this.logger.warn("Error fetching data from the Broker:" + host + " Reason: " + ((int) errorCode));
                if (i > 5) {
                    this.logger.error("5 errors occurred existing the fetching");
                    return;
                }
                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                    SimpleConsumer simpleConsumer2 = this.consumer;
                    KafkaConf kafkaConf14 = this.conf;
                    String str5 = KafkaConf.topic;
                    KafkaConf kafkaConf15 = this.conf;
                    lastOffset = getLastOffset(simpleConsumer2, str5, KafkaConf.partition, kafka.api.OffsetRequest.LatestTime(), sb);
                } else {
                    this.consumer.close();
                    this.consumer = null;
                    try {
                        KafkaConf kafkaConf16 = this.conf;
                        String str6 = KafkaConf.topic;
                        KafkaConf kafkaConf17 = this.conf;
                        host = findNewLeader(host, str6, KafkaConf.partition);
                    } catch (Exception e) {
                        this.logger.error("find lead broker failed");
                        e.printStackTrace();
                        return;
                    }
                }
            } else {
                i = 0;
                long j = 0;
                KafkaConf kafkaConf18 = this.conf;
                String str7 = KafkaConf.topic;
                KafkaConf kafkaConf19 = this.conf;
                Iterator it = fetch.messageSet(str7, KafkaConf.partition).iterator();
                while (it.hasNext()) {
                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                    long offset = messageAndOffset.offset();
                    if (offset < lastOffset) {
                        this.logger.info("Found an old offset: " + offset + " Expecting: " + lastOffset);
                    } else {
                        lastOffset = messageAndOffset.nextOffset();
                        ByteBuffer payload = messageAndOffset.message().payload();
                        byte[] bArr = new byte[payload.limit()];
                        payload.get(bArr);
                        try {
                            this.msgQueue.put(new KafkaMetaMsg(bArr, messageAndOffset.offset()));
                        } catch (InterruptedException e2) {
                            this.logger.error(e2.getMessage());
                            e2.printStackTrace();
                        }
                        j++;
                    }
                }
                if (j == 0) {
                    delay(1);
                }
            }
        }
    }

    private void delay(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /*  JADX ERROR: NullPointerException in pass: RegionMakerVisitor
        java.lang.NullPointerException: Cannot invoke "java.util.List.isEmpty()" because "s" is null
        	at jadx.core.utils.BlockUtils.getNextBlock(BlockUtils.java:411)
        	at jadx.core.dex.visitors.regions.RegionMaker.traverse(RegionMaker.java:172)
        	at jadx.core.dex.visitors.regions.RegionMaker.makeRegion(RegionMaker.java:91)
        	at jadx.core.dex.visitors.regions.RegionMaker.processIf(RegionMaker.java:735)
        	at jadx.core.dex.visitors.regions.RegionMaker.traverse(RegionMaker.java:152)
        	at jadx.core.dex.visitors.regions.RegionMaker.makeRegion(RegionMaker.java:91)
        	at jadx.core.dex.visitors.regions.RegionMaker.processExcHandler(RegionMaker.java:1110)
        	at jadx.core.dex.visitors.regions.RegionMaker.processTryCatchBlocks(RegionMaker.java:1046)
        	at jadx.core.dex.visitors.regions.RegionMakerVisitor.visit(RegionMakerVisitor.java:55)
        */
    public boolean isConnected() {
        /*
            r8 = this;
            r0 = 0
            r9 = r0
            r0 = 0
            r10 = r0
        L4:
            r0 = r10
            r1 = r8
            com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf r1 = r1.conf     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.util.List<java.lang.String> r1 = com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf.brokerSeeds     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            int r1 = r1.size()     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r2 = 1
            int r1 = r1 - r2
            if (r0 > r1) goto L6e
            kafka.javaapi.consumer.SimpleConsumer r0 = new kafka.javaapi.consumer.SimpleConsumer     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r1 = r0
            r2 = r8
            com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf r2 = r2.conf     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.util.List<java.lang.String> r2 = com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf.brokerSeeds     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r3 = r10
            java.lang.Object r2 = r2.get(r3)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.lang.String r2 = (java.lang.String) r2     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r3 = r8
            com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf r3 = r3.conf     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.util.List<java.lang.Integer> r3 = com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf.portList     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r4 = r10
            java.lang.Object r3 = r3.get(r4)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.lang.Integer r3 = (java.lang.Integer) r3     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            int r3 = r3.intValue()     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r4 = 100000(0x186a0, float:1.4013E-40)
            r5 = 65536(0x10000, float:9.1835E-41)
            java.lang.String r6 = "heartBeat"
            r1.<init>(r2, r3, r4, r5, r6)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r9 = r0
            r0 = r8
            com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf r0 = r0.conf     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.lang.String r0 = com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf.topic     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            java.util.List r0 = java.util.Collections.singletonList(r0)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r11 = r0
            kafka.javaapi.TopicMetadataRequest r0 = new kafka.javaapi.TopicMetadataRequest     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r1 = r0
            r2 = r11
            r1.<init>(r2)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r12 = r0
            r0 = r9
            r1 = r12
            kafka.javaapi.TopicMetadataResponse r0 = r0.send(r1)     // Catch: java.lang.Exception -> L74 java.lang.Throwable -> L80
            r13 = r0
            int r10 = r10 + 1
            goto L4
        L6e:
            r0 = jsr -> L88
        L71:
            goto L94
        L74:
            r10 = move-exception
            r0 = r10
            r0.printStackTrace()     // Catch: java.lang.Throwable -> L80
            r0 = 0
            r11 = r0
            r0 = jsr -> L88
        L7e:
            r1 = r11
            return r1
        L80:
            r14 = move-exception
            r0 = jsr -> L88
        L85:
            r1 = r14
            throw r1
        L88:
            r15 = r0
            r0 = r9
            if (r0 == 0) goto L92
            r0 = r9
            r0.close()
        L92:
            ret r15
        L94:
            r1 = 1
            return r1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.github.hackerwin7.mysql.tracker.kafka.driver.consumer.KafkaReceiver.isConnected():boolean");
    }
}
