package com.dangdang.ddframe.job.plugin.job.type;

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;
import com.dangdang.ddframe.job.api.JobExecutionSingleShardingContext;
import com.dangdang.ddframe.job.internal.job.AbstractDataFlowElasticJob;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/dangdang/ddframe/job/plugin/job/type/AbstractSequenceDataFlowElasticJob.class */
public abstract class AbstractSequenceDataFlowElasticJob<T> extends AbstractDataFlowElasticJob<T, JobExecutionSingleShardingContext> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSequenceDataFlowElasticJob.class);
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @Override // com.dangdang.ddframe.job.internal.job.AbstractElasticJob
    protected final void executeJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        if (isStreamingProcess()) {
            executeStreamingJob(jobExecutionMultipleShardingContext);
        } else {
            executeOneOffJob(jobExecutionMultipleShardingContext);
        }
    }

    private void executeStreamingJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        Map<Integer, List<T>> concurrentFetchData = concurrentFetchData(jobExecutionMultipleShardingContext);
        while (true) {
            Map<Integer, List<T>> map = concurrentFetchData;
            if (map.isEmpty() || isStoped() || getShardingService().isNeedSharding()) {
                return;
            }
            concurrentProcessData(jobExecutionMultipleShardingContext, map);
            concurrentFetchData = concurrentFetchData(jobExecutionMultipleShardingContext);
        }
    }

    private void executeOneOffJob(JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        Map<Integer, List<T>> concurrentFetchData = concurrentFetchData(jobExecutionMultipleShardingContext);
        if (concurrentFetchData.isEmpty()) {
            return;
        }
        concurrentProcessData(jobExecutionMultipleShardingContext, concurrentFetchData);
    }

    private Map<Integer, List<T>> concurrentFetchData(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext) {
        List<Integer> shardingItems = jobExecutionMultipleShardingContext.getShardingItems();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(shardingItems.size());
        final CountDownLatch countDownLatch = new CountDownLatch(shardingItems.size());
        Iterator<Integer> it = shardingItems.iterator();
        while (it.hasNext()) {
            final int intValue = it.next().intValue();
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.plugin.job.type.AbstractSequenceDataFlowElasticJob.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        List<T> fetchData = AbstractSequenceDataFlowElasticJob.this.fetchData(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(intValue));
                        if (null != fetchData && !fetchData.isEmpty()) {
                            concurrentHashMap.put(Integer.valueOf(intValue), fetchData);
                        }
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        latchAwait(countDownLatch);
        log.debug("Elastic job: fetch data size: {}.", Integer.valueOf(concurrentHashMap != null ? concurrentHashMap.size() : 0));
        return concurrentHashMap;
    }

    private void concurrentProcessData(final JobExecutionMultipleShardingContext jobExecutionMultipleShardingContext, Map<Integer, List<T>> map) {
        final CountDownLatch countDownLatch = new CountDownLatch(map.size());
        for (final Map.Entry<Integer, List<T>> entry : map.entrySet()) {
            this.executorService.submit(new Runnable() { // from class: com.dangdang.ddframe.job.plugin.job.type.AbstractSequenceDataFlowElasticJob.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AbstractSequenceDataFlowElasticJob.this.processDataWithStatistics(jobExecutionMultipleShardingContext.createJobExecutionSingleShardingContext(((Integer) entry.getKey()).intValue()), (List) entry.getValue());
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }
            });
        }
        latchAwait(countDownLatch);
    }
}
