package com.baijia.databus;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.baijia.databus.AbstractProcessor;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/baijia/databus/KafkaDispatcher.class */
public class KafkaDispatcher {
    private static final Logger log = LoggerFactory.getLogger(KafkaDispatcher.class);
    private Thread databusDispatcherThread;
    private boolean running;

    @Autowired
    private DatabusKafkaConsumer consumer;

    @Autowired
    private ApplicationContext applicationContext;

    @Value("${kafka.session.timeout.ms:30000}")
    private long kafkaSessionTimeoutMs;

    @Value("${heartbeat.interval.ms:3000}")
    private long heartbeatIntervalMs;
    private static final boolean RETRY_MODE = false;
    private static final String ID_COLUMN = "ID";
    private Map<String, List<Processor>> registedMap = new ConcurrentHashMap();
    private Map<TopicPartition, OffsetAndMetadata> commitedOffsetByTopicPartition = new ConcurrentHashMap();
    private Set<String> pauseTopics = new HashSet();
    private Map<String, List<Future<AbstractProcessor.ProcessResult>>> processResultsByTopic = new HashMap();
    private Map<String, Long> processingOffsetByTopic = new ConcurrentHashMap();
    private Map<String, List<ChangedRow>> changedRowsByTopic = new ConcurrentHashMap();
    private long maxSleepTime = 3000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baijia/databus/KafkaDispatcher$SaveOffsetsOnRebalance.class */
    public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
        private SaveOffsetsOnRebalance() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaDispatcher.log.info("[Kafka Consumer] kafka partitions have been revoked. partitions size {}, values {}", Integer.valueOf(collection.size()), collection);
            KafkaDispatcher.this.consumer.commitAsync(KafkaDispatcher.this.commitedOffsetByTopicPartition, null);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaDispatcher.log.info("[Kafka Consumer] kafka partitions have been assigned. partitions size {}, value {}", Integer.valueOf(collection.size()), collection);
            KafkaDispatcher.this.pauseTopics.clear();
            KafkaDispatcher.this.initTopicPartitionAndOffsets();
        }
    }

    public void start() {
        this.consumer.subscribe(new ArrayList(this.registedMap.keySet()), new SaveOffsetsOnRebalance());
        this.databusDispatcherThread = new Thread(new Runnable() { // from class: com.baijia.databus.KafkaDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaDispatcher.this.process();
            }
        }, "Kafka-consumer-thread");
        this.databusDispatcherThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.baijia.databus.KafkaDispatcher.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                KafkaDispatcher.log.error("parse events has an error", th);
            }
        });
        this.running = true;
        this.databusDispatcherThread.start();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { // from class: com.baijia.databus.KafkaDispatcher.3
            @Override // java.lang.Runnable
            public void run() {
                KafkaDispatcher.this.stop();
            }
        }));
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.databusDispatcherThread != null) {
                try {
                    this.databusDispatcherThread.join();
                } catch (InterruptedException e) {
                }
            }
            try {
                this.consumer.commitSync(this.commitedOffsetByTopicPartition);
                this.consumer.close();
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initTopicPartitionAndOffsets() {
        for (TopicPartition topicPartition : this.consumer.assignment()) {
            long position = this.consumer.position(topicPartition) + 1;
            log.info("Init offset {} for new partition {}.", Long.valueOf(position), topicPartition);
            this.commitedOffsetByTopicPartition.put(topicPartition, new OffsetAndMetadata(position));
        }
    }

    private void checkProcessorProgress() {
        Future<AbstractProcessor.ProcessResult> onChanged;
        for (String str : this.processResultsByTopic.keySet()) {
            if (!this.processResultsByTopic.get(str).isEmpty()) {
                for (Future<AbstractProcessor.ProcessResult> future : this.processResultsByTopic.get(str)) {
                    try {
                        if (future.isDone()) {
                            this.processResultsByTopic.get(str).remove(future);
                            if (!future.get().success && (onChanged = future.get().processor.onChanged(this.changedRowsByTopic.get(str), false, future.get().processor)) != null) {
                                this.processResultsByTopic.get(str).add(onChanged);
                            }
                        }
                    } catch (InterruptedException | ExecutionException e) {
                        log.warn("Error while try to get process result", e);
                    }
                }
                if (this.processResultsByTopic.get(str).isEmpty()) {
                    this.changedRowsByTopic.remove(str);
                    this.pauseTopics.remove(str);
                    log.info("[Kafka Consumer] All processors of topic {} have process all messages successfully, will resume if necessary and commit", str);
                    this.consumer.resume(new TopicPartition[]{new TopicPartition(str, RETRY_MODE)});
                    log.info("[Kafka Consume] Update commited offset of topic {} from {} to {}", new Object[]{str, this.commitedOffsetByTopicPartition.get(new TopicPartition(str, RETRY_MODE)), Long.valueOf(this.processingOffsetByTopic.get(str).longValue() + 1)});
                    this.commitedOffsetByTopicPartition.put(new TopicPartition(str, RETRY_MODE), new OffsetAndMetadata(this.processingOffsetByTopic.get(str).longValue() + 1));
                    log.info("[Kafka Consumer] Commit topicOffset async {}", this.commitedOffsetByTopicPartition);
                    this.consumer.commitAsync(this.commitedOffsetByTopicPartition, null);
                } else if (!this.pauseTopics.contains(str)) {
                    this.pauseTopics.add(str);
                    log.info("[Kafka Consumer] Topic {}  has slow processor, will pause", str);
                    this.consumer.pause(new TopicPartition[]{new TopicPartition(str, RETRY_MODE)});
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process() {
        long j = 1;
        new ConsumerRecords(Collections.emptyMap());
        while (this.running) {
            try {
                ConsumerRecords<String, CanalEntry.Entry> poll = this.consumer.poll(100);
                if (poll.isEmpty()) {
                    log.trace("Empty records, will sleep {} ms and retry.", Long.valueOf(j));
                    try {
                        TimeUnit.MILLISECONDS.sleep(j);
                        j *= 2;
                        if (j > this.maxSleepTime) {
                            log.info("[Kafka Consumer] Empty records, will sleep {} ms and retry.", Long.valueOf(j));
                            j = 1;
                        }
                    } catch (Exception e) {
                    }
                } else {
                    j = 1;
                    log.info("[Kafka Consumer] Changed partition since last polling, size {}", Integer.valueOf(poll.count()));
                    Map<String, List<ChangedRow>> parseKafkaRecord = parseKafkaRecord(poll);
                    for (String str : parseKafkaRecord.keySet()) {
                        log.trace("Dispatch topic {} with data {}", str, parseKafkaRecord.get(str));
                        log.info("[Kafka Consumer] Dispatch message with size {} of topic {}.", Integer.valueOf(parseKafkaRecord.get(str).size()), str);
                        dispatch(str, parseKafkaRecord.get(str));
                    }
                    this.changedRowsByTopic.putAll(parseKafkaRecord);
                }
                checkProcessorProgress();
            } catch (Exception e2) {
                log.error("Error while process, will retry, ", e2);
            }
        }
    }

    private Map<String, List<ChangedRow>> parseKafkaRecord(ConsumerRecords<String, CanalEntry.Entry> consumerRecords) throws InvalidProtocolBufferException {
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (updateOffset(consumerRecord.topic(), consumerRecord.offset()) && ((CanalEntry.Entry) consumerRecord.value()).getEntryType() == CanalEntry.EntryType.ROWDATA) {
                if (!hashMap.containsKey(consumerRecord.topic())) {
                    hashMap.put(consumerRecord.topic(), new ArrayList());
                }
                ((List) hashMap.get(consumerRecord.topic())).addAll(parseEntry((CanalEntry.Entry) consumerRecord.value(), consumerRecord.offset()));
            }
        }
        return hashMap;
    }

    private boolean updateOffset(String str, long j) {
        if (this.processingOffsetByTopic.containsKey(str) && this.processingOffsetByTopic.get(str).longValue() >= j) {
            return false;
        }
        this.processingOffsetByTopic.put(str, Long.valueOf(j));
        return true;
    }

    private List<ChangedRow> parseEntry(CanalEntry.Entry entry, long j) throws InvalidProtocolBufferException {
        CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        ArrayList arrayList = new ArrayList(parseFrom.getRowDatasCount());
        for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
            ChangedRow changedRow = new ChangedRow();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            String str = RETRY_MODE;
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                hashMap.put(column.getName().toUpperCase(), column.getValue());
                if (ID_COLUMN.equalsIgnoreCase(column.getName())) {
                    str = column.getValue();
                }
            }
            for (CanalEntry.Column column2 : rowData.getAfterColumnsList()) {
                hashMap2.put(column2.getName().toUpperCase(), column2.getValue());
                if (ID_COLUMN.equalsIgnoreCase(column2.getName())) {
                    str = column2.getValue();
                }
            }
            changedRow.setNewValue(hashMap2);
            changedRow.setOldValue(hashMap);
            changedRow.setChangedType(entry.getHeader().getEventType());
            changedRow.setDb(entry.getHeader().getSchemaName());
            changedRow.setTable(entry.getHeader().getTableName());
            changedRow.setTimestamp(entry.getHeader().getExecuteTime());
            changedRow.setOffset(j);
            if (StringUtils.isNotBlank(str) || NumberUtils.isDigits(str)) {
                changedRow.setId(Long.parseLong(str));
            } else {
                log.warn("Can't parse ID from {}, Entry {}", str, ToStringBuilder.reflectionToString(entry, ToStringStyle.SHORT_PREFIX_STYLE));
            }
            arrayList.add(changedRow);
        }
        return arrayList;
    }

    public synchronized void subscribe(String str, Processor processor) {
        Objects.requireNonNull(str, "Topic can't be null!");
        Objects.requireNonNull(processor, "Processor can't be null!");
        log.info("Subscribe received, topic {}, processor {}", str, processor);
        if (!this.registedMap.containsKey(str)) {
            this.registedMap.put(str, new CopyOnWriteArrayList());
        }
        if (this.registedMap.get(str).contains(processor)) {
            log.info("Processor {} of topic {} aleady exists.", processor, str);
        } else {
            this.registedMap.get(str).add(processor);
            this.consumer.subscribe(new ArrayList(this.registedMap.keySet()));
        }
    }

    public synchronized void unsubcribe(String str, Processor processor) {
        Objects.requireNonNull(str, "Topic can't be null!");
        Objects.requireNonNull(processor, "Processor can't be null!");
        log.info("Unsubscribe received, topic {}, processor {}", str, processor);
        this.registedMap.get(str).remove(processor);
        if (this.registedMap.get(str).isEmpty()) {
            this.processingOffsetByTopic.remove(str);
            this.changedRowsByTopic.remove(str);
            this.pauseTopics.remove(str);
            this.processResultsByTopic.remove(str);
            this.consumer.subscribe(new ArrayList(this.registedMap.keySet()));
        }
    }

    private void dispatch(String str, List<ChangedRow> list) {
        if (!this.registedMap.containsKey(str)) {
            log.warn("No processor regist for topic {}", str);
            this.commitedOffsetByTopicPartition.put(new TopicPartition(str, RETRY_MODE), new OffsetAndMetadata(this.processingOffsetByTopic.get(str).longValue() + 1));
            return;
        }
        List<Processor> list2 = this.registedMap.get(str);
        ArrayList arrayList = new ArrayList(list2.size());
        for (Processor processor : list2) {
            arrayList.add(processor.onChanged(list, true, processor));
        }
        if (!this.processResultsByTopic.containsKey(str)) {
            this.processResultsByTopic.put(str, new CopyOnWriteArrayList());
        }
        this.processResultsByTopic.get(str).addAll(arrayList);
    }

    public void initRegist() {
        Map beansOfType = this.applicationContext.getBeansOfType(Processor.class);
        if (!CollectionUtils.isEmpty(beansOfType)) {
            for (Processor processor : beansOfType.values()) {
                subscribe(processor.topic(), processor);
            }
        }
        start();
    }

    public DatabusKafkaConsumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(DatabusKafkaConsumer databusKafkaConsumer) {
        this.consumer = databusKafkaConsumer;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public ApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    public long getKafkaSessionTimeoutMs() {
        return this.kafkaSessionTimeoutMs;
    }

    public void setKafkaSessionTimeoutMs(long j) {
        this.kafkaSessionTimeoutMs = j;
    }
}
