package com.kuaike.kafka.consumer.template;

import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.common.utils.ApiResult;
import com.kuaike.common.utils.ApiResultUtils;
import com.kuaike.common.utils.JacksonUtil;
import com.kuaike.common.utils.JsonUtil;
import com.kuaike.common.utils.MD5Utils;
import com.kuaike.issue.dto.BaseAlarmDto;
import com.kuaike.issue.manage.AlarmManage;
import com.kuaike.kafka.config.KafkaConsumerConfig;
import com.kuaike.kafka.constants.ConsumeStatus;
import com.kuaike.kafka.consumer.ConsumerClientBuilder;
import com.kuaike.kafka.consumer.ManualCommitConsumerClient;
import com.kuaike.kafka.consumer.registry.CompensateConsumerRegistry;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/kuaike/kafka/consumer/template/AbstractNonSuspendMessageConsumer.class */
public abstract class AbstractNonSuspendMessageConsumer implements NonSuspendMessageConsumer, ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(AbstractNonSuspendMessageConsumer.class);
    private static final int MAX_CONSUME_TIMES = 8;

    @Autowired
    private KafkaConsumerConfig kafkaServerConfig;

    @Autowired
    private MessageConsumeRecordService messageConsumeRecordService;

    @Autowired
    private CompensateConsumerRegistry registry;

    @Autowired
    private AlarmManage alarmManage;
    private volatile ApplicationContext context;

    public abstract String getTopic();

    public abstract String getMessageUniqueKey(ConsumerRecord consumerRecord);

    public abstract String getMessageContent(ConsumerRecord consumerRecord);

    public abstract void doCompensate(String str);

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    @EventListener({ContextRefreshedEvent.class})
    @Conditional({ConsumeEnabledCondition.class})
    public void listenForApplicationContextRefreshed() {
        if (((KafkaConsumerConfig) this.context.getBean("kafkaConsumerConfig", KafkaConsumerConfig.class)).isConsumersEnabled()) {
            ManualCommitConsumerClient buildManualCommitConsumer = new ConsumerClientBuilder(this.kafkaServerConfig.getServers(), this.kafkaServerConfig.getGroupId()).fetchMinBytes(this.kafkaServerConfig.getFetchMinBytes()).maxPollRecords(this.kafkaServerConfig.getMaxPollRecords()).maxPollIntervalMs(this.kafkaServerConfig.getMaxPollIntervalMs()).buildManualCommitConsumer();
            Thread thread = new Thread(() -> {
                handleMessage(buildManualCommitConsumer);
            }, "kafka-consumer-" + getTopic());
            thread.setUncaughtExceptionHandler((thread2, th) -> {
                log.warn("Kafka消费线程-" + thread2.getName() + " 异常终止，上下文信息为：" + JsonUtil.toStr(this.kafkaServerConfig), th);
            });
            thread.start();
            log.info("开启对主题-{}的消息监听", getTopic());
        }
    }

    @PostConstruct
    private void init() {
        this.registry.register(getTopic(), this);
    }

    @Override // com.kuaike.kafka.consumer.template.NonSuspendMessageConsumer
    public void compensate(MsgConsumeRecordDto msgConsumeRecordDto) {
        log.info("开始消息补偿，补偿消息记录为：{}", msgConsumeRecordDto.getId());
        doCompensate(msgConsumeRecordDto.getContent());
        log.info("补偿消息成功，id为：{}", msgConsumeRecordDto.getId());
    }

    private void handleMessage(ManualCommitConsumerClient manualCommitConsumerClient) {
        String topic = getTopic();
        log.info("开始监听主题{}的消息", topic);
        manualCommitConsumerClient.consume(topic, consumerRecord -> {
            log.info("收到来自主题-{}的推送消息：{}", topic, JsonUtil.toStr(consumerRecord));
            if (null != consumerRecord.value()) {
                try {
                    try {
                        if (StringUtils.isBlank(getMessageContent(consumerRecord))) {
                            throw new BusinessException(CommonErrorCode.REQUIRE_PARAM, "消息内容不能为空");
                        }
                        MsgConsumeRecordDto before = before(consumerRecord);
                        consume(consumerRecord);
                        before.setConsumeStatus(ConsumeStatus.CONSUME_SUCC.getStatus());
                        after(before, ApiResultUtils.buildApiResult());
                    } catch (Exception e) {
                        ApiResult error = ApiResultUtils.error(CommonErrorCode.SYSTEM_ERROR, e.getMessage());
                        log.error("消息消费失败，错误信息为： content:" + consumerRecord.toString(), e);
                        this.alarmManage.notice(new BaseAlarmDto(e, (String) null, consumerRecord.toString(), (String) null));
                        after(null, error);
                    } catch (BusinessException e2) {
                        ApiResult error2 = ApiResultUtils.error(e2.getErrorCode(), e2.getMessage());
                        log.error("消息消费失败，错误信息为： content:{}, apiResult:{}", consumerRecord.toString(), error2);
                        this.alarmManage.notice(new BaseAlarmDto(e2, (String) null, consumerRecord.toString(), (String) null));
                        after(null, error2);
                    }
                } catch (Throwable th) {
                    after(null, null);
                    throw th;
                }
            }
        });
    }

    private MsgConsumeRecordDto before(ConsumerRecord consumerRecord) {
        String str;
        String topic = getTopic();
        String messageUniqueKey = getMessageUniqueKey(consumerRecord);
        if (StringUtils.isBlank(messageUniqueKey)) {
            throw new BusinessException(CommonErrorCode.REQUIRE_PARAM, "需提供该消息在该主题下的唯一标识");
        }
        MsgConsumeRecordDto byTopicAndUnqTag = this.messageConsumeRecordService.getByTopicAndUnqTag(topic, messageUniqueKey);
        if (byTopicAndUnqTag == null) {
            MsgConsumeRecordDto generateRecord = generateRecord(messageUniqueKey, getMessageContent(consumerRecord));
            generateRecord.setId(Long.valueOf(this.messageConsumeRecordService.insert(generateRecord)));
            log.info("消息插入成功：{}", generateRecord);
            return generateRecord;
        }
        int recvCount = byTopicAndUnqTag.getRecvCount() + NumberUtils.INTEGER_ONE.intValue();
        byTopicAndUnqTag.setRecvCount(recvCount);
        if (byTopicAndUnqTag.getConsumeStatus() == ConsumeStatus.CONSUME_SUCC.getStatus()) {
            str = "消息已经成功消费过，放弃消费";
        } else if (byTopicAndUnqTag.getConsumeStatus() == ConsumeStatus.CONSUMING.getStatus()) {
            str = "消息正在消费中，放弃消费";
        } else {
            if (recvCount <= MAX_CONSUME_TIMES + NumberUtils.INTEGER_ONE.intValue()) {
                return byTopicAndUnqTag;
            }
            str = "超过最大允许重试次数，放弃消费";
        }
        throw new BusinessException(CommonErrorCode.BUSINESS_ERROR, str);
    }

    private void after(MsgConsumeRecordDto msgConsumeRecordDto, ApiResult apiResult) {
        try {
            if (msgConsumeRecordDto != null) {
                if (msgConsumeRecordDto.getConsumeStatus() != ConsumeStatus.CONSUME_SUCC.getStatus() && apiResult.getCode() != NumberUtils.LONG_ZERO.longValue()) {
                    msgConsumeRecordDto.setConsumeStatus(ConsumeStatus.CONSUME_FAILED.getStatus());
                }
                msgConsumeRecordDto.setResult(JacksonUtil.obj2Str(apiResult));
                this.messageConsumeRecordService.update(msgConsumeRecordDto);
            } else {
                log.error("更新消息消费记录表失败：消息为空，放弃插入消息消费记录表");
            }
        } catch (Exception e) {
            log.error("更新消息消费记录表失败：record:{}，apiResul:{}，错误信息:{}", new Object[]{msgConsumeRecordDto, apiResult, ExceptionUtils.getStackTrace(e)});
        }
    }

    private MsgConsumeRecordDto generateRecord(String str, String str2) {
        MsgConsumeRecordDto msgConsumeRecordDto = new MsgConsumeRecordDto();
        msgConsumeRecordDto.setTopic(getTopic());
        msgConsumeRecordDto.setUnqTag(str);
        msgConsumeRecordDto.setConsumeStatus(ConsumeStatus.UNCONSUME.getStatus());
        msgConsumeRecordDto.setContent(str2);
        msgConsumeRecordDto.setMd5Str(MD5Utils.MD5(str2));
        msgConsumeRecordDto.setRecvCount(NumberUtils.INTEGER_ONE.intValue());
        msgConsumeRecordDto.setResult("");
        return msgConsumeRecordDto;
    }
}
