/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.skynet.logic.consumer;

import com.kuaike.common.entity.WechatMessage;
import com.kuaike.skynet.logic.context.ReplyContext;
import com.kuaike.skynet.logic.context.ReplyContextUtil;
import com.kuaike.skynet.logic.dispatcher.AcceptFriendDispatcher;
import com.kuaike.skynet.logic.dispatcher.WechatMessageDispatcher;
import com.kuaike.skynet.logic.service.common.utils.JacksonUtils;
import com.kuaike.skynet.logic.service.common.utils.KafkaClientUtils;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class WechatMessageConsumer {
    private static final Logger log = LoggerFactory.getLogger(WechatMessageConsumer.class);
    @Autowired
    private KafkaClientUtils kafkaClientUtils;
    private Consumer<String, String> consumer;
    @Value(value="${kafkaMessage.consumer.wechatMsg.topic}")
    private String topic;
    @Autowired
    private WechatMessageDispatcher wechatMessageDispatcher;
    @Autowired
    private AcceptFriendDispatcher acceptFriendDispatcher;
    private JacksonUtils jacksonUtils = new JacksonUtils();

    @PostConstruct
    public void init() {
        this.consumer = this.kafkaClientUtils.buildConsumer();
        this.consumer.subscribe(Collections.singleton(this.topic));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        log.info("Start wechat message consumer");
        block11: while (true) {
            try {
                ConsumerRecords records = this.consumer.poll(100L);
                Iterator iterator = records.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block11;
                    ConsumerRecord record = (ConsumerRecord)iterator.next();
                    WechatMessage wechatMessage = (WechatMessage)this.jacksonUtils.getObjectMapper().readValue((String)record.value(), WechatMessage.class);
                    this.process(wechatMessage);
                }
            }
            catch (Throwable e) {
                log.error("e {}", e);
                continue;
            }
            finally {
                try {
                    this.consumer.commitAsync();
                }
                catch (Throwable records) {}
                continue;
            }
            break;
        }
    }

    private void process(WechatMessage wechatMessage) {
        if (Objects.nonNull(wechatMessage.getIsImport()) && wechatMessage.getIsImport().booleanValue()) {
            log.info("ignore import message, requestId:{}", (Object)wechatMessage.getRequestId());
            return;
        }
        this.initContext(wechatMessage);
        if (this.acceptFriendDispatcher.isAcceptFriendMessage(wechatMessage)) {
            this.acceptFriendDispatcher.dispatchMessage(wechatMessage);
        }
        try {
            this.wechatMessageDispatcher.dispatch(wechatMessage);
        }
        catch (Exception e) {
            log.error("handler message failed, message={}", (Object)wechatMessage, (Object)e);
        }
        ReplyContextUtil.remove();
    }

    private void initContext(WechatMessage wechatMessage) {
        ReplyContext ctx = ReplyContextUtil.get();
        ctx.setRequestId(wechatMessage.getRequestId());
        ctx.setMsgSvrId(wechatMessage.getMsgSvrId());
        ctx.setWechatId(wechatMessage.getWechatId());
        ctx.setTalkerType(wechatMessage.getTalkerType());
        ctx.setCreateTime(wechatMessage.getCreateTime());
    }
}

