/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.chat.service.impl;

import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.errorcode.UniverseErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.scrm.chat.service.ConversationService;
import com.kuaike.scrm.chat.service.FileService;
import com.kuaike.scrm.chat.service.MessageConsumerService;
import com.kuaike.scrm.chat.service.StoreMessageService;
import com.kuaike.scrm.chat.service.WeworkMessageService;
import com.kuaike.scrm.common.service.FollowRecordMsgService;
import com.kuaike.scrm.common.service.dto.FollowRecordMsg;
import com.kuaike.scrm.common.utils.JacksonUtils;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import com.kuaike.scrm.dal.agent.mapper.AgentDecryptWeworkContactIdMapper;
import com.kuaike.scrm.dal.agent.mapper.AgentDecryptWeworkUserIdMapper;
import com.kuaike.scrm.dal.wework.entity.WeworkCorp;
import com.kuaike.scrm.dal.wework.mapper.WeworkCorpMapper;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumerServiceImpl
implements MessageConsumerService {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerServiceImpl.class);
    @Autowired
    private ConversationService conversationService;
    @Autowired
    private FileService fileService;
    @Autowired
    private WeworkMessageService weworkMessageService;
    @Autowired
    private StoreMessageService storeMessageService;
    @Autowired
    private KafkaClientUtils kafkaClient;
    @Value(value="${kafka.topic.quality_check}")
    private String qualityCheckTopic;
    @Autowired
    private FollowRecordMsgService followRecordMsgService;
    @Resource
    private WeworkCorpMapper weworkCorpMapper;
    @Resource
    private AgentDecryptWeworkContactIdMapper agentDecryptWeworkContactIdMapper;
    @Resource
    private AgentDecryptWeworkUserIdMapper agentDecryptWeworkUserIdMapper;
    private ExecutorService messageService = Executors.newFixedThreadPool(15, (ThreadFactory)new NamedThreadFactory("UpMessageConsumer"));

    @PostConstruct
    public void init() {
        ThreadPoolMonitorUtils.addToMonitor((ExecutorService)this.messageService);
    }

    @Override
    public void handleMessage(ChatMsg chatMsg) {
        this.weworkMessageService.handleRevokeMessage(chatMsg);
        this.conversationService.handleConversation(chatMsg);
        this.fileService.handleMsgFile(chatMsg);
        try {
            this.kafkaClient.sendMessage(this.qualityCheckTopic, chatMsg.getConversationId(), JacksonUtils.getInstance().writeValueAsString((Object)chatMsg));
        }
        catch (Exception e) {
            log.error("\u53d1\u9001\u6d88\u606f\u5230\u6d88\u606f\u8d28\u68c0topic:{}\u5931\u8d25,", (Object)this.qualityCheckTopic, (Object)e);
        }
        WeworkCorp weworkCorp = this.weworkCorpMapper.getByBizId(chatMsg.getBizId());
        if (weworkCorp != null && weworkCorp.getIsNewDkf() == 1) {
            Map map = this.agentDecryptWeworkContactIdMapper.queryByDecryptContactIds(chatMsg.getBizId(), chatMsg.getTolist());
            if (MapUtils.isNotEmpty((Map)map)) {
                Collection contactIds = map.values();
                chatMsg.setTolist((List)Lists.newArrayList(contactIds));
            }
            if (StringUtils.isNotBlank((CharSequence)chatMsg.getFrom())) {
                String userId = this.agentDecryptWeworkUserIdMapper.queryByDecryptUserId(chatMsg.getBizId(), chatMsg.getFrom());
                chatMsg.setFrom(userId);
            }
        }
        this.followRecordMsgService.followRecordToKafka(FollowRecordMsg.fromChatMsg((ChatMsg)chatMsg));
    }

    @Override
    public void handleMessages(List<ChatMsg> chatMsgList) throws IOException {
        if (CollectionUtils.isEmpty(chatMsgList)) {
            return;
        }
        log.info("handleMessages:{}", (Object)JSON.toJSONString(chatMsgList));
        this.storeMessageService.batchWriteMsgAsync(chatMsgList);
        CountDownLatch countDownLatch = new CountDownLatch(chatMsgList.size());
        for (ChatMsg chatMsg : chatMsgList) {
            log.info("wechatMessage: RequestId:{},wechatId:{},TalkerId:{}", new Object[]{chatMsg.getMsgId(), chatMsg.getConversationId(), chatMsg.getFrom()});
            this.messageService.execute(() -> {
                try {
                    this.handleMessage(chatMsg);
                }
                catch (Exception e) {
                    log.error("handleMessages error,msg:{}", (Object)chatMsg.getMsgId(), (Object)e);
                }
                finally {
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await(5L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            log.error("Interrupted", (Throwable)e);
            Thread.currentThread().interrupt();
            throw new BusinessException((UniverseErrorCode)CommonErrorCode.BUSINESS_ERROR);
        }
    }
}

