package cn.kinyun.crm.sal.leads.service.impl;

import cn.kinyun.crm.common.enums.LeadsType;
import cn.kinyun.crm.common.utils.IdGen;
import cn.kinyun.crm.dal.leads.dto.LeadsSimpleDto;
import cn.kinyun.crm.dal.leads.mapper.AbandonLibMapper;
import cn.kinyun.crm.dal.leads.mapper.DeptLibMapper;
import cn.kinyun.crm.dal.leads.mapper.LeadsBindingInfoMapper;
import cn.kinyun.crm.dal.leads.mapper.PublicLibMapper;
import cn.kinyun.crm.dal.util.BizTableContext;
import cn.kinyun.crm.sal.leads.dto.BindingRecordDto;
import cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService;
import com.google.common.collect.Lists;
import com.kuaike.common.errorcode.CommonErrorCode;
import com.kuaike.common.exception.BusinessException;
import com.kuaike.common.utils.JacksonUtil;
import com.kuaike.common.utils.JsonUtil;
import com.kuaike.scrm.common.dto.CurrentUserInfo;
import com.kuaike.scrm.common.utils.LoginUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:cn/kinyun/crm/sal/leads/service/impl/LeadsBindingRecordEsServiceImpl.class */
public class LeadsBindingRecordEsServiceImpl implements LeadsBindingRecordEsService {
    private static final Logger log = LoggerFactory.getLogger(LeadsBindingRecordEsServiceImpl.class);

    @Value("${crm.elasticsearch.customerBindingRecord}")
    private String index;

    @Value("${permission.token}")
    private String permissionToken;

    @Resource
    private RestHighLevelClient restHighLevelClient;

    @Resource
    private IdGen idGen;

    @Resource
    private DeptLibMapper deptLibMapper;

    @Resource
    private AbandonLibMapper abandonLibMapper;

    @Resource
    private PublicLibMapper publicLibMapper;

    @Resource
    private LeadsBindingInfoMapper leadsBindingInfoMapper;
    private static final int PAGE_SIZE = 3000;

    @Override // cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService
    public void add(BindingRecordDto bindingRecordDto) {
        try {
            this.restHighLevelClient.index(newIndexRequest(bindingRecordDto), new Header[0]);
        } catch (IOException e) {
            log.error("add,params:{},es插入异常", bindingRecordDto, e);
        }
    }

    @Override // cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService
    public void batchAdd(List<BindingRecordDto> list) {
        BulkRequest bulkRequest = new BulkRequest();
        Iterator<BindingRecordDto> it = list.iterator();
        while (it.hasNext()) {
            bulkRequest.add(newIndexRequest(it.next()));
        }
        try {
            log.info("bulk response:{}", this.restHighLevelClient.bulk(bulkRequest, new Header[0]).status());
        } catch (Exception e) {
            log.error("batchAdd, 批量插入绑定记录失败", e);
        }
    }

