package com.kuaike.wework.msg.common.utils.exec;

import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/kuaike/wework/msg/common/utils/exec/TaskJobExecutor.class */
public class TaskJobExecutor {
    private static final Logger log = LoggerFactory.getLogger(TaskJobExecutor.class);
    private static int availableCore = Runtime.getRuntime().availableProcessors();
    private static int blockSize = 2048;
    public static final ExecutorService executorService = Executors.newWorkStealingPool(availableCore * 4);
    private static final ConcurrentHashMap<String, AbstractTaskJob> runningJob = new ConcurrentHashMap<>(1024);
    private static final ConcurrentHashMap<String, AbstractTaskJob> blockedJob = new ConcurrentHashMap<>(1024);
    private static final ArrayBlockingQueue<AbstractTaskJob> needRunningJob = Queues.newArrayBlockingQueue(2048);
    private static final ArrayBlockingQueue<AbstractTaskJob> finishedJob = Queues.newArrayBlockingQueue(1024);
    private static volatile AtomicBoolean run = new AtomicBoolean(true);
    static final Thread mainDispatcher = new Thread(() -> {
        Object obj = null;
        int i = 0;
        while (run.get()) {
            try {
                AbstractTaskJob take = needRunningJob.take();
                if (runningJob.containsKey(take.getWechatId())) {
                    needRunningJob.put(take);
                    if (take.equals(obj)) {
                        i++;
                        if (i >= 2) {
                            Thread.sleep(200L);
                        }
                    } else {
                        obj = take;
                        i = 0;
                    }
                } else {
                    runningJob.put(take.getWechatId(), take);
                    take.subJobList().stream().forEach(taskJobRun -> {
                        executorService.submit(taskJobRun);
                    });
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "job任务分发器");
    static final Thread backgroudScaner = new Thread(() -> {
        while (run.get()) {
            try {
                AbstractTaskJob take = finishedJob.take();
                runningJob.remove(take.getWechatId());
                blockedJob.remove(take.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, "任务完成调度器");

    public static synchronized void addTaskJob(AbstractTaskJob abstractTaskJob) {
        if (abstractTaskJob == null) {
            return;
        }
        if (abstractTaskJob.totalSubJob.get() != abstractTaskJob.jobRunnableList.size()) {
            log.error("error taskJob={}", abstractTaskJob);
            return;
        }
        if (blockedJob.containsKey(abstractTaskJob.getId())) {
            log.warn("task have been add to pool,id={},taskId={},wechatId={},", new Object[]{abstractTaskJob.getId(), abstractTaskJob.getTaskId(), abstractTaskJob.getWechatId()});
            return;
        }
        blockedJob.put(abstractTaskJob.getId(), abstractTaskJob);
        try {
            needRunningJob.put(abstractTaskJob);
        } catch (InterruptedException e) {
            log.error("push fail ", e);
        }
        log.info("put taskJob id={}", abstractTaskJob.getId());
    }

    @PostConstruct
    public void start() {
        mainDispatcher.setPriority(10);
        backgroudScaner.setPriority(10);
        mainDispatcher.start();
        log.info("mainDispatcher start");
        backgroudScaner.start();
        log.info("backgroudScaner start");
    }

    public static synchronized void addFinishedJob(AbstractTaskJob abstractTaskJob) {
        try {
            finishedJob.put(abstractTaskJob);
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("put to finished queue fail", e);
        }
    }

    @PreDestroy
    public void close() {
        run.compareAndSet(true, false);
    }

    public static int blockJobSize() {
        return blockedJob.size();
    }

    public static int runningJobSize() {
        return runningJob.size();
    }

    public static List<String> blockJobs() {
        ArrayList newArrayList = Lists.newArrayList();
        Enumeration<String> keys = blockedJob.keys();
        while (keys.hasMoreElements()) {
            newArrayList.add(keys.nextElement());
        }
        return newArrayList;
    }

    public static List<String> runningJobs() {
        ArrayList newArrayList = Lists.newArrayList();
        Enumeration<String> keys = runningJob.keys();
        while (keys.hasMoreElements()) {
            newArrayList.add(keys.nextElement());
        }
        return newArrayList;
    }
}
