package com.kuaike.skynet.logic.consumer;

import com.kuaike.common.entity.WechatMessage;
import com.kuaike.skynet.logic.context.ReplyContext;
import com.kuaike.skynet.logic.context.ReplyContextUtil;
import com.kuaike.skynet.logic.dispatcher.AcceptFriendDispatcher;
import com.kuaike.skynet.logic.dispatcher.WechatMessageDispatcher;
import com.kuaike.skynet.logic.service.common.utils.JacksonUtils;
import com.kuaike.skynet.logic.service.common.utils.KafkaClientUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/kuaike/skynet/logic/consumer/WechatMessageConsumer.class */
public class WechatMessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(WechatMessageConsumer.class);

    @Autowired
    private KafkaClientUtils kafkaClientUtils;
    private Consumer<String, String> consumer;

    @Value("${kafkaMessage.consumer.wechatMsg.topic}")
    private String topic;

    @Autowired
    private WechatMessageDispatcher wechatMessageDispatcher;

    @Autowired
    private AcceptFriendDispatcher acceptFriendDispatcher;
    private JacksonUtils jacksonUtils = new JacksonUtils();

    @PostConstruct
    public void init() {
        this.consumer = this.kafkaClientUtils.buildConsumer();
        this.consumer.subscribe(Collections.singleton(this.topic));
    }

    public void start() {
        log.info("Start wechat message consumer");
        while (true) {
            try {
                try {
                    Iterator it = this.consumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        process((WechatMessage) this.jacksonUtils.getObjectMapper().readValue((String) ((ConsumerRecord) it.next()).value(), WechatMessage.class));
                    }
                    try {
                        this.consumer.commitAsync();
                    } catch (Throwable th) {
                    }
                } catch (Throwable th2) {
                    log.error("e {}", th2);
                    try {
                        this.consumer.commitAsync();
                    } catch (Throwable th3) {
                    }
                }
            } catch (Throwable th4) {
                try {
                    this.consumer.commitAsync();
                } catch (Throwable th5) {
                }
                throw th4;
            }
        }
    }

    private void process(WechatMessage wechatMessage) {
        if (Objects.nonNull(wechatMessage.getIsImport()) && wechatMessage.getIsImport().booleanValue()) {
            log.info("ignore import message, requestId:{}", wechatMessage.getRequestId());
            return;
        }
        initContext(wechatMessage);
        if (this.acceptFriendDispatcher.isAcceptFriendMessage(wechatMessage)) {
            this.acceptFriendDispatcher.dispatchMessage(wechatMessage);
        }
        try {
            this.wechatMessageDispatcher.dispatch(wechatMessage);
        } catch (Exception e) {
            log.error("handler message failed, message={}", wechatMessage, e);
        }
        ReplyContextUtil.remove();
    }

    private void initContext(WechatMessage wechatMessage) {
        ReplyContext replyContext = ReplyContextUtil.get();
        replyContext.setRequestId(wechatMessage.getRequestId());
        replyContext.setMsgSvrId(wechatMessage.getMsgSvrId());
        replyContext.setWechatId(wechatMessage.getWechatId());
        replyContext.setTalkerType(wechatMessage.getTalkerType());
        replyContext.setCreateTime(wechatMessage.getCreateTime());
    }
}