    @Override // cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService
    public void updateBindingEndTime(Long l, List<Long> list, Date date) {
        if (CollectionUtils.isEmpty(list)) {
            log.warn("updateBindingEndTime,leadsId为空");
        } else if (date == null) {
            log.warn("updateBindingEndTime,bindingEndTime为空");
        } else {
            Lists.partition(list, 500).forEach(list2 -> {
                SearchHit[] hits;
                SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
                searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("leadsId", list2)).must(QueryBuilders.termQuery("bizId", l)).mustNot(QueryBuilders.existsQuery("bindingEndTime")));
                searchSourceBuilder.size(list2.size());
                SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
                searchRequest.source(searchSourceBuilder);
                try {
                    SearchHits hits2 = this.restHighLevelClient.search(searchRequest, new Header[0]).getHits();
                    if (hits2 == null || (hits = hits2.getHits()) == null) {
                        return;
                    }
                    BulkRequest bulkRequest = new BulkRequest();
                    for (SearchHit searchHit : hits) {
                        UpdateRequest updateRequest = new UpdateRequest(this.index, "doc", searchHit.getId());
                        updateRequest.doc(XContentFactory.jsonBuilder().startObject().field("bindingEndTime", date.getTime()).endObject());
                        bulkRequest.add(updateRequest);
                    }
                    if (CollectionUtils.isNotEmpty(bulkRequest.requests())) {
                        log.info("bulkResponse:{}", JsonUtil.toStr(this.restHighLevelClient.bulk(bulkRequest, new Header[0])));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    @Override // cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService
    public List<BindingRecordDto> selectBindingRecordsByLeadsIds(Long l, Long l2) {
        if (l == null || l2 == null) {
            throw new BusinessException(CommonErrorCode.PARAM_ERROR, "请求参数不合法");
        }
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery("bizId", l)).must(QueryBuilders.termQuery("leadsId", l2)));
        searchSourceBuilder.sort("createTime", SortOrder.DESC);
        searchSourceBuilder.size(10);
        SearchRequest searchRequest = new SearchRequest(new String[]{this.index});
        searchRequest.types(new String[]{"doc"});
        searchRequest.source(searchSourceBuilder);
        try {
            SearchHit[] hits = this.restHighLevelClient.search(searchRequest, new Header[0]).getHits().getHits();
            if (hits == null || hits.length == 0) {
                return Collections.emptyList();
            }
            ArrayList newArrayList = Lists.newArrayList();
            for (SearchHit searchHit : hits) {
                try {
                    newArrayList.add((BindingRecordDto) JacksonUtil.str2Obj(searchHit.getSourceAsString(), BindingRecordDto.class));
                } catch (IOException e) {
                    log.error("解析绑定记录失败：", e);
                }
            }
            return newArrayList;
        } catch (Exception e2) {
            log.error("Failed query binding record from es,", e2);
            return Collections.emptyList();
        }
    }

    @Override // cn.kinyun.crm.sal.leads.service.LeadsBindingRecordEsService
    public void syncBindingRecord(String str) {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        CurrentUserInfo currentUser = LoginUtils.getCurrentUser();
        log.info("syncBindingRecord,token:{},operatorId:{}", str, currentUser.getId());
        if (StringUtils.isBlank(str)) {
            throw new BusinessException(CommonErrorCode.PARAM_ERROR, "请求参数不能为空");
        }
        if (!this.permissionToken.equals(str)) {
            throw new BusinessException(CommonErrorCode.PARAM_ERROR, "请求参数不合法");
        }
        Long bizId = currentUser.getBizId();
        CountDownLatch countDownLatch = new CountDownLatch(4);
        new Thread(() -> {
            BizTableContext.putBizId(bizId);
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            Long selectMaxId = this.deptLibMapper.selectMaxId();
            List selectByMaxIdAndLimit = this.deptLibMapper.selectByMaxIdAndLimit(0L, Integer.valueOf(PAGE_SIZE));
            while (true) {
                List<LeadsSimpleDto> list = selectByMaxIdAndLimit;
                if (!CollectionUtils.isNotEmpty(list)) {
                    break;
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (LeadsSimpleDto leadsSimpleDto : list) {
                    newArrayList.add(BindingRecordDto.buildBindingRecord(bizId, leadsSimpleDto.getLeadsId(), leadsSimpleDto.getBindingDeptId(), leadsSimpleDto.getCreateTime(), Integer.valueOf(LeadsType.DEPT_LIB.getValue())));
                }
                batchAdd(newArrayList);
                Long id = ((LeadsSimpleDto) list.get(list.size() - 1)).getId();
                if (id.equals(selectMaxId)) {
                    log.info("dept_id,pageMaxId:{}, maxId:{}相等，结束循环", id, selectMaxId);
                    break;
                }
                selectByMaxIdAndLimit = this.deptLibMapper.selectByMaxIdAndLimit(id, Integer.valueOf(PAGE_SIZE));
            }
            countDownLatch.countDown();
            log.info("部门库绑定信息同步到es完成,耗时:{}", Long.valueOf(System.currentTimeMillis() - valueOf2.longValue()));
            BizTableContext.clear();
        }).start();
        new Thread(() -> {
            BizTableContext.putBizId(bizId);
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            Long selectMaxId = this.abandonLibMapper.selectMaxId();
            List selectByMaxIdAndLimit = this.abandonLibMapper.selectByMaxIdAndLimit(0L, Integer.valueOf(PAGE_SIZE));
            while (true) {
                List<LeadsSimpleDto> list = selectByMaxIdAndLimit;
                if (!CollectionUtils.isNotEmpty(list)) {
                    break;
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (LeadsSimpleDto leadsSimpleDto : list) {
                    newArrayList.add(BindingRecordDto.buildBindingRecord(bizId, leadsSimpleDto.getLeadsId(), -1L, leadsSimpleDto.getCreateTime(), Integer.valueOf(LeadsType.ABANDON_LIB.getValue())));
                }
                batchAdd(newArrayList);
                Long id = ((LeadsSimpleDto) list.get(list.size() - 1)).getId();
                if (id.equals(selectMaxId)) {
                    log.info("abandon_lib,pageMaxId:{}, maxId:{}相等，结束循环", id, selectMaxId);
                    break;
                }
                selectByMaxIdAndLimit = this.abandonLibMapper.selectByMaxIdAndLimit(id, Integer.valueOf(PAGE_SIZE));
            }
            countDownLatch.countDown();
            log.info("废弃库绑定信息同步到es完成,耗时:{}", Long.valueOf(System.currentTimeMillis() - valueOf2.longValue()));
            BizTableContext.clear();
        }).start();
        new Thread(() -> {
            BizTableContext.putBizId(bizId);
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            Long selectMaxId = this.publicLibMapper.selectMaxId();
            List selectByMaxIdAndLimit = this.publicLibMapper.selectByMaxIdAndLimit(0L, Integer.valueOf(PAGE_SIZE));
            while (true) {
                List<LeadsSimpleDto> list = selectByMaxIdAndLimit;
                if (!CollectionUtils.isNotEmpty(list)) {
                    break;
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (LeadsSimpleDto leadsSimpleDto : list) {
                    newArrayList.add(BindingRecordDto.buildBindingRecord(bizId, leadsSimpleDto.getLeadsId(), -1L, leadsSimpleDto.getCreateTime(), Integer.valueOf(LeadsType.PUBLIC_LIB.getValue())));
                }
                batchAdd(newArrayList);
                Long id = ((LeadsSimpleDto) list.get(list.size() - 1)).getId();
                if (id.equals(selectMaxId)) {
                    log.info("public_lib,pageMaxId:{}, maxId:{}相等，结束循环", id, selectMaxId);
                    break;
                }
                selectByMaxIdAndLimit = this.publicLibMapper.selectByMaxIdAndLimit(id, Integer.valueOf(PAGE_SIZE));
            }
            countDownLatch.countDown();
            log.info("公海绑定信息同步到es完成,耗时:{}", Long.valueOf(System.currentTimeMillis() - valueOf2.longValue()));
            BizTableContext.clear();
        }).start();
        new Thread(() -> {
            BizTableContext.putBizId(bizId);
            Long valueOf2 = Long.valueOf(System.currentTimeMillis());
            Long selectMaxId = this.leadsBindingInfoMapper.selectMaxId();
            List selectByMaxIdAndLimit = this.leadsBindingInfoMapper.selectByMaxIdAndLimit(0L, Integer.valueOf(PAGE_SIZE));
            while (true) {
                List<LeadsSimpleDto> list = selectByMaxIdAndLimit;
                if (!CollectionUtils.isNotEmpty(list)) {
                    break;
                }
                ArrayList newArrayList = Lists.newArrayList();
                for (LeadsSimpleDto leadsSimpleDto : list) {
                    newArrayList.add(BindingRecordDto.buildBindingRecord(bizId, leadsSimpleDto.getLeadsId(), -1L, leadsSimpleDto.getCreateTime(), leadsSimpleDto.getCustomerType()));
                }
                batchAdd(newArrayList);
                Long id = ((LeadsSimpleDto) list.get(list.size() - 1)).getId();
                if (id.equals(selectMaxId)) {
                    log.info("public_lib,pageMaxId:{}, maxId:{}相等，结束循环", id, selectMaxId);
                    break;
                }
                selectByMaxIdAndLimit = this.leadsBindingInfoMapper.selectByMaxIdAndLimit(id, Integer.valueOf(PAGE_SIZE));
            }
            countDownLatch.countDown();
            log.info("个人库绑定信息同步到es完成,耗时:{}", Long.valueOf(System.currentTimeMillis() - valueOf2.longValue()));
            BizTableContext.clear();
        }).start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            log.error("syncBindingRecord, countDownLatch发生异常:", e);
        }
        log.info("totalTime:{}", Long.valueOf(System.currentTimeMillis() - valueOf.longValue()));
    }

    private IndexRequest newIndexRequest(BindingRecordDto bindingRecordDto) {
        Map map = JacksonUtil.toMap(bindingRecordDto);
        IndexRequest indexRequest = new IndexRequest(this.index, "doc", this.idGen.getNum());
        indexRequest.source(map, XContentType.JSON);
        return indexRequest;
    }
}
