package com.baijia.databus;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.MoreObjects;
import com.google.protobuf.InvalidProtocolBufferException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/baijia/databus/KafkaDispatcher.class */
public class KafkaDispatcher {
    private static final Logger log = LoggerFactory.getLogger(KafkaDispatcher.class);
    private Thread kafkaConsumerThread;
    private boolean running;

    @Autowired
    private DatabusKafkaConsumer consumer;

    @Autowired
    private ApplicationContext applicationContext;

    @Value("${kafka.session.timeout.ms:30000}")
    private long kafkaSessionTimeoutMs;
    private static final String ID_COLUMN = "ID";
    private Map<String, SubscribeEntry> registedMap = new ConcurrentHashMap();
    private long maxSleepTime = 2000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baijia/databus/KafkaDispatcher$SaveOffsetsOnRebalance.class */
    public static class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
        private SaveOffsetsOnRebalance() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaDispatcher.log.info("[Kafka Consumer] kafka partitions have been revoked. partitions size {}, values {}", Integer.valueOf(collection.size()), collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            KafkaDispatcher.log.info("[Kafka Consumer] kafka partitions have been assigned. partitions size {}, value {}", Integer.valueOf(collection.size()), collection);
        }
    }

    public void start() {
        this.kafkaConsumerThread = new Thread(new Runnable() { // from class: com.baijia.databus.KafkaDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                KafkaDispatcher.this.process();
            }
        }, "Kafka-consumer-thread");
        this.kafkaConsumerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.baijia.databus.KafkaDispatcher.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                KafkaDispatcher.log.error("parse events has an error", th);
            }
        });
        this.kafkaConsumerThread.start();
        this.running = true;
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.kafkaConsumerThread != null) {
                try {
                    this.kafkaConsumerThread.join();
                } catch (InterruptedException e) {
                }
            }
            this.consumer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process() {
        long j = 1;
        int i = 0;
        this.consumer.subscribe(new ArrayList(this.registedMap.keySet()), new SaveOffsetsOnRebalance());
        ConsumerRecords<String, CanalEntry.Entry> consumerRecords = new ConsumerRecords<>(Collections.emptyMap());
        while (this.running) {
            while (this.running) {
                try {
                    if (i <= 0) {
                        for (TopicPartition topicPartition : this.consumer.assignment()) {
                            if (this.consumer.committed(topicPartition) == null) {
                                log.debug("commit offset {} for new partition {}.", Long.valueOf(this.consumer.position(topicPartition)), topicPartition);
                                this.consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(this.consumer.position(topicPartition))));
                            }
                        }
                        consumerRecords = this.consumer.poll(1000L);
                    }
                    if (consumerRecords.isEmpty()) {
                        log.debug("Empty records, will sleep {}ms and retry.", Long.valueOf(j));
                        try {
                            TimeUnit.MILLISECONDS.sleep(j);
                            j *= 2;
                            if (j > this.maxSleepTime) {
                                j = 1;
                            }
                        } catch (Exception e) {
                        }
                    } else {
                        j = 1;
                        log.info("changed partition since last polling, size {}", Integer.valueOf(consumerRecords.count()));
                        Map<String, List<ChangedRow>> parseKafkaRecord = parseKafkaRecord(consumerRecords);
                        for (String str : parseKafkaRecord.keySet()) {
                            log.debug("Dispatch topic {} with data {}", str, parseKafkaRecord.get(str));
                            log.info("Received message size {} of topic {}.", Integer.valueOf(parseKafkaRecord.get(str).size()), str);
                            List<Future<Boolean>> dispatch = dispatch(str, parseKafkaRecord.get(str));
                            TopicPartition topicPartition2 = new TopicPartition(str, 0);
                            Iterator<Future<Boolean>> it = dispatch.iterator();
                            while (it.hasNext()) {
                                Future<Boolean> next = it.next();
                                SubscribeEntry subscribeEntry = this.registedMap.get(str);
                                int indexOf = dispatch.indexOf(next);
                                long j2 = subscribeEntry.getTimeout()[indexOf];
                                int maxRetry = subscribeEntry.getProcessors()[indexOf].maxRetry();
                                while (true) {
                                    if (j2 > 0 || maxRetry > 0) {
                                        j2 -= this.kafkaSessionTimeoutMs / 2;
                                        try {
                                        } catch (InterruptedException e2) {
                                            next.cancel(true);
                                            Thread.currentThread().interrupt();
                                        } catch (ExecutionException e3) {
                                            log.error("Error while executing processor {} which is consuming topic {}, offset {} \n{}", new Object[]{subscribeEntry.getProcessors()[indexOf].getClass().getName(), str, this.consumer.committed(topicPartition2), e3});
                                            log.error("Error while executing processor {} which is consuming topic {}, offset {} \n{}\n\t{}", new Object[]{subscribeEntry.getProcessors()[indexOf].getClass().getName(), str, this.consumer.committed(topicPartition2), e3.getCause(), StringUtils.join(e3.getCause().getStackTrace(), "\n\t")});
                                            log.error("Contents of topic : {}", parseKafkaRecord.get(str));
                                        } catch (TimeoutException e4) {
                                            if (j2 < 0) {
                                            }
                                            log.debug("try to heartbeat with commitSync");
                                            this.consumer.commitSync(Collections.singletonMap(topicPartition2, (OffsetAndMetadata) MoreObjects.firstNonNull(this.consumer.committed(topicPartition2), new OffsetAndMetadata(0L))));
                                            log.debug("heartbeat with commitSync success.");
                                        }
                                        if (next.get(this.kafkaSessionTimeoutMs / 2, TimeUnit.MILLISECONDS).booleanValue()) {
                                            subscribeEntry.getAck()[indexOf] = true;
                                            break;
                                        }
                                        j2 = 0;
                                        maxRetry--;
                                        next = subscribeEntry.getProcessors()[indexOf].onChanged(parseKafkaRecord.get(str));
                                        Logger logger = log;
                                        Object[] objArr = new Object[4];
                                        objArr[0] = subscribeEntry.getProcessors()[indexOf].getClass().getName();
                                        objArr[1] = str;
                                        objArr[2] = this.consumer.committed(topicPartition2);
                                        objArr[3] = maxRetry > 0 ? "true" : "false";
                                        logger.error("UnSuccess while executing processor {} which is consuming topic {} offset {} , retry ? {}", objArr);
                                        if (log.isDebugEnabled()) {
                                            log.error("Contents of topic {}", parseKafkaRecord.get(str));
                                        }
                                    }
                                }
                            }
                            OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(this.consumer.position(topicPartition2));
                            log.debug("OffsetAndMetadata {} will be commit", offsetAndMetadata);
                            log.info("Topic {} partition {} offset {} -- {} is going to be submitted.", new Object[]{str, 0, Long.valueOf(((OffsetAndMetadata) MoreObjects.firstNonNull(this.consumer.committed(topicPartition2), new OffsetAndMetadata(0L))).offset()), Long.valueOf(offsetAndMetadata.offset())});
                            this.consumer.commitSync(Collections.singletonMap(topicPartition2, offsetAndMetadata));
                            log.info("Topic {} partition {} offset {} is submitted.", new Object[]{str, 0, Long.valueOf(this.consumer.committed(topicPartition2).offset())});
                        }
                    }
                } catch (CommitFailedException e5) {
                    this.consumer.subscribe(new ArrayList(this.registedMap.keySet()));
                    i++;
                    log.error("Error while commit offset to kafka, will retry", e5);
                    if (i >= 5) {
                        log.error("Failed After retry {} times.", Integer.valueOf(i));
                        i = 0;
                    }
                } catch (Exception e6) {
                    i++;
                    log.error("Error while subscribe to kafka, will retry", e6);
                    if (i >= 5) {
                        log.error("Failed After retry {} times.", Integer.valueOf(i));
                        i = 0;
                    }
                }
            }
            i = 0;
        }
    }

    private Map<String, List<ChangedRow>> parseKafkaRecord(ConsumerRecords<String, CanalEntry.Entry> consumerRecords) throws InvalidProtocolBufferException {
        HashMap hashMap = new HashMap();
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (((CanalEntry.Entry) consumerRecord.value()).getEntryType() == CanalEntry.EntryType.ROWDATA) {
                if (!hashMap.containsKey(consumerRecord.topic())) {
                    hashMap.put(consumerRecord.topic(), new ArrayList());
                }
                ((List) hashMap.get(consumerRecord.topic())).addAll(parseEntry((CanalEntry.Entry) consumerRecord.value()));
            }
        }
        return hashMap;
    }

    private List<ChangedRow> parseEntry(CanalEntry.Entry entry) throws InvalidProtocolBufferException {
        CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        ArrayList arrayList = new ArrayList(parseFrom.getRowDatasCount());
        for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
            ChangedRow changedRow = new ChangedRow();
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            String str = null;
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                hashMap.put(column.getName().toUpperCase(), column.getValue());
                if (ID_COLUMN.equalsIgnoreCase(column.getName())) {
                    str = column.getValue();
                }
            }
            for (CanalEntry.Column column2 : rowData.getAfterColumnsList()) {
                hashMap2.put(column2.getName().toUpperCase(), column2.getValue());
                if (ID_COLUMN.equalsIgnoreCase(column2.getName())) {
                    str = column2.getValue();
                }
            }
            changedRow.setNewValue(hashMap2);
            changedRow.setOldValue(hashMap);
            changedRow.setChangedType(entry.getHeader().getEventType());
            changedRow.setDb(entry.getHeader().getSchemaName());
            changedRow.setTable(entry.getHeader().getTableName());
            changedRow.setTimestamp(entry.getHeader().getExecuteTime());
            if (StringUtils.isNotBlank(str) || NumberUtils.isDigits(str)) {
                changedRow.setId(Long.parseLong(str));
            } else {
                log.warn("Can't parse ID from {}, Entry {}", str, ToStringBuilder.reflectionToString(entry, ToStringStyle.SHORT_PREFIX_STYLE));
            }
            arrayList.add(changedRow);
        }
        return arrayList;
    }

    public synchronized void subscribe(String str, Processor processor, boolean z, long j) {
        Objects.requireNonNull(str, "Topic can't be null!");
        Objects.requireNonNull(processor, "Processor can't be null!");
        log.info("Subscribe received, topic {}, processor {}", str, processor);
        SubscribeEntry subscribeEntry = this.registedMap.get(str);
        if (subscribeEntry == null) {
            SubscribeEntry subscribeEntry2 = new SubscribeEntry();
            subscribeEntry2.setAck(new boolean[]{z});
            subscribeEntry2.setAutoAck(new boolean[]{z});
            subscribeEntry2.setProcessors(new Processor[]{processor});
            subscribeEntry2.setTimeout(new long[]{j});
            this.registedMap.put(str, subscribeEntry2);
            this.consumer.subscribe(new ArrayList(this.registedMap.keySet()));
            return;
        }
        for (Processor processor2 : subscribeEntry.getProcessors()) {
            if (processor2 == processor) {
                log.info("Processor {} of topic {} aleady exists.", processor, str);
                return;
            }
        }
        Processor[] processorArr = new Processor[subscribeEntry.getProcessors().length + 1];
        System.arraycopy(subscribeEntry.getProcessors(), 0, processorArr, 0, subscribeEntry.getProcessors().length);
        processorArr[processorArr.length - 1] = processor;
        subscribeEntry.setProcessors(processorArr);
        boolean[] zArr = new boolean[subscribeEntry.getAck().length + 1];
        System.arraycopy(subscribeEntry.getAck(), 0, zArr, 0, subscribeEntry.getAck().length);
        zArr[zArr.length - 1] = z;
        subscribeEntry.setAck(zArr);
        boolean[] zArr2 = new boolean[subscribeEntry.getAutoAck().length + 1];
        System.arraycopy(subscribeEntry.getAutoAck(), 0, zArr2, 0, subscribeEntry.getAutoAck().length);
        zArr2[zArr2.length - 1] = z;
        subscribeEntry.setAutoAck(zArr2);
        long[] jArr = new long[subscribeEntry.getTimeout().length + 1];
        System.arraycopy(subscribeEntry.getTimeout(), 0, jArr, 0, subscribeEntry.getTimeout().length);
        jArr[jArr.length - 1] = j;
        subscribeEntry.setTimeout(jArr);
    }

    public synchronized void unsubcribe(String str, Processor processor) {
        Objects.requireNonNull(str, "Topic can't be null!");
        Objects.requireNonNull(processor, "Processor can't be null!");
        log.info("Unsubscribe received, topic {}, processor {}", str, processor);
        SubscribeEntry subscribeEntry = this.registedMap.get(str);
        if (subscribeEntry == null) {
            log.info("Can't find regist entry for topic {}, processor {}", str, processor);
            return;
        }
        int i = 0;
        while (true) {
            if (i >= subscribeEntry.getProcessors().length) {
                break;
            }
            if (subscribeEntry.getProcessors()[i] == processor) {
                for (int i2 = i; i2 < subscribeEntry.getProcessors().length - 1; i2++) {
                    subscribeEntry.getProcessors()[i2] = subscribeEntry.getProcessors()[i2 + 1];
                    subscribeEntry.getAutoAck()[i2] = subscribeEntry.getAutoAck()[i2 + 1];
                    subscribeEntry.getAck()[i2] = subscribeEntry.getAck()[i2 + 1];
                    subscribeEntry.getTimeout()[i2] = subscribeEntry.getTimeout()[i2 + 1];
                }
                Processor[] processorArr = new Processor[subscribeEntry.getProcessors().length - 1];
                System.arraycopy(subscribeEntry.getProcessors(), 0, processorArr, 0, processorArr.length);
                subscribeEntry.setProcessors(processorArr);
                boolean[] zArr = new boolean[subscribeEntry.getAutoAck().length - 1];
                System.arraycopy(subscribeEntry.getAutoAck(), 0, zArr, 0, zArr.length);
                subscribeEntry.setAutoAck(zArr);
                boolean[] zArr2 = new boolean[subscribeEntry.getAck().length - 1];
                System.arraycopy(subscribeEntry.getAck(), 0, zArr2, 0, zArr2.length);
                subscribeEntry.setAck(zArr2);
                long[] jArr = new long[subscribeEntry.getTimeout().length - 1];
                System.arraycopy(subscribeEntry.getTimeout(), 0, jArr, 0, jArr.length);
                subscribeEntry.setTimeout(jArr);
            } else {
                i++;
            }
        }
        if (subscribeEntry.getProcessors().length == 0) {
            this.registedMap.remove(str);
            this.consumer.subscribe(new ArrayList(this.registedMap.keySet()));
        }
    }

    private List<Future<Boolean>> dispatch(String str, List<ChangedRow> list) {
        if (!this.registedMap.containsKey(str)) {
            return Collections.emptyList();
        }
        SubscribeEntry subscribeEntry = this.registedMap.get(str);
        ArrayList arrayList = new ArrayList(subscribeEntry.getProcessors().length);
        for (int i = 0; i < subscribeEntry.getProcessors().length; i++) {
            arrayList.add(subscribeEntry.getProcessors()[i].onChanged(list));
        }
        return arrayList;
    }

    public void initRegist() {
        Map beansOfType = this.applicationContext.getBeansOfType(Processor.class);
        if (!CollectionUtils.isEmpty(beansOfType)) {
            for (Processor processor : beansOfType.values()) {
                subscribe(processor.topic(), processor, processor.autoAck(), processor.timeout());
            }
        }
        start();
    }

    public DatabusKafkaConsumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(DatabusKafkaConsumer databusKafkaConsumer) {
        this.consumer = databusKafkaConsumer;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public ApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    public long getKafkaSessionTimeoutMs() {
        return this.kafkaSessionTimeoutMs;
    }

    public void setKafkaSessionTimeoutMs(long j) {
        this.kafkaSessionTimeoutMs = j;
    }
}
