package com.kuaike.skynet.manager.common.service.impl;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.kuaike.common.enums.WechatMsgType;
import com.kuaike.common.sqlbuilder.dto.PageDto;
import com.kuaike.skynet.manager.common.constants.Conf;
import com.kuaike.skynet.manager.common.dto.operate.OpRule;
import com.kuaike.skynet.manager.common.service.WechatMessageCommonService;
import com.kuaike.skynet.manager.common.utils.DateUtil;
import com.kuaike.skynet.manager.dal.wechat.dto.TalkerMsgCountDto;
import com.kuaike.skynet.manager.dal.wechat.dto.WechatFileListDto;
import com.kuaike.skynet.manager.dal.wechat.dto.WechatFileQueryParams;
import com.kuaike.skynet.manager.dal.wechat.dto.WechatMessageDetailDto;
import com.kuaike.skynet.manager.dal.wechat.dto.WechatMessageDetailQueryParams;
import com.kuaike.skynet.manager.dal.wechat.entity.WechatFile;
import com.kuaike.skynet.manager.dal.wechat.entity.WechatFileDto;
import com.kuaike.skynet.manager.dal.wechat.entity.WechatMessage;
import com.kuaike.skynet.manager.dal.wechat.enums.TalkerType;
import com.kuaike.skynet.manager.dal.wechat.mapper.WechatAccountMapper;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.Header;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.cardinality.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/kuaike/skynet/manager/common/service/impl/WechatMessageCommonServiceImpl.class */
public class WechatMessageCommonServiceImpl implements WechatMessageCommonService {

