package com.baijia.yunying.databus.dal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.io.InputStream;
import java.lang.Thread;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;

@Component
/* loaded from: input_file:com/baijia/yunying/databus/dal/DatabusEngine.class */
public class DatabusEngine {
    private static final Logger logger = LoggerFactory.getLogger(DatabusEngine.class);
    protected static CanalConnector connector;
    private static Producer<byte[], byte[]> producer;

    @Value("${canal.destination:'example'}")
    protected String destination;
    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { // from class: com.baijia.yunying.databus.dal.DatabusEngine.1
        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            DatabusEngine.logger.error("parse events has an error", th);
        }
    };
    protected Thread thread = null;
    protected volatile boolean running = false;

    @Value("${canal.zk.host:'172.16.3.4:2181'}")
    private String zkHost = "172.16.3.4:2181,172.16.3.4:2182,172.16.3.4:2183";

    @Value("${canal.batch.size:1000}")
    private int batchSize = 1000;

    @Value("${canal.max_sleep_time:5000}")
    private long maxSleepTime = 5000;

    @Value("${kafak.config_file:'kafka.properties'}")
    private String kafkaPoperties = "kafka.properties";

    @PostConstruct
    public void init() throws Exception {
        connector = CanalConnectors.newClusterConnector(this.zkHost, this.destination, "", "");
        Assert.notNull(connector, "Connector is null");
        Properties properties = new Properties();
        try {
            InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(this.kafkaPoperties);
            Throwable th = null;
            try {
                try {
                    properties.load(resourceAsStream);
                    if (resourceAsStream != null) {
                        if (0 != 0) {
                            try {
                                resourceAsStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            resourceAsStream.close();
                        }
                    }
                    producer = new KafkaProducer(properties);
                    this.running = true;
                    process();
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            logger.error("Error while try to resolve kafka properties", e);
            throw e;
        }
    }

    @PreDestroy
    public void close() throws Exception {
        connector.disconnect();
        producer.close();
    }

    protected void process() {
        long j = 1;
        while (this.running) {
            try {
                try {
                    MDC.put("destination", this.destination);
                    connector.connect();
                    connector.subscribe();
                    while (this.running) {
                        Message withoutAck = connector.getWithoutAck(this.batchSize);
                        long id = withoutAck.getId();
                        int size = withoutAck.getEntries().size();
                        if (id == -1 || size == 0) {
                            logger.warn("Can't get any message, will sleep and retry");
                            try {
                                TimeUnit.MILLISECONDS.sleep(j);
                                j *= 2;
                                if (j > this.maxSleepTime) {
                                    j = 1;
                                }
                            } catch (Exception e) {
                            }
                        } else {
                            List<CanalEntry.Entry> entries = withoutAck.getEntries();
                            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                            for (CanalEntry.Entry entry : entries) {
                                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                                    producer.send(new ProducerRecord(entry.getHeader().getSchemaName(), entry.getHeader().getTableName().getBytes(), entry.toByteArray()), new Callback() { // from class: com.baijia.yunying.databus.dal.DatabusEngine.2
                                        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                                            if (exc != null) {
                                                atomicBoolean.set(false);
                                            }
                                        }
                                    });
                                }
                            }
                            if (atomicBoolean.get()) {
                                connector.ack(id);
                            } else {
                                logger.error("Errow while try to process msg, batchId {}, message {}", Long.valueOf(id), withoutAck);
                            }
                        }
                        connector.ack(id);
                    }
                    connector.disconnect();
                    MDC.remove("destination");
                } catch (Exception e2) {
                    logger.error("process error!", e2);
                    connector.disconnect();
                    MDC.remove("destination");
                }
            } catch (Throwable th) {
                connector.disconnect();
                MDC.remove("destination");
                throw th;
            }
        }
    }

    public static void main(String[] strArr) throws InterruptedException {
        final ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("databus-spring.xml");
        classPathXmlApplicationContext.start();
        System.out.println(classPathXmlApplicationContext.getBeansOfType(DatabusEngine.class));
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.baijia.yunying.databus.dal.DatabusEngine.3
            @Override // java.lang.Runnable
            public void run() {
                DatabusEngine.logger.info("Stop databus gracefully");
                classPathXmlApplicationContext.close();
            }
        }));
        classPathXmlApplicationContext.registerShutdownHook();
    }
}
