package com.kuaike.scrm.chat.service;

import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import cn.kinyun.wework.sdk.utils.JacksonUtils;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.scrm.common.utils.AutoCreateIndexUtil;
import com.kuaike.scrm.common.utils.NamedThreadFactory;
import com.kuaike.scrm.common.utils.ThreadPoolMonitorUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.Header;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/kuaike/scrm/chat/service/StoreMessageService.class */
public class StoreMessageService {
    private static final Logger log = LoggerFactory.getLogger(StoreMessageService.class);

    @Value("${scrm.elasticsearch.index.weworkMessage:test_chat_message_alias}")
    private String indexAlias;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Autowired
    private AutoCreateIndexUtil indexUtil;
    private final int size = Runtime.getRuntime().availableProcessors();
    private final ExecutorService storeService = new ThreadPoolExecutor(this.size, this.size * 2, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(10), new NamedThreadFactory("storeService"), new ThreadPoolExecutor.CallerRunsPolicy());

    @PostConstruct
    public void init() {
        ThreadPoolMonitorUtils.addToMonitor(this.storeService);
    }

    public long findLastMsg(String str) {
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("corpId.keyword", str));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.indexAlias});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.sort("seq", SortOrder.DESC);
        searchSourceBuilder.size(1);
        searchSourceBuilder.from(0);
        searchRequest.source(searchSourceBuilder);
        System.out.println(searchSourceBuilder.toString());
        log.debug("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            SearchHit[] hits = this.restHighLevelClient.search(searchRequest, new Header[0]).getHits().getHits();
            if (hits.length <= 0) {
                return 0L;
            }
            try {
                Long seq = ((ChatMsg) JacksonUtils.readValue(hits[0].getSourceAsString(), ChatMsg.class)).getSeq();
                log.info("last msg seq:{}", seq);
                return seq.longValue();
            } catch (Exception e) {
                log.error("findLastMsg search corpId:{}", str, e);
                return 0L;
            }
        } catch (Exception e2) {
            log.error("findLastMsg error corpId:{}", str, e2);
            return 0L;
        }
    }

    public void batchWriteMsgAsync(List<ChatMsg> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        List<List> partition = Lists.partition(list, 100);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(partition.size());
        for (List list2 : partition) {
            newArrayListWithExpectedSize.add(CompletableFuture.runAsync(() -> {
                try {
                    batchWriteMsg(list2);
                } catch (IOException e) {
                    throw new BusinessException(CommonErrorCode.BUSINESS_ERROR);
                }
            }, this.storeService));
        }
        CompletableFuture.allOf((CompletableFuture[]) newArrayListWithExpectedSize.toArray(new CompletableFuture[partition.size()])).whenComplete((r4, th) -> {
            if (th != null) {
                log.error("store msg error", th);
            }
        }).join();
    }

    private void batchWriteMsg(List<ChatMsg> list) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (ChatMsg chatMsg : list) {
            IndexRequest indexRequest = new IndexRequest(this.indexUtil.getIndexName(chatMsg.getMsgTime() != null ? new Date(chatMsg.getMsgTime().longValue()) : new Date(chatMsg.getTime().longValue())), "doc", chatMsg.getMsgId());
            try {
                indexRequest.source(JacksonUtils.writeValueAsString(chatMsg), XContentType.JSON);
                bulkRequest.add(indexRequest);
            } catch (JsonProcessingException e) {
                log.error("json error:{}", JSON.toJSONString(chatMsg));
            }
        }
        BulkResponse bulk = this.restHighLevelClient.bulk(bulkRequest, new Header[0]);
        log.info("bulk response:{}", bulk.status());
        Iterator it = bulk.iterator();
        while (it.hasNext()) {
            BulkItemResponse bulkItemResponse = (BulkItemResponse) it.next();
            log.info("write message id:{}, error:{}", bulkItemResponse.getId(), bulkItemResponse.getFailureMessage());
        }
    }
}
