package com.github.hackerwin7.mysql.tracker.tracker;

import com.github.hackerwin7.mysql.tracker.filter.FilterMatcher;
import com.github.hackerwin7.mysql.tracker.kafka.driver.producer.KafkaSender;
import com.github.hackerwin7.mysql.tracker.kafka.utils.KafkaConf;
import com.github.hackerwin7.mysql.tracker.monitor.TrackerMonitor;
import com.github.hackerwin7.mysql.tracker.monitor.constants.JDMysqlTrackerMonitorType;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.DirectLogFetcherChannel;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogContext;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogDecoder;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogEvent;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.event.QueryLogEvent;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlConnector;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlQueryExecutor;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlUpdateExecutor;
import com.github.hackerwin7.mysql.tracker.mysql.driver.packets.HeaderPacket;
import com.github.hackerwin7.mysql.tracker.mysql.driver.packets.client.BinlogDumpCommandPacket;
import com.github.hackerwin7.mysql.tracker.mysql.driver.utils.PacketManager;
import com.github.hackerwin7.mysql.tracker.protocol.json.JSONConvert;
import com.github.hackerwin7.mysql.tracker.protocol.protobuf.CanalEntry;
import com.github.hackerwin7.mysql.tracker.tracker.common.TableMetaCache;
import com.github.hackerwin7.mysql.tracker.tracker.parser.LogEventConvert;
import com.github.hackerwin7.mysql.tracker.tracker.position.EntryPosition;
import com.github.hackerwin7.mysql.tracker.tracker.utils.TrackerConf;
import com.github.hackerwin7.mysql.tracker.zk.client.ZkExecutor;
import com.github.hackerwin7.mysql.tracker.zk.utils.ZkConf;
import com.google.protobuf.InvalidProtocolBufferException;
import com.jd.bdp.magpie.MagpieExecutor;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.producer.KeyedMessage;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal.class */
public class HandlerKafkaZkLocal implements MagpieExecutor {
    private MysqlConnector logConnector;
    private MysqlConnector tableConnector;
    private MysqlConnector realConnector;
    private MysqlQueryExecutor queryExecutor;
    private MysqlUpdateExecutor updateExecutor;
    private MysqlQueryExecutor realQuery;
    private TableMetaCache tableMetaCache;
    private LogEventConvert eventConvert;
    private String jobId;
    private KafkaSender msgSender;
    private KafkaSender phMonitorSender;
    private ZkExecutor zkExecutor;
    private BlockingQueue<CanalEntry.Entry> entryQueue;
    private String logfile;
    private long offset;
    private FilterMatcher fm;
    private long startTime;
    Fetcher fetcher;
    Timer timer;
    Minuter minter;
    Timer htimer;
    HeartBeat heartBeat;
    private TrackerMonitor monitor;
    private List<CanalEntry.Entry> entryList;
    private List<KeyedMessage<String, byte[]>> messageList;
    private Logger logger = LoggerFactory.getLogger(HandlerKafkaZkLocal.class);
    private TrackerConf config = new TrackerConf();
    private long batchId = 0;
    private long inBatchId = 0;
    private int globalFetchThread = 0;
    private LogEvent globalXidEvent = null;
    private CanalEntry.Entry globalXidEntry = null;
    private String globalBinlogName = "null";
    private long globalXidBatchId = -1;
    private long globalXidInBatchId = -1;
    private LogEvent lastEvent = null;
    private CanalEntry.Entry lastEntry = null;
    private String binlog = null;
    private boolean fetchSurvival = true;
    private long trackerWhileTime = 0;
    private long trackerAvroTime = 0;
    private long whileStart = 0;
    private long whileEnd = 0;
    private long avroStart = 0;
    private long avroEnd = 0;
    private long trackerPersisTime = 0;
    private long trackerMonitorTime = 0;
    private long persisStart = 0;
    private long persisEnd = 0;
    private long monitorStart = 0;
    private long monitorEnd = 0;
    private long trackerSwitchTime = 0;
    private long switchStart = 0;
    private long switchEnd = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal$Fetcher.class */
    public class Fetcher extends Thread {
        private DirectLogFetcherChannel fetcher;
        private LogDecoder decoder;
        private LogContext context;
        private LogEvent event;
        public CanalEntry.Entry fetchLast;
        private Logger logger = LoggerFactory.getLogger(Fetcher.class);
        private TrackerMonitor monitor = new TrackerMonitor();
        private TrackerMonitor minuteMonitor = new TrackerMonitor();
        public FetchMonitorMin timerMonitor = new FetchMonitorMin();
        public Timer timer = new Timer();
        public boolean iskilled = false;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal$Fetcher$FetchMonitorMin.class */
        public class FetchMonitorMin extends TimerTask {
            private Logger logger = LoggerFactory.getLogger(FetchMonitorMin.class);

            FetchMonitorMin() {
            }

