/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.utils.exec;

import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.kuaike.scrm.utils.exec.AbstractTaskJob;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class TaskJobExecutor {
    private static final Logger log = LoggerFactory.getLogger(TaskJobExecutor.class);
    private static int availableCore = Runtime.getRuntime().availableProcessors();
    private static int blockSize = 2048;
    public static final ExecutorService executorService = Executors.newWorkStealingPool(availableCore * 4);
    private static final ConcurrentHashMap<String, AbstractTaskJob> runningJob = new ConcurrentHashMap(1024);
    private static final ConcurrentHashMap<String, AbstractTaskJob> blockedJob = new ConcurrentHashMap(1024);
    private static final ArrayBlockingQueue<AbstractTaskJob> needRunningJob = Queues.newArrayBlockingQueue((int)2048);
    private static final ArrayBlockingQueue<AbstractTaskJob> finishedJob = Queues.newArrayBlockingQueue((int)1024);
    private static volatile AtomicBoolean run = new AtomicBoolean(true);
    static final Thread mainDispatcher = new Thread(() -> {
        AbstractTaskJob latestJob = null;
        int count = 0;
        while (run.get()) {
            try {
                AbstractTaskJob taskJob = needRunningJob.take();
                if (runningJob.containsKey(taskJob.getWechatId())) {
                    needRunningJob.put(taskJob);
                    if (taskJob.equals(latestJob)) {
                        if (++count < 2) continue;
                        Thread.sleep(200L);
                        continue;
                    }
                    latestJob = taskJob;
                    count = 0;
                    continue;
                }
                runningJob.put(taskJob.getWechatId(), taskJob);
                taskJob.subJobList().stream().forEach(a -> executorService.submit((Runnable)a));
            }
            catch (InterruptedException e) {
                log.error("interrupt e", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
    }, "job\u4efb\u52a1\u5206\u53d1\u5668");
    static final Thread backgroudScaner = new Thread(() -> {
        while (run.get()) {
            try {
                AbstractTaskJob taskJob = finishedJob.take();
                runningJob.remove(taskJob.getWechatId());
                blockedJob.remove(taskJob.getId());
            }
            catch (InterruptedException e) {
                log.error("interrupt e", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
    }, "\u4efb\u52a1\u5b8c\u6210\u8c03\u5ea6\u5668");

    public static synchronized void addTaskJob(AbstractTaskJob taskJob) {
        if (taskJob == null) {
            return;
        }
        if (taskJob.totalSubJob.get() != taskJob.jobRunnableList.size()) {
            log.error("error taskJob={}", (Object)taskJob);
            return;
        }
        if (blockedJob.containsKey(taskJob.getId())) {
            log.warn("task have been add to pool,id={},taskId={},wechatId={},", new Object[]{taskJob.getId(), taskJob.getTaskId(), taskJob.getWechatId()});
            return;
        }
        blockedJob.put(taskJob.getId(), taskJob);
        try {
            needRunningJob.put(taskJob);
        }
        catch (InterruptedException e) {
            log.error("push fail ", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        log.info("put taskJob id={}", (Object)taskJob.getId());
    }

    @PostConstruct
    public void start() {
        mainDispatcher.setPriority(10);
        backgroudScaner.setPriority(10);
        mainDispatcher.start();
        log.info("mainDispatcher start");
        backgroudScaner.start();
        log.info("backgroudScaner start");
    }

    public static synchronized void addFinishedJob(AbstractTaskJob taskJob) {
        try {
            finishedJob.put(taskJob);
        }
        catch (InterruptedException e) {
            log.error("put to finished queue fail", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    @PreDestroy
    public void close() {
        run.compareAndSet(true, false);
    }

    public static int blockJobSize() {
        return blockedJob.size();
    }

    public static int runningJobSize() {
        return runningJob.size();
    }

    public static List<String> blockJobs() {
        ArrayList keys = Lists.newArrayList();
        Enumeration<String> enumeration = blockedJob.keys();
        while (enumeration.hasMoreElements()) {
            keys.add(enumeration.nextElement());
        }
        return keys;
    }

    public static List<String> runningJobs() {
        ArrayList keys = Lists.newArrayList();
        Enumeration<String> enumeration = runningJob.keys();
        while (enumeration.hasMoreElements()) {
            keys.add(enumeration.nextElement());
        }
        return keys;
    }
}

