/*
 * Decompiled with CFR 0.152.
 */
package com.baijia.tianxiao.dal.es.dao.impl;

import com.baijia.tianxiao.dal.es.dao.EsStudentsDao;
import com.baijia.tianxiao.dal.es.dao.impl.AbstractEsBaseDao;
import com.baijia.tianxiao.dal.org.dao.OrgStudentDao;
import com.baijia.tianxiao.dal.org.po.OrgStudent;
import com.baijia.tianxiao.sqlbuilder.dto.PageDto;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Component
public class EsStudentDaoImpl
extends AbstractEsBaseDao
implements EsStudentsDao {
    private static final Logger log = LoggerFactory.getLogger(EsStudentDaoImpl.class);
    @Autowired
    OrgStudentDao orgStudentDao;

    @Override
    public Map<String, Object> getStudentByIndexTypeId(String index, String type, String id) {
        TransportClient client = this.getClient();
        GetResponse response = (GetResponse)client.prepareGet(index, type, id).get();
        return response.getSource();
    }

    @Override
    @Transactional(readOnly=true)
    public void importAllOrgStudentFromDbToEs(String index, String type) {
        long time = System.currentTimeMillis();
        BulkProcessor bulkProcessor = BulkProcessor.builder((Client)this.getClient(), (BulkProcessor.Listener)new BulkProcessor.Listener(){

            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("going to bulk, executionId:{}, numOfActions:{}", (Object)executionId, (Object)request.numberOfActions());
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                log.info("bulk ok, executionId:{}, numOfActions:{}, costs:{}ms, hasFailures:{}", new Object[]{executionId, request.numberOfActions(), response.getTookInMillis(), response.hasFailures()});
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.error("bulk failed!, error:{}", failure);
            }
        }).setConcurrentRequests(3).setBulkActions(10000).setBulkSize(new ByteSizeValue(10L, ByteSizeUnit.MB)).build();
        PageDto page = new PageDto();
        page.setPageNum(Integer.valueOf(1));
        page.setPageSize(Integer.valueOf(3000));
        List students = this.orgStudentDao.getByPage(page, new String[0]);
        while (students.size() > 0) {
            for (OrgStudent orgStudent : students) {
                bulkProcessor.add(((IndexRequest)new IndexRequest().index(index)).type(type).id(orgStudent.getId().toString()).source(objectMapper.writeValueAsBytes((Object)orgStudent)));
            }
            page.setPageNum(Integer.valueOf(page.getPageNum() + 1));
            students = this.orgStudentDao.getByPage(page, new String[0]);
        }
        bulkProcessor.flush();
        bulkProcessor.close();
        log.info("import {} data costs:{}", (Object)page.getCount(), (Object)(System.currentTimeMillis() - time));
    }
}