            private long getRearNum(String str) {
                long j = 0;
                int length = str.length() - 1;
                while (true) {
                    if (length < 0) {
                        break;
                    }
                    if (!Character.isDigit(str.charAt(length))) {
                        j = Long.valueOf(str.substring(length + 1, str.length())).longValue();
                        break;
                    }
                    length--;
                }
                if (j == 0) {
                    j = Long.valueOf(str).longValue();
                }
                return j;
            }

            private double getDelayNum(String str, long j, String str2, long j2) throws Exception {
                long rearNum = getRearNum(str) - getRearNum(str2);
                long j3 = j - j2;
                if (rearNum != 0) {
                    j3 = j;
                }
                if (j3 < 0) {
                    j3 = 0;
                }
                return Double.valueOf(rearNum + "." + j3).doubleValue();
            }

            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    this.logger.info("==============> per minute fetch monitor:");
                    this.logger.info("---> fetch number of entry:" + Fetcher.this.minuteMonitor.fetchNum + " entries");
                    this.logger.info("---> fetch sum size :" + (Fetcher.this.minuteMonitor.batchSize / HandlerKafkaZkLocal.this.config.mbUnit) + " MB");
                    EntryPosition findPosFromMysqlNow = HandlerKafkaZkLocal.this.findPosFromMysqlNow(HandlerKafkaZkLocal.this.realQuery);
                    Fetcher.this.minuteMonitor.delayNum = 0.0d;
                    if (Fetcher.this.fetchLast != null && findPosFromMysqlNow != null) {
                        Fetcher.this.minuteMonitor.delayNum = getDelayNum(findPosFromMysqlNow.getJournalName(), findPosFromMysqlNow.getPosition().longValue(), Fetcher.this.fetchLast.getHeader().getLogfileName(), Fetcher.this.fetchLast.getHeader().getLogfileOffset());
                    }
                    this.logger.info("---> fetch delay num :" + Fetcher.this.minuteMonitor.delayNum);
                    String jSONObject = JSONConvert.JrdwMonitorVoToJson(Fetcher.this.minuteMonitor.toJrdwMonitorOnline(20001, HandlerKafkaZkLocal.this.jobId)).toString();
                    this.logger.info("fetch monitor : " + jSONObject + ";master:" + findPosFromMysqlNow.getJournalName() + "#" + findPosFromMysqlNow.getPosition() + ";fetch:" + Fetcher.this.fetchLast.getHeader().getLogfileName() + "#" + Fetcher.this.fetchLast.getHeader().getLogfileOffset());
                    HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, jSONObject.getBytes("UTF-8")));
                    Fetcher.this.minuteMonitor.clear();
                } catch (Exception e) {
                    this.logger.error(e.getMessage());
                    e.printStackTrace();
                }
            }
        }

        Fetcher() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                init();
                int i = 0;
                this.timer.schedule(this.timerMonitor, 1000L, HandlerKafkaZkLocal.this.config.monitorsec * 1000);
                while (this.fetcher.fetch()) {
                    if (i == 0) {
                        this.monitor.fetchStart = System.currentTimeMillis();
                    }
                    this.event = this.decoder.decode(this.fetcher, this.context);
                    if (this.event == null) {
                        this.logger.warn("fetched event is null...");
                    } else {
                        CanalEntry.Entry parse = HandlerKafkaZkLocal.this.eventConvert.parse(this.event);
                        if (parse != null) {
                            HandlerKafkaZkLocal.this.entryQueue.put(parse);
                            this.fetchLast = parse;
                            i++;
                            this.minuteMonitor.fetchNum++;
                            this.monitor.batchSize += this.event.getEventLen();
                            this.minuteMonitor.batchSize += this.event.getEventLen();
                            if (i >= HandlerKafkaZkLocal.this.config.batchsize) {
                                this.monitor.fetchEnd = System.currentTimeMillis();
                                this.logger.info("===================================> fetch thread : ");
                                this.logger.info("---> fetch during time : " + (this.monitor.fetchEnd - this.monitor.fetchStart) + " ms");
                                this.logger.info("---> fetch number : " + i + " events");
                                this.logger.info("---> fetch sum size : " + (this.monitor.batchSize / HandlerKafkaZkLocal.this.config.mbUnit) + " MB");
                                this.monitor.clear();
                                i = 0;
                            }
                            if (this.iskilled) {
                                break;
                            }
                        }
                    }
                }
                HandlerKafkaZkLocal.this.fetchSurvival = false;
            } catch (Exception e) {
                if (this.iskilled) {
                    return;
                }
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.Fetcher.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error("fetch thread error : " + e.getMessage());
                e.printStackTrace();
                String message2 = e.getMessage();
                if (!message2.contains("errno = 1236")) {
                    if (message2.contains("zk position is error")) {
                        HandlerKafkaZkLocal.this.globalFetchThread = 1;
                        return;
                    } else {
                        HandlerKafkaZkLocal.this.globalFetchThread = 1;
                        return;
                    }
                }
                try {
                    HandlerKafkaZkLocal.this.zkExecutor.delete(HandlerKafkaZkLocal.this.config.persisPath + "/" + HandlerKafkaZkLocal.this.jobId);
                } catch (Exception e2) {
                    this.logger.error(e2.getMessage());
                    e2.printStackTrace();
                }
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
            }
        }

        private void init() throws Exception {
            EntryPosition findPosFromZk = HandlerKafkaZkLocal.this.findPosFromZk();
            if (findPosFromZk == null) {
                throw new Exception("zk position is error...");
            }
            this.logger.info("set the binlog configuration for the binlog dump");
            HandlerKafkaZkLocal.this.updateExecutor.update("set wait_timeout=9999999");
            HandlerKafkaZkLocal.this.updateExecutor.update("set net_write_timeout=1800");
            HandlerKafkaZkLocal.this.updateExecutor.update("set net_read_timeout=1800");
            HandlerKafkaZkLocal.this.updateExecutor.update("set names 'binary'");
            HandlerKafkaZkLocal.this.updateExecutor.update("set @master_binlog_checksum= '@@global.binlog_checksum'");
            HandlerKafkaZkLocal.this.updateExecutor.update("SET @mariadb_slave_capability='4'");
            this.logger.info("send the binlog dump packet to mysql , let mysql set up a binlog dump thread in mysql");
            BinlogDumpCommandPacket binlogDumpCommandPacket = new BinlogDumpCommandPacket();
            binlogDumpCommandPacket.binlogFileName = findPosFromZk.getJournalName();
            binlogDumpCommandPacket.binlogPosition = findPosFromZk.getPosition().longValue();
            binlogDumpCommandPacket.slaveServerId = HandlerKafkaZkLocal.this.config.slaveId;
            byte[] bytes = binlogDumpCommandPacket.toBytes();
            HeaderPacket headerPacket = new HeaderPacket();
            headerPacket.setPacketBodyLength(bytes.length);
            headerPacket.setPacketSequenceNumber((byte) 0);
            PacketManager.write(HandlerKafkaZkLocal.this.logConnector.getChannel(), new ByteBuffer[]{ByteBuffer.wrap(headerPacket.toBytes()), ByteBuffer.wrap(bytes)});
            this.fetcher = new DirectLogFetcherChannel(HandlerKafkaZkLocal.this.logConnector.getReceiveBufferSize());
            this.fetcher.start(HandlerKafkaZkLocal.this.logConnector.getChannel());
            this.decoder = new LogDecoder(0, LogEvent.ENUM_END_EVENT);
            this.context = new LogContext();
        }

        public void shutdown() {
            this.timerMonitor.cancel();
            this.timer.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal$HeartBeat.class */
    public class HeartBeat extends TimerTask {
        private Logger logger = LoggerFactory.getLogger(HeartBeat.class);

        HeartBeat() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            this.logger.info("=================================> check assembly heartbeats......");
            if (!HandlerKafkaZkLocal.this.logConnector.isConnected() || !HandlerKafkaZkLocal.this.tableConnector.isConnected() || !HandlerKafkaZkLocal.this.realConnector.isConnected()) {
                this.logger.info("mysql connection loss, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
                return;
            }
            if (!isMysqlConnected()) {
                this.logger.info("mysql connection loss, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
                return;
            }
            if (!HandlerKafkaZkLocal.this.msgSender.isConnected()) {
                this.logger.info("kafka producer connection loss, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
                return;
            }
            if (!HandlerKafkaZkLocal.this.phMonitorSender.isConnected()) {
                this.logger.info("phoenix kafka producer connection loss, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
            } else if (!HandlerKafkaZkLocal.this.zkExecutor.isConnected()) {
                this.logger.info("zookeeper connection loss, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
            } else {
                if (HandlerKafkaZkLocal.this.fetchSurvival) {
                    return;
                }
                this.logger.info("fetch thread had been dead, reload the job ......");
                HandlerKafkaZkLocal.this.globalFetchThread = 1;
            }
        }

        private boolean isMysqlConnected() {
            try {
                MysqlConnector mysqlConnector = new MysqlConnector(new InetSocketAddress(HandlerKafkaZkLocal.this.config.address, HandlerKafkaZkLocal.this.config.myPort), HandlerKafkaZkLocal.this.config.username, HandlerKafkaZkLocal.this.config.password);
                mysqlConnector.connect();
                mysqlConnector.disconnect();
                return true;
            } catch (IOException e) {
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal$Minuter.class */
    public class Minuter extends TimerTask {
        private Logger logger = LoggerFactory.getLogger(Minuter.class);

        Minuter() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long j;
            String str;
            Calendar calendar = Calendar.getInstance();
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm");
            SimpleDateFormat simpleDateFormat2 = new SimpleDateFormat("yyyy-MM-dd");
            String format = simpleDateFormat.format(calendar.getTime());
            String format2 = simpleDateFormat2.format(calendar.getTime());
            String str2 = format.split(":")[0];
            if (HandlerKafkaZkLocal.this.globalXidEntry != null) {
                j = HandlerKafkaZkLocal.this.globalXidEntry.getHeader().getLogfileOffset() + HandlerKafkaZkLocal.this.globalXidEntry.getHeader().getEventLength();
                str = HandlerKafkaZkLocal.this.globalBinlogName + ":" + j + ":" + HandlerKafkaZkLocal.this.globalXidBatchId + ":" + HandlerKafkaZkLocal.this.globalXidInBatchId;
            } else {
                j = -1;
                str = HandlerKafkaZkLocal.this.globalBinlogName + ":-1:" + HandlerKafkaZkLocal.this.globalXidBatchId + ":" + HandlerKafkaZkLocal.this.globalXidInBatchId;
            }
            try {
                if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2)) {
                    HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2, format2);
                }
                if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2)) {
                    HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2, str2);
                }
                if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format)) {
                    HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format, format);
                }
                if (HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId)) {
                    HandlerKafkaZkLocal.this.zkExecutor.set(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId, str);
                } else {
                    HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId, str);
                }
            } catch (Exception e) {
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.Minuter.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                e.printStackTrace();
                this.logger.error("minute time err: " + e.getMessage());
                this.logger.error(e.getMessage());
                boolean z = false;
                int i = 0;
                while (!z) {
                    if (i >= HandlerKafkaZkLocal.this.config.retrys) {
                        HandlerKafkaZkLocal.this.globalFetchThread = 1;
                        return;
                    }
                    i++;
                    try {
                        if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2)) {
                            HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2, format2);
                        }
                        if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2)) {
                            HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2, str2);
                        }
                        if (!HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format)) {
                            HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format, format);
                        }
                        if (HandlerKafkaZkLocal.this.zkExecutor.exists(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId)) {
                            HandlerKafkaZkLocal.this.zkExecutor.set(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId, str);
                        } else {
                            HandlerKafkaZkLocal.this.zkExecutor.create(HandlerKafkaZkLocal.this.config.minutePath + "/" + format2 + "/" + str2 + "/" + format + "/" + HandlerKafkaZkLocal.this.jobId, str);
                        }
                        z = true;
                    } catch (Exception e2) {
                        final String message2 = e2.getMessage();
                        new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.Minuter.2
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    TrackerMonitor trackerMonitor = new TrackerMonitor();
                                    trackerMonitor.exMsg = message2;
                                    HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                                } catch (Exception e3) {
                                    e3.printStackTrace();
                                }
                            }
                        }).start();
                        this.logger.error("retrying...... Exception:" + e2.getMessage());
                        HandlerKafkaZkLocal.this.delay(3);
                    }
                }
            }
            this.logger.info("===================================> per minute thread :");
            this.logger.info("---> binlog file is " + HandlerKafkaZkLocal.this.globalBinlogName + ",position is :" + j + "; batch id is :" + HandlerKafkaZkLocal.this.globalXidBatchId + ",in batch id is :" + HandlerKafkaZkLocal.this.globalXidInBatchId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerKafkaZkLocal$RetryTimesOutException.class */
    public class RetryTimesOutException extends Exception {
        public RetryTimesOutException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void delay(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void delayMin(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void init() throws Exception {
        this.logger.info("initializing......");
        this.config.jobId = this.jobId;
        this.config.initConfFile();
        this.jobId = this.config.jobId;
        this.logfile = this.config.logfile;
        this.offset = this.config.offset;
        this.batchId = this.config.batchId;
        this.inBatchId = this.config.inId;
        KafkaConf kafkaConf = new KafkaConf();
        KafkaConf.brokerList = this.config.phKaBrokerList;
        KafkaConf.port = this.config.phKaPort;
        KafkaConf.topic = this.config.phKaTopic;
        KafkaConf.acks = this.config.phKaAcks;
        this.phMonitorSender = new KafkaSender(kafkaConf);
        this.phMonitorSender.connect();
        this.logConnector = new MysqlConnector(new InetSocketAddress(this.config.address, this.config.myPort), this.config.username, this.config.password);
        this.tableConnector = new MysqlConnector(new InetSocketAddress(this.config.address, this.config.myPort), this.config.username, this.config.password);
        this.realConnector = new MysqlConnector(new InetSocketAddress(this.config.address, this.config.myPort), this.config.username, this.config.password);
        boolean z = false;
        int i = 0;
        while (!z) {
            if (i >= this.config.retrys) {
                this.globalFetchThread = 1;
                throw new RetryTimesOutException("reload job......");
            }
            i++;
            try {
                this.logConnector.connect();
                this.tableConnector.connect();
                this.realConnector.connect();
                z = true;
            } catch (IOException e) {
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error("connect mysql failed ... retry to connect it...");
                e.printStackTrace();
                delay(5);
            }
        }
        this.queryExecutor = new MysqlQueryExecutor(this.logConnector);
        this.updateExecutor = new MysqlUpdateExecutor(this.logConnector);
        this.realQuery = new MysqlQueryExecutor(this.realConnector);
        this.tableMetaCache = new TableMetaCache(this.tableConnector);
        this.entryQueue = new LinkedBlockingQueue(this.config.queuesize);
        KafkaConf kafkaConf2 = new KafkaConf();
        TrackerConf trackerConf = this.config;
        KafkaConf.brokerList = TrackerConf.brokerList;
        TrackerConf trackerConf2 = this.config;
        KafkaConf.port = TrackerConf.kafkaPort;
        TrackerConf trackerConf3 = this.config;
        KafkaConf.topic = TrackerConf.topic;
        TrackerConf trackerConf4 = this.config;
        KafkaConf.acks = TrackerConf.acks;
        this.msgSender = new KafkaSender(kafkaConf2);
        this.msgSender.connect();
        ZkConf zkConf = new ZkConf();
        zkConf.zkServers = this.config.zkServers;
        this.zkExecutor = new ZkExecutor(zkConf);
        boolean z2 = false;
        int i2 = 0;
        while (!z2) {
            if (i2 >= this.config.retrys) {
                this.globalFetchThread = 1;
                throw new RetryTimesOutException("reload job......");
            }
            i2++;
            try {
                this.zkExecutor.connect();
                z2 = true;
            } catch (Exception e2) {
                final String message2 = e2.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.2
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message2;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e3) {
                            e3.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error("connect zk failed , retrying......");
                e2.printStackTrace();
                delay(3);
            }
        }
        initZk();
        this.fm = new FilterMatcher(this.config.filterRegex);
        this.eventConvert = new LogEventConvert();
        this.eventConvert.setTableMetaCache(this.tableMetaCache);
        this.eventConvert.setCharset(this.config.charset);
        LogEventConvert logEventConvert = this.eventConvert;
        Map<String, String> map = LogEventConvert.filterMap;
        TrackerConf trackerConf5 = this.config;
        map.putAll(TrackerConf.filterMap);
        this.startTime = System.currentTimeMillis();
        this.globalFetchThread = 0;
        this.fetcher = new Fetcher();
        this.timer = new Timer();
        this.minter = new Minuter();
        this.htimer = new Timer();
        this.heartBeat = new HeartBeat();
        this.monitor = new TrackerMonitor();
        this.entryList = new ArrayList();
        this.messageList = new ArrayList();
    }

    private void initZk() throws Exception {
        boolean z = false;
        int i = 0;
        while (!z) {
            if (i >= this.config.retrys) {
                this.globalFetchThread = 1;
                throw new RetryTimesOutException("reload job......");
            }
            i++;
            try {
                if (!this.zkExecutor.exists(this.config.rootPath)) {
                    this.zkExecutor.create(this.config.rootPath, "");
                }
                if (!this.zkExecutor.exists(this.config.persisPath)) {
                    this.zkExecutor.create(this.config.persisPath, "");
                }
                if (!this.zkExecutor.exists(this.config.minutePath)) {
                    this.zkExecutor.create(this.config.minutePath, "");
                }
                z = true;
            } catch (Exception e) {
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error("retrying...... Exception:" + e.getMessage());
                delay(3);
            }
        }
    }

    private EntryPosition findPosFromMysqlNow() {
        List<String> fieldValues;
        EntryPosition entryPosition = null;
        try {
            fieldValues = this.queryExecutor.query("show master status").getFieldValues();
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (CollectionUtils.isEmpty(fieldValues)) {
            throw new Exception("show master status failed");
        }
        entryPosition = new EntryPosition(fieldValues.get(0), Long.valueOf(fieldValues.get(1)));
        return entryPosition;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public EntryPosition findPosFromMysqlNow(MysqlQueryExecutor mysqlQueryExecutor) {
        List<String> fieldValues;
        if (mysqlQueryExecutor == null) {
            return null;
        }
        EntryPosition entryPosition = null;
        try {
            fieldValues = mysqlQueryExecutor.query("show master status").getFieldValues();
        } catch (Exception e) {
            this.logger.error("show master status error!!!");
            e.printStackTrace();
        }
        if (CollectionUtils.isEmpty(fieldValues)) {
            throw new Exception("show master status failed");
        }
        entryPosition = new EntryPosition(fieldValues.get(0), Long.valueOf(fieldValues.get(1)));
        return entryPosition;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public EntryPosition findPosFromZk() {
        String str;
        String str2;
        this.logger.info("finding position......");
        EntryPosition entryPosition = null;
        try {
            str = this.config.persisPath + "/" + this.jobId;
            str2 = this.zkExecutor.get(str);
        } catch (Exception e) {
            this.logger.error("zk client error : " + e.getMessage());
            e.printStackTrace();
        }
        if (str2 == null || str2.equals("")) {
            if (this.offset > 0) {
                this.logger.info("find mysql position from configuration......");
                EntryPosition entryPosition2 = new EntryPosition(this.logfile, Long.valueOf(this.offset));
                this.logger.info("start position :" + entryPosition2.getBinlogPosFileName() + ":" + entryPosition2.getPosition() + ":" + this.batchId + ":" + this.inBatchId);
                return entryPosition2;
            }
            this.logger.info("find mysql show master status......");
            EntryPosition findPosFromMysqlNow = findPosFromMysqlNow();
            this.batchId = 0L;
            this.inBatchId = 0L;
            this.logger.info("start position :" + findPosFromMysqlNow.getBinlogPosFileName() + ":" + findPosFromMysqlNow.getPosition() + ":" + this.batchId + ":" + this.inBatchId);
            return findPosFromMysqlNow;
        }
        String[] split = str2.split(":");
        if (split.length != 4) {
            this.zkExecutor.delete(str);
            this.logger.error("zk position format is error...");
            return null;
        }
        this.logger.info("find zk position......");
        entryPosition = new EntryPosition(split[0], Long.valueOf(split[1]));
        this.batchId = Long.valueOf(split[2]).longValue();
        this.inBatchId = Long.valueOf(split[3]).longValue();
        this.logger.info("start position :" + entryPosition.getBinlogPosFileName() + ":" + entryPosition.getPosition() + ":" + this.batchId + ":" + this.inBatchId);
        return entryPosition;
    }

    public void prepare(String str) throws Exception {
        this.logger.info("preparing......");
        this.jobId = str;
        try {
            init();
            this.fetchSurvival = true;
            this.fetcher.start();
            this.timer.schedule(this.minter, 1000L, this.config.minsec * 1000);
            this.htimer.schedule(this.heartBeat, 1000L, this.config.heartsec * 1000);
            final String hostAddress = InetAddress.getLocalHost().getHostAddress();
            new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        TrackerMonitor trackerMonitor = new TrackerMonitor();
                        trackerMonitor.ip = hostAddress;
                        HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.IP_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            this.logger.info("start the tracker successfully......");
            delay(3);
        } catch (RetryTimesOutException e) {
            this.logger.error(e.getMessage());
        }
    }

    public void run() throws Exception {
        if (this.entryQueue.isEmpty()) {
            delayMin(100);
        }
        if (this.globalFetchThread == 1) {
            this.globalFetchThread = 0;
            this.logger.error("connect loss or position is error!!! reload......");
            reload(this.jobId);
            delay(5);
            return;
        }
        this.switchEnd = System.currentTimeMillis();
        if (this.switchStart > 0) {
            this.trackerSwitchTime += this.switchEnd - this.switchStart;
        }
        this.whileStart = System.currentTimeMillis();
        while (!this.entryQueue.isEmpty()) {
            CanalEntry.Entry take = this.entryQueue.take();
            if (take != null) {
                this.lastEntry = take;
                TrackerConf trackerConf = this.config;
                if (TrackerConf.filterMap.size() == 0 || isInMap(take.getHeader().getSchemaName() + "." + take.getHeader().getTableName())) {
                    CanalEntry.Entry m76build = CanalEntry.Entry.newBuilder().setHeader(take.getHeader()).setEntryType(take.getEntryType()).setStoreValue(take.getStoreValue()).setBatchId(this.batchId).setInId(this.inBatchId).setIp(this.config.address).m76build();
                    this.inBatchId++;
                    if (isEndEntry(m76build)) {
                        this.inBatchId = 0L;
                        this.batchId++;
                    }
                    byte[] byteArray = m76build.toByteArray();
                    this.monitor.batchSize += byteArray.length;
                    TrackerConf trackerConf2 = this.config;
                    this.messageList.add(new KeyedMessage<>(TrackerConf.topic, (Object) null, byteArray));
                }
                if (this.messageList.size() >= this.config.batchsize || this.monitor.batchSize / this.config.mbUnit >= this.config.spacesize) {
                    break;
                }
            }
        }
        this.whileEnd = System.currentTimeMillis();
        this.trackerWhileTime += this.whileEnd - this.whileStart;
        this.persisStart = System.currentTimeMillis();
        if (this.lastEntry != null) {
            this.binlog = this.lastEntry.getHeader().getLogfileName();
            this.globalBinlogName = this.binlog;
            this.globalXidEntry = this.lastEntry;
            this.globalXidBatchId = this.batchId;
            this.globalXidInBatchId = this.inBatchId;
        }
        if (this.messageList.size() >= this.config.batchsize || this.monitor.batchSize / this.config.mbUnit >= this.config.spacesize || System.currentTimeMillis() - this.startTime > this.config.timeInterval * 1000) {
            if (this.messageList.size() == 0) {
                return;
            }
            this.monitor.persisNum = this.messageList.size();
            this.monitor.delayTime = System.currentTimeMillis() - this.lastEntry.getHeader().getExecuteTime();
            if (persisteKeyMsg(this.messageList) == -1) {
                this.logger.info("persistence the data failed !!! reloading ......");
                this.globalFetchThread = 1;
                return;
            } else {
                confirmPos(this.lastEntry);
                this.messageList.clear();
            }
        }
        this.persisEnd = System.currentTimeMillis();
        this.trackerPersisTime += this.persisEnd - this.persisStart;
        this.monitorStart = System.currentTimeMillis();
        if (this.monitor.persisNum > 0) {
            this.monitor.persistenceStart = this.startTime;
            this.monitor.persistenceEnd = System.currentTimeMillis();
            this.logger.info("===================================> persistence thread / monitor:");
            this.logger.info("---> persistence deal during time:" + (this.monitor.persistenceEnd - this.monitor.persistenceStart) + " ms");
            this.logger.info("---> !!!!debug , while time : " + this.trackerWhileTime);
            this.trackerWhileTime = 0L;
            this.logger.info("---> !!!!debug , persis time : " + this.trackerPersisTime);
            this.trackerPersisTime = 0L;
            this.logger.info("---> !!!!debug , monitor time : " + this.trackerMonitorTime);
            this.trackerMonitorTime = 0L;
            this.logger.info("---> !!!!debug , switch time : " + this.trackerSwitchTime);
            this.trackerSwitchTime = 0L;
            this.logger.info("---> blocking queue size:" + this.entryQueue.size());
            this.logger.info("---> send time :" + (this.monitor.sendEnd - this.monitor.sendStart) + " ms");
            this.logger.info("---> parser delay time:" + this.monitor.delayTime + " ms");
            this.logger.info("---> the number of entry list: " + this.monitor.persisNum + " entries");
            this.logger.info("---> entry list to bytes sum size is " + (this.monitor.batchSize / this.config.mbUnit) + " MB");
            if (this.lastEntry != null) {
                this.logger.info("---> position info: binlog file is " + this.globalBinlogName + ",position is :" + (this.lastEntry.getHeader().getLogfileOffset() + this.lastEntry.getHeader().getEventLength()) + "; batch id is :" + this.globalXidBatchId + ",in batch id is :" + this.globalXidInBatchId);
            }
            final TrackerMonitor cloneDeep = this.monitor.cloneDeep();
            new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        String jSONObject = JSONConvert.JrdwMonitorVoToJson(cloneDeep.toJrdwMonitorOnline(20002, HandlerKafkaZkLocal.this.jobId)).toString();
                        HandlerKafkaZkLocal.this.logger.info("monitor :" + jSONObject);
                        HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, jSONObject.getBytes("UTF-8")));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            this.monitor.clear();
            this.startTime = System.currentTimeMillis();
        }
        this.monitorEnd = System.currentTimeMillis();
        this.trackerMonitorTime += this.monitorEnd - this.monitorStart;
        this.switchStart = System.currentTimeMillis();
    }

    private boolean isInMap(String str) {
        TrackerConf trackerConf = this.config;
        return TrackerConf.filterMap.containsKey(str);
    }

    private void persisteData(List<CanalEntry.Entry> list) {
        this.monitor.persistenceStart = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        Iterator<CanalEntry.Entry> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().toByteArray());
            this.monitor.batchSize += r0.length;
        }
        this.monitor.persistenceEnd = System.currentTimeMillis();
        this.monitor.hbaseWriteStart = System.currentTimeMillis();
        if (arrayList.size() > 0) {
            this.msgSender.send(arrayList);
        }
        this.monitor.hbaseWriteEnd = System.currentTimeMillis();
    }

    private int persisteKeyMsg(List<KeyedMessage<String, byte[]>> list) {
        this.monitor.sendStart = System.currentTimeMillis();
        int sendKeyMsg = this.msgSender.sendKeyMsg(list, this.phMonitorSender, this.config);
        this.monitor.sendEnd = System.currentTimeMillis();
        return sendKeyMsg;
    }

    private void confirmPos(LogEvent logEvent, String str) throws Exception {
        if (logEvent != null) {
            String str2 = str + ":" + logEvent.getLogPos() + ":" + this.batchId + ":" + this.inBatchId;
            try {
                String str3 = this.config.persisPath + "/" + this.jobId;
                if (this.zkExecutor.exists(str3)) {
                    this.zkExecutor.set(str3, str2);
                } else {
                    this.zkExecutor.create(str3, str2);
                }
            } catch (Exception e) {
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.6
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error(e.getMessage());
                boolean z = false;
                int i = 0;
                while (!z) {
                    if (i >= this.config.retrys) {
                        this.globalFetchThread = 1;
                        return;
                    }
                    i++;
                    try {
                        this.zkExecutor.set(this.config.persisPath + "/" + this.jobId, str2);
                        z = true;
                    } catch (Exception e2) {
                        final String message2 = e2.getMessage();
                        new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.7
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    TrackerMonitor trackerMonitor = new TrackerMonitor();
                                    trackerMonitor.exMsg = message2;
                                    HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                                } catch (Exception e3) {
                                    e3.printStackTrace();
                                }
                            }
                        }).start();
                        this.logger.error("retrying...... Exception:" + e2.getMessage());
                        delay(3);
                    }
                }
            }
        }
    }

    private void confirmPos(CanalEntry.Entry entry) throws Exception {
        if (entry != null) {
            String str = entry.getHeader().getLogfileName() + ":" + (entry.getHeader().getLogfileOffset() + entry.getHeader().getEventLength()) + ":" + this.batchId + ":" + this.inBatchId;
            try {
                String str2 = this.config.persisPath + "/" + this.jobId;
                if (this.zkExecutor.exists(str2)) {
                    this.zkExecutor.set(str2, str);
                } else {
                    this.zkExecutor.create(str2, str);
                }
            } catch (Exception e) {
                final String message = e.getMessage();
                new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.8
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            TrackerMonitor trackerMonitor = new TrackerMonitor();
                            trackerMonitor.exMsg = message;
                            HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }).start();
                this.logger.error(e.getMessage());
                boolean z = false;
                int i = 0;
                while (!z) {
                    if (i >= this.config.retrys) {
                        this.globalFetchThread = 1;
                        return;
                    }
                    i++;
                    try {
                        this.zkExecutor.set(this.config.persisPath + "/" + this.jobId, str);
                        z = true;
                    } catch (Exception e2) {
                        final String message2 = e2.getMessage();
                        new Thread(new Runnable() { // from class: com.github.hackerwin7.mysql.tracker.tracker.HandlerKafkaZkLocal.9
                            @Override // java.lang.Runnable
                            public void run() {
                                try {
                                    TrackerMonitor trackerMonitor = new TrackerMonitor();
                                    trackerMonitor.exMsg = message2;
                                    HandlerKafkaZkLocal.this.phMonitorSender.sendKeyMsg(new KeyedMessage<>(HandlerKafkaZkLocal.this.config.phKaTopic, (Object) null, JSONConvert.JrdwMonitorVoToJson(trackerMonitor.toJrdwMonitorOnline(JDMysqlTrackerMonitorType.EXCEPTION_MONITOR, HandlerKafkaZkLocal.this.jobId)).toString().getBytes("UTF-8")));
                                } catch (Exception e3) {
                                    e3.printStackTrace();
                                }
                            }
                        }).start();
                        this.logger.error("retrying...... Exception:" + e2.getMessage());
                        delay(3);
                    }
                }
            }
        }
    }

    private void confirmPos(CanalEntry.Entry entry, String str) throws Exception {
        if (entry != null) {
            String str2 = str + ":" + (entry.getHeader().getLogfileOffset() + entry.getHeader().getEventLength()) + ":" + this.batchId + ":" + this.inBatchId;
            try {
                String str3 = this.config.persisPath + "/" + this.jobId;
                if (this.zkExecutor.exists(str3)) {
                    this.zkExecutor.set(str3, str2);
                } else {
                    this.zkExecutor.create(str3, str2);
                }
            } catch (Exception e) {
                this.logger.error(e.getMessage());
                boolean z = false;
                int i = 0;
                while (!z) {
                    if (i >= this.config.retrys) {
                        this.globalFetchThread = 1;
                        return;
                    }
                    i++;
                    try {
                        String str4 = this.config.persisPath + "/" + this.jobId;
                        this.zkExecutor.set(this.config.persisPath, str2);
                        z = true;
                    } catch (Exception e2) {
                        this.logger.error("retrying...... Exception:" + e2.getMessage());
                        delay(3);
                    }
                }
            }
        }
    }

    private boolean isEndEvent(LogEvent logEvent) {
        if (logEvent.getHeader().getType() != 16) {
            return logEvent.getHeader().getType() == 2 && !StringUtils.endsWithIgnoreCase(((QueryLogEvent) logEvent).getQuery(), LogEventConvert.BEGIN);
        }
        return true;
    }

    private boolean isEndEntry(CanalEntry.Entry entry) {
        try {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                return true;
            }
            if (CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getIsDdl()) {
                return entry.getEntryType() == CanalEntry.EntryType.ROWDATA;
            }
            return false;
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return false;
        }
    }

    public void pause(String str) throws Exception {
    }

    public void reload(String str) throws Exception {
        close(this.jobId);
        prepare(this.jobId);
    }

    public void close(String str) throws Exception {
        this.logger.info("closing......");
        this.fetcher.iskilled = true;
        this.fetcher.shutdown();
        this.minter.cancel();
        this.heartBeat.cancel();
        this.timer.cancel();
        this.htimer.cancel();
        this.logConnector.disconnect();
        this.realConnector.disconnect();
        this.tableConnector.disconnect();
        this.msgSender.close();
        this.zkExecutor.close();
        this.config.clear();
    }
}
