/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.kafka.consumer.template;

import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.errorcode.UniverseErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.common.utils.ApiResult;
import com.kuaike.common.utils.ApiResultUtils;
import com.kuaike.common.utils.JacksonUtil;
import com.kuaike.common.utils.JsonUtil;
import com.kuaike.common.utils.MD5Utils;
import com.kuaike.issue.dto.BaseAlarmDto;
import com.kuaike.issue.manage.AlarmManage;
import com.kuaike.kafka.config.KafkaConsumerConfig;
import com.kuaike.kafka.constants.ConsumeStatus;
import com.kuaike.kafka.consumer.ConsumerClientBuilder;
import com.kuaike.kafka.consumer.ManualCommitConsumerClient;
import com.kuaike.kafka.consumer.registry.CompensateConsumerRegistry;
import com.kuaike.kafka.consumer.template.ConsumeEnabledCondition;
import com.kuaike.kafka.consumer.template.MessageConsumeRecordService;
import com.kuaike.kafka.consumer.template.MsgConsumeRecordDto;
import com.kuaike.kafka.consumer.template.NonSuspendMessageConsumer;
import javax.annotation.PostConstruct;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public abstract class AbstractNonSuspendMessageConsumer
implements NonSuspendMessageConsumer,
ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(AbstractNonSuspendMessageConsumer.class);
    private static final int MAX_CONSUME_TIMES = 8;
    @Autowired
    private KafkaConsumerConfig kafkaServerConfig;
    @Autowired
    private MessageConsumeRecordService messageConsumeRecordService;
    @Autowired
    private CompensateConsumerRegistry registry;
    @Autowired
    private AlarmManage alarmManage;
    private volatile ApplicationContext context;

    public abstract String getTopic();

    public abstract String getMessageUniqueKey(ConsumerRecord var1);

    public abstract String getMessageContent(ConsumerRecord var1);

    public abstract void doCompensate(String var1);

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
    }

    @EventListener(value={ContextRefreshedEvent.class})
    @Conditional(value={ConsumeEnabledCondition.class})
    public void listenForApplicationContextRefreshed() {
        KafkaConsumerConfig consumerConfig = (KafkaConsumerConfig)this.context.getBean("kafkaConsumerConfig", KafkaConsumerConfig.class);
        if (consumerConfig.isConsumersEnabled()) {
            ManualCommitConsumerClient consumerClient = new ConsumerClientBuilder(this.kafkaServerConfig.getServers(), this.kafkaServerConfig.getGroupId()).fetchMinBytes(this.kafkaServerConfig.getFetchMinBytes()).maxPollRecords(this.kafkaServerConfig.getMaxPollRecords()).maxPollIntervalMs(this.kafkaServerConfig.getMaxPollIntervalMs()).buildManualCommitConsumer();
            String threadName = "kafka-consumer-" + this.getTopic();
            Thread consumerThread = new Thread(() -> this.handleMessage(consumerClient), threadName);
            consumerThread.setUncaughtExceptionHandler((t, ex) -> log.warn("Kafka\u6d88\u8d39\u7ebf\u7a0b-" + t.getName() + " \u5f02\u5e38\u7ec8\u6b62\uff0c\u4e0a\u4e0b\u6587\u4fe1\u606f\u4e3a\uff1a" + JsonUtil.toStr((Object)this.kafkaServerConfig), ex));
            consumerThread.start();
            log.info("\u5f00\u542f\u5bf9\u4e3b\u9898-{}\u7684\u6d88\u606f\u76d1\u542c", (Object)this.getTopic());
        }
    }

    @PostConstruct
    private void init() {
        this.registry.register(this.getTopic(), this);
    }

    @Override
    public void compensate(MsgConsumeRecordDto record) {
        log.info("\u5f00\u59cb\u6d88\u606f\u8865\u507f\uff0c\u8865\u507f\u6d88\u606f\u8bb0\u5f55\u4e3a\uff1a{}", (Object)record.getId());
        this.doCompensate(record.getContent());
        log.info("\u8865\u507f\u6d88\u606f\u6210\u529f\uff0cid\u4e3a\uff1a{}", (Object)record.getId());
    }

    private void handleMessage(ManualCommitConsumerClient client) {
        String topic = this.getTopic();
        log.info("\u5f00\u59cb\u76d1\u542c\u4e3b\u9898{}\u7684\u6d88\u606f", (Object)topic);
        client.consume(topic, record -> {
            log.info("\u6536\u5230\u6765\u81ea\u4e3b\u9898-{}\u7684\u63a8\u9001\u6d88\u606f\uff1a{}", (Object)topic, (Object)JsonUtil.toStr((Object)record));
            if (null != record.value()) {
                ApiResult apiResult = null;
                MsgConsumeRecordDto msgConsumeRecordDto = null;
                String content = "";
                try {
                    content = this.getMessageContent(record);
                    if (StringUtils.isBlank((CharSequence)content)) {
                        throw new BusinessException((UniverseErrorCode)CommonErrorCode.REQUIRE_PARAM, "\u6d88\u606f\u5185\u5bb9\u4e0d\u80fd\u4e3a\u7a7a");
                    }
                    msgConsumeRecordDto = this.before(record);
                    this.consume(record);
                    msgConsumeRecordDto.setConsumeStatus(ConsumeStatus.CONSUME_SUCC.getStatus());
                    apiResult = ApiResultUtils.buildApiResult();
                    this.after(msgConsumeRecordDto, apiResult);
                }
                catch (BusinessException e) {
                    apiResult = ApiResultUtils.error((UniverseErrorCode)e.getErrorCode(), (String)e.getMessage());
                    log.error("\u6d88\u606f\u6d88\u8d39\u5931\u8d25\uff0c\u9519\u8bef\u4fe1\u606f\u4e3a\uff1a content:{}, apiResult:{}", (Object)record.toString(), (Object)apiResult);
                    this.alarmManage.notice(new BaseAlarmDto((Throwable)e, null, record.toString(), null));
                    this.after(msgConsumeRecordDto, apiResult);
                }
                catch (Exception e2) {
                    apiResult = ApiResultUtils.error((UniverseErrorCode)CommonErrorCode.SYSTEM_ERROR, (String)e2.getMessage());
                    log.error("\u6d88\u606f\u6d88\u8d39\u5931\u8d25\uff0c\u9519\u8bef\u4fe1\u606f\u4e3a\uff1a content:" + record.toString(), (Throwable)e2);
                    this.alarmManage.notice(new BaseAlarmDto((Throwable)e2, null, record.toString(), null));
                    this.after(msgConsumeRecordDto, apiResult);
                    {
                        catch (Throwable throwable) {
                            this.after(msgConsumeRecordDto, apiResult);
                            throw throwable;
                        }
                    }
                }
            }
        });
    }

    private MsgConsumeRecordDto before(ConsumerRecord record) {
        String topic = this.getTopic();
        String unqTag = this.getMessageUniqueKey(record);
        if (StringUtils.isBlank((CharSequence)unqTag)) {
            throw new BusinessException((UniverseErrorCode)CommonErrorCode.REQUIRE_PARAM, "\u9700\u63d0\u4f9b\u8be5\u6d88\u606f\u5728\u8be5\u4e3b\u9898\u4e0b\u7684\u552f\u4e00\u6807\u8bc6");
        }
        MsgConsumeRecordDto msgConsumeRecordDto = this.messageConsumeRecordService.getByTopicAndUnqTag(topic, unqTag);
        if (msgConsumeRecordDto != null) {
            String message;
            int recvCount = msgConsumeRecordDto.getRecvCount() + NumberUtils.INTEGER_ONE;
            msgConsumeRecordDto.setRecvCount(recvCount);
            if (msgConsumeRecordDto.getConsumeStatus() == ConsumeStatus.CONSUME_SUCC.getStatus()) {
                message = "\u6d88\u606f\u5df2\u7ecf\u6210\u529f\u6d88\u8d39\u8fc7\uff0c\u653e\u5f03\u6d88\u8d39";
            } else if (msgConsumeRecordDto.getConsumeStatus() == ConsumeStatus.CONSUMING.getStatus()) {
                message = "\u6d88\u606f\u6b63\u5728\u6d88\u8d39\u4e2d\uff0c\u653e\u5f03\u6d88\u8d39";
            } else if (recvCount > 8 + NumberUtils.INTEGER_ONE) {
                message = "\u8d85\u8fc7\u6700\u5927\u5141\u8bb8\u91cd\u8bd5\u6b21\u6570\uff0c\u653e\u5f03\u6d88\u8d39";
            } else {
                return msgConsumeRecordDto;
            }
            throw new BusinessException((UniverseErrorCode)CommonErrorCode.BUSINESS_ERROR, message);
        }
        msgConsumeRecordDto = this.generateRecord(unqTag, this.getMessageContent(record));
        long id = this.messageConsumeRecordService.insert(msgConsumeRecordDto);
        msgConsumeRecordDto.setId(id);
        log.info("\u6d88\u606f\u63d2\u5165\u6210\u529f\uff1a{}", (Object)msgConsumeRecordDto);
        return msgConsumeRecordDto;
    }

    private void after(MsgConsumeRecordDto record, ApiResult apiResult) {
        try {
            if (record != null) {
                if (record.getConsumeStatus() != ConsumeStatus.CONSUME_SUCC.getStatus() && apiResult.getCode() != NumberUtils.LONG_ZERO.longValue()) {
                    record.setConsumeStatus(ConsumeStatus.CONSUME_FAILED.getStatus());
                }
                record.setResult(JacksonUtil.obj2Str((Object)apiResult));
                this.messageConsumeRecordService.update(record);
            } else {
                log.error("\u66f4\u65b0\u6d88\u606f\u6d88\u8d39\u8bb0\u5f55\u8868\u5931\u8d25\uff1a\u6d88\u606f\u4e3a\u7a7a\uff0c\u653e\u5f03\u63d2\u5165\u6d88\u606f\u6d88\u8d39\u8bb0\u5f55\u8868");
            }
        }
        catch (Exception e) {
            log.error("\u66f4\u65b0\u6d88\u606f\u6d88\u8d39\u8bb0\u5f55\u8868\u5931\u8d25\uff1arecord:{}\uff0capiResul:{}\uff0c\u9519\u8bef\u4fe1\u606f:{}", new Object[]{record, apiResult, ExceptionUtils.getStackTrace((Throwable)e)});
        }
    }

    private MsgConsumeRecordDto generateRecord(String seq, String content) {
        MsgConsumeRecordDto record = new MsgConsumeRecordDto();
        record.setTopic(this.getTopic());
        record.setUnqTag(seq);
        record.setConsumeStatus(ConsumeStatus.UNCONSUME.getStatus());
        record.setContent(content);
        record.setMd5Str(MD5Utils.MD5((String)content));
        record.setRecvCount(NumberUtils.INTEGER_ONE);
        record.setResult("");
        return record;
    }
}

