package com.baijia.databus.demo;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.Thread;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:com/baijia/databus/demo/KafkaConsumer.class */
public class KafkaConsumer {

    @Autowired
    private Consumer<String, CanalEntry.Entry> consumer;

    @Autowired
    private JdbcTemplate jdbcTemplate;
    private List<String> topic;
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    static ClassPathXmlApplicationContext context;
    private Thread thread = null;
    private boolean running = false;
    private final String insertUpdateInfoiSql = "insert into test.sz_canal_test (app_type,pos, database_name, table_name, record_id, event_type, before_record, after_record) values (1, ?,?, ?, ?, ?, ?, ?);";
    private long maxSleepTime = 5000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baijia/databus/demo/KafkaConsumer$SaveOffsetsOnRebalance.class */
    public static class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
        private Consumer<String, CanalEntry.Entry> consumer;

        public SaveOffsetsOnRebalance(Consumer<String, CanalEntry.Entry> consumer) {
            this.consumer = consumer;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.consumer.commitSync();
            KafkaConsumer.logger.info("【Kafka Consumer】 kafka patitions have been revoked. pations size {}, values {}", Integer.valueOf(collection.size()), collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaConsumer.logger.info("【Kafka Consumer】 kafka patitions have been assigned. pations size {}, value {}", Integer.valueOf(collection.size()), collection);
        }
    }

    public static void main(String[] strArr) {
        context = new ClassPathXmlApplicationContext("classpath:spring.xml");
        ((KafkaConsumer) context.getBean("consumerTest")).start();
    }

    public KafkaConsumer(String... strArr) {
        this.topic = Arrays.asList(strArr);
    }

    public void start() {
        this.thread = new Thread(new Runnable() { // from class: com.baijia.databus.demo.KafkaConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaConsumer.this.process();
            }
        });
        this.thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.baijia.databus.demo.KafkaConsumer.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                KafkaConsumer.logger.error("parse events has an error", th);
            }
        });
        this.thread.start();
        this.running = true;
    }

    public void stop() {
        context.close();
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                try {
                    this.thread.join();
                } catch (InterruptedException e) {
                }
            }
            this.consumer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process() {
        long j = 1;
        this.consumer.subscribe(this.topic, new SaveOffsetsOnRebalance(this.consumer));
        while (this.running) {
            while (this.running) {
                try {
                    ConsumerRecords poll = this.consumer.poll(1000L);
                    if (poll.isEmpty()) {
                        logger.warn("Empty records, will sleep {}ms and retry.", Long.valueOf(j));
                        try {
                            TimeUnit.MILLISECONDS.sleep(j);
                            j *= 2;
                            if (j > this.maxSleepTime) {
                                j = 1;
                            }
                        } catch (Exception e) {
                        }
                    } else {
                        j = 1;
                        logger.info("Record size {}", Integer.valueOf(poll.count()));
                        final ArrayList arrayList = new ArrayList();
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            arrayList.addAll(parseRowDataEntry((CanalEntry.Entry) ((ConsumerRecord) it.next()).value()));
                        }
                        this.consumer.commitSync();
                        if (!arrayList.isEmpty()) {
                            logger.info("【KAFKA】Insert change log to db, size {}.", Integer.valueOf(arrayList.size()));
                            this.jdbcTemplate.batchUpdate("insert into test.sz_canal_test (app_type,pos, database_name, table_name, record_id, event_type, before_record, after_record) values (1, ?,?, ?, ?, ?, ?, ?);", new BatchPreparedStatementSetter() { // from class: com.baijia.databus.demo.KafkaConsumer.3
                                public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                                    TableColumnChange tableColumnChange = (TableColumnChange) arrayList.get(i);
                                    int i2 = 1 + 1;
                                    preparedStatement.setLong(1, tableColumnChange.getPos().longValue());
                                    int i3 = i2 + 1;
                                    preparedStatement.setString(i2, tableColumnChange.getDatabaseName());
                                    int i4 = i3 + 1;
                                    preparedStatement.setString(i3, tableColumnChange.getTableName());
                                    int i5 = i4 + 1;
                                    preparedStatement.setLong(i4, tableColumnChange.getRecordId().longValue());
                                    int i6 = i5 + 1;
                                    preparedStatement.setInt(i5, tableColumnChange.getEventType().intValue());
                                    int i7 = i6 + 1;
                                    preparedStatement.setString(i6, tableColumnChange.getBeforeRecord());
                                    int i8 = i7 + 1;
                                    preparedStatement.setString(i7, tableColumnChange.getAfterRecord());
                                }

                                public int getBatchSize() {
                                    return arrayList.size();
                                }
                            });
                        }
                        this.consumer.unsubscribe();
                        this.consumer.subscribe(this.topic, new SaveOffsetsOnRebalance(this.consumer));
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        }
    }

    private List<TableColumnChange> parseRowDataEntry(CanalEntry.Entry entry) throws JsonProcessingException, InvalidProtocolBufferException {
        if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
            return Collections.emptyList();
        }
        CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        ArrayList arrayList = new ArrayList(parseFrom.getRowDatasCount());
        ObjectMapper objectMapper = new ObjectMapper();
        for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
            TableColumnChange tableColumnChange = new TableColumnChange();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            String str = "";
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                hashMap.put(column.getName(), column.getValue());
                if (column.getIsKey()) {
                    str = column.getValue();
                }
            }
            for (CanalEntry.Column column2 : rowData.getAfterColumnsList()) {
                hashMap2.put(column2.getName(), column2.getValue());
                if (column2.getIsKey()) {
                    str = column2.getValue();
                }
            }
            tableColumnChange.setDatabaseName(entry.getHeader().getSchemaName());
            tableColumnChange.setTableName(entry.getHeader().getTableName());
            try {
                tableColumnChange.setRecordId(Long.valueOf(Long.parseLong(str)));
            } catch (Exception e) {
                tableColumnChange.setRecordId(0L);
            }
            tableColumnChange.setPos(Long.valueOf(entry.getHeader().getLogfileOffset()));
            tableColumnChange.setEventType(Integer.valueOf(parseFrom.getEventType().getNumber()));
            tableColumnChange.setBeforeRecord(objectMapper.writeValueAsString(hashMap));
            tableColumnChange.setAfterRecord(objectMapper.writeValueAsString(hashMap2));
            arrayList.add(tableColumnChange);
        }
        return arrayList;
    }
}
