package cn.hangar.agpflow.engine.mq;

import cn.hangar.agp.platform.core.app.AppContext;
import cn.hangar.agp.platform.core.mq.CallBack;
import cn.hangar.agp.platform.core.mq.MQManager;
import cn.hangar.agpflow.engine.ServiceContext;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service;

@Service("defaultBussMQService")
/* loaded from: input_file:cn/hangar/agpflow/engine/mq/BussMQService.class */
public class BussMQService implements IBussMQService {
    MQManager mqManager;
    ConsumerPoolManager consumerPoolManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cn/hangar/agpflow/engine/mq/BussMQService$ConsumerPoolManager.class */
    public class ConsumerPoolManager {
        MQManager mqManager;
        private static final int CORE_POOL_SIZE = 4;
        private static final int MAX_POOL_SIZE = 1000;
        private static final int KEEP_ALIVE_TIME = 0;
        private static final int WORK_QUEUE_SIZE = 10000;
        final RejectedExecutionHandler handler;
        final ThreadPoolExecutor threadPool;
        ScheduledFuture<?> taskHandler;
        boolean start = false;
        final Runnable doThread = new Runnable() { // from class: cn.hangar.agpflow.engine.mq.BussMQService.ConsumerPoolManager.2
            @Override // java.lang.Runnable
            public void run() {
                runTopic(BussMQService.this.getTopic(PriorityType.High));
                runTopic(BussMQService.this.getTopic(PriorityType.Midd));
                runTopic(BussMQService.this.getTopic(PriorityType.Low));
            }

            private void runTopic(String str) {
                int maximumPoolSize = ConsumerPoolManager.this.threadPool.getMaximumPoolSize() - ConsumerPoolManager.this.threadPool.getActiveCount();
                while (!ConsumerPoolManager.this.mqManager.isEmpty(str).booleanValue() && maximumPoolSize > 0) {
                    for (Object obj : ConsumerPoolManager.this.mqManager.poll(str, 1000L)) {
                        if (obj instanceof MQMessageInfo) {
                            ConsumerPoolManager.this.threadPool.execute(new HandleThread((MQMessageInfo) obj, AppContext.Current()));
                            maximumPoolSize--;
                        } else {
                            System.out.println("未知消息类型：" + obj);
                        }
                    }
                }
            }
        };
        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:cn/hangar/agpflow/engine/mq/BussMQService$ConsumerPoolManager$HandleThread.class */
        public class HandleThread implements Runnable {
            private MQMessageInfo msg;
            private AppContext context;

            public HandleThread(MQMessageInfo mQMessageInfo, AppContext appContext) {
                this.msg = mQMessageInfo;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    AppContext.setCurrent(this.context);
                    MQMessageInfo mQMessageInfo = (MQMessageInfo) MQMessageInfo.class.cast(this.msg);
                    IMQHandle createMQHandle = MQHelper.createMQHandle(mQMessageInfo);
                    if (createMQHandle == null) {
                        System.out.println("未知操作：" + mQMessageInfo.getMessageId());
                        return;
                    }
                    createMQHandle.handle(this.msg.getArgs());
                    if (mQMessageInfo.getEndMessages() != null && !mQMessageInfo.getEndMessages().isEmpty()) {
                        IBussMQService iBussMQService = (IBussMQService) ServiceContext.findService(IBussMQService.class);
                        Iterator<MQMessageInfo> it = mQMessageInfo.getEndMessages().iterator();
                        while (it.hasNext()) {
                            iBussMQService.send(it.next());
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public ConsumerPoolManager(MQManager mQManager) {
            this.mqManager = mQManager;
            this.handler = new RejectedExecutionHandler() { // from class: cn.hangar.agpflow.engine.mq.BussMQService.ConsumerPoolManager.1
                @Override // java.util.concurrent.RejectedExecutionHandler
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                }
            };
            this.threadPool = new ThreadPoolExecutor(4, MAX_POOL_SIZE, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
        }

        public void run() {
            synchronized (this) {
                if (this.start) {
                    return;
                }
                this.start = true;
            }
        }

        public void stop() {
            synchronized (this) {
                this.start = false;
                this.scheduler.shutdown();
            }
        }
    }

    private MQManager getMQManager() {
        if (this.mqManager == null) {
            this.mqManager = (MQManager) ServiceContext.getContext().find(MQManager.class);
        }
        return this.mqManager;
    }

    @Override // cn.hangar.agpflow.engine.mq.IBussMQService
    public void init(Map map) {
        getMQManager().init(map);
    }

    @Override // cn.hangar.agpflow.engine.mq.IBussMQService
    public void send(MQMessageInfo mQMessageInfo) {
        getMQManager().send(getTopic(mQMessageInfo), mQMessageInfo);
        startConsumer();
    }

    private String getTopic(MQMessageInfo mQMessageInfo) {
        Integer priority = mQMessageInfo.getPriority();
        return priority == null ? "Priority0" : "Priority" + priority;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getTopic(PriorityType priorityType) {
        Integer valueOf = Integer.valueOf(priorityType.ordinal());
        return valueOf == null ? "Priority0" : "Priority" + valueOf;
    }

    @Override // cn.hangar.agpflow.engine.mq.IBussMQService
    public void send(MQMessageInfo mQMessageInfo, CallBack callBack) {
        getMQManager().send(getTopic(mQMessageInfo), mQMessageInfo, callBack);
        startConsumer();
    }

    public void startConsumer() {
        if (this.consumerPoolManager == null) {
            this.consumerPoolManager = new ConsumerPoolManager(getMQManager());
        }
        this.consumerPoolManager.run();
    }
}
