package cn.kinyun.wework.sdk.common.components;

import cn.kinyun.wework.sdk.common.utils.KafkaClientUtils;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

/* loaded from: input_file:cn/kinyun/wework/sdk/common/components/AbstractTopicConsumer.class */
public abstract class AbstractTopicConsumer implements CommandLineRunner {

    @Value("${scrm.consumer.poll.ms:1000}")
    private Integer consumerPollMs;

    @Autowired
    private KafkaClientUtils kafkaClientUtils;

    @Autowired
    private KafkaProperties kafkaProperties;
    private Consumer<String, String> consumer;
    private Thread thread;
    private boolean isRunning = true;
    private static final Logger log = LoggerFactory.getLogger(AbstractTopicConsumer.class);
    private static final Long CONSUME_INTERVAL_TIME_LIMIT = 300000L;

    public void run(String... strArr) throws Exception {
        Preconditions.checkArgument(StringUtils.isNotBlank(getTopic()), "topic can not be null");
        this.consumer = this.kafkaClientUtils.buildConsumer();
        this.consumer.subscribe(Collections.singleton(getTopic()));
        start();
    }

    public void start() {
        this.isRunning = true;
        if (this.thread == null) {
            this.thread = new Thread(() -> {
                while (this.isRunning) {
                    loop();
                }
            });
            this.thread.setName(getTopic());
            this.thread.start();
        }
    }

    public void stop() {
        this.isRunning = false;
        this.thread = null;
    }

    private void loop() {
        try {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.consumerPollMs.intValue()));
            long currentTimeMillis = System.currentTimeMillis();
            if (poll == null || poll.isEmpty()) {
                return;
            }
            int i = 0;
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                i += ((ConsumerRecord) it.next()).serializedValueSize();
            }
            int count = poll.count();
            Iterator it2 = poll.iterator();
            while (it2.hasNext()) {
                ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it2.next();
                long longValue = ((Long) Optional.of(Long.valueOf(consumerRecord.timestamp())).orElse(Long.valueOf(System.currentTimeMillis()))).longValue();
                long currentTimeMillis2 = System.currentTimeMillis();
                long j = currentTimeMillis2 - longValue;
                long j2 = currentTimeMillis2 - currentTimeMillis;
                if (j > CONSUME_INTERVAL_TIME_LIMIT.longValue()) {
                    log.warn("消费时间间隔过大:topic:{}; offset:{}; delayTime:{}ms,recordSize:{}; totalRecordByte:{}, currentHandleTime:{}", new Object[]{consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), Long.valueOf(j), Integer.valueOf(count), Integer.valueOf(i), Long.valueOf(j2)});
                }
                consume(consumerRecord);
            }
        } catch (Exception e) {
            log.error("Kafka消息处理失败", e);
        }
    }

    protected void consume(ConsumerRecord<String, String> consumerRecord) {
        try {
            handle(consumerRecord);
        } catch (Exception e) {
            log.error("Kafka消息处理失败", e);
        } finally {
            this.consumer.commitAsync();
        }
    }

    public abstract String getTopic();

    public abstract void handle(ConsumerRecord<String, String> consumerRecord);
}
