/*
 * Decompiled with CFR 0.152.
 */
package com.baijiayun.duanxunbao.common.kafka;

import com.baijiayun.duanxunbao.common.kafka.KafkaClientUtils;
import com.baijiayun.duanxunbao.common.kafka.KafkaMsgHandler;
import com.baijiayun.duanxunbao.common.utils.NamedThreadFactory;
import com.baijiayun.duanxunbao.common.utils.ThreadPoolMonitorUtils;
import com.baijiayun.duanxunbao.common.utils.TraceIdUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseKafkaConsumerImpl {
    private static final Logger log = LoggerFactory.getLogger(BaseKafkaConsumerImpl.class);
    private static final int MAX_TIME = 3600000;
    protected AtomicBoolean isRunning = new AtomicBoolean(false);
    protected Thread thread;
    private ExecutorService executorService;
    protected String poolName;
    protected int poolSize = 1;
    protected Integer consumerPollMs;
    protected KafkaClientUtils kafkaClientUtils;
    protected KafkaMsgHandler<String, String> msgHandler;
    protected Consumer<String, String> consumer;
    private String groupId;
    private String topic;
    private static final Long CONSUME_INTERVAL_TIME_LIMIT = 300000L;

    public void doStart() {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.getTopic()), (Object)"topic can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getMsgHandler()), (Object)"msgHandler  can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getKafkaClientUtils()), (Object)"kafkaClientUtils  can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getConsumerPollMs()), (Object)"consumerPollMs  can not be null");
        this.createConsumer();
        this.consumer.subscribe(Collections.singleton(this.getTopic()));
        this.createThreadPool();
        this.start();
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    public void start() {
        this.isRunning.set(true);
        if (this.thread == null) {
            this.thread = new Thread(this::run);
            this.thread.setName(this.getTopic());
            this.thread.setUncaughtExceptionHandler((t, e) -> log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", e));
            this.thread.start();
        }
    }

    public void stop() {
        this.isRunning.set(false);
        this.thread = null;
        this.executorService.shutdown();
    }

    public void run() {
        long lastTimeCommit = -1L;
        while (this.isRunning.get()) {
            try {
                if (this.loop()) {
                    lastTimeCommit = System.currentTimeMillis();
                    continue;
                }
                long curTime = System.currentTimeMillis();
                if (curTime - lastTimeCommit <= 3600000L) continue;
                log.info("heart beat commit lastTimeCommit: {}, currentTime: {}", (Object)lastTimeCommit, (Object)curTime);
                this.consumer.commitAsync();
                lastTimeCommit = curTime;
            }
            catch (Exception e) {
                log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
            }
        }
    }

    private boolean loop() {
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(this.consumerPollMs.intValue()));
        if (records == null || records.isEmpty()) {
            return false;
        }
        long pullStartTime = System.currentTimeMillis();
        ArrayList all = Lists.newArrayListWithCapacity((int)records.count());
        for (ConsumerRecord record : records) {
            all.add(record);
        }
        List pages = Lists.partition((List)all, (int)this.getPoolSize());
        for (List page : pages) {
            this.executeAndWait(page, pullStartTime);
        }
        this.consumer.commitSync();
        return true;
    }

    private void executeAndWait(List<ConsumerRecord<String, String>> records, long pullStartTime) {
        long start = System.currentTimeMillis();
        ArrayList taskList = Lists.newArrayListWithCapacity((int)records.size());
        for (ConsumerRecord<String, String> record : records) {
            FutureTask<Object> task = new FutureTask<Object>(() -> {
                try {
                    Headers headers = record.headers();
                    Header header = headers.lastHeader("B-Request-ID");
                    if (header != null) {
                        TraceIdUtils.setTraceId(new String(header.value()));
                    } else {
                        TraceIdUtils.genAndSetTraceId();
                    }
                    long timestamp = Optional.ofNullable(record.timestamp()).orElse(System.currentTimeMillis());
                    long begin = System.currentTimeMillis();
                    long consumeInterval = begin - timestamp;
                    long currentHandleTime = begin - (Objects.isNull(pullStartTime) ? 0L : pullStartTime);
                    if (consumeInterval > CONSUME_INTERVAL_TIME_LIMIT) {
                        log.warn("\u6d88\u8d39\u65f6\u95f4\u95f4\u9694\u8fc7\u5927:topic:{}; offset:{}; delayTime:{}ms, currentHandleTime:{}", new Object[]{record.topic(), record.offset(), consumeInterval, currentHandleTime});
                    }
                    this.msgHandler.onMessage(record);
                }
                catch (Exception e) {
                    log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
                }
                finally {
                    TraceIdUtils.removeTraceId();
                }
            }, null);
            this.executorService.submit(task);
            taskList.add(task);
        }
        for (FutureTask task : taskList) {
            try {
                task.get();
            }
            catch (InterruptedException e) {
                log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
            }
        }
        log.info("\u5f00\u542f\u4e2a{}\u7ebf\u7a0b\u5904\u7406{}\u6761\u6d88\u606f\uff0c\u8017\u65f6:{}ms", new Object[]{taskList.size(), records.size(), System.currentTimeMillis() - start});
    }

    public void createConsumer() {
        if (StringUtils.isNotBlank((CharSequence)this.groupId)) {
            log.info("groupid ==========:{}", (Object)this.groupId);
            this.consumer = this.kafkaClientUtils.buildConsumer(this.groupId, null);
        } else {
            this.consumer = this.kafkaClientUtils.buildConsumer();
        }
    }

    public void createThreadPool() {
        int size = this.getPoolSize();
        String name = this.getPoolName();
        if (StringUtils.isBlank((CharSequence)name)) {
            name = this.getTopic();
        }
        this.executorService = new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(size * 2), new NamedThreadFactory(name), new ThreadPoolExecutor.CallerRunsPolicy());
        ThreadPoolMonitorUtils.addToMonitor(name, this.executorService);
    }

    public void setPoolName(String poolName) {
        this.poolName = poolName;
    }

    public String getPoolName() {
        return this.poolName;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public void setConsumerPollMs(Integer consumerPollMs) {
        this.consumerPollMs = consumerPollMs;
    }

    public Integer getConsumerPollMs() {
        return this.consumerPollMs;
    }

    public void setKafkaClientUtils(KafkaClientUtils kafkaClientUtils) {
        this.kafkaClientUtils = kafkaClientUtils;
    }

    public KafkaClientUtils getKafkaClientUtils() {
        return this.kafkaClientUtils;
    }

    public void setMsgHandler(KafkaMsgHandler<String, String> msgHandler) {
        this.msgHandler = msgHandler;
    }

    public KafkaMsgHandler<String, String> getMsgHandler() {
        return this.msgHandler;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

