package com.baijia.databus.monitor.app;

import com.baijia.databus.monitor.common.GroupInformation;
import com.google.common.base.MoreObjects;
import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.Thread;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import kafka.common.OffsetAndMetadata;
import kafka.common.TopicAndPartition;
import kafka.coordinator.BaseKey;
import kafka.coordinator.GroupMetadataManager;
import kafka.coordinator.GroupTopicPartition;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/baijia/databus/monitor/app/ConsumerOffsetMonitor.class */
public class ConsumerOffsetMonitor {

    @Autowired
    @Qualifier("offsetConsumer")
    private Consumer<byte[], byte[]> offsetConsumer;

    @Autowired
    private Consumer<byte[], byte[]> logSizeFetcher;

    @Autowired
    private Configuration configuration;

    @Autowired
    private JavaMailSender mailSender;

    @Value("${mail.switch:false}")
    private boolean mailSwitch;

    @Value("${env}")
    private String env;

    @Value("${alert.mail_receiver}")
    private String[] receivers;

    @Value("${kafka.consumer.group.max.lag}")
    private int maxLag;

    @Value("${kafka.expires.group.auto.clean.ms:1800000}")
    private long groupCleanMs;

    @Value("${zookeeper.hosts}")
    private String zkHosts;
    private static final Logger logger = LoggerFactory.getLogger(ConsumerOffsetMonitor.class);
    private static final String CONSUMER_OFFSETS_TOPIC_NAME = "__consumer_offsets";
    private static final String CONFIGS_BASE = "/configs";
    private static final String GROUP_BASE = "/groups";
    private static final String GROUP_PATH = "/groups/%s";
    private static final String GROUP_TOPIC_PARENT = "/groups/%s/topics";
    private static final String GROUP_TOPIC_PARTITION_PARENT = "/groups/%s/topics/%s/partition";
    private static final String GROUP_PARTITION_OFFSET_PATH = "/groups/%s/topics/%s/partition/%d/position";
    private static final String CONSUMER_OFFSET_REPORT_TPL_PATH = "consumer_offset_report.ftl";
    private CuratorFramework zkClient = null;
    private volatile boolean running = false;
    private Thread kafkaListenerThread = null;
    private Map<TopicPartition, Long> partitionSizeMap = new ConcurrentHashMap();
    private Map<String, Map<TopicPartition, Long>> groupPartitionOffsetsMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baijia/databus/monitor/app/ConsumerOffsetMonitor$ReportOnRebalance.class */
    public static class ReportOnRebalance implements ConsumerRebalanceListener {
        private ReportOnRebalance() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            ConsumerOffsetMonitor.logger.info("[Kafka Consumer] kafka partitions have been revoked. partitions size {}, values {}", Integer.valueOf(collection.size()), collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            ConsumerOffsetMonitor.logger.info("[Kafka Consumer] kafka partitions have been assigned. partitions size {}, value {}", Integer.valueOf(collection.size()), collection);
        }
    }

    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }

    @PostConstruct
    public void start() throws Exception {
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        int indexOf = this.zkHosts.indexOf("/");
        if (indexOf >= 0) {
            String substring = this.zkHosts.substring(indexOf);
            this.zkClient = CuratorFrameworkFactory.builder().connectString(this.zkHosts.substring(0, indexOf)).retryPolicy(exponentialBackoffRetry).build();
            this.zkClient.start();
            if (this.zkClient.checkExists().forPath(substring) == null) {
                this.zkClient.create().creatingParentsIfNeeded().forPath(substring);
            }
            this.zkClient.close();
        }
        this.zkClient = CuratorFrameworkFactory.builder().retryPolicy(exponentialBackoffRetry).connectString(this.zkHosts).build();
        this.zkClient.start();
        createZkPathIfNecessary();
        extractDataFromZk();
        this.kafkaListenerThread = new Thread(new Runnable() { // from class: com.baijia.databus.monitor.app.ConsumerOffsetMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                ConsumerOffsetMonitor.this.watchConsumerOffsets();
            }
        }, "kafka-consumer-listener");
        this.kafkaListenerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: com.baijia.databus.monitor.app.ConsumerOffsetMonitor.2
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                ConsumerOffsetMonitor.logger.error("Error occur in kafkaListenerThread, {}", ExceptionUtils.getFullStackTrace(th));
            }
        });
        this.running = true;
        this.kafkaListenerThread.start();
    }

    @PreDestroy
    public void stop() {
        this.running = false;
        try {
            this.kafkaListenerThread.join();
        } catch (InterruptedException e) {
        }
        this.offsetConsumer.close();
        this.logSizeFetcher.close();
        if (this.zkClient != null) {
            this.zkClient.close();
        }
    }

    private void createZkPathIfNecessary() throws Exception {
        if (this.zkClient.checkExists().forPath(GROUP_BASE) == null) {
            this.zkClient.create().creatingParentsIfNeeded().forPath(GROUP_BASE);
        }
        if (this.zkClient.checkExists().forPath(CONFIGS_BASE) == null) {
            this.zkClient.create().creatingParentsIfNeeded().forPath(CONFIGS_BASE);
        }
    }

    @Scheduled(cron = "${kafka.log.size.update.cron}")
    public void updateLogSize() {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.groupPartitionOffsetsMap.keySet().iterator();
        while (it.hasNext()) {
            arrayList.addAll(this.groupPartitionOffsetsMap.get(it.next()).keySet());
        }
        logger.debug("Preparing update log size.");
        try {
            updatePartitionSize(arrayList);
        } catch (Exception e) {
            logger.error("Error occurs while update partition size, {}", ExceptionUtils.getFullStackTrace(e));
        }
        logger.debug("Log size was updated.");
    }

    @Scheduled(cron = "${kafka.consumer.offsets.report.cron}")
    public void reportGroupOffsets() {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        try {
            HashSet hashSet = new HashSet((Collection) this.zkClient.getChildren().forPath(GROUP_BASE));
            if (hashSet.size() != this.groupPartitionOffsetsMap.size()) {
                for (String str : this.groupPartitionOffsetsMap.keySet()) {
                    if (!hashSet.contains(str)) {
                        this.groupPartitionOffsetsMap.remove(str);
                        this.partitionSizeMap.remove(str);
                    }
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                String str3 = new String((byte[]) this.zkClient.getData().forPath(String.format(GROUP_PATH, str2)));
                if (System.currentTimeMillis() - Long.parseLong(str3.isEmpty() ? "0" : str3) >= this.groupCleanMs) {
                    this.zkClient.delete().deletingChildrenIfNeeded().forPath(String.format(GROUP_PATH, str2));
                    this.groupPartitionOffsetsMap.remove(str2);
                    this.partitionSizeMap.remove(str2);
                }
            }
            for (String str4 : this.groupPartitionOffsetsMap.keySet()) {
                ArrayList arrayList2 = new ArrayList();
                long j = 0;
                for (TopicPartition topicPartition : this.groupPartitionOffsetsMap.get(str4).keySet()) {
                    long longValue = ((Long) MoreObjects.firstNonNull(this.partitionSizeMap.get(topicPartition), 0L)).longValue() - ((Long) MoreObjects.firstNonNull(this.groupPartitionOffsetsMap.get(str4).get(topicPartition), 0L)).longValue();
                    arrayList2.add(new GroupInformation().setGroup(str4).setTopic(topicPartition.topic()).setPartition(Integer.valueOf(topicPartition.partition())).setCurrentOffset(this.groupPartitionOffsetsMap.get(str4).get(topicPartition)).setLogEndOffset(this.partitionSizeMap.get(topicPartition)).setLag(Long.valueOf(longValue)));
                    j += longValue;
                }
                if (j > this.maxLag) {
                    arrayList.add(str4);
                }
                hashMap.put(str4, arrayList2);
            }
        } catch (Exception e) {
            logger.error("Error while getting lag report, {}", ExceptionUtils.getFullStackTrace(e));
        }
        if (arrayList == null || arrayList.size() <= 0) {
            return;
        }
        try {
            String groupInformationRender = groupInformationRender(hashMap, "databus消费者延迟报告", arrayList);
            if (this.mailSwitch) {
                MimeMessage createMimeMessage = this.mailSender.createMimeMessage();
                MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(createMimeMessage);
                mimeMessageHelper.setFrom("databus@baijiahulian.com");
                mimeMessageHelper.setTo(this.receivers);
                mimeMessageHelper.setSubject(String.format("[databus-monitor] Consumer Group Lag Report", this.env));
                mimeMessageHelper.setText(groupInformationRender, true);
                this.mailSender.send(createMimeMessage);
            }
            logger.info(groupInformationRender);
        } catch (MessagingException e2) {
            logger.error("Error while send email. {}", ExceptionUtils.getFullStackTrace(e2));
        } catch (IOException | TemplateException e3) {
            logger.error("Error while render group information, {}", ExceptionUtils.getFullStackTrace(e3));
        }
    }

    public String groupInformationRender(Map<String, List<GroupInformation>> map, String str, List<String> list) throws IOException, TemplateException {
        Template template = this.configuration.getTemplate(CONSUMER_OFFSET_REPORT_TPL_PATH);
        StringWriter stringWriter = new StringWriter();
        HashMap hashMap = new HashMap();
        hashMap.put("groupInfoMap", map);
        hashMap.put("title", str);
        hashMap.put("lagGroups", list);
        hashMap.put("maxLag", Integer.valueOf(this.maxLag));
        template.process(hashMap, stringWriter);
        return stringWriter.toString();
    }

    public void updatePartitionSize(List<TopicPartition> list) {
        this.logSizeFetcher.assign(list);
        this.logSizeFetcher.seekToEnd((TopicPartition[]) list.toArray(new TopicPartition[list.size()]));
        this.logSizeFetcher.seekToEnd(new TopicPartition[0]);
        for (TopicPartition topicPartition : list) {
            this.partitionSizeMap.put(topicPartition, Long.valueOf(this.logSizeFetcher.position(topicPartition)));
        }
    }

    public void watchConsumerOffsets() {
        try {
            this.offsetConsumer.subscribe(Arrays.asList(CONSUMER_OFFSETS_TOPIC_NAME), new ReportOnRebalance());
        } catch (Exception e) {
            logger.error("Error while subscribe topic {}, {}", CONSUMER_OFFSETS_TOPIC_NAME, ExceptionUtils.getFullStackTrace(e));
        }
        while (this.running) {
            try {
                Iterator it = this.offsetConsumer.poll(1000L).iterator();
                while (this.running && it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    byte[] bArr = (byte[]) consumerRecord.key();
                    byte[] bArr2 = (byte[]) consumerRecord.value();
                    try {
                        BaseKey readMessageKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(bArr));
                        if (readMessageKey.key() instanceof GroupTopicPartition) {
                            updateGroupPartitionOffsets((GroupTopicPartition) readMessageKey.key(), GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(bArr2)));
                        } else if (bArr2 != null) {
                            logger.debug("got group message: {}", GroupMetadataManager.readGroupMessageValue((String) readMessageKey.key(), ByteBuffer.wrap(bArr2)));
                        }
                    } catch (Exception e2) {
                        logger.error("Error while extract data from kafka, {}", ExceptionUtils.getFullStackTrace(e2));
                    }
                }
            } catch (Exception e3) {
                logger.error("Error while listening __conusmer_offsets, {}", ExceptionUtils.getFullStackTrace(e3));
            }
        }
    }

    public void updateGroupPartitionOffsets(GroupTopicPartition groupTopicPartition, OffsetAndMetadata offsetAndMetadata) throws Exception {
        TopicAndPartition topicAndPartition = groupTopicPartition.topicPartition();
        String group = groupTopicPartition.group();
        String str = topicAndPartition.topic();
        int partition = topicAndPartition.partition();
        Long valueOf = Long.valueOf(offsetAndMetadata.offset());
        long commitTimestamp = offsetAndMetadata.commitTimestamp();
        logger.debug("groupTopicPartition: {}, offsetAndMetadata: {}\n", groupTopicPartition, offsetAndMetadata);
        Map<TopicPartition, Long> map = (Map) MoreObjects.firstNonNull(this.groupPartitionOffsetsMap.get(group), new HashMap());
        if (!this.groupPartitionOffsetsMap.containsKey(group)) {
            this.groupPartitionOffsetsMap.put(str, map);
        }
        map.put(new TopicPartition(str, partition), valueOf);
        String format = String.format(GROUP_PARTITION_OFFSET_PATH, group, str, Integer.valueOf(partition));
        if (this.zkClient.checkExists().forPath(format) == null) {
            this.zkClient.create().creatingParentsIfNeeded().forPath(format, valueOf.toString().getBytes());
        } else {
            this.zkClient.setData().forPath(format, valueOf.toString().getBytes());
        }
        updateGroupReportTime(group, commitTimestamp);
    }

    private void updateGroupReportTime(String str, long j) throws Exception {
        if (this.zkClient.checkExists().forPath(String.format(GROUP_PATH, str)) != null) {
            this.zkClient.setData().forPath(String.format(GROUP_PATH, str), Long.toString(j).getBytes());
        }
    }

    public void extractDataFromZk() throws Exception {
        List<String> list = (List) this.zkClient.getChildren().forPath(GROUP_BASE);
        logger.info("Extract data from zookeeper...");
        for (String str : list) {
            this.groupPartitionOffsetsMap.put(str, getPartitionOffsetMapByGroupFromZk(str));
        }
        logger.info("got group partition offset information: {}", this.groupPartitionOffsetsMap);
    }

    private Map<TopicPartition, Long> getPartitionOffsetMapByGroupFromZk(String str) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str2 : (List) this.zkClient.getChildren().forPath(String.format(GROUP_TOPIC_PARENT, str))) {
            for (String str3 : (List) this.zkClient.getChildren().forPath(String.format(GROUP_TOPIC_PARTITION_PARENT, str, str2))) {
                try {
                    int parseInt = Integer.parseInt(str3);
                    hashMap.put(new TopicPartition(str2, parseInt), Long.valueOf(Long.parseLong(new String((byte[]) this.zkClient.getData().forPath(String.format(GROUP_PARTITION_OFFSET_PATH, str, str2, Integer.valueOf(parseInt)))))));
                } catch (NumberFormatException | KeeperException.NoNodeException e) {
                    logger.warn("Error while analysis {}/{}", String.format(GROUP_TOPIC_PARTITION_PARENT, str, str2), str3);
                }
            }
        }
        return hashMap;
    }

    public static void main(String[] strArr) {
        new ClassPathXmlApplicationContext("classpath:databus-monitor-spring.xml");
    }
}
