package com.baijia.storm.sun.pipe.kafka.dispatcher;

import com.baijia.storm.sun.common.runner.AbstractRunnerLifeCycle;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;

/* loaded from: input_file:com/baijia/storm/sun/pipe/kafka/dispatcher/KafkaMessageDispatcher.class */
public class KafkaMessageDispatcher extends AbstractRunnerLifeCycle implements MessageDispatcher<String, byte[]> {
    private static final Logger log = LoggerFactory.getLogger(KafkaMessageDispatcher.class);
    private ExecutorService executorService;
    private KafkaConsumer<byte[], byte[]> consumer;
    private Map<String, List<MessageProcessor<String, byte[]>>> registerMap = new ConcurrentHashMap();
    private Set<String> pauseTopics = new HashSet();
    private Map<String, List<Future<Boolean>>> processResultsByTopic = new HashMap();
    private Thread thread = null;
    private Thread.UncaughtExceptionHandler exceptionHandler = (thread, th) -> {
        log.error("Thread {} is stopped due to uncaught exception {}", thread.getName(), ExceptionUtils.getStackTrace(th));
        super.stop();
    };
    private long maxSleepTime = 3000;

    public KafkaMessageDispatcher(KafkaConsumer<byte[], byte[]> kafkaConsumer) {
        this.executorService = null;
        this.consumer = null;
        this.consumer = kafkaConsumer;
        this.executorService = Executors.newCachedThreadPool();
    }

    public void start() {
        if (this.consumer == null) {
            throw new IllegalStateException("kafka consumer is initialized yet.");
        }
        super.start();
        Thread thread = new Thread(this::process, "kafka-message-dispatcher");
        thread.setUncaughtExceptionHandler(this.exceptionHandler);
        thread.start();
        log.info("Kafka message dispatcher thread started.");
    }

    @PreDestroy
    public void stop() {
        super.stop();
        if (this.thread != null) {
            this.thread.interrupt();
            try {
                this.thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        log.info("Kafka message dispatcher thread stopped.");
    }

    private void process() {
        long j = 1;
        this.consumer.subscribe(new ArrayList(this.registerMap.keySet()));
        while (this.running) {
            try {
                ConsumerRecords poll = this.consumer.poll(100);
                if (poll.isEmpty()) {
                    log.trace("Empty records, will sleep {} ms and retry.", Long.valueOf(j));
                    try {
                        TimeUnit.MILLISECONDS.sleep(j);
                        j *= 2;
                        if (j > this.maxSleepTime) {
                            log.debug("Empty records, will sleep {} ms and retry.", Long.valueOf(j));
                            j = 1;
                        }
                    } catch (InterruptedException e) {
                        log.info("process thread was interrupted by other thread while sleeping.");
                        Thread.currentThread().interrupt();
                    }
                } else {
                    j = 1;
                    log.info("Changed partition since last polling: {},", poll.partitions());
                    HashMap hashMap = new HashMap();
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        ((List) hashMap.computeIfAbsent(consumerRecord.topic(), str -> {
                            return new ArrayList();
                        })).add((byte[]) consumerRecord.value());
                    }
                    hashMap.entrySet().forEach(entry -> {
                        dispatch((String) entry.getKey(), (List<byte[]>) entry.getValue());
                    });
                }
                checkProcessorProgress();
            } catch (Exception e2) {
                log.error("Error while process, will retry, ", e2);
            }
        }
    }

    private void checkProcessorProgress() {
        for (String str : this.processResultsByTopic.keySet()) {
            if (!this.processResultsByTopic.get(str).isEmpty()) {
                for (Future<Boolean> future : this.processResultsByTopic.get(str)) {
                    if (future.isDone()) {
                        this.processResultsByTopic.get(str).remove(future);
                        try {
                            future.get();
                        } catch (InterruptedException | ExecutionException e) {
                            log.warn("Error while try to get process result", e);
                        }
                    }
                }
                if (this.processResultsByTopic.get(str).isEmpty()) {
                    this.pauseTopics.remove(str);
                    log.info("All processors of topic {} have process all messages successfully, will resume if necessary and commit", str);
                    this.consumer.resume(new TopicPartition[]{new TopicPartition(str, 0)});
                } else if (!this.pauseTopics.contains(str)) {
                    this.pauseTopics.add(str);
                    log.info("Topic {}  has slow processor, will pause", str);
                    this.consumer.pause(new TopicPartition[]{new TopicPartition(str, 0)});
                }
            }
        }
    }

    @Override // com.baijia.storm.sun.pipe.kafka.dispatcher.MessageDispatcher
    public int dispatch(String str, List<byte[]> list) {
        if (!this.registerMap.containsKey(str)) {
            log.warn("no processor registered for topic {}", str);
            return 0;
        }
        List<MessageProcessor<String, byte[]>> list2 = this.registerMap.get(str);
        ArrayList arrayList = new ArrayList(list2.size());
        for (MessageProcessor<String, byte[]> messageProcessor : list2) {
            arrayList.add(this.executorService.submit(() -> {
                messageProcessor.accept(str, list);
                return true;
            }));
        }
        if (!this.processResultsByTopic.containsKey(str)) {
            this.processResultsByTopic.put(str, new CopyOnWriteArrayList());
        }
        this.processResultsByTopic.get(str).addAll(arrayList);
        return arrayList.size();
    }

    @Override // com.baijia.storm.sun.pipe.kafka.dispatcher.MessageDispatcher
    public synchronized boolean register(String str, MessageProcessor<String, byte[]> messageProcessor) {
        Objects.requireNonNull(str, "topic can't be null!");
        Objects.requireNonNull(messageProcessor, "processor can't be null!");
        log.info("processor {} will subscribe topic {}", messageProcessor, str);
        if (!this.registerMap.containsKey(str)) {
            this.registerMap.put(str, new CopyOnWriteArrayList());
        }
        if (this.registerMap.get(str).contains(messageProcessor)) {
            log.info("Processor {} of topic {} aleady exists.", messageProcessor, str);
            return true;
        }
        this.registerMap.get(str).add(messageProcessor);
        this.consumer.subscribe(new ArrayList(this.registerMap.keySet()));
        return true;
    }

    @Override // com.baijia.storm.sun.pipe.kafka.dispatcher.MessageDispatcher
    public synchronized boolean unregister(String str, MessageProcessor<String, byte[]> messageProcessor) {
        Objects.requireNonNull(str, "Topic can't be null!");
        Objects.requireNonNull(messageProcessor, "Processor can't be null!");
        log.info("Unsubscribe received, topic {}, processor {}", str, messageProcessor);
        this.registerMap.get(str).remove(messageProcessor);
        if (!this.registerMap.get(str).isEmpty()) {
            return true;
        }
        this.pauseTopics.remove(str);
        this.processResultsByTopic.remove(str);
        this.consumer.subscribe(new ArrayList(this.registerMap.keySet()));
        return true;
    }

    public void init(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            throw new IllegalStateException("context should be initialized with spring application context");
        }
        for (MessageProcessor messageProcessor : applicationContext.getBeansOfType(MessageProcessor.class).values()) {
            if (messageProcessor instanceof KafkaMessageProcessor) {
                KafkaMessageProcessor kafkaMessageProcessor = (KafkaMessageProcessor) messageProcessor;
                register(kafkaMessageProcessor.topic(), (MessageProcessor<String, byte[]>) kafkaMessageProcessor);
            }
        }
    }
}
