/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.common.component;

import com.google.common.base.Preconditions;
import com.kuaike.scrm.common.component.KafkaMsgHandler;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseKafkaConsumerImpl {
    private static final Logger log = LoggerFactory.getLogger(BaseKafkaConsumerImpl.class);
    private static final int MAX_TIME = 3600000;
    protected boolean isRunning;
    protected Thread thread;
    protected Integer consumerPollMs;
    protected KafkaClientUtils kafkaClientUtils;
    protected KafkaMsgHandler<String, String> msgHandler;
    protected Consumer<String, String> consumer;
    private String groupId;
    private String topic;
    private static final Long CONSUME_INTERVAL_TIME_LIMIT = 300000L;

    public BaseKafkaConsumerImpl() {
    }

    public BaseKafkaConsumerImpl(Integer consumerPollMs, KafkaClientUtils kafkaClientUtils, KafkaMsgHandler<String, String> msgHandler, String topic) {
        this.consumerPollMs = consumerPollMs;
        this.kafkaClientUtils = kafkaClientUtils;
        this.msgHandler = msgHandler;
        this.topic = topic;
    }

    public void doStart() {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.getTopic()), (Object)"topic can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getMsgHandler()), (Object)"msgHandler  can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getKafkaClientUtils()), (Object)"kafkaClientUtils  can not be null");
        Preconditions.checkArgument((boolean)Objects.nonNull(this.getConsumerPollMs()), (Object)"consumerPollMs  can not be null");
        this.createConsumer();
        this.consumer.subscribe(Collections.singleton(this.getTopic()));
        this.start();
    }

    public void start() {
        this.isRunning = true;
        if (this.thread == null) {
            Runnable task = () -> {
                long lastTimeCommit = -1L;
                while (this.isRunning) {
                    if (this.loop()) {
                        lastTimeCommit = System.currentTimeMillis();
                        continue;
                    }
                    long curTime = System.currentTimeMillis();
                    if (curTime - lastTimeCommit <= 3600000L) continue;
                    log.info("heart beat commit lastTimeCommit: {}, currentTime: {}", (Object)lastTimeCommit, (Object)curTime);
                    this.consumer.commitAsync();
                    lastTimeCommit = curTime;
                }
            };
            this.thread = new Thread(task);
            this.thread.setName(this.getTopic());
            this.thread.start();
        }
    }

    public void stop() {
        this.isRunning = false;
        this.thread = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean loop() {
        boolean isCommit = false;
        try {
            Long pullStartTime = null;
            ConsumerRecords records = this.consumer.poll(Duration.ofMillis(this.consumerPollMs.intValue()));
            pullStartTime = System.currentTimeMillis();
            if (records == null || records.isEmpty()) {
                return isCommit;
            }
            int totalRecordByte = 0;
            for (ConsumerRecord record : records) {
                totalRecordByte += record.serializedValueSize();
            }
            int recordSize = records.count();
            for (ConsumerRecord record : records) {
                long timestamp = Optional.ofNullable(record.timestamp()).orElse(System.currentTimeMillis());
                long begin = System.currentTimeMillis();
                long consumeInterval = begin - timestamp;
                long currentHandleTime = begin - (Objects.isNull(pullStartTime) ? 0L : pullStartTime);
                if (consumeInterval > CONSUME_INTERVAL_TIME_LIMIT) {
                    log.warn("\u6d88\u8d39\u65f6\u95f4\u95f4\u9694\u8fc7\u5927:topic:{}; offset:{}; delayTime:{}ms,recordSize:{}; totalRecordByte:{}, currentHandleTime:{}", new Object[]{record.topic(), record.offset(), consumeInterval, recordSize, totalRecordByte, currentHandleTime});
                }
                try {
                    this.msgHandler.onMessage((ConsumerRecord<String, String>)record);
                }
                catch (Exception e) {
                    log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
                }
                finally {
                    this.consumer.commitSync();
                    isCommit = true;
                }
            }
        }
        catch (Exception e) {
            log.error("Kafka\u6d88\u606f\u5904\u7406\u5931\u8d25", (Throwable)e);
        }
        return isCommit;
    }

    public void createConsumer() {
        if (StringUtils.isNotBlank((CharSequence)this.groupId)) {
            log.info("groupid ==========:{}", (Object)this.groupId);
            this.consumer = this.kafkaClientUtils.buildConsumer(this.groupId, null);
        } else {
            this.consumer = this.kafkaClientUtils.buildConsumer();
        }
    }

    public void setConsumerPollMs(Integer consumerPollMs) {
        this.consumerPollMs = consumerPollMs;
    }

    public Integer getConsumerPollMs() {
        return this.consumerPollMs;
    }

    public void setKafkaClientUtils(KafkaClientUtils kafkaClientUtils) {
        this.kafkaClientUtils = kafkaClientUtils;
    }

    public KafkaClientUtils getKafkaClientUtils() {
        return this.kafkaClientUtils;
    }

    public void setMsgHandler(KafkaMsgHandler<String, String> msgHandler) {
        this.msgHandler = msgHandler;
    }

    public KafkaMsgHandler<String, String> getMsgHandler() {
        return this.msgHandler;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

