package com.kuaike.scrm.common.component;

import com.google.common.base.Preconditions;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kuaike/scrm/common/component/BaseKafkaConsumerImpl.class */
public class BaseKafkaConsumerImpl {
    protected boolean isRunning;
    protected Thread thread;
    protected Integer consumerPollMs;
    protected KafkaClientUtils kafkaClientUtils;
    protected KafkaMsgHandler<String, String> msgHandler;
    protected Consumer<String, String> consumer;
    private String groupId;
    private String topic;
    private static final Logger log = LoggerFactory.getLogger(BaseKafkaConsumerImpl.class);
    private static final Long CONSUME_INTERVAL_TIME_LIMIT = 300000L;

    public BaseKafkaConsumerImpl() {
    }

    public BaseKafkaConsumerImpl(Integer num, KafkaClientUtils kafkaClientUtils, KafkaMsgHandler<String, String> kafkaMsgHandler, String str) {
        this.consumerPollMs = num;
        this.kafkaClientUtils = kafkaClientUtils;
        this.msgHandler = kafkaMsgHandler;
        this.topic = str;
    }

    public void doStart() {
        Preconditions.checkArgument(StringUtils.isNotBlank(getTopic()), "topic can not be null");
        Preconditions.checkArgument(Objects.nonNull(getMsgHandler()), "msgHandler  can not be null");
        Preconditions.checkArgument(Objects.nonNull(getKafkaClientUtils()), "kafkaClientUtils  can not be null");
        Preconditions.checkArgument(Objects.nonNull(getConsumerPollMs()), "consumerPollMs  can not be null");
        createConsumer();
        this.consumer.subscribe(Collections.singleton(getTopic()));
        start();
    }

    public void start() {
        this.isRunning = true;
        if (this.thread == null) {
            this.thread = new Thread(() -> {
                while (this.isRunning) {
                    loop();
                }
            });
            this.thread.setName(getTopic());
            this.thread.start();
        }
    }

    public void stop() {
        this.isRunning = false;
        this.thread = null;
    }

    private void loop() {
        try {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.consumerPollMs.intValue()));
            Long valueOf = Long.valueOf(System.currentTimeMillis());
            if (poll == null || poll.isEmpty()) {
                return;
            }
            int i = 0;
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                i += ((ConsumerRecord) it.next()).serializedValueSize();
            }
            int count = poll.count();
            Iterator it2 = poll.iterator();
            while (it2.hasNext()) {
                ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it2.next();
                long longValue = ((Long) Optional.ofNullable(Long.valueOf(consumerRecord.timestamp())).orElse(Long.valueOf(System.currentTimeMillis()))).longValue();
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - longValue;
                long longValue2 = currentTimeMillis - (Objects.isNull(valueOf) ? 0L : valueOf.longValue());
                if (j > CONSUME_INTERVAL_TIME_LIMIT.longValue()) {
                    log.warn("消费时间间隔过大:topic:{}; offset:{}; delayTime:{}ms,recordSize:{}; totalRecordByte:{}, currentHandleTime:{}", new Object[]{consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), Long.valueOf(j), Integer.valueOf(count), Integer.valueOf(i), Long.valueOf(longValue2)});
                }
                try {
                    try {
                        this.msgHandler.onMessage(consumerRecord);
                        this.consumer.commitAsync();
                    } catch (Exception e) {
                        log.error("Kafka消息处理失败", e);
                        this.consumer.commitAsync();
                    }
                } catch (Throwable th) {
                    this.consumer.commitAsync();
                    throw th;
                }
            }
        } catch (Exception e2) {
            log.error("Kafka消息处理失败", e2);
        }
    }

    public void createConsumer() {
        if (!StringUtils.isNotBlank(this.groupId)) {
            this.consumer = this.kafkaClientUtils.buildConsumer();
        } else {
            log.info("groupid ==========:{}", this.groupId);
            this.consumer = this.kafkaClientUtils.buildConsumer(this.groupId, null);
        }
    }

    public void setConsumerPollMs(Integer num) {
        this.consumerPollMs = num;
    }

    public Integer getConsumerPollMs() {
        return this.consumerPollMs;
    }

    public void setKafkaClientUtils(KafkaClientUtils kafkaClientUtils) {
        this.kafkaClientUtils = kafkaClientUtils;
    }

    public KafkaClientUtils getKafkaClientUtils() {
        return this.kafkaClientUtils;
    }

    public void setMsgHandler(KafkaMsgHandler<String, String> kafkaMsgHandler) {
        this.msgHandler = kafkaMsgHandler;
    }

    public KafkaMsgHandler<String, String> getMsgHandler() {
        return this.msgHandler;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }
}
