package com.baijia.databus.app;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.MoreObjects;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.InputStream;
import java.lang.Thread;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/baijia/databus/app/CanalClient.class */
public class CanalClient {
    protected static final Logger logger = LoggerFactory.getLogger(CanalClient.class);
    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    protected volatile boolean running;
    protected Thread.UncaughtExceptionHandler handler;
    protected Thread thread;
    protected CanalConnector connector;
    protected static String context_format;
    protected static String row_format;
    protected static String transaction_format;
    protected String destination;
    protected String kafkaTopicPrefix;
    private MetricRegistry metricRegistry;
    private boolean printEntryIfDebug;
    private Producer<byte[], byte[]> producer;
    private long maxSleepTime;
    private int batchSize;

    public CanalClient(String str) {
        this(str, null);
    }

    public CanalClient(String str, CanalConnector canalConnector) {
        this.running = false;
        this.handler = new Thread.UncaughtExceptionHandler() { // from class: com.baijia.databus.app.CanalClient.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                CanalClient.logger.error("parse events has an error", th);
            }
        };
        this.thread = null;
        this.kafkaTopicPrefix = null;
        this.metricRegistry = null;
        this.printEntryIfDebug = false;
        this.maxSleepTime = 2000L;
        this.batchSize = 128;
        this.destination = str;
        this.connector = canalConnector;
        MDC.put("destination", str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() throws Exception {
        Properties properties = new Properties();
        try {
            InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("kafka.properties");
            Throwable th = null;
            try {
                try {
                    properties.load(resourceAsStream);
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                    this.producer = new KafkaProducer(properties);
                    Assert.notNull(this.connector, "connector is null");
                    this.thread = new Thread(new Runnable() { // from class: com.baijia.databus.app.CanalClient.2
                        @Override // java.lang.Runnable
                        public void run() {
                            CanalClient.this.process();
                        }
                    }, String.format("%s-process-thread", this.destination));
                    this.thread.setUncaughtExceptionHandler(this.handler);
                    this.thread.start();
                    this.running = true;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("Error while try to resolve kafka properties", e);
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                }
            }
            this.producer.close();
            MDC.remove("destination");
        }
    }

    protected void process() {
        long j = 1;
        Properties properties = new Properties();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Meter meter = this.metricRegistry.meter(String.format("%s.message.in", this.destination));
        try {
            InputStream resourceAsStream = contextClassLoader.getResourceAsStream("databus-app.properties");
            Throwable th = null;
            try {
                properties.load(resourceAsStream);
                this.kafkaTopicPrefix = (String) MoreObjects.firstNonNull(properties.getProperty("kafka.topic.prefix"), "");
                int intValue = ((Integer) MoreObjects.firstNonNull(Integer.valueOf(Integer.parseInt(properties.getProperty("batch.management.size"))), -1)).intValue();
                int intValue2 = ((Integer) MoreObjects.firstNonNull(Integer.valueOf(Integer.parseInt(properties.getProperty("batch.management.timeout"))), -1)).intValue();
                this.printEntryIfDebug = ((Boolean) MoreObjects.firstNonNull(Boolean.valueOf(Boolean.parseBoolean(properties.getProperty("print.entry.if.debug"))), false)).booleanValue();
                if (resourceAsStream != null) {
                    if (0 != 0) {
                        try {
                            resourceAsStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        resourceAsStream.close();
                    }
                }
                SlidingWindowBatchManager slidingWindowBatchManager = new SlidingWindowBatchManager(this.connector, intValue, intValue2, this.metricRegistry, this.destination);
                Future future = null;
                long j2 = -1;
                while (this.running) {
                    try {
                        MDC.put("destination", this.destination);
                        this.connector.connect();
                        this.connector.subscribe();
                        slidingWindowBatchManager.setConnector(this.connector);
                        while (this.running) {
                            String uuid = UUID.randomUUID().toString();
                            MDC.put("logger_id", uuid);
                            slidingWindowBatchManager.waitForRollbackIfNecessary();
                            slidingWindowBatchManager.checkTimeout();
                            if (!this.connector.checkValid()) {
                                this.connector.connect();
                                this.connector.subscribe();
                            }
                            Message withoutAck = this.connector.getWithoutAck(this.batchSize);
                            j2 = withoutAck.getId();
                            int size = withoutAck.getEntries().size();
                            if (j2 == -1 || size == 0) {
                                if (j * 2 > this.maxSleepTime) {
                                    logger.info("[CANAL] Can't get any change, will sleep {}ms.", Long.valueOf(j));
                                }
                                logger.debug("[CANAL] Can't get any change, will sleep {}ms.", Long.valueOf(j));
                                if (j2 >= 0) {
                                    try {
                                        slidingWindowBatchManager.tryAck(j2);
                                    } catch (InterruptedException e) {
                                        Thread.currentThread().interrupt();
                                    }
                                }
                                TimeUnit.MILLISECONDS.sleep(j);
                                j *= 2;
                                if (j > this.maxSleepTime) {
                                    j = 1;
                                }
                                MDC.remove("logger_id");
                            } else {
                                j = 1;
                                int i = 0;
                                logDebugInfoIfNecessory(withoutAck, j2, size);
                                List<CanalEntry.Entry> entries = withoutAck.getEntries();
                                slidingWindowBatchManager.registerBatch(j2, entries.size(), uuid);
                                int i2 = 0;
                                for (CanalEntry.Entry entry : entries) {
                                    logger.debug("got batch {}, bachIndex {}", Long.valueOf(j2), Integer.valueOf(i2));
                                    meter.mark();
                                    int i3 = i2;
                                    i2++;
                                    if (slidingWindowBatchManager.isRollingback()) {
                                        logger.debug("previous batch is under rollback, skip...");
                                        slidingWindowBatchManager.setProcessStatus(j2, i3, true);
                                    } else if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                                        i++;
                                        try {
                                            String str = this.kafkaTopicPrefix + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
                                            if (StringUtils.isBlank(entry.getHeader().getSchemaName()) || StringUtils.isBlank(entry.getHeader().getTableName()) || !isKafkaTopicIsValid(str)) {
                                                logger.debug("Topic [ {} ] is invalid, will ignore it.", str);
                                                slidingWindowBatchManager.setProcessStatus(j2, i3, true);
                                            } else {
                                                future = this.producer.send(new ProducerRecord(str, entry.toByteArray()), new KafkaProducerCallback(slidingWindowBatchManager, j2, i3, true, entry));
                                                if (CanalEntry.RowChange.parseFrom(entry.getStoreValue()).getIsDdl()) {
                                                    future = this.producer.send(new ProducerRecord("ddl", entry.toByteArray()), new KafkaProducerCallback(slidingWindowBatchManager, j2, i3, false, entry));
                                                }
                                            }
                                        } catch (Exception e2) {
                                            logger.warn("Something happen while try to send data to kafka", e2);
                                        }
                                    } else {
                                        slidingWindowBatchManager.setProcessStatus(j2, i3, true);
                                        logger.debug("batch {}, bachIndex {} is not rowdata, mark as processed directly.", Long.valueOf(j2), Integer.valueOf(i3));
                                    }
                                }
                                logger.info("[CANAL] New batch [{}] size {}, rowdata size {} has been processed.", new Object[]{Long.valueOf(j2), Integer.valueOf(entries.size()), Integer.valueOf(i)});
                                slidingWindowBatchManager.tryAck(j2);
                                MDC.remove("logger_id");
                            }
                        }
                    } catch (Exception e3) {
                        logger.error("process error!, {}", ExceptionUtils.getFullStackTrace(e3));
                    }
                }
                if (future != null && this.connector.checkValid()) {
                    try {
                        try {
                            future.get();
                            slidingWindowBatchManager.tryAck(j2);
                            slidingWindowBatchManager.waitForRollbackIfNecessary();
                            if (this.connector.checkValid()) {
                                this.connector.disconnect();
                            }
                        } catch (Exception e4) {
                            logger.error("Error while wait all batchId to be committed. {}", ExceptionUtils.getFullStackTrace(e4));
                            if (this.connector.checkValid()) {
                                this.connector.disconnect();
                            }
                        }
                    } catch (Throwable th3) {
                        if (this.connector.checkValid()) {
                            this.connector.disconnect();
                        }
                        throw th3;
                    }
                }
                logger.info("## Canal client for destination {} is stopped.", this.destination);
            } finally {
            }
        } catch (Exception e5) {
            logger.error("Error while try to resolve databus-app.properties", e5);
        }
    }

    private void logDebugInfoIfNecessory(Message message, long j, int i) {
        if (logger.isDebugEnabled()) {
            printSummary(message, j, i);
            if (this.printEntryIfDebug) {
                printEntry(message.getEntries());
            }
        }
    }

    private void printSummary(Message message, long j, int i) {
        long j2 = 0;
        Iterator it = message.getEntries().iterator();
        while (it.hasNext()) {
            j2 += ((CanalEntry.Entry) it.next()).getHeader().getEventLength();
        }
        String str = null;
        String str2 = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            str = buildPositionForDump((CanalEntry.Entry) message.getEntries().get(0));
            str2 = buildPositionForDump((CanalEntry.Entry) message.getEntries().get(message.getEntries().size() - 1));
        }
        logger.info(context_format, new Object[]{Long.valueOf(j), Integer.valueOf(i), Long.valueOf(j2), new SimpleDateFormat(DATE_FORMAT).format(new Date()), str, str2});
    }

