package com.baijia.databus.demo;

import com.aliyun.drc.client.message.DataMessage;
import com.aliyun.drc.clusterclient.ClusterClient;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:com/baijia/databus/demo/DtsConsumer.class */
public class DtsConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DtsConsumer.class);

    @Autowired
    private ClusterClient client;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Value("${dts.channel}")
    private String channel;
    private final String insertUpdateInfoiSql = "insert into test.sz_dts_test (app_type,pos, database_name, table_name, record_id, event_type, before_record, after_record) values (2,?, ?, ?, ?, ?, ?, ?);";
    private List<DataMessage.Record.Type> concernTypes = Arrays.asList(DataMessage.Record.Type.INSERT, DataMessage.Record.Type.UPDATE, DataMessage.Record.Type.DELETE, DataMessage.Record.Type.REPLACE);

    @PostConstruct
    public void init() throws Exception {
        logger.info("DtsConsumer init.");
        ClusterListener clusterListener = new ClusterListener() { // from class: com.baijia.databus.demo.DtsConsumer.1
            ObjectMapper mapper = new ObjectMapper();

            public void notify(List<ClusterMessage> list) throws Exception {
                final ArrayList arrayList = new ArrayList();
                for (ClusterMessage clusterMessage : list) {
                    if (DtsConsumer.this.concernTypes.contains(clusterMessage.getRecord().getOpt())) {
                        TableColumnChange tableColumnChange = new TableColumnChange();
                        tableColumnChange.setDatabaseName(clusterMessage.getRecord().getDbname());
                        tableColumnChange.setTableName(clusterMessage.getRecord().getTablename());
                        tableColumnChange.setEventType(Integer.valueOf(DtsConsumer.this.concernTypes.indexOf(clusterMessage.getRecord().getOpt()) + 1));
                        tableColumnChange.setPos(Long.valueOf(Long.parseLong(clusterMessage.getRecord().getCheckpoint().split("@")[0])));
                        HashMap hashMap = new HashMap();
                        for (DataMessage.Record.Field field : clusterMessage.getRecord().getFieldList()) {
                            hashMap.put(field.getFieldname(), field.getValue() == null ? "" : field.getValue().toString("utf8"));
                            if (field.isPrimary()) {
                                try {
                                    tableColumnChange.setRecordId(Long.valueOf(Long.parseLong(field.getValue().toString("utf8"))));
                                } catch (Exception e) {
                                    tableColumnChange.setRecordId(0L);
                                }
                            }
                        }
                        tableColumnChange.setAfterRecord(this.mapper.writeValueAsString(hashMap));
                        arrayList.add(tableColumnChange);
                    }
                    clusterMessage.ackAsConsumed();
                }
                if (arrayList.size() > 0) {
                    DtsConsumer.logger.info("【DTS】 Add datas to db, size {}", Integer.valueOf(arrayList.size()));
                    DtsConsumer.this.jdbcTemplate.batchUpdate("insert into test.sz_dts_test (app_type,pos, database_name, table_name, record_id, event_type, before_record, after_record) values (2,?, ?, ?, ?, ?, ?, ?);", new BatchPreparedStatementSetter() { // from class: com.baijia.databus.demo.DtsConsumer.1.1
                        public void setValues(PreparedStatement preparedStatement, int i) throws SQLException {
                            int i2 = 1 + 1;
                            preparedStatement.setLong(1, ((TableColumnChange) arrayList.get(i)).getPos().longValue());
                            int i3 = i2 + 1;
                            preparedStatement.setString(i2, ((TableColumnChange) arrayList.get(i)).getDatabaseName());
                            int i4 = i3 + 1;
                            preparedStatement.setString(i3, ((TableColumnChange) arrayList.get(i)).getTableName());
                            int i5 = i4 + 1;
                            preparedStatement.setLong(i4, ((TableColumnChange) arrayList.get(i)).getRecordId().longValue());
                            int i6 = i5 + 1;
                            preparedStatement.setInt(i5, ((TableColumnChange) arrayList.get(i)).getEventType().intValue());
                            int i7 = i6 + 1;
                            preparedStatement.setString(i6, "{}");
                            int i8 = i7 + 1;
                            preparedStatement.setString(i7, ((TableColumnChange) arrayList.get(i)).getAfterRecord());
                        }

                        public int getBatchSize() {
                            return arrayList.size();
                        }
                    });
                }
            }

            public void noException(Exception exc) {
                DtsConsumer.logger.error("Error", exc);
            }
        };
        logger.info("Add ConcurrentLinstener {}", clusterListener);
        this.client.addConcurrentListener(clusterListener);
        logger.info("Ask For GUID {}", this.channel);
        this.client.askForGUID(this.channel);
        logger.info("Start client");
        this.client.start();
    }
}
