package com.dangdang.ddframe.job.plugin.job.type;

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.internal.job.AbstractDataFlowElasticJob;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dangdang/ddframe/job/plugin/job/type/AbstractThroughputDataFlowElasticJob.class */
public abstract class AbstractThroughputDataFlowElasticJob<T> extends AbstractDataFlowElasticJob<T, JobExecutionMultipleShardingContext> {
    private static final Logger log = LoggerFactory.getLogger(AbstractThroughputDataFlowElasticJob.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @Override // com.dangdang.ddframe.job.internal.job.AbstractElasticJob
    protected final void executeJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        if (isStreamingProcess()) {
            executeStreamingJob(jobExecutionMultipleShardingContext);
        } else {
            executeOneOffJob(jobExecutionMultipleShardingContext);
        }
    }

    private void executeStreamingJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchDataWithLog = fetchDataWithLog(jobExecutionMultipleShardingContext);
        while (true) {
            List<T> list = fetchDataWithLog;
            if (null == list || list.isEmpty() || isStoped() || getShardingService().isNeedSharding()) {
                return;
            }
            concurrentProcessData(jobExecutionMultipleShardingContext, list);
            fetchDataWithLog = fetchDataWithLog(jobExecutionMultipleShardingContext);
        }
    }

    private void executeOneOffJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchDataWithLog = fetchDataWithLog(jobExecutionMultipleShardingContext);
        if (null == fetchDataWithLog || fetchDataWithLog.isEmpty()) {
            return;
        }
        concurrentProcessData(jobExecutionMultipleShardingContext, fetchDataWithLog);
    }

    private List<T> fetchDataWithLog(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<T> fetchData = fetchData(jobExecutionMultipleShardingContext);
        log.debug("Elastic job: fetch data size: {}.", Integer.valueOf(fetchData != null ? fetchData.size() : 0));
        return fetchData;
    }

    private void concurrentProcessData(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext, List<T> list) {
        int concurrentDataProcessThreadCount = getConfigService().getConcurrentDataProcessThreadCount();
        if (concurrentDataProcessThreadCount <= 1 || list.size() <= concurrentDataProcessThreadCount) {
            processDataWithStatistics(jobExecutionMultipleShardingContext, list);
            return;
        }
        List<List> partition = Lists.partition(list, list.size() / concurrentDataProcessThreadCount);
        final CountDownLatch countDownLatch = new CountDownLatch(partition.size());
        for (final List list2 : partition) {
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.plugin.job.type.AbstractThroughputDataFlowElasticJob.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractThroughputDataFlowElasticJob.this.processDataWithStatistics(jobExecutionMultipleShardingContext, list2);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            });
        }
        latchAwait(countDownLatch);
    }
}
