/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.chat.service.impl;

import cn.kinyun.wework.sdk.api.chat.ChatApi;
import cn.kinyun.wework.sdk.entity.chat.ChatData;
import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import cn.kinyun.wework.sdk.entity.chat.EncryptChatData;
import cn.kinyun.wework.sdk.enums.TalkerType;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.kuaike.scrm.chat.dto.ChatDataReq;
import com.kuaike.scrm.chat.service.ChatConfigService;
import com.kuaike.scrm.chat.service.PullChatMsgService;
import com.kuaike.scrm.chat.service.StoreMessageService;
import com.kuaike.scrm.chat.utils.ChatMessageUtil;
import com.kuaike.scrm.common.dto.StopWatchDto;
import com.kuaike.scrm.common.utils.JacksonUtils;
import com.kuaike.scrm.common.utils.KafkaClientUtils;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import com.kuaike.scrm.dal.chat.dto.ChatConfigDto;
import com.kuaike.scrm.dal.wework.entity.WeworkCorp;
import com.kuaike.scrm.dal.wework.mapper.WeworkCorpMapper;
import java.io.File;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Service;

@Service
public class PullChatMsgServiceImpl
implements PullChatMsgService,
InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(PullChatMsgServiceImpl.class);
    @Autowired
    private ChatConfigService chatConfigService;
    @Autowired
    private StoreMessageService storeMessageService;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private KafkaClientUtils kafkaClient;
    @Value(value="${kafka.topic.chat_msg_crm}")
    private String chatMsgTopic;
    private final ExecutorService executeService;
    @Autowired
    private WeworkCorpMapper weworkCorpMapper;
    private static final long EXECUTE_SEC = 300L;
    private static final long FETCH_ONE_TIME = 2L;
    private static final String PULL_MESSAGE_KEY_FORMAT = "crm:chat:pull:message:%s";
    private static final String PULL_SEQ_KEY_FORMAT = "crm:chat:pull:seq:%s";

    PullChatMsgServiceImpl() {
        int processors = Runtime.getRuntime().availableProcessors();
        int minSize = processors * 2;
        int maxSize = processors * 20;
        this.executeService = new ThreadPoolExecutor(minSize, maxSize, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(1000), (ThreadFactory)new NamedThreadFactory("PullMsgExecuteService"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private static String getPullMessageKey(String corpId) {
        return String.format(PULL_MESSAGE_KEY_FORMAT, corpId);
    }

    private static String getPullSeqKey(String corpId) {
        return String.format(PULL_SEQ_KEY_FORMAT, corpId);
    }

    public void afterPropertiesSet() {
        ThreadPoolMonitorUtils.addToMonitor((ExecutorService)this.executeService);
    }

    @Override
    public void pullAllCorpMessage() {
        log.info("pullAllCorpMessage start");
        List<ChatConfigDto> allChatConfig = this.chatConfigService.getConfigCache();
        if (CollectionUtils.isEmpty(allChatConfig)) {
            log.warn("not found effective chat config");
            return;
        }
        List corpList = this.weworkCorpMapper.selectAllAuthed();
        if (CollectionUtils.isEmpty((Collection)corpList)) {
            log.warn("not found effective corp");
            return;
        }
        Set bizIds = corpList.stream().map(WeworkCorp::getBizId).collect(Collectors.toSet());
        Map<String, List<ChatConfigDto>> configMap = allChatConfig.stream().filter(it -> bizIds.contains(it.getBizId())).collect(Collectors.groupingBy(ChatConfigDto::getCorpId));
        if (MapUtils.isEmpty(configMap)) {
            log.warn("not found effective chat config");
            return;
        }
        ArrayList<String> corpIdList = new ArrayList<String>(configMap.keySet());
        for (String corpId : corpIdList) {
            List<ChatConfigDto> chatConfigDtos = configMap.get(corpId);
            String key = PullChatMsgServiceImpl.getPullMessageKey(corpId);
            ValueOperations opsForValue = this.redisTemplate.opsForValue();
            if (Boolean.TRUE.equals(this.redisTemplate.hasKey((Object)key))) {
                log.info("\u8be5\u5546\u6237\u6b63\u5728\u62c9\u53d6\u6570\u636e\uff0c\u8df3\u8fc7\u3002corpId:{}", (Object)corpId);
                continue;
            }
            this.executeService.execute(() -> {
                try {
                    Boolean isSuccess = opsForValue.setIfAbsent((Object)key, (Object)corpId, 302L, TimeUnit.SECONDS);
                    if (Boolean.FALSE.equals(isSuccess)) {
                        log.info("\u53e6\u4e00\u4e2a\u7ebf\u7a0b\u6b63\u5728\u62c9\u53d6\u5546\u6237\u6570\u636e\uff0c\u8df3\u8fc7\u3002corpId:{}", (Object)corpId);
                        return;
                    }
                    log.info("try pull message corpId:{}, result:{}, key:{}", new Object[]{corpId, isSuccess, key});
                }
                catch (Exception e) {
                    log.error("pullMessage error", (Throwable)e);
                    return;
                }
                long startTime = System.currentTimeMillis();
                try {
                    this.pullMessage(chatConfigDtos, this.deadLine(300L));
                    log.info("pullMessage time:{}", (Object)(System.currentTimeMillis() - startTime));
                }
                catch (Exception e) {
                    log.error("pullMessage error:{}", (Object)JSON.toJSONString((Object)chatConfigDtos), (Object)e);
                }
                finally {
                    log.info("release lock corpId:{}", (Object)corpId);
                    this.redisTemplate.delete((Object)key);
                }
                log.info("pullMessage finished, corpId:{}, time:{}", (Object)corpId, (Object)(System.currentTimeMillis() - startTime));
            });
        }
    }

    private long deadLine(long durationSec) {
        return System.currentTimeMillis() + durationSec * 1000L;
    }

    private ChatData getChatData(ChatApi chatApi, ChatDataReq req) {
        try {
            return chatApi.getChatdata(req.getSeq(), req.getLimit());
        }
        catch (Exception e) {
            log.error("getChatdata with error,chatApi={},req={}", (Object)chatApi, (Object)req);
            throw e;
        }
    }

    private ChatDataReq buildChatDataReq(String corpId) {
        long seq;
        ChatDataReq req = new ChatDataReq();
        long beginTime = System.currentTimeMillis();
        ValueOperations opsForValue = this.redisTemplate.opsForValue();
        Object seqStr = opsForValue.get((Object)PullChatMsgServiceImpl.getPullSeqKey(corpId));
        if (Objects.isNull(seqStr)) {
            seq = this.storeMessageService.findLastMsg(corpId);
            opsForValue.set((Object)PullChatMsgServiceImpl.getPullSeqKey(corpId), (Object)Long.toString(seq));
            log.info("select from es:{}", (Object)seq);
        } else {
            seq = Long.parseLong((String)seqStr);
            log.info("select from redis:{}", (Object)seq);
        }
        req.setSeq(seq);
        log.info("find findLastMsg time:{}", (Object)(System.currentTimeMillis() - beginTime));
        req.setLimit(1000);
        return req;
    }

    private void recordPullSeq(String corpId, Long seq) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)corpId), (Object)"corpId not null");
        Preconditions.checkArgument((boolean)Objects.nonNull(seq), (Object)"seq not null");
        ValueOperations opsForValue = this.redisTemplate.opsForValue();
        opsForValue.set((Object)PullChatMsgServiceImpl.getPullSeqKey(corpId), (Object)seq.toString());
        log.info("recordPullSeq seq:{}, corpId:{}", (Object)seq, (Object)corpId);
    }

    private ChatMsg decrypt(ChatApi chatApi, EncryptChatData encryptChatData) throws GeneralSecurityException {
        return chatApi.decrypt(encryptChatData);
    }

    private boolean pullMessageInternal(ChatConfigDto chatConfigDto, ChatApi chatApi, ChatDataReq req) {
        log.info("pullMessageInternal corpId={}, req:{}", (Object)chatConfigDto.getCorpId(), (Object)req);
        StopWatchDto stopWatchDto = new StopWatchDto("PullMessage", true, log);
        stopWatchDto.start("getChatData");
        ChatData chatdata = this.getChatData(chatApi, req);
        stopWatchDto.stop();
        if (Objects.isNull(chatdata)) {
            log.info("chatdata is null");
            stopWatchDto.print();
            return false;
        }
        List chatDataList = chatdata.getChatdata();
        if (CollectionUtils.isEmpty((Collection)chatDataList)) {
            log.info("chatDataList is empty");
            stopWatchDto.print();
            return false;
        }
        stopWatchDto.start("decrypt and send msg");
        for (EncryptChatData encryptChatData : chatDataList) {
            ChatMsg chatMsg;
            req.setSeq(encryptChatData.getSeq());
            try {
                chatMsg = this.decrypt(chatApi, encryptChatData);
            }
            catch (Exception e) {
                log.error("decrypt error:{}", (Object)JSON.toJSONString((Object)encryptChatData), (Object)e);
                continue;
            }
            this.buildChatMsg(chatMsg, encryptChatData, chatConfigDto.getCorpId(), chatConfigDto.getBizId());
            try {
                log.info("send message:{}", (Object)chatMsg.getMsgId());
                this.kafkaClient.sendMessage(this.chatMsgTopic, chatMsg.getConversationId(), JacksonUtils.getInstance().writeValueAsString((Object)chatMsg));
            }
            catch (Exception e) {
                log.error("sendMessage to kafka error:{}", (Object)JSON.toJSONString((Object)encryptChatData), (Object)e);
            }
        }
        stopWatchDto.stop();
        stopWatchDto.print();
        return chatDataList.size() > 500;
    }

    @Override
    public void pullMessageManual(List<ChatConfigDto> chatConfigDtos, Long startSeq, Long endSeq, boolean sendToKafka) {
        ChatDataReq req = new ChatDataReq();
        req.setSeq(startSeq);
        this.reSetLimit(endSeq, req);
        ChatConfigDto chatConfigDto = chatConfigDtos.get(0);
        while (req.getSeq() < endSeq) {
            this.reSetLimit(endSeq, req);
            this.batchPullMsg(chatConfigDtos, sendToKafka, req, chatConfigDto);
        }
    }

    private void reSetLimit(Long endSeq, ChatDataReq req) {
        if (endSeq - req.getSeq() >= 1000L) {
            req.setLimit(1000);
        } else {
            req.setLimit((int)(endSeq - req.getSeq()));
        }
    }

    private void batchPullMsg(List<ChatConfigDto> chatConfigDtos, boolean sendToKafka, ChatDataReq req, ChatConfigDto chatConfigDto) {
        List chatDataList;
        ChatApi chatApi;
        try {
            chatApi = this.getChatApi(chatConfigDtos);
            ChatData chatdata = this.getChatData(chatApi, req);
            chatDataList = chatdata.getChatdata();
        }
        catch (Exception e) {
            log.error("getChatMsg with error", (Throwable)e);
            return;
        }
        if (CollectionUtils.isEmpty((Collection)chatDataList)) {
            log.info("chatDataList is empty");
            return;
        }
        String corpId = chatConfigDto.getCorpId();
        Long bizId = chatConfigDto.getBizId();
        for (EncryptChatData encryptChatData : chatDataList) {
            ChatMsg chatMsg;
            try {
                chatMsg = this.decrypt(chatApi, encryptChatData);
            }
            catch (Exception e) {
                req.setSeq(encryptChatData.getSeq());
                log.error("decrypt error:{}", (Object)JSON.toJSONString((Object)encryptChatData), (Object)e);
                continue;
            }
            this.buildChatMsg(chatMsg, encryptChatData, corpId, bizId);
            if (sendToKafka) {
                try {
                    this.kafkaClient.sendMessage(this.chatMsgTopic, chatMsg.getConversationId(), JacksonUtils.getInstance().writeValueAsString((Object)chatMsg));
                }
                catch (Exception e) {
                    log.error("sendMessage to kafka error:{}", (Object)JSON.toJSONString((Object)encryptChatData), (Object)e);
                }
            }
            log.info("send message:{}", (Object)chatMsg.getMsgId());
            req.setSeq(encryptChatData.getSeq());
        }
    }

    private void pullMessage(List<ChatConfigDto> chatConfigDtos, Long deadLine) {
        try {
            String corpId = chatConfigDtos.get(0).getCorpId();
            chatConfigDtos.forEach(ChatConfigDto::validate);
            ChatDataReq req = this.buildChatDataReq(corpId);
            ChatApi chatApi = this.getChatApi(chatConfigDtos);
            Long preSeq = req.getSeq();
            while (System.currentTimeMillis() < deadLine) {
                boolean needContinue = false;
                try {
                    needContinue = this.pullMessageInternal(chatConfigDtos.get(0), chatApi, req);
                }
                catch (Exception e) {
                    if (req.getSeq() > preSeq) {
                        this.recordPullSeq(corpId, req.getSeq());
                    }
                    log.error("pullMessageInternal with error", (Throwable)e);
                }
                if (!needContinue) break;
                if (req.getSeq() <= preSeq) continue;
                this.recordPullSeq(corpId, req.getSeq());
            }
            if (req.getSeq() > preSeq) {
                this.recordPullSeq(corpId, req.getSeq());
            }
        }
        catch (Exception e) {
            log.error("pullMessage error:{}", (Object)JSON.toJSONString(chatConfigDtos), (Object)e);
        }
    }

    private void buildChatMsg(ChatMsg chatMsg, EncryptChatData encryptChatData, String corpId, Long bizId) {
        chatMsg.setSeq(encryptChatData.getSeq());
        chatMsg.setCorpId(corpId);
        chatMsg.setBizId(bizId);
        chatMsg.setUpdateTime(Long.valueOf(new Date().getTime()));
        chatMsg.setTalkerType(Integer.valueOf(StringUtils.isNotBlank((CharSequence)chatMsg.getRoomId()) ? TalkerType.CHATROOM.getValue() : TalkerType.CONTACT.getValue()));
        chatMsg.setExternalContact(Boolean.valueOf(chatMsg.getMsgId().contains("external")));
        chatMsg.setConversationId(ChatMsg.getConversationId((ChatMsg)chatMsg));
    }

    @Override
    public Optional<File> fetchFile(String corpId, Long bizId, String sdkfiledid, String msgId, String fileName) {
        List<ChatConfigDto> chatConfigDtos = this.chatConfigService.selectChatConfigDtoByCorpId(bizId, corpId);
        if (Objects.isNull(chatConfigDtos)) {
            return Optional.empty();
        }
        try {
            ChatApi chatApi = this.getChatApi(chatConfigDtos);
            File file = this.getFileDetail(chatApi, sdkfiledid, fileName);
            return Optional.ofNullable(file);
        }
        catch (Exception e) {
            log.error("fetchFile error:{},e", (Object)msgId, (Object)e);
            return Optional.empty();
        }
    }

    private ChatApi getChatApi(List<ChatConfigDto> chatConfigDtos) throws IOException {
        return ChatMessageUtil.getChatApi(chatConfigDtos);
    }

    public File getFileDetail(ChatApi chatApi, String sdkfileid, String fileName) {
        fileName = StringUtils.isBlank((CharSequence)fileName) ? "tmpfile" : fileName;
        return chatApi.getMediadata(sdkfileid, UUID.randomUUID().toString().replace("-", "") + "-" + fileName);
    }
}