    protected String buildPositionForDump(CanalEntry.Entry entry) {
        return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + new SimpleDateFormat(DATE_FORMAT).format(new Date(entry.getHeader().getExecuteTime())) + ")";
    }

    protected void printEntry(List<CanalEntry.Entry> list) {
        for (CanalEntry.Entry entry : list) {
            long time = new Date().getTime() - entry.getHeader().getExecuteTime();
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN) {
                    try {
                        CanalEntry.TransactionBegin parseFrom = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
                        logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(time)});
                        logger.info(" BEGIN ----> Thread id: {}", Long.valueOf(parseFrom.getThreadId()));
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                } else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    try {
                        CanalEntry.TransactionEnd parseFrom2 = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
                        logger.info("----------------\n");
                        logger.info(" END ----> transaction id: {}", parseFrom2.getTransactionId());
                        logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(time)});
                    } catch (InvalidProtocolBufferException e2) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e2);
                    }
                } else {
                    continue;
                }
            } else if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                try {
                    CanalEntry.RowChange parseFrom3 = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    CanalEntry.EventType eventType = parseFrom3.getEventType();
                    logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(time)});
                    if (eventType == CanalEntry.EventType.QUERY || parseFrom3.getIsDdl()) {
                        logger.info(" sql ----> " + parseFrom3.getSql() + SEP);
                    } else {
                        for (CanalEntry.RowData rowData : parseFrom3.getRowDatasList()) {
                            if (eventType == CanalEntry.EventType.DELETE) {
                                printColumn(rowData.getBeforeColumnsList());
                            } else if (eventType == CanalEntry.EventType.INSERT) {
                                printColumn(rowData.getAfterColumnsList());
                            } else {
                                printColumn(rowData.getAfterColumnsList());
                            }
                        }
                    }
                } catch (Exception e3) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e3);
                }
            } else {
                continue;
            }
        }
    }

    protected void printColumn(List<CanalEntry.Column> list) {
        for (CanalEntry.Column column : list) {
            StringBuilder sb = new StringBuilder();
            sb.append(column.getName() + " : " + column.getValue());
            sb.append("    type=" + column.getMysqlType());
            if (column.getUpdated()) {
                sb.append("    update=" + column.getUpdated());
            }
            sb.append(SEP);
            logger.info(sb.toString());
        }
    }

    private static boolean isKafkaTopicIsValid(String str) {
        if (StringUtils.isEmpty(str) || str.equals(".") || str.equals("..")) {
            return false;
        }
        return Pattern.matches("[a-zA-Z0-9\\._\\-]{1,249}", str);
    }

    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    public void setMetricRegistry(MetricRegistry metricRegistry) {
        this.metricRegistry = metricRegistry;
    }

    static {
        context_format = null;
        row_format = null;
        transaction_format = null;
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;
        row_format = SEP + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms" + SEP;
        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;
    }
}
