package com.github.hackerwin7.mysql.tracker.tracker;

import com.github.hackerwin7.mysql.tracker.filter.FilterMatcher;
import com.github.hackerwin7.mysql.tracker.hbase.driver.HBaseOperator;
import com.github.hackerwin7.mysql.tracker.monitor.TrackerMonitor;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.DirectLogFetcherChannel;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogContext;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogDecoder;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.LogEvent;
import com.github.hackerwin7.mysql.tracker.mysql.dbsync.event.QueryLogEvent;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlConnector;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlQueryExecutor;
import com.github.hackerwin7.mysql.tracker.mysql.driver.MysqlUpdateExecutor;
import com.github.hackerwin7.mysql.tracker.mysql.driver.packets.HeaderPacket;
import com.github.hackerwin7.mysql.tracker.mysql.driver.packets.client.BinlogDumpCommandPacket;
import com.github.hackerwin7.mysql.tracker.mysql.driver.utils.PacketManager;
import com.github.hackerwin7.mysql.tracker.protocol.avro.EventEntryAvro;
import com.github.hackerwin7.mysql.tracker.protocol.json.ConfigJson;
import com.github.hackerwin7.mysql.tracker.protocol.protobuf.CanalEntry;
import com.github.hackerwin7.mysql.tracker.tracker.common.TableMetaCache;
import com.github.hackerwin7.mysql.tracker.tracker.parser.LogEventConvert;
import com.github.hackerwin7.mysql.tracker.tracker.position.EntryPosition;
import com.github.hackerwin7.mysql.tracker.tracker.utils.TrackerConfiger;
import com.google.protobuf.InvalidProtocolBufferException;
import com.jd.bdp.magpie.MagpieExecutor;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.sf.json.JSONObject;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerNoParserMagpieHBase.class */
public class HandlerNoParserMagpieHBase implements MagpieExecutor {
    private MysqlConnector connector;
    private MysqlQueryExecutor queryExecutor;
    private MysqlUpdateExecutor updateExecutor;
    private MysqlConnector realConnector;
    private MysqlQueryExecutor realQuery;
    private MysqlConnector connectorTable;
    private TableMetaCache tableMetaCache;
    LogEventConvert eventParser;
    private TrackerConfiger configer;
    private EntryPosition startPosition;
    private HBaseOperator hbaseOP;
    private BlockingQueue<LogEvent> eventQueue;
    private boolean running;
    private long startTime;
    private List<LogEvent> eventList;
    private TrackerMonitor monitor;
    FetchThread takeData;
    PerminTimer minTask;
    Timer timer;
    private String jobId;
    private FilterMatcher fm;
    private Logger logger = LoggerFactory.getLogger(HandlerNoParserMagpieHBase.class);
    private int batchsize = 100000;
    private double secondsize = 1.0d;
    private int secondPer = 60;
    private final int MAXQUEUE = 30000;
    private LogEvent globalEvent = null;
    private LogEvent globalXidEvent = null;
    private String globalBinlogName = null;
    private byte[] globalEventRowKey = null;
    private byte[] globalXidEventRowKey = null;
    private byte[] globalEntryRowKey = null;
    private int globalFetchThread = 0;

    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerNoParserMagpieHBase$FetchThread.class */
    class FetchThread extends Thread {
        private DirectLogFetcherChannel fetcher;
        private LogDecoder decoder;
        private LogContext context;
        private LogEvent event;
        private Logger logger = LoggerFactory.getLogger(FetchThread.class);
        private TrackerMonitor monitor = new TrackerMonitor();

        FetchThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                preRun();
                int i = 0;
                while (this.fetcher.fetch()) {
                    if (i == 0) {
                        this.monitor.fetchStart = System.currentTimeMillis();
                    }
                    this.event = this.decoder.decode(this.fetcher, this.context);
                    if (this.event == null) {
                        this.logger.warn("fetched event is null!!!");
                    } else {
                        i++;
                        this.monitor.batchSize += this.event.getEventLen();
                        try {
                            if (this.event != null) {
                                HandlerNoParserMagpieHBase.this.eventQueue.put(this.event);
                            }
                            if (i % 10000 == 0) {
                                this.monitor.fetchEnd = System.currentTimeMillis();
                                this.logger.info("======> fetch thread : ");
                                this.logger.info("---> fetch during time : " + (this.monitor.fetchEnd - this.monitor.fetchStart));
                                this.logger.info("---> fetch number : " + i);
                                this.logger.info("---> fetch sum size : " + this.monitor.batchSize);
                                this.monitor.clear();
                                i = 0;
                            }
                        } catch (InterruptedException e) {
                            this.logger.error("eventQueue and entryQueue add data failed!!!");
                            throw new InterruptedIOException();
                        }
                    }
                }
            } catch (IOException e2) {
                this.logger.error("fetch data failed!!! the IOException is " + e2.getMessage());
                e2.printStackTrace();
                if (e2.getMessage().contains("errno = 1236")) {
                    try {
                        HandlerNoParserMagpieHBase.this.hbaseOP.deleteHBaseData(new Delete(Bytes.toBytes(HandlerNoParserMagpieHBase.this.hbaseOP.trackerRowKey)), HandlerNoParserMagpieHBase.this.hbaseOP.getCheckpointSchemaName());
                        HandlerNoParserMagpieHBase.this.close(HandlerNoParserMagpieHBase.this.jobId);
                    } catch (Exception e3) {
                        this.logger.error("delete the checkpoint row key failed ...... msg : " + e3.getMessage());
                    }
                    HandlerNoParserMagpieHBase.this.globalFetchThread = 1;
                }
            }
        }

        public void preRun() throws IOException {
            this.logger.info("set the binlog configuration for the binlog dump");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("set wait_timeout=9999999");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("set net_write_timeout=1800");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("set net_read_timeout=1800");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("set names 'binary'");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("set @master_binlog_checksum= '@@global.binlog_checksum'");
            HandlerNoParserMagpieHBase.this.updateExecutor.update("SET @mariadb_slave_capability='4'");
            this.logger.info("send the binlog dump packet to mysql , let mysql set up a binlog dump thread in mysql");
            BinlogDumpCommandPacket binlogDumpCommandPacket = new BinlogDumpCommandPacket();
            binlogDumpCommandPacket.binlogFileName = HandlerNoParserMagpieHBase.this.startPosition.getJournalName();
            binlogDumpCommandPacket.binlogPosition = HandlerNoParserMagpieHBase.this.startPosition.getPosition().longValue();
            binlogDumpCommandPacket.slaveServerId = HandlerNoParserMagpieHBase.this.configer.getSlaveId().longValue();
            byte[] bytes = binlogDumpCommandPacket.toBytes();
            HeaderPacket headerPacket = new HeaderPacket();
            headerPacket.setPacketBodyLength(bytes.length);
            headerPacket.setPacketSequenceNumber((byte) 0);
            PacketManager.write(HandlerNoParserMagpieHBase.this.connector.getChannel(), new ByteBuffer[]{ByteBuffer.wrap(headerPacket.toBytes()), ByteBuffer.wrap(bytes)});
            this.logger.info("initialize the mysql.dbsync class");
            this.fetcher = new DirectLogFetcherChannel(HandlerNoParserMagpieHBase.this.connector.getReceiveBufferSize());
            this.fetcher.start(HandlerNoParserMagpieHBase.this.connector.getChannel());
            this.decoder = new LogDecoder(0, LogEvent.ENUM_END_EVENT);
            this.context = new LogContext();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/hackerwin7/mysql/tracker/tracker/HandlerNoParserMagpieHBase$PerminTimer.class */
    public class PerminTimer extends TimerTask {
        private Logger logger = LoggerFactory.getLogger(PerminTimer.class);

        PerminTimer() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            if (HandlerNoParserMagpieHBase.this.globalBinlogName == null || HandlerNoParserMagpieHBase.this.globalXidEvent == null || HandlerNoParserMagpieHBase.this.globalXidEventRowKey == null) {
                return;
            }
            String str = HandlerNoParserMagpieHBase.this.hbaseOP.trackerRowKey + "##" + new SimpleDateFormat("yyyy-MM-dd HH:mm").format(Calendar.getInstance().getTime());
            Put put = new Put(Bytes.toBytes(str));
            String str2 = HandlerNoParserMagpieHBase.this.globalBinlogName + ":" + HandlerNoParserMagpieHBase.this.globalXidEvent.getLogPos();
            String valueOf = String.valueOf(Long.valueOf(Bytes.toLong(HandlerNoParserMagpieHBase.this.globalXidEventRowKey)));
            put.add(HandlerNoParserMagpieHBase.this.hbaseOP.getFamily(), Bytes.toBytes(HandlerNoParserMagpieHBase.this.hbaseOP.binlogXidCol), Bytes.toBytes(str2));
            put.add(HandlerNoParserMagpieHBase.this.hbaseOP.getFamily(), Bytes.toBytes(HandlerNoParserMagpieHBase.this.hbaseOP.eventXidCol), Bytes.toBytes(valueOf));
            try {
                HandlerNoParserMagpieHBase.this.hbaseOP.putHBaseData(put, HandlerNoParserMagpieHBase.this.hbaseOP.getCheckpointSchemaName());
            } catch (IOException e) {
                this.logger.error("per minute persistence failed!!!");
                e.printStackTrace();
            }
            this.logger.info("======> per minute persistence the binlog xid and event xid to checkpoint : ");
            this.logger.info("---> row kye is " + str + ",col is :" + str2 + "; col is :" + valueOf);
        }
    }

    public HandlerNoParserMagpieHBase(TrackerConfiger trackerConfiger) {
        this.configer = trackerConfiger;
    }

    public HandlerNoParserMagpieHBase(File file) throws IOException {
        if (!file.exists()) {
            this.logger.error("properties file is not found !!! can not load the task!!!");
            System.exit(1);
            return;
        }
        BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
        Properties properties = new Properties();
        properties.load(bufferedInputStream);
        this.configer = new TrackerConfiger();
        this.configer.setAddress(properties.getProperty("mysql.address"));
        this.configer.setPort(Integer.valueOf(properties.getProperty("mysql.port")).intValue());
        this.configer.setUsername(properties.getProperty("mysql.usr"));
        this.configer.setPassword(properties.getProperty("mysql.psd"));
        this.configer.setSlaveId(Long.valueOf(properties.getProperty("mysql.slaveId")));
        this.configer.setHbaseRootDir(properties.getProperty("hbase.rootdir"));
        this.configer.setHbaseDistributed(properties.getProperty("hbase.cluster.distributed"));
        this.configer.setHbaseZkQuorum(properties.getProperty("hbase.zookeeper.quorum"));
        this.configer.setHbaseZkPort(properties.getProperty("hbase.zookeeper.property.clientPort"));
        this.configer.setDfsSocketTimeout(properties.getProperty("dfs.socket.timeout"));
    }

    public void prepare(String str) throws Exception {
        boolean z;
        this.jobId = str;
        JSONObject json = new ConfigJson(str).getJson();
        if (json != null) {
            JSONObject jSONObject = json.getJSONObject("info").getJSONObject("content");
            this.configer.setUsername(jSONObject.getString("Username"));
            this.configer.setPassword(jSONObject.getString("Password"));
            this.configer.setAddress(jSONObject.getString("Address"));
            this.configer.setPort(jSONObject.getInt("Port"));
            this.configer.setSlaveId(Long.valueOf(jSONObject.getLong("SlaveId")));
            this.configer.setHbaseRootDir(jSONObject.getString("HbaseRootDir"));
            this.configer.setHbaseDistributed(jSONObject.getString("HbaseDistributed"));
            this.configer.setHbaseZkQuorum(jSONObject.getString("HbaseZkQuorum"));
            this.configer.setHbaseZkPort(jSONObject.getString("HbaseZkPort"));
            this.configer.setDfsSocketTimeout(jSONObject.getString("DfsSocketTimeout"));
        }
        this.logger.info("starting the  tracker ......");
        do {
            this.connector = new MysqlConnector(new InetSocketAddress(this.configer.getAddress(), this.configer.getPort()), this.configer.getUsername(), this.configer.getPassword());
            this.connectorTable = new MysqlConnector(new InetSocketAddress(this.configer.getAddress(), this.configer.getPort()), this.configer.getUsername(), this.configer.getPassword());
            this.realConnector = new MysqlConnector(new InetSocketAddress(this.configer.getAddress(), this.configer.getPort()), this.configer.getUsername(), this.configer.getPassword());
            try {
                this.connector.connect();
                this.connectorTable.connect();
                this.realConnector.connect();
                z = true;
            } catch (IOException e) {
                this.logger.error("connector connect failed or connectorTable connect failed");
                e.printStackTrace();
                this.logger.error("the mysql " + this.configer.getAddress() + " is not available ...");
                z = false;
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                }
            }
        } while (!z);
        this.queryExecutor = new MysqlQueryExecutor(this.connector);
        this.updateExecutor = new MysqlUpdateExecutor(this.connector);
        this.realQuery = new MysqlQueryExecutor(this.realConnector);
        this.hbaseOP = new HBaseOperator(str);
        this.hbaseOP.getConf().set("hbase.rootdir", this.configer.getHbaseRootDir());
        this.hbaseOP.getConf().set("hbase.cluster.distributed", this.configer.getHbaseDistributed());
        this.hbaseOP.getConf().set("hbase.zookeeper.quorum", this.configer.getHbaseZkQuorum());
        this.hbaseOP.getConf().set("hbase.zookeeper.property.clientPort", this.configer.getHbaseZkPort());
        this.hbaseOP.getConf().set("dfs.socket.timeout", this.configer.getDfsSocketTimeout());
        this.hbaseOP.connect();
        this.logger.info("find start position");
        this.startPosition = findStartPosition();
        if (this.startPosition == null) {
            throw new NullPointerException("start position is null");
        }
        this.tableMetaCache = new TableMetaCache(this.connectorTable);
        this.eventParser = new LogEventConvert();
        this.eventParser.setTableMetaCache(this.tableMetaCache);
        this.eventQueue = new LinkedBlockingQueue(30000);
        this.logger.info("start the tracker thread to dump the binlog data from mysql...");
        this.globalFetchThread = 0;
        this.takeData = new FetchThread();
        this.takeData.start();
        this.logger.info("start the minute thread to save the position per minute as checkpoint...");
        this.minTask = new PerminTimer();
        this.timer = new Timer();
        this.timer.schedule(this.minTask, 1000L, this.secondPer * 1000);
        this.startTime = new Date().getTime();
        this.eventList = new ArrayList();
        this.monitor = new TrackerMonitor();
        this.logger.info("tracker is started successfully......");
        this.fm = new FilterMatcher(this.configer.getFilterRegex());
    }

    private EntryPosition findStartPosition() throws IOException {
        EntryPosition findHBaseStartPosition = findHBaseStartPosition();
        if (findHBaseStartPosition == null) {
            this.logger.info("file position load failed , get the position from mysql!");
            findHBaseStartPosition = findMysqlStartPosition();
        } else {
            this.logger.info("file position loaded!");
        }
        return findHBaseStartPosition;
    }

    private EntryPosition findHBaseStartPosition() throws IOException {
        EntryPosition entryPosition = null;
        Get get = new Get(Bytes.toBytes(this.hbaseOP.trackerRowKey));
        get.addFamily(this.hbaseOP.getFamily());
        for (KeyValue keyValue : this.hbaseOP.getHBaseData(get, this.hbaseOP.getCheckpointSchemaName()).raw()) {
            byte[] value = keyValue.getValue();
            if (value != null) {
                String str = new String(value);
                if (str.contains(":")) {
                    String[] split = str.split(":");
                    entryPosition = new EntryPosition(split[0], Long.valueOf(split[1]));
                } else {
                    this.globalEventRowKey = Bytes.toBytes(Long.valueOf(Bytes.toString(value)).longValue());
                }
            }
        }
        return entryPosition;
    }

    private EntryPosition findMysqlStartPosition() throws IOException {
        List<String> fieldValues = this.queryExecutor.query("show master status").getFieldValues();
        if (CollectionUtils.isEmpty(fieldValues)) {
            throw new NullPointerException("show master status failed!");
        }
        EntryPosition entryPosition = new EntryPosition(fieldValues.get(0), Long.valueOf(fieldValues.get(1)));
        Long l = 0L;
        this.globalEventRowKey = Bytes.toBytes(l.longValue());
        return entryPosition;
    }

    public void reload(String str) {
    }

    public void pause(String str) throws Exception {
    }

    public void close(String str) throws Exception {
        this.minTask.cancel();
        this.timer.cancel();
        this.connector.disconnect();
        this.connectorTable.disconnect();
        this.realConnector.disconnect();
        this.hbaseOP.disconnect();
    }

    public void run() throws Exception {
        if (this.globalFetchThread == 1) {
            this.globalFetchThread = 0;
            throw new Exception("restart the magpie executor !!! ");
        }
        while (!this.eventQueue.isEmpty()) {
            try {
                LogEvent take = this.eventQueue.take();
                if (take != null) {
                    this.eventList.add(take);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (this.eventList.size() >= this.batchsize) {
                break;
            }
        }
        if (this.eventList.size() >= this.batchsize || new Date().getTime() - this.startTime > this.secondsize * 1000.0d) {
            this.monitor.persisNum = this.eventList.size();
            try {
                writeHBaseEvent();
            } catch (Exception e2) {
                this.logger.error("persistence event list error msg : " + e2.getMessage());
                e2.printStackTrace();
            }
            if (existXid(this.eventList)) {
                try {
                    writeHBaseCheckpointXid();
                } catch (IOException e3) {
                    this.logger.error("persistence xid pos error");
                    e3.printStackTrace();
                }
            }
            this.eventList.clear();
            this.startTime = new Date().getTime();
        }
        this.monitor.persistenceEnd = System.currentTimeMillis();
        if (this.monitor.persisNum > 0) {
            this.logger.info("---> persistence deal during time : " + (this.monitor.persistenceEnd - this.monitor.persistenceStart));
            this.logger.info("---> write hbase during time : " + (this.monitor.hbaseWriteEnd - this.monitor.hbaseWriteStart));
            this.logger.info("---> entry list to bytes sum size is " + this.monitor.batchSize);
            this.logger.info("---> the number if entry list is " + this.monitor.persisNum);
            this.monitor.clear();
        }
    }

    private void writeHBaseEvent() throws IOException {
        byte[] bArr = this.globalEventRowKey;
        ArrayList arrayList = new ArrayList();
        LogEvent logEvent = null;
        CanalEntry.Entry entry = null;
        CanalEntry.Entry entry2 = null;
        this.monitor.persistenceStart = System.currentTimeMillis();
        for (LogEvent logEvent2 : this.eventList) {
            logEvent = logEvent2;
            try {
                entry2 = this.eventParser.parse(logEvent2);
            } catch (Exception e) {
                this.logger.error("parse to entry failed!!! msg : " + e.getMessage());
                e.printStackTrace();
            }
            if (entry2 != null && this.fm.isMatch(entry2.getHeader().getSchemaName() + "." + entry2.getHeader().getTableName())) {
                if (entry2 != null && entry2.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    entry = entry2;
                }
                this.globalBinlogName = this.eventParser.getBinlogFileName();
                if (entry2 != null) {
                    Put put = new Put(bArr);
                    byte[] bytesFromEntryToAvro = getBytesFromEntryToAvro(entry2);
                    this.monitor.batchSize += bytesFromEntryToAvro.length;
                    put.add(this.hbaseOP.getFamily(), Bytes.toBytes(this.hbaseOP.eventBytesCol), bytesFromEntryToAvro);
                    arrayList.add(put);
                    bArr = Bytes.toBytes(Bytes.toLong(bArr) + 1);
                    if (isEndEvent(logEvent2)) {
                        this.globalXidEvent = logEvent2;
                        this.globalXidEventRowKey = bArr;
                    }
                }
            }
        }
        this.monitor.persistenceEnd = System.currentTimeMillis();
        if (logEvent != null && entry != null) {
            if (this.eventList.size() > 0) {
                this.logger.info("======> persistence the " + this.eventList.size() + " events  the batched last column is " + getEntryCol(entry));
            }
            logInfoEvent(logEvent);
            logInfoBatchEvent(logEvent);
        }
        this.monitor.hbaseWriteStart = System.currentTimeMillis();
        this.hbaseOP.putHBaseData(arrayList, this.hbaseOP.getEventBytesSchemaName());
        this.monitor.hbaseWriteEnd = System.currentTimeMillis();
        this.globalEventRowKey = bArr;
    }

    private boolean existXid(List<LogEvent> list) {
        Iterator<LogEvent> it = list.iterator();
        while (it.hasNext()) {
            if (isEndEvent(it.next())) {
                return true;
            }
        }
        return false;
    }

    private boolean isEndEvent(LogEvent logEvent) {
        if (logEvent.getHeader().getType() != 16) {
            return logEvent.getHeader().getType() == 2 && !StringUtils.endsWithIgnoreCase(((QueryLogEvent) logEvent).getQuery(), LogEventConvert.BEGIN);
        }
        return true;
    }

    private void writeHBaseCheckpointXid() throws IOException {
        Put put = new Put(Bytes.toBytes(this.hbaseOP.trackerRowKey));
        String str = this.globalBinlogName + ":" + this.globalXidEvent.getLogPos();
        String valueOf = String.valueOf(Long.valueOf(Bytes.toLong(this.globalXidEventRowKey)));
        put.add(this.hbaseOP.getFamily(), Bytes.toBytes(this.hbaseOP.binlogXidCol), Bytes.toBytes(str));
        put.add(this.hbaseOP.getFamily(), Bytes.toBytes(this.hbaseOP.eventXidCol), Bytes.toBytes(valueOf));
        this.hbaseOP.putHBaseData(put, this.hbaseOP.getCheckpointSchemaName());
    }

    private long getDelay(LogEvent logEvent) {
        return new Date().getTime() - logEvent.getWhen();
    }

    private void logInfoBatchEvent(LogEvent logEvent) {
        if (logEvent != null) {
            try {
                List<String> fieldValues = this.realQuery.query("show master status").getFieldValues();
                if (CollectionUtils.isEmpty(fieldValues)) {
                    throw new NullPointerException("show master status failed!");
                }
                this.logger.info("---> batch over stock : " + (new EntryPosition(fieldValues.get(0), Long.valueOf(fieldValues.get(1))).getPosition().longValue() - logEvent.getLogPos()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void logInfoEvent(LogEvent logEvent) {
        if (logEvent != null) {
            try {
                this.logger.info("--->get event : " + LogEvent.getTypeName(logEvent.getHeader().getType()) + ", now pos: " + (logEvent.getLogPos() - logEvent.getEventLen()) + ", next pos: " + logEvent.getLogPos() + ", binlog file : " + this.eventParser.getBinlogFileName() + ", delay time : " + getDelay(logEvent) + ", type : " + getEventType(logEvent));
                if (logEvent.getHeader().getType() == 2) {
                    this.logger.info(", sql : " + ((QueryLogEvent) logEvent).getQuery());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private String getEntryCol(CanalEntry.Entry entry) {
        String str = "";
        try {
            CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            if (parseFrom.getRowDatasList().size() > 0) {
                CanalEntry.RowData rowDatas = parseFrom.getRowDatas(0);
                if (rowDatas.getAfterColumnsList().size() > 0) {
                    str = rowDatas.getAfterColumns(0).getName() + " ## " + rowDatas.getAfterColumns(0).getValue();
                }
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
        return str;
    }

    private String getEventType(LogEvent logEvent) {
        return LogEvent.getTypeName(logEvent.getHeader().getType());
    }

    private void delaySec(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private String getEntryType(CanalEntry.Entry entry) {
        try {
            switch (CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getEventType()) {
                case INSERT:
                    return "INSERT";
                case UPDATE:
                    return "UPDATE";
                case DELETE:
                    return "DELETE";
                case CREATE:
                    return "CREATE";
                case ALTER:
                    return "ALTER";
                case ERASE:
                    return "ERASE";
                case QUERY:
                    return "QUERY";
                case TRUNCATE:
                    return "TRUNCATE";
                case RENAME:
                    return "RENAME";
                case CINDEX:
                    return "CINDEX";
                case DINDEX:
                    return "DINDEX";
                default:
                    return "UNKNOWN";
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            return "NULL";
        }
    }

    private EventEntryAvro entryToAvro(CanalEntry.Entry entry) {
        EventEntryAvro eventEntryAvro = new EventEntryAvro();
        eventEntryAvro.setDbName(entry.getHeader().getSchemaName());
        eventEntryAvro.setSchema$(entry.getHeader().getSchemaName());
        eventEntryAvro.setTableName(entry.getHeader().getTableName());
        eventEntryAvro.setOperation(getEntryType(entry));
        eventEntryAvro.setDbOptTimestamp(Long.valueOf(entry.getHeader().getExecuteTime()));
        eventEntryAvro.setDmlHBaseOptTimestamp(Long.valueOf(new Date().getTime()));
        try {
            CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            if (parseFrom.getIsDdl()) {
                eventEntryAvro.setDdlSql(parseFrom.getSql());
            } else {
                eventEntryAvro.setDdlSql("");
            }
            eventEntryAvro.setError("");
            for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                if (parseFrom.getEventType() == CanalEntry.EventType.DELETE) {
                    List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                    HashMap hashMap = new HashMap();
                    HashMap hashMap2 = new HashMap();
                    for (CanalEntry.Column column : beforeColumnsList) {
                        hashMap2.put(column.getName(), column.getValue());
                        if (column.getIsKey()) {
                            hashMap.put(column.getName(), column.getValue());
                        }
                    }
                    eventEntryAvro.setCurrent(hashMap);
                    eventEntryAvro.setSource(hashMap2);
                } else if (parseFrom.getEventType() == CanalEntry.EventType.INSERT) {
                    List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                    HashMap hashMap3 = new HashMap();
                    HashMap hashMap4 = new HashMap();
                    for (CanalEntry.Column column2 : afterColumnsList) {
                        hashMap3.put(column2.getName(), column2.getValue());
                    }
                    eventEntryAvro.setSource(hashMap4);
                    eventEntryAvro.setCurrent(hashMap3);
                } else if (parseFrom.getEventType() == CanalEntry.EventType.UPDATE) {
                    List<CanalEntry.Column> beforeColumnsList2 = rowData.getBeforeColumnsList();
                    List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
                    HashMap hashMap5 = new HashMap();
                    HashMap hashMap6 = new HashMap();
                    int i = 0;
                    int i2 = 0;
                    while (true) {
                        if (i <= afterColumnsList2.size() - 1 || i2 <= beforeColumnsList2.size() - 1) {
                            if (i <= afterColumnsList2.size() - 1) {
                                hashMap6.put(afterColumnsList2.get(i).getName(), afterColumnsList2.get(i).getValue());
                            }
                            if (i2 <= beforeColumnsList2.size() - 1) {
                                hashMap5.put(beforeColumnsList2.get(i2).getName(), beforeColumnsList2.get(i2).getValue());
                            }
                            i++;
                            i2++;
                        }
                    }
                } else {
                    HashMap hashMap7 = new HashMap();
                    eventEntryAvro.setCurrent(new HashMap());
                    eventEntryAvro.setSource(hashMap7);
                }
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
        return eventEntryAvro;
    }

    private byte[] getBytesFromAvro(EventEntryAvro eventEntryAvro) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, (BinaryEncoder) null);
        try {
            new SpecificDatumWriter(EventEntryAvro.getClassSchema()).write(eventEntryAvro, binaryEncoder);
            binaryEncoder.flush();
            byteArrayOutputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return byteArrayOutputStream.toByteArray();
    }

    private EventEntryAvro getAvroFromBytes(byte[] bArr) {
        EventEntryAvro eventEntryAvro = null;
        try {
            eventEntryAvro = (EventEntryAvro) new SpecificDatumReader(EventEntryAvro.getClassSchema()).read((Object) null, DecoderFactory.get().binaryDecoder(bArr, (BinaryDecoder) null));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return eventEntryAvro;
    }

    private byte[] getBytesFromEntryToAvro(CanalEntry.Entry entry) {
        return getBytesFromAvro(entryToAvro(entry));
    }
}
