/*
 * Decompiled with CFR 0.152.
 */
package cn.stylefeng.roses.kernel.sync.modular.cc;

import cn.stylefeng.roses.kernel.sync.core.util.CustomSpringContextHolder;
import cn.stylefeng.roses.kernel.sync.modular.ew.base.AbstractEntryWrapper;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;

public class AbstractCanalClient {
    protected static final Logger logger = LoggerFactory.getLogger(AbstractCanalClient.class);
    protected volatile boolean running = false;
    protected Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
    protected Thread thread = null;
    protected CanalConnector connector;
    protected String destination;

    public AbstractCanalClient(String destination) {
        this(destination, null);
    }

    public AbstractCanalClient(String destination, CanalConnector connector) {
        this.destination = destination;
        this.connector = connector;
    }

    public void setConnector(CanalConnector connector) {
        this.connector = connector;
    }

    protected void start() {
        Assert.notNull((Object)this.connector, (String)"connector is null");
        this.thread = new Thread(() -> this.process());
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    protected void stop() {
        if (!this.running) {
            return;
        }
        this.running = false;
        if (this.thread != null) {
            try {
                this.thread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        MDC.remove((String)"destination");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void process() {
        int batchSize = 5120;
        while (this.running) {
            try {
                MDC.put((String)"destination", (String)this.destination);
                this.connector.connect();
                this.connector.subscribe();
                while (this.running) {
                    Message message = this.connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId != -1L && size != 0) {
                        this.processEntry(message.getEntries());
                    }
                    this.connector.ack(batchId);
                }
            }
            catch (Exception e) {
                logger.error("processEntrys error!", (Throwable)e);
            }
            finally {
                this.connector.disconnect();
                MDC.remove((String)"destination");
            }
        }
    }

    protected void processEntry(List<CanalEntry.Entry> entrys) {
        List<AbstractEntryWrapper> entryProcessors = CustomSpringContextHolder.getBeanOfType(AbstractEntryWrapper.class);
        if (entryProcessors == null || entryProcessors.size() == 0) {
            return;
        }
        for (AbstractEntryWrapper entryProcessor : entryProcessors) {
            entryProcessor.processEntrys(entrys);
        }
    }
}

