package com.kuaike.scrm.chat.service.impl;

import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import com.alibaba.fastjson.JSON;
import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.scrm.chat.service.ConversationService;
import com.kuaike.scrm.chat.service.FileService;
import com.kuaike.scrm.chat.service.MessageConsumerService;
import com.kuaike.scrm.chat.service.StoreMessageService;
import com.kuaike.scrm.chat.service.WeworkMessageService;
import com.kuaike.scrm.common.service.FollowRecordMsgService;
import com.kuaike.scrm.common.service.dto.FollowRecordMsg;
import com.kuaike.scrm.common.utils.JacksonUtils;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/kuaike/scrm/chat/service/impl/MessageConsumerServiceImpl.class */
public class MessageConsumerServiceImpl implements MessageConsumerService {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerServiceImpl.class);

    @Autowired
    private ConversationService conversationService;

    @Autowired
    private FileService fileService;

    @Autowired
    private WeworkMessageService weworkMessageService;

    @Autowired
    private StoreMessageService storeMessageService;

    @Autowired
    private KafkaClientUtils kafkaClient;

    @Value("${kafka.topic.quality_check}")
    private String qualityCheckTopic;

    @Autowired
    private FollowRecordMsgService followRecordMsgService;
    private ExecutorService messageService = Executors.newFixedThreadPool(15, new NamedThreadFactory("UpMessageConsumer"));

    @PostConstruct
    public void init() {
        ThreadPoolMonitorUtils.addToMonitor(this.messageService);
    }

    @Override // com.kuaike.scrm.chat.service.MessageConsumerService
    public void handleMessage(ChatMsg chatMsg) {
        this.weworkMessageService.handleRevokeMessage(chatMsg);
        this.conversationService.handleConversation(chatMsg);
        this.fileService.handleMsgFile(chatMsg);
        try {
            this.kafkaClient.sendMessage(this.qualityCheckTopic, chatMsg.getConversationId(), JacksonUtils.getInstance().writeValueAsString(chatMsg));
        } catch (Exception e) {
            log.error("发送消息到消息质检topic:{}失败,", this.qualityCheckTopic, e);
        }
        this.followRecordMsgService.followRecordToKafka(FollowRecordMsg.fromChatMsg(chatMsg));
    }

    @Override // com.kuaike.scrm.chat.service.MessageConsumerService
    public void handleMessages(List<ChatMsg> list) throws IOException {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        log.info("handleMessages:{}", JSON.toJSONString(list));
        this.storeMessageService.batchWriteMsgAsync(list);
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (ChatMsg chatMsg : list) {
            log.info("wechatMessage: RequestId:{},wechatId:{},TalkerId:{}", new Object[]{chatMsg.getMsgId(), chatMsg.getConversationId(), chatMsg.getFrom()});
            this.messageService.execute(() -> {
                try {
                    try {
                        handleMessage(chatMsg);
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        log.error("handleMessages error,msg:{}", chatMsg.getMsgId(), e);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        }
        try {
            countDownLatch.await(5L, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
            Thread.currentThread().interrupt();
            throw new BusinessException(CommonErrorCode.BUSINESS_ERROR);
        }
    }
}
