package com.kuaike.scrm.chat.service.impl;

import cn.kinyun.wework.sdk.api.chat.ChatApi;
import cn.kinyun.wework.sdk.entity.chat.ChatData;
import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import cn.kinyun.wework.sdk.entity.chat.EncryptChatData;
import cn.kinyun.wework.sdk.enums.TalkerType;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.kuaike.scrm.chat.dto.ChatDataReq;
import com.kuaike.scrm.chat.service.ChatConfigService;
import com.kuaike.scrm.chat.service.PullChatMsgService;
import com.kuaike.scrm.chat.service.StoreMessageService;
import com.kuaike.scrm.chat.utils.ChatMessageUtil;
import com.kuaike.scrm.common.dto.StopWatchDto;
import com.kuaike.scrm.common.utils.JacksonUtils;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import com.kuaike.scrm.dal.chat.dto.ChatConfigDto;
import java.io.File;
import java.security.GeneralSecurityException;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/kuaike/scrm/chat/service/impl/PullChatMsgServiceImpl.class */
public class PullChatMsgServiceImpl implements PullChatMsgService, InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(PullChatMsgServiceImpl.class);

    @Autowired
    private ChatConfigService chatConfigService;

    @Autowired
    private StoreMessageService storeMessageService;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private KafkaClientUtils kafkaClient;

    @Value("${kafka.topic.chat_msg_crm}")
    private String chatMsgTopic;
    private ExecutorService executeService = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(10), new NamedThreadFactory("PullMsgExecuteService"), new ThreadPoolExecutor.DiscardPolicy());
    private static final long EXECUTE_SEC = 300;
    private static final int DURATION_SEC = 30;
    private static final String PULL_MESSAGE_KEY_FORMAT = "crm:chat:pull:message:%s";
    private static final String PULL_SEQ_KEY_FORMAT = "crm:chat:pull:seq:%s";

    private static String getPullMessageKey(String str) {
        return String.format(PULL_MESSAGE_KEY_FORMAT, str);
    }

    private static String getPullSeqKey(String str) {
        return String.format(PULL_SEQ_KEY_FORMAT, str);
    }

    public void afterPropertiesSet() {
        ThreadPoolMonitorUtils.addToMonitor(this.executeService);
    }

    @Override // com.kuaike.scrm.chat.service.PullChatMsgService
    public void pullAllCorpMessage() {
        log.info("pullAllCorpMessage start");
        List<ChatConfigDto> configCache = this.chatConfigService.getConfigCache();
        if (CollectionUtils.isEmpty(configCache)) {
            log.warn("not found effective chat config");
            return;
        }
        Map map = (Map) configCache.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getCorpId();
        }));
        List<String> list = (List) map.keySet().stream().collect(Collectors.toList());
        Collections.shuffle(list);
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for (String str : list) {
            List list2 = (List) map.get(str);
            this.executeService.execute(() -> {
                try {
                    try {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        String pullMessageKey = getPullMessageKey(str);
                        Boolean ifAbsent = this.redisTemplate.opsForValue().setIfAbsent(pullMessageKey, str, EXECUTE_SEC, TimeUnit.SECONDS);
                        log.info("try pull message corpId:{}, result:{}, key:{}", new Object[]{str, ifAbsent, pullMessageKey});
                        if (!ifAbsent.booleanValue()) {
                            countDownLatch.countDown();
                            return;
                        }
                        try {
                            try {
                                long currentTimeMillis3 = System.currentTimeMillis();
                                pullMessage(list2, Long.valueOf(deadLine(DURATION_SEC)));
                                log.info("pullMessage time:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis3));
                                log.info("release lock corpId:{}", str);
                                this.redisTemplate.delete(pullMessageKey);
                            } catch (Exception e) {
                                log.error("pullMessage error:{}", JSON.toJSONString(list2), e);
                                log.info("release lock corpId:{}", str);
                                this.redisTemplate.delete(pullMessageKey);
                            }
                            log.info("pullMessage and lock time:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                            countDownLatch.countDown();
                        } catch (Throwable th) {
                            log.info("release lock corpId:{}", str);
                            this.redisTemplate.delete(pullMessageKey);
                            throw th;
                        }
                    } catch (Throwable th2) {
                        countDownLatch.countDown();
                        throw th2;
                    }
                } catch (Exception e2) {
                    log.error("pullMessage error", e2);
                    countDownLatch.countDown();
                }
            });
        }
        try {
            countDownLatch.await(301L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        log.info("total execute time:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private long deadLine(int i) {
        return System.currentTimeMillis() + (i * 1000);
    }

    private ChatData getChatData(ChatApi chatApi, ChatDataReq chatDataReq) throws Exception {
        try {
            return chatApi.getChatdata(chatDataReq.getSeq(), chatDataReq.getLimit());
        } catch (Exception e) {
            log.error("getChatdata with error,chatApi={},req={}", chatApi, chatDataReq);
            throw e;
        }
    }

    private ChatDataReq buildChatDataReq(String str) {
        Long valueOf;
        ChatDataReq chatDataReq = new ChatDataReq();
        long currentTimeMillis = System.currentTimeMillis();
        ValueOperations opsForValue = this.redisTemplate.opsForValue();
        Object obj = opsForValue.get(getPullSeqKey(str));
        if (Objects.isNull(obj)) {
            valueOf = Long.valueOf(this.storeMessageService.findLastMsg(str));
            opsForValue.set(getPullSeqKey(str), valueOf.toString());
            log.info("select from es:{}", valueOf);
        } else {
            valueOf = Long.valueOf((String) obj);
            log.info("select from redis:{}", valueOf);
        }
        chatDataReq.setSeq(valueOf);
        log.info("find findLastMsg time:{}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        chatDataReq.setLimit(1000);
        return chatDataReq;
    }

    private void recordPullSeq(String str, Long l) {
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "corpId not null");
        Preconditions.checkArgument(Objects.nonNull(l), "seq not null");
        this.redisTemplate.opsForValue().set(getPullSeqKey(str), l.toString());
        log.info("recordPullSeq seq:{}, corpId:{}", l, str);
    }

    private ChatMsg decrypt(ChatApi chatApi, EncryptChatData encryptChatData) throws GeneralSecurityException {
        return chatApi.decrypt(encryptChatData);
    }

    private boolean pullMessageInternal(ChatConfigDto chatConfigDto, ChatApi chatApi, ChatDataReq chatDataReq) throws Exception {
        log.info("pullMessageInternal chatConfigDto={}, chatApi={}, req:{}", new Object[]{JSON.toJSONString(chatConfigDto), JSON.toJSONString(chatApi), JSON.toJSONString(chatDataReq)});
        StopWatchDto stopWatchDto = new StopWatchDto("PullMessage", true, log);
        stopWatchDto.start("getChatData");
        ChatData chatData = getChatData(chatApi, chatDataReq);
        stopWatchDto.stop();
        if (Objects.isNull(chatData)) {
            log.info("chatdata is null");
            stopWatchDto.print();
            return false;
        }
        List<EncryptChatData> chatdata = chatData.getChatdata();
        if (CollectionUtils.isEmpty(chatdata)) {
            log.info("chatDataList is empty");
            stopWatchDto.print();
            return false;
        }
        stopWatchDto.start("decrypt and send msg");
        for (EncryptChatData encryptChatData : chatdata) {
            try {
                ChatMsg decrypt = decrypt(chatApi, encryptChatData);
                buildChatMsg(decrypt, encryptChatData, chatConfigDto.getCorpId(), chatConfigDto.getBizId());
                try {
                    this.kafkaClient.sendMessage(this.chatMsgTopic, decrypt.getConversationId(), JacksonUtils.getInstance().writeValueAsString(decrypt));
                    log.info("send message:{}", decrypt.getMsgId());
                    chatDataReq.setSeq(encryptChatData.getSeq());
                } catch (Exception e) {
                    chatDataReq.setSeq(encryptChatData.getSeq());
                    log.error("sendMessage to kafka error:{}", JSON.toJSONString(encryptChatData), e);
                }
            } catch (Exception e2) {
                chatDataReq.setSeq(encryptChatData.getSeq());
                log.error("decrypt error:{}", JSON.toJSONString(encryptChatData), e2);
            }
        }
        stopWatchDto.stop();
        stopWatchDto.print();
        return chatdata.size() > 500;
    }

    @Override // com.kuaike.scrm.chat.service.PullChatMsgService
    public void pullMessageManual(List<ChatConfigDto> list, Long l, Long l2, boolean z) {
        ChatDataReq chatDataReq = new ChatDataReq();
        chatDataReq.setSeq(l);
        reSetLimit(l2, chatDataReq);
        ChatConfigDto chatConfigDto = list.get(0);
        while (chatDataReq.getSeq().longValue() < l2.longValue()) {
            reSetLimit(l2, chatDataReq);
            batchPullMsg(list, z, chatDataReq, chatConfigDto);
        }
    }

    private void reSetLimit(Long l, ChatDataReq chatDataReq) {
        if (l.longValue() - chatDataReq.getSeq().longValue() >= 1000) {
            chatDataReq.setLimit(1000);
        } else {
            chatDataReq.setLimit(Integer.valueOf((int) (l.longValue() - chatDataReq.getSeq().longValue())));
        }
    }

    private void batchPullMsg(List<ChatConfigDto> list, boolean z, ChatDataReq chatDataReq, ChatConfigDto chatConfigDto) {
        try {
            ChatApi chatApi = getChatApi(list);
            List<EncryptChatData> chatdata = getChatData(chatApi, chatDataReq).getChatdata();
            if (CollectionUtils.isEmpty(chatdata)) {
                log.info("chatDataList is empty");
                return;
            }
            String corpId = chatConfigDto.getCorpId();
            Long bizId = chatConfigDto.getBizId();
            for (EncryptChatData encryptChatData : chatdata) {
                try {
                    ChatMsg decrypt = decrypt(chatApi, encryptChatData);
                    buildChatMsg(decrypt, encryptChatData, corpId, bizId);
                    if (z) {
                        try {
                            this.kafkaClient.sendMessage(this.chatMsgTopic, decrypt.getConversationId(), JacksonUtils.getInstance().writeValueAsString(decrypt));
                        } catch (Exception e) {
                            chatDataReq.setSeq(encryptChatData.getSeq());
                            log.error("sendMessage to kafka error:{}", JSON.toJSONString(encryptChatData), e);
                        }
                    }
                    log.info("send message:{}", decrypt.getMsgId());
                    chatDataReq.setSeq(encryptChatData.getSeq());
                } catch (Exception e2) {
                    chatDataReq.setSeq(encryptChatData.getSeq());
                    log.error("decrypt error:{}", JSON.toJSONString(encryptChatData), e2);
                }
            }
        } catch (Exception e3) {
            log.error("getChatMsg with error", e3);
        }
    }

    private void pullMessage(List<ChatConfigDto> list, Long l) {
        try {
            String corpId = list.get(0).getCorpId();
            list.forEach((v0) -> {
                v0.validate();
            });
            ChatDataReq buildChatDataReq = buildChatDataReq(corpId);
            ChatApi chatApi = getChatApi(list);
            Long seq = buildChatDataReq.getSeq();
            while (System.currentTimeMillis() < l.longValue()) {
                boolean z = false;
                try {
                    z = pullMessageInternal(list.get(0), chatApi, buildChatDataReq);
                } catch (Exception e) {
                    if (buildChatDataReq.getSeq().longValue() > seq.longValue()) {
                        recordPullSeq(corpId, buildChatDataReq.getSeq());
                    }
                    log.error("pullMessageInternal with error,chatConfig={},req={}", new Object[]{list.get(0), chatApi, e});
                }
                if (!z) {
                    break;
                } else if (buildChatDataReq.getSeq().longValue() > seq.longValue()) {
                    recordPullSeq(corpId, buildChatDataReq.getSeq());
                }
            }
            if (buildChatDataReq.getSeq().longValue() > seq.longValue()) {
                recordPullSeq(corpId, buildChatDataReq.getSeq());
            }
        } catch (Exception e2) {
            log.error("pullMessage error:{}", JSON.toJSONString(list), e2);
        }
    }

    private void buildChatMsg(ChatMsg chatMsg, EncryptChatData encryptChatData, String str, Long l) {
        chatMsg.setSeq(encryptChatData.getSeq());
        chatMsg.setCorpId(str);
        chatMsg.setBizId(l);
        chatMsg.setUpdateTime(Long.valueOf(new Date().getTime()));
        chatMsg.setTalkerType(Integer.valueOf(StringUtils.isNotBlank(chatMsg.getRoomId()) ? TalkerType.CHATROOM.getValue() : TalkerType.CONTACT.getValue()));
        chatMsg.setExternalContact(Boolean.valueOf(chatMsg.getMsgId().contains("external")));
        chatMsg.setConversationId(ChatMsg.getConversationId(chatMsg));
    }

    @Override // com.kuaike.scrm.chat.service.PullChatMsgService
    public Optional<File> fetchFile(String str, Long l, String str2, String str3, String str4) {
        List<ChatConfigDto> selectChatConfigDtoByCorpId = this.chatConfigService.selectChatConfigDtoByCorpId(l, str);
        if (Objects.isNull(selectChatConfigDtoByCorpId)) {
            return Optional.empty();
        }
        try {
            return Optional.ofNullable(getFileDetail(getChatApi(selectChatConfigDtoByCorpId), str2, str4));
        } catch (Exception e) {
            log.error("fetchFile error:{},e", str3, e);
            return Optional.empty();
        }
    }

    private ChatApi getChatApi(List<ChatConfigDto> list) throws Exception {
        return ChatMessageUtil.getChatApi(list);
    }

    public File getFileDetail(ChatApi chatApi, String str, String str2) throws Exception {
        return chatApi.getMediadata(str, UUID.randomUUID().toString().replace("-", "") + "-" + (StringUtils.isBlank(str2) ? "tmpfile" : str2));
    }
}
