/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.chat.service;

import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import cn.kinyun.wework.sdk.utils.JacksonUtils;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.errorcode.UniverseErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.scrm.common.utils.AutoCreateIndexUtil;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.Header;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class StoreMessageService {
    private static final Logger log = LoggerFactory.getLogger(StoreMessageService.class);
    @Value(value="${scrm.elasticsearch.index.weworkMessage:test_chat_message_alias}")
    private String indexAlias;
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    @Autowired
    private AutoCreateIndexUtil indexUtil;
    private final int size = Runtime.getRuntime().availableProcessors();
    private final ExecutorService storeService = new ThreadPoolExecutor(this.size, this.size * 2, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(10), (ThreadFactory)new NamedThreadFactory("storeService"), new ThreadPoolExecutor.CallerRunsPolicy());

    @PostConstruct
    public void init() {
        ThreadPoolMonitorUtils.addToMonitor((ExecutorService)this.storeService);
    }

    public long findLastMsg(String corpId) {
        SearchResponse searchResponse;
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.termQuery((String)"corpId.keyword", (String)corpId));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.indexAlias});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query((QueryBuilder)queryBuilder);
        searchSourceBuilder.sort("seq", SortOrder.DESC);
        searchSourceBuilder.size(1);
        searchSourceBuilder.from(0);
        searchRequest.source(searchSourceBuilder);
        log.debug("searchSourceBuilder:{}", (Object)searchSourceBuilder.toString());
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        }
        catch (Exception e) {
            log.error("findLastMsg error corpId:{}", (Object)corpId, (Object)e);
            return 0L;
        }
        SearchHits hits = searchResponse.getHits();
        SearchHit[] searchHits = hits.getHits();
        if (searchHits.length > 0) {
            SearchHit searchHit = searchHits[0];
            String sourceAsString = searchHit.getSourceAsString();
            try {
                ChatMsg chatMsg = (ChatMsg)JacksonUtils.readValue((String)sourceAsString, ChatMsg.class);
                Long seq = chatMsg.getSeq();
                log.info("last msg seq:{}", (Object)seq);
                return seq;
            }
            catch (Exception var12) {
                log.error("findLastMsg search corpId:{}", (Object)corpId, (Object)var12);
            }
        }
        return 0L;
    }

    public void batchWriteMsgAsync(List<ChatMsg> chatMsgList) {
        if (CollectionUtils.isEmpty(chatMsgList)) {
            return;
        }
        List partition = Lists.partition(chatMsgList, (int)100);
        ArrayList resultFutureList = Lists.newArrayListWithExpectedSize((int)partition.size());
        for (List list : partition) {
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                try {
                    this.batchWriteMsg(list);
                }
                catch (IOException e) {
                    throw new BusinessException((UniverseErrorCode)CommonErrorCode.BUSINESS_ERROR);
                }
            }, this.storeService);
            resultFutureList.add(completableFuture);
        }
        ((CompletableFuture)CompletableFuture.allOf(resultFutureList.toArray(new CompletableFuture[0])).whenComplete((r, t) -> {
            if (t != null) {
                log.error("store msg error", t);
            }
        })).join();
    }

    private void batchWriteMsg(List<ChatMsg> chatMsgList) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (ChatMsg chatMsg : chatMsgList) {
            Date msgTime = chatMsg.getMsgTime() != null ? new Date(chatMsg.getMsgTime()) : new Date(chatMsg.getTime());
            IndexRequest indexRequest = new IndexRequest(this.indexUtil.getIndexName(msgTime), "doc", chatMsg.getMsgId());
            try {
                indexRequest.source(JacksonUtils.writeValueAsString((Object)chatMsg), XContentType.JSON);
            }
            catch (JsonProcessingException e) {
                log.error("json error:{}", (Object)JSON.toJSONString((Object)chatMsg));
                continue;
            }
            bulkRequest.add(indexRequest);
        }
        BulkResponse bulkResponse = this.restHighLevelClient.bulk(bulkRequest, new Header[0]);
        log.info("bulk response:{}", (Object)bulkResponse.status());
        for (BulkItemResponse response : bulkResponse) {
            if (response.isFailed()) {
                log.error("write message id:{}, error:{}", (Object)response.getId(), (Object)response.getFailureMessage());
                continue;
            }
            log.info("write message id:{} success", (Object)response.getId());
        }
    }
}