    @Value("${skynet.elasticsearch.wechatMessageIndex}")
    private String index;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    @Autowired
    private WechatAccountMapper wechatAccountMapper;
    private static final Logger log = LoggerFactory.getLogger(WechatMessageCommonServiceImpl.class);
    private static final Date beforeSixMonthDay = DateUtils.addMonths(new Date(), -6);

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public WechatMessage selectByPrimaryKeyFromES(String str) {
        SearchResponse searchResponse;
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "消息id不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("requestId.keyword", str));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return null;
        }
        SearchHit[] hits = searchResponse.getHits().getHits();
        if (hits.length > 0) {
            return (WechatMessage) JSON.parseObject(hits[0].getSourceAsString(), WechatMessage.class);
        }
        return null;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<WechatMessage> queryListByCreateTimeCount(String str, String str2, Date date, Boolean bool, Integer num, Boolean bool2) {
        log.info("queryListByCreateTimeCount params: wechatId:{}, talkerId:{}, createTime:{}, direction:{}, count:{}, isDesc:{}", new Object[]{str, str2, date, bool, num, bool2});
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "wechatId不能为空");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "talkerId不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("wechatId.keyword", str)).must(QueryBuilders.matchQuery("talkerId.keyword", str2));
        if (Objects.nonNull(date) && Objects.nonNull(bool)) {
            must.must(bool.booleanValue() ? QueryBuilders.rangeQuery("createTime").gt(Long.valueOf(date.getTime())) : QueryBuilders.rangeQuery("createTime").lt(Long.valueOf(date.getTime())));
        }
        ValuesSourceAggregationBuilder field = AggregationBuilders.count(OpRule.COUNT_KEY).field("id");
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        if (bool2.booleanValue()) {
            searchSourceBuilder.sort("createTime", SortOrder.DESC);
        } else if (bool.booleanValue()) {
            searchSourceBuilder.sort("createTime", SortOrder.ASC);
        } else {
            searchSourceBuilder.sort("createTime", SortOrder.DESC);
        }
        searchSourceBuilder.sort("msgSvrId.keyword", SortOrder.ASC);
        searchSourceBuilder.size(num.intValue());
        searchSourceBuilder.from(0);
        searchSourceBuilder.aggregation(field);
        return (List) executeQueryAndDealResult(searchRequest, searchSourceBuilder).getLeft();
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<WechatMessage> queryListByImgPath(String str) {
        log.info("queryListByImgPath params, imgPath:{}", str);
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "路径不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("imgPath.keyword", str));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        return (List) executeQueryAndDealResult(searchRequest, searchSourceBuilder).getLeft();
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Pair<List<WechatMessage>, Long> queryPageList(String str, String str2, String str3, Integer num, Integer num2) {
        log.info("queryPageList params, wechatId:{}, talkerId:{}, query:{}, skipResults:{}, pageSize:{}", new Object[]{str, str2, str3, num, num2});
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "wechatId不能为空");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "talkerId不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("wechatId.keyword", str)).must(QueryBuilders.matchQuery("talkerId.keyword", str2)).must(QueryBuilders.matchQuery("type", Integer.valueOf(WechatMsgType.TEXT.getValue())));
        if (StringUtils.isNotBlank(str3)) {
            must.must(QueryBuilders.matchPhraseQuery("message", str3));
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.sort("createTime", SortOrder.ASC);
        searchSourceBuilder.sort("msgSvrId.keyword", SortOrder.ASC);
        searchSourceBuilder.size(num2.intValue());
        searchSourceBuilder.from(num.intValue());
        return executeQueryAndDealResult(searchRequest, searchSourceBuilder);
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Map<String, Integer> getWechatIdMessageMemberCountMap(Collection<String> collection, Date date, Date date2, Set<String> set) {
        SearchResponse searchResponse;
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(collection), "wechatChatRoomIds不能为空");
        Preconditions.checkArgument(Objects.nonNull(date), "开始时间不能为空");
        Preconditions.checkArgument(Objects.nonNull(date2), "结束不能为空");
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("talkerId.keyword", collection)).must(QueryBuilders.rangeQuery("createTime").gte(date)).must(QueryBuilders.rangeQuery("createTime").lte(date2)).must(QueryBuilders.matchQuery("talkerType", Integer.valueOf(TalkerType.CHATROOM.getValue()))).mustNot(QueryBuilders.matchQuery("type", Integer.valueOf(WechatMsgType.WECHAT_SYSTEM.getValue())));
        if (CollectionUtils.isNotEmpty(set)) {
            mustNot.mustNot(QueryBuilders.termsQuery("talkerId.keyword", set));
        }
        TermsAggregationBuilder valueType = AggregationBuilders.terms("groupRoom").field("talkerId.keyword").valueType(ValueType.STRING);
        valueType.subAggregation(AggregationBuilders.cardinality("distinctMemberCount").field("chatroomTalkerId.keyword"));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(mustNot);
        searchSourceBuilder.size(0);
        searchSourceBuilder.aggregation(valueType);
        searchRequest.source(searchSourceBuilder);
        log.debug("source:{}", searchRequest.toString());
        searchRequest.source(searchSourceBuilder);
        log.debug("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return Collections.emptyMap();
        }
        HashMap newHashMap = Maps.newHashMap();
        if (Objects.isNull(searchResponse)) {
            return newHashMap;
        }
        Terms terms = searchResponse.getAggregations().get("groupRoom");
        if (Objects.isNull(terms)) {
            return newHashMap;
        }
        List<Terms.Bucket> buckets = terms.getBuckets();
        if (CollectionUtils.isNotEmpty(buckets)) {
            newHashMap = Maps.newHashMapWithExpectedSize(buckets.size());
            for (Terms.Bucket bucket : buckets) {
                newHashMap.put(bucket.getKeyAsString(), Integer.valueOf(Long.valueOf(bucket.getAggregations().get("distinctMemberCount").getValue()).intValue()));
            }
        }
        return newHashMap;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Long queryCountByMsgTypesWechatIds(Collection<Integer> collection, Collection<String> collection2, Date date, Date date2) {
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(collection2), "wechatIds不能为空");
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(collection), "msgTypes不能为空");
        Preconditions.checkArgument(Objects.nonNull(date), "开始时间不能为空");
        Preconditions.checkArgument(Objects.nonNull(date2), "结束不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("wechatId.keyword", collection2)).must(QueryBuilders.termsQuery("type", collection)).must(QueryBuilders.rangeQuery("createTime").gte(date)).must(QueryBuilders.rangeQuery("createTime").lte(date2));
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctmsgSvrIdCount").field("msgSvrId.keyword");
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder size = new SearchSourceBuilder().size(0);
        size.query(must);
        size.aggregation(field);
        searchRequest.source(size);
        SearchResponse searchResponse = null;
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            e.printStackTrace();
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            Throwables.throwIfUnchecked(e);
        }
        if (Objects.isNull(searchResponse)) {
            return 0L;
        }
        return Long.valueOf(searchResponse.getAggregations().get("distinctmsgSvrIdCount").getValue());
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<TalkerMsgCountDto> queryTalkerCountByWechatIdMessageQuery(String str, String str2) {
        SearchResponse searchResponse;
        log.info("queryTalkerCountByWechatIdMessageQuery params, wechatId:{}, messageQuery:{}", str, str2);
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "wechatId不能为空");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(beforeSixMonthDay)).must(QueryBuilders.termQuery("wechatId.keyword", str)).must(QueryBuilders.termQuery("type", WechatMsgType.TEXT.getValue()));
        if (StringUtils.isNotBlank(str2)) {
            must.must(QueryBuilders.matchPhraseQuery("message", str2));
        }
        TermsAggregationBuilder order = AggregationBuilders.terms("groupTalkerId").field("talkerId.keyword").order(BucketOrder.aggregation("lastMsgSendTime", false));
        order.subAggregation(AggregationBuilders.max("lastMsgSendTime").field("createTime"));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.size(0);
        searchSourceBuilder.aggregation(order);
        searchRequest.source(searchSourceBuilder);
        log.debug("queryTalkerCountByWechatIdMessageQuery:{}", searchSourceBuilder.toString());
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("es search error:", e);
            searchResponse = null;
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (Objects.isNull(searchResponse) || Objects.isNull(searchResponse.getAggregations())) {
            return newArrayList;
        }
        Terms terms = searchResponse.getAggregations().get("groupTalkerId");
        if (Objects.isNull(terms)) {
            return newArrayList;
        }
        List<Terms.Bucket> buckets = terms.getBuckets();
        if (CollectionUtils.isNotEmpty(buckets)) {
            for (Terms.Bucket bucket : buckets) {
                TalkerMsgCountDto talkerMsgCountDto = new TalkerMsgCountDto();
                String keyAsString = bucket.getKeyAsString();
                Long valueOf = Long.valueOf(bucket.getDocCount());
                String valueAsString = bucket.getAggregations().get("lastMsgSendTime").getValueAsString();
                talkerMsgCountDto.setCount(Integer.valueOf(valueOf.intValue()));
                talkerMsgCountDto.setTalkerId(keyAsString);
                talkerMsgCountDto.setLastMsgSendTime(formateDate(valueAsString));
                newArrayList.add(talkerMsgCountDto);
            }
        }
        return newArrayList;
    }

    private static Date formateDate(String str) {
        try {
            return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS Z").parse(str.replace("Z", " UTC"));
        } catch (ParseException e) {
            e.printStackTrace();
            log.error("转换时间失败！", e);
            return null;
        }
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Pair<List<WechatMessageDetailDto>, Long> queryMessageList(WechatMessageDetailQueryParams wechatMessageDetailQueryParams) {
        SearchResponse searchResponse;
        log.info("queryMessageList queryParams:{}", JSON.toJSONString(wechatMessageDetailQueryParams));
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (CollectionUtils.isNotEmpty(wechatMessageDetailQueryParams.getWechatIdSet())) {
            boolQuery.must(QueryBuilders.termsQuery("wechatId.keyword", wechatMessageDetailQueryParams.getWechatIdSet()));
        }
        if (CollectionUtils.isNotEmpty(wechatMessageDetailQueryParams.getTypes())) {
            boolQuery.must(QueryBuilders.termsQuery("type", wechatMessageDetailQueryParams.getTypes()));
        }
        if (Objects.nonNull(wechatMessageDetailQueryParams.getEndTime())) {
            boolQuery.must(QueryBuilders.rangeQuery("createTime").lte(wechatMessageDetailQueryParams.getEndTime()));
        }
        if (Objects.nonNull(wechatMessageDetailQueryParams.getStartTime())) {
            boolQuery.must(QueryBuilders.rangeQuery("createTime").gte(wechatMessageDetailQueryParams.getStartTime()));
        }
        if (CollectionUtils.isNotEmpty(wechatMessageDetailQueryParams.getTalkerIdSet())) {
            boolQuery.must(QueryBuilders.boolQuery().should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("isSend", true)).must(QueryBuilders.termsQuery("wechatId.keyword", Sets.newHashSet(wechatMessageDetailQueryParams.getTalkerIdSet())))).should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("isSend", false)).must(QueryBuilders.termQuery("talkerType", TalkerType.CONTACT.getValue())).must(QueryBuilders.termsQuery("talkerId.keyword", Sets.newHashSet(wechatMessageDetailQueryParams.getTalkerIdSet())))).should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("isSend", false)).must(QueryBuilders.termQuery("talkerType", TalkerType.CHATROOM.getValue())).must(QueryBuilders.termsQuery("chatroomTalkerId.keyword", Sets.newHashSet(wechatMessageDetailQueryParams.getTalkerIdSet())))));
        }
        boolQuery.mustNot(QueryBuilders.termsQuery("talkerId.keyword", Conf.FILTER_OFFICAL_ACCOUNT));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQuery);
        if (Objects.nonNull(wechatMessageDetailQueryParams.getPageDto())) {
            searchSourceBuilder.size(wechatMessageDetailQueryParams.getPageDto().getPageSize().intValue());
            searchSourceBuilder.from(wechatMessageDetailQueryParams.getPageDto().getOffset());
        } else {
            searchSourceBuilder.size(1000);
            searchSourceBuilder.from(0);
        }
        String str = "";
        String sortColumn = wechatMessageDetailQueryParams.getSortColumn();
        if (StringUtils.isBlank(sortColumn)) {
            str = "createTime";
        } else if (StringUtils.isNotBlank(sortColumn)) {
            str = "wechatAlias".equals(sortColumn) ? "nickName.keyword" : "typeDesc".equals(sortColumn) ? "type" : "createTime";
        }
        if (StringUtils.equals("1", wechatMessageDetailQueryParams.getSort())) {
            searchSourceBuilder.sort(str, SortOrder.ASC);
        } else {
            searchSourceBuilder.sort(str, SortOrder.DESC);
        }
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (Objects.isNull(searchResponse)) {
            return new ImmutablePair(newArrayList, 0L);
        }
        SearchHits hits = searchResponse.getHits();
        long totalHits = hits.getTotalHits();
        Stream.of((Object[]) hits.getHits()).forEach(searchHit -> {
            String sourceAsString = searchHit.getSourceAsString();
            String str2 = (String) searchHit.getSourceAsMap().get("requestId");
            WechatMessageDetailDto wechatMessageDetailDto = (WechatMessageDetailDto) JSON.parseObject(sourceAsString, WechatMessageDetailDto.class);
            wechatMessageDetailDto.setWechatMessageId(str2);
            newArrayList.add(wechatMessageDetailDto);
        });
        return new ImmutablePair(newArrayList, Long.valueOf(totalHits));
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Set<Long> syncFile2Es(List<WechatFile> list) {
        HashSet newHashSet = Sets.newHashSet();
        BulkRequest bulkRequest = new BulkRequest();
        int i = 0;
        for (WechatFile wechatFile : list) {
            WechatFileDto wechatFileDto = new WechatFileDto();
            wechatFileDto.setKey(wechatFile.getFileKey());
            wechatFileDto.setFileUrl(wechatFile.getFileUrl());
            wechatFileDto.setFileName(wechatFile.getFileName());
            wechatFileDto.setType(wechatFile.getType().byteValue());
            wechatFileDto.setVideoCover(wechatFile.getVideoCover());
            wechatFileDto.initFileTypeStr();
            wechatFileDto.setRequestId(wechatFile.getRequestId());
            List<String> searchFile = searchFile(wechatFileDto);
            wechatFileDto.setRequestId((String) null);
            if (searchFile.size() > 0) {
                Iterator<String> it = searchFile.iterator();
                while (it.hasNext()) {
                    UpdateRequest updateRequest = new UpdateRequest(this.index, "doc", it.next());
                    updateRequest.doc(JSON.toJSONString(wechatFileDto), XContentType.JSON);
                    bulkRequest.add(updateRequest);
                    i++;
                }
                newHashSet.add(wechatFile.getId());
            } else {
                log.warn("syncFile2Es error: no search response,imgPath={}, requestId:{}", wechatFileDto.getKey(), wechatFile.getRequestId());
            }
        }
        if (i > 0) {
            try {
                this.restHighLevelClient.bulk(bulkRequest, new Header[0]);
            } catch (IOException e) {
                log.error("syncFile2Es error: bulk fail,syncIds={}", newHashSet);
                newHashSet.clear();
            }
        }
        return newHashSet;
    }

    private List<String> searchFile(WechatFileDto wechatFileDto) {
        SearchResponse searchResponse;
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("fileName"));
        if (StringUtils.isNotBlank(wechatFileDto.getRequestId())) {
            mustNot.must(QueryBuilders.termQuery("requestId.keyword", wechatFileDto.getRequestId()));
        } else {
            mustNot.must(QueryBuilders.matchQuery("imgPath.keyword", wechatFileDto.getKey()));
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(mustNot);
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return Collections.emptyList();
        }
        SearchHit[] hits = searchResponse.getHits().getHits();
        ArrayList newArrayList = Lists.newArrayList();
        for (SearchHit searchHit : hits) {
            newArrayList.add(searchHit.getId());
        }
        return newArrayList;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Pair<List<WechatFileListDto>, Long> queryFileByNameAndType(WechatFileQueryParams wechatFileQueryParams, boolean z) {
        Set queryBusinessCustomerAllWechatAccount = this.wechatAccountMapper.queryBusinessCustomerAllWechatAccount(wechatFileQueryParams.getBusinessCustomerId());
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("isSend", z)).must(QueryBuilders.termsQuery("fileTypeStr", wechatFileQueryParams.getFileTypes()));
        if (z) {
            if (StringUtils.isNotEmpty(wechatFileQueryParams.getSender())) {
                must.must(QueryBuilders.termQuery("wechatId", wechatFileQueryParams.getSender()));
            } else {
                must.must(QueryBuilders.termsQuery("wechatId", wechatFileQueryParams.getManageWechatIdSet()));
            }
            if (StringUtils.isNotEmpty(wechatFileQueryParams.getReceiver())) {
                must.must(QueryBuilders.termQuery("talkerId", wechatFileQueryParams.getReceiver()));
            }
        } else {
            if (StringUtils.isNotEmpty(wechatFileQueryParams.getSender())) {
                must.must(QueryBuilders.boolQuery().should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("talkerType", TalkerType.CONTACT.getValue())).must(QueryBuilders.termsQuery("talkerId", Sets.newHashSet(new String[]{wechatFileQueryParams.getSender()})))).should(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("talkerType", TalkerType.CHATROOM.getValue())).must(QueryBuilders.termsQuery("chatroomTalkerId", Sets.newHashSet(new String[]{wechatFileQueryParams.getSender()})))));
            } else {
                must.mustNot(QueryBuilders.termsQuery("talkerId", queryBusinessCustomerAllWechatAccount));
            }
            if (StringUtils.isNotEmpty(wechatFileQueryParams.getReceiver())) {
                must.must(QueryBuilders.termQuery("wechatId", wechatFileQueryParams.getReceiver()));
            } else {
                must.must(QueryBuilders.termsQuery("wechatId", wechatFileQueryParams.getManageWechatIdSet()));
            }
        }
        if (StringUtils.isNotEmpty(wechatFileQueryParams.getFileName())) {
            must.must(QueryBuilders.matchPhraseQuery("fileName", wechatFileQueryParams.getFileName()));
        }
        if (wechatFileQueryParams.getStartTime() != null) {
            must.must(QueryBuilders.rangeQuery("createTime").gte(wechatFileQueryParams.getStartTime()));
        }
        if (wechatFileQueryParams.getEndTime() != null) {
            must.must(QueryBuilders.rangeQuery("createTime").lte(wechatFileQueryParams.getEndTime()));
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.sort("createTime", SortOrder.DESC);
        searchSourceBuilder.size(wechatFileQueryParams.getPageDto().getPageSize().intValue());
        searchSourceBuilder.from((wechatFileQueryParams.getPageDto().getPageNum().intValue() - 1) * wechatFileQueryParams.getPageDto().getPageSize().intValue());
        Pair<List<WechatMessage>, Long> executeQueryAndDealResult = executeQueryAndDealResult(searchRequest, searchSourceBuilder);
        List list = (List) executeQueryAndDealResult.getLeft();
        ArrayList newArrayList = Lists.newArrayList();
        list.forEach(wechatMessage -> {
            newArrayList.add(buildWechatFileListDto(wechatMessage, z));
        });
        return new ImmutablePair(newArrayList, executeQueryAndDealResult.getRight());
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<WechatMessage> queryUrlByKeys(Set<String> set) {
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        TermsQueryBuilder termsQuery = QueryBuilders.termsQuery("imgPath.keyword", set);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(Conf.DEV_WECHAT_MAX_SELECT_COUNT_PER_TASK);
        searchSourceBuilder.query(termsQuery);
        return (List) executeQueryAndDealResult(searchRequest, searchSourceBuilder).getLeft();
    }

    private WechatFileListDto buildWechatFileListDto(WechatMessage wechatMessage, boolean z) {
        WechatFileListDto wechatFileListDto = new WechatFileListDto();
        wechatFileListDto.setFileName(wechatMessage.getFileName());
        wechatFileListDto.setUrl(wechatMessage.getFileUrl());
        wechatFileListDto.setSendTime(wechatMessage.getCreateTime());
        if (z) {
            wechatFileListDto.setSenderId(wechatMessage.getWechatId());
        } else {
            wechatFileListDto.setReceiverId(wechatMessage.getWechatId());
        }
        wechatFileListDto.setTalkerType(wechatMessage.getTalkerType().toString());
        wechatFileListDto.setTalkerId(wechatMessage.getTalkerId());
        wechatFileListDto.setChatRoomTalkerId(wechatMessage.getChatroomTalkerId());
        wechatFileListDto.setVideoCover(wechatMessage.getVideoCover());
        return wechatFileListDto;
    }

    private Pair<List<WechatMessage>, Long> executeQueryAndDealResult(SearchRequest searchRequest, SearchSourceBuilder searchSourceBuilder) {
        SearchResponse searchResponse;
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        ArrayList newArrayList = Lists.newArrayList();
        if (Objects.isNull(searchResponse)) {
            return new ImmutablePair(newArrayList, 0L);
        }
        SearchHits hits = searchResponse.getHits();
        SearchHit[] hits2 = hits.getHits();
        long totalHits = hits.getTotalHits();
        Stream.of((Object[]) hits2).forEach(searchHit -> {
            newArrayList.add((WechatMessage) JSON.parseObject(searchHit.getSourceAsString(), WechatMessage.class));
        });
        return new ImmutablePair(newArrayList, Long.valueOf(totalHits));
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Map<String, List<String>> selectAllPrivateChatByDay(Date date, Date date2, Set<String> set) {
        SearchResponse searchResponse;
        HashMap newHashMap;
        Preconditions.checkArgument(Objects.nonNull(date), "beginTime is null");
        Preconditions.checkArgument(Objects.nonNull(date2), "endTime is null");
        try {
            log.info("selectAllPrivateChatByDay beginTime:{}, endTime:{}", DateUtil.dateToDateString(date, DateUtil.yyyy_MM_dd_HH_mm_ss_EN), DateUtil.dateToDateString(date2, DateUtil.yyyy_MM_dd_HH_mm_ss_EN));
        } catch (Exception e) {
            log.error("selectAllPrivateChatByDay 打印日志失败,Exception: {}", e);
        }
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(date2)).must(QueryBuilders.termQuery("talkerType", 1)).mustNot(QueryBuilders.termsQuery("type", Lists.newArrayList(new Integer[]{570425393, 285212721}))).mustNot(QueryBuilders.termQuery("talkerId.keyword", "weixin"));
        if (CollectionUtils.isNotEmpty(set)) {
            mustNot.mustNot(QueryBuilders.termsQuery("talkerId.keyword", set));
        }
        TermsAggregationBuilder size = AggregationBuilders.terms("allWechatAggre").field("wechatId.keyword").size(100000);
        size.subAggregation(AggregationBuilders.terms("talkerAggre").field("talkerId.keyword").size(100000));
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(mustNot);
        searchSourceBuilder.size(0);
        searchSourceBuilder.aggregation(size);
        searchRequest.source(searchSourceBuilder);
        log.debug("selectAllPrivateChatByDay-------------------");
        log.debug("source:{}", searchRequest.toString());
        searchRequest.source(searchSourceBuilder);
        log.debug("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e2) {
            log.error("Search es failed, searchRequest={}", searchRequest, e2);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return Collections.emptyMap();
        }
        Terms terms = searchResponse.getAggregations().get("allWechatAggre");
        if (Objects.isNull(terms)) {
            return Maps.newHashMap();
        }
        List<Terms.Bucket> buckets = terms.getBuckets();
        if (CollectionUtils.isNotEmpty(buckets)) {
            newHashMap = Maps.newHashMapWithExpectedSize(buckets.size());
            for (Terms.Bucket bucket : buckets) {
                String keyAsString = bucket.getKeyAsString();
                List buckets2 = bucket.getAggregations().get("talkerAggre").getBuckets();
                if (CollectionUtils.isNotEmpty(buckets2)) {
                    ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(buckets2.size());
                    buckets2.forEach(bucket2 -> {
                        newArrayListWithExpectedSize.add(bucket2.getKeyAsString());
                    });
                    newHashMap.put(keyAsString, newArrayListWithExpectedSize);
                } else {
                    newHashMap.put(keyAsString, Lists.newArrayList());
                }
            }
        } else {
            newHashMap = Maps.newHashMap();
        }
        return newHashMap;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<WechatMessage> queryWechatDayMessageDetail(String str, List<String> list, Date date, Date date2, PageDto pageDto, Set<String> set) {
        SearchResponse searchResponse;
        Preconditions.checkArgument(Objects.nonNull(pageDto), "分页参数不能为空");
        Preconditions.checkArgument(Objects.nonNull(str), "wechatId is null");
        Preconditions.checkArgument(Objects.nonNull(date), "beginTime is null");
        Preconditions.checkArgument(Objects.nonNull(date2), "endTime is null");
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.termQuery("wechatId.keyword", str));
        boolQuery.must(QueryBuilders.termsQuery("talkerId.keyword", list));
        boolQuery.must(QueryBuilders.termQuery("talkerType", TalkerType.CONTACT.getValue()));
        boolQuery.must(QueryBuilders.rangeQuery("createTime").gt(Long.valueOf(date.getTime())).lt(Long.valueOf(date2.getTime())));
        if (CollectionUtils.isNotEmpty(set)) {
            boolQuery.mustNot(QueryBuilders.termsQuery("talkerId.keyword", set));
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQuery);
        searchSourceBuilder.from(pageDto.getOffset());
        searchSourceBuilder.size(pageDto.getPageSize().intValue());
        searchSourceBuilder.sort("createTime", SortOrder.ASC);
        searchSourceBuilder.fetchSource(new String[]{"requestId", "wechatId", "isSend", "talkerId", "createTime"}, new String[0]);
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search query wechat message failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return Collections.emptyList();
        }
        SearchHits hits = searchResponse.getHits();
        if (hits.getTotalHits() <= 0) {
            return new ArrayList();
        }
        pageDto.setCount(Integer.valueOf(Long.valueOf(hits.getTotalHits()).intValue()));
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(Long.valueOf(hits.getTotalHits()).intValue());
        Stream.of((Object[]) hits.getHits()).forEach(searchHit -> {
            newArrayListWithExpectedSize.add((WechatMessage) JSON.parseObject(searchHit.getSourceAsString(), WechatMessage.class));
        });
        return newArrayListWithExpectedSize;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public WechatMessage selectByMsgServerId(String str, String str2, String str3) {
        SearchResponse searchResponse;
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "消息msgServerId不能为空");
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("msgSvrId.keyword", str)).must(QueryBuilders.termQuery("wechatId.keyword", str3)).mustNot(QueryBuilders.termQuery("requestId.keyword", str2));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(mustNot);
        searchRequest.source(searchSourceBuilder);
        try {
            searchResponse = this.restHighLevelClient.search(searchRequest, new Header[0]);
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            searchResponse = null;
        }
        if (Objects.isNull(searchResponse)) {
            return null;
        }
        SearchHit[] hits = searchResponse.getHits().getHits();
        if (hits.length > 0) {
            return (WechatMessage) JSON.parseObject(hits[0].getSourceAsString(), WechatMessage.class);
        }
        return null;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Boolean updateMessage(WechatMessage wechatMessage) {
        log.info("updateMessage with:{}", JSON.toJSONString(wechatMessage));
        boolean z = false;
        HashMap hashMap = new HashMap();
        if (Objects.nonNull(wechatMessage.getIsRevoked())) {
            hashMap.put("isRevoked", wechatMessage.getIsRevoked());
        }
        if (Objects.nonNull(wechatMessage.getSendStatus())) {
            hashMap.put("sendStatus", wechatMessage.getSendStatus());
        }
        if (Objects.nonNull(wechatMessage.getTalkerType())) {
            hashMap.put("talkerType", wechatMessage.getTalkerType());
        }
        if (StringUtils.isNotBlank(wechatMessage.getTalkerId())) {
            hashMap.put("talkerId", wechatMessage.getTalkerId());
        }
        if (StringUtils.isNotBlank(wechatMessage.getOriginRequestId())) {
            hashMap.put("originRequestId", wechatMessage.getOriginRequestId());
        }
        if (MapUtils.isEmpty(hashMap)) {
            return false;
        }
        UpdateRequest doc = new UpdateRequest(this.index, "doc", wechatMessage.getRequestId()).doc(hashMap);
        doc.retryOnConflict(3);
        try {
            if (this.restHighLevelClient.update(doc, new Header[0]).getResult() == DocWriteResponse.Result.UPDATED) {
                z = true;
                log.info("updateMessage success requestId:{}", wechatMessage.getRequestId());
            }
        } catch (IOException e) {
            log.error("updateMessage failed, requestId={}", wechatMessage.getRequestId(), e);
            e.printStackTrace();
        }
        return Boolean.valueOf(z);
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Optional<WechatMessage> queryAddFriendBeforeMessage(String str, String str2, Date date, String str3) {
        log.info("querAddFriendMessage params: wechatId:{}, talkerId:{}, createTime:{}", new Object[]{str, str2, date});
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "wechatId不能为空");
        Preconditions.checkArgument(StringUtils.isNotBlank(str2), "talkerId不能为空");
        Preconditions.checkArgument(Objects.nonNull(date), "createTime not null");
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("wechatId.keyword", str)).must(QueryBuilders.matchQuery("talkerId.keyword", str2));
        must.must(QueryBuilders.rangeQuery("createTime").lte(Long.valueOf(date.getTime())));
        must.must(QueryBuilders.termQuery("type", WechatMsgType.TIPS.getValue()));
        must.mustNot(QueryBuilders.termQuery("requestId.keyword", str3));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.sort("createTime", SortOrder.DESC);
        searchSourceBuilder.size(1);
        searchSourceBuilder.from(0);
        List list = (List) executeQueryAndDealResult(searchRequest, searchSourceBuilder).getLeft();
        return CollectionUtils.isEmpty(list) ? Optional.empty() : Optional.ofNullable(list.get(0));
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public List<WechatMessage> queryWaitJoinFriendMsg(Date date, Date date2) {
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.rangeQuery("createTime").gte(date).lt(date2));
        boolQuery.must(QueryBuilders.termQuery("type", WechatMsgType.TEXT.getValue()));
        boolQuery.must(QueryBuilders.termQuery("talkerType", 1));
        boolQuery.must(QueryBuilders.matchPhraseQuery("message", "opcode")).must(QueryBuilders.matchPhraseQuery("message", "ticket")).must(QueryBuilders.matchPhraseQuery("message", "scene"));
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(boolQuery);
        searchSourceBuilder.sort("createTime", SortOrder.DESC);
        searchRequest.source(searchSourceBuilder);
        log.debug("searchSourceBuilder:{}", searchSourceBuilder.toString());
        return (List) executeQueryAndDealResult(searchRequest, searchSourceBuilder).getLeft();
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Map<String, Integer> getChatRoomSpeakerCount(Collection<String> collection, Date date, Date date2) {
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctTalker").field("chatroomTalkerId.keyword");
        HashMap newHashMap = Maps.newHashMap();
        for (String str : collection) {
            BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termQuery("talkerId.keyword", str)).mustNot(QueryBuilders.termQuery("chatroomTalkerId.keyword", str));
            SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
            searchRequest.types(new String[]{"doc"});
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(mustNot);
            searchSourceBuilder.aggregation(field);
            searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
            searchRequest.source(searchSourceBuilder);
            log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
            try {
                newHashMap.put(str, Integer.valueOf((int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctTalker").getValue()));
            } catch (IOException e) {
                log.error("Search es failed, searchRequest={}", searchRequest, e);
                newHashMap.put(str, NumberUtils.INTEGER_ZERO);
            }
        }
        return newHashMap;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Map<String, Integer> getChatRoomMsgCount(Collection<String> collection, Date date, Date date2) {
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctMsgSvrId").field("msgSvrId.keyword");
        HashMap newHashMap = Maps.newHashMap();
        SearchRequest searchRequest = null;
        for (String str : collection) {
            try {
                BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termQuery("talkerId.keyword", str)).mustNot(QueryBuilders.termQuery("chatroomTalkerId.keyword", str));
                searchRequest = new SearchRequest(new String[]{this.index});
                searchRequest.types(new String[]{"doc"});
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.aggregation(field);
                searchSourceBuilder.query(mustNot);
                searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
                searchRequest.source(searchSourceBuilder);
                log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
                newHashMap.put(str, Integer.valueOf((int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctMsgSvrId").getValue()));
            } catch (IOException e) {
                log.error("Search es failed, searchRequest={}", searchRequest, e);
                newHashMap.put(str, NumberUtils.INTEGER_ZERO);
            }
        }
        return newHashMap;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public Map<String, Integer> getChatRoomRobotMsgCount(Collection<String> collection, Collection<String> collection2, Date date, Date date2) {
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctMsgSvrId").field("msgSvrId.keyword");
        HashMap newHashMap = Maps.newHashMap();
        SearchRequest searchRequest = null;
        for (String str : collection) {
            try {
                BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termsQuery("chatroomTalkerId.keyword", collection2)).must(QueryBuilders.termQuery("talkerId.keyword", str));
                searchRequest = new SearchRequest(new String[]{this.index});
                searchRequest.types(new String[]{"doc"});
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.query(must);
                searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
                searchSourceBuilder.aggregation(field);
                searchRequest.source(searchSourceBuilder);
                log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
                newHashMap.put(str, Integer.valueOf((int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctMsgSvrId").getValue()));
            } catch (IOException e) {
                log.error("Search es failed, searchRequest={}", searchRequest, e);
                newHashMap.put(str, NumberUtils.INTEGER_ZERO);
            }
        }
        return newHashMap;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public int[] queryByTalkerIdAndMemberIdsAndTime(String str, String str2, Date date, Date date2) {
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termsQuery("chatroomTalkerId.keyword", new String[]{str2})).must(QueryBuilders.termQuery("talkerId.keyword", str));
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctMsgSvrId").field("msgSvrId.keyword");
        DateHistogramAggregationBuilder minDocCount = AggregationBuilders.dateHistogram("dateAggr").field("createTime").dateHistogramInterval(DateHistogramInterval.DAY).format(DateUtil.YYYY_MM_DD_EN).minDocCount(1L);
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
        searchSourceBuilder.aggregation(field).aggregation(minDocCount);
        searchRequest.source(searchSourceBuilder);
        log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
        int[] iArr = new int[2];
        try {
            SearchResponse search = this.restHighLevelClient.search(searchRequest, new Header[0]);
            iArr[0] = (int) search.getAggregations().get("distinctMsgSvrId").getValue();
            iArr[1] = search.getAggregations().get("dateAggr").getBuckets().size();
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
        }
        return iArr;
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public int getTotalTalkerCountByChatRoomIdsAndTime(Set<String> set, Date date, Date date2) {
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termsQuery("talkerId.keyword", set)).mustNot(QueryBuilders.termsQuery("chatroomTalkerId.keyword", set));
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctTalker").field("chatroomTalkerId.keyword");
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(mustNot);
        searchSourceBuilder.aggregation(field);
        searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
        searchRequest.source(searchSourceBuilder);
        log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            return (int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctTalker").getValue();
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            return NumberUtils.INTEGER_ZERO.intValue();
        }
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public int getTotalMsgCountByChatRoomIdsAndTime(Set<String> set, Date date, Date date2) {
        BoolQueryBuilder mustNot = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termsQuery("talkerId.keyword", set)).mustNot(QueryBuilders.termsQuery("chatroomTalkerId.keyword", set));
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctMsgSvrId").field("msgSvrId.keyword");
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.aggregation(field);
        searchSourceBuilder.query(mustNot);
        searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
        searchRequest.source(searchSourceBuilder);
        log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            return (int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctMsgSvrId").getValue();
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            return NumberUtils.INTEGER_ZERO.intValue();
        }
    }

    @Override // com.kuaike.skynet.manager.common.service.WechatMessageCommonService
    public int getTotalRobotMsgByChatRoomIdsAndTime(Set<String> set, Set<String> set2, Date date, Date date2) {
        BoolQueryBuilder must = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("createTime").gte(Long.valueOf(date.getTime())).lte(Long.valueOf(date2.getTime()))).must(QueryBuilders.termQuery("talkerType", 2)).must(QueryBuilders.existsQuery("chatroomTalkerId")).must(QueryBuilders.termsQuery("chatroomTalkerId.keyword", set2)).must(QueryBuilders.termsQuery("talkerId.keyword", set));
        CardinalityAggregationBuilder field = AggregationBuilders.cardinality("distinctMsgSvrId").field("msgSvrId.keyword");
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(must);
        searchSourceBuilder.size(NumberUtils.INTEGER_ZERO.intValue());
        searchSourceBuilder.aggregation(field);
        searchRequest.source(searchSourceBuilder);
        log.info("searchSourceBuilder:{}", searchSourceBuilder.toString());
        try {
            return (int) this.restHighLevelClient.search(searchRequest, new Header[0]).getAggregations().get("distinctMsgSvrId").getValue();
        } catch (IOException e) {
            log.error("Search es failed, searchRequest={}", searchRequest, e);
            return NumberUtils.INTEGER_ZERO.intValue();
        }
    }
}
