package com.baijia.tianxiao.sal.common.impl;

import com.baijia.tianxiao.sal.common.CustomType;
import com.baijia.tianxiao.sal.common.api.ConsultUserStudentSynchService;
import com.baijia.tianxiao.sal.common.api.abstracts.AbstractTaskData;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/baijia/tianxiao/sal/common/impl/ConsultUserStudentSynchServiceImpl.class */
public class ConsultUserStudentSynchServiceImpl implements ConsultUserStudentSynchService {
    private static final Logger log = LoggerFactory.getLogger(ConsultUserStudentSynchServiceImpl.class);
    public static final String TASK_OVER_SIGNAL = "[]fd4354rfjjj_Rezar_gfirewncjfladsf___Over_fdsfds";
    private static final int THREAD_SIZE = 80;
    private final ArrayBlockingQueue<String> taskQueues = new ArrayBlockingQueue<>(100);
    private final ConcurrentHashMap<String, AbstractTaskData> markCache = new ConcurrentHashMap<>(100, 0.2f);
    private ExecutorService executorService = Executors.newFixedThreadPool(81);
    private final Lock lock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/baijia/tianxiao/sal/common/impl/ConsultUserStudentSynchServiceImpl$SyncSameTask.class */
    public static class SyncSameTask implements Runnable {
        private BlockingQueue<String> taskQueues;
        private ConcurrentHashMap<String, AbstractTaskData> markCache;

        public SyncSameTask(BlockingQueue<String> blockingQueue, ConcurrentHashMap<String, AbstractTaskData> concurrentHashMap) {
            this.taskQueues = blockingQueue;
            this.markCache = concurrentHashMap;
        }

        @Override // java.lang.Runnable
        public void run() {
            ConsultUserStudentSynchServiceImpl.log.info("current is run {} ", Thread.currentThread().getName());
            while (!Thread.currentThread().isInterrupted()) {
                AbstractTaskData abstractTaskData = null;
                try {
                    try {
                        String take = this.taskQueues.take();
                        ConsultUserStudentSynchServiceImpl.log.info("take key : {} ", take);
                        if (take != null && take.equals(ConsultUserStudentSynchServiceImpl.TASK_OVER_SIGNAL)) {
                            ConsultUserStudentSynchServiceImpl.log.info("receiver a signal to stop current thread :{}  ", Thread.currentThread().getName());
                            if (0 != 0) {
                                abstractTaskData.setEnd(true);
                                return;
                            }
                            return;
                        }
                        AbstractTaskData abstractTaskData2 = this.markCache.get(take);
                        abstractTaskData2.run();
                        if (abstractTaskData2 != null) {
                            abstractTaskData2.setEnd(true);
                        }
                    } catch (InterruptedException e) {
                        ConsultUserStudentSynchServiceImpl.log.warn("interrupt while consume AbstractTaskData");
                        if (0 != 0) {
                            abstractTaskData.setEnd(true);
                            return;
                        }
                        return;
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        abstractTaskData.setEnd(true);
                    }
                    throw th;
                }
            }
        }
    }

    public ConsultUserStudentSynchServiceImpl() {
        initTaskExecutors();
    }

    private void initTaskExecutors() {
        for (int i = 0; i < THREAD_SIZE; i++) {
            this.executorService.execute(new SyncSameTask(this.taskQueues, this.markCache));
        }
        this.executorService.execute(new Runnable() { // from class: com.baijia.tianxiao.sal.common.impl.ConsultUserStudentSynchServiceImpl.1
            @Override // java.lang.Runnable
            public void run() {
                ConsultUserStudentSynchServiceImpl.log.info("clear threa is run ");
                while (!ConsultUserStudentSynchServiceImpl.this.executorService.isShutdown() && !Thread.currentThread().isInterrupted()) {
                    Enumeration keys = ConsultUserStudentSynchServiceImpl.this.markCache.keys();
                    while (keys.hasMoreElements()) {
                        String str = (String) keys.nextElement();
                        ConsultUserStudentSynchServiceImpl.this.lock.lock();
                        try {
                            try {
                                AbstractTaskData abstractTaskData = (AbstractTaskData) ConsultUserStudentSynchServiceImpl.this.markCache.get(str);
                                if (abstractTaskData.isEnd() && abstractTaskData.isExpired()) {
                                    ConsultUserStudentSynchServiceImpl.log.info("remove the execute over and expired taskData which mark is : {}  ", abstractTaskData.getMark());
                                    ConsultUserStudentSynchServiceImpl.this.markCache.remove(str);
                                } else {
                                    ConsultUserStudentSynchServiceImpl.log.info("the task which mark :{} not execute over and expired", abstractTaskData.getMark());
                                }
                                ConsultUserStudentSynchServiceImpl.this.lock.unlock();
                            } catch (Exception e) {
                                e.printStackTrace();
                                ConsultUserStudentSynchServiceImpl.this.lock.unlock();
                            }
                        } catch (Throwable th) {
                            ConsultUserStudentSynchServiceImpl.this.lock.unlock();
                            throw th;
                        }
                    }
                    try {
                        TimeUnit.SECONDS.sleep(20L);
                    } catch (InterruptedException e2) {
                        ConsultUserStudentSynchServiceImpl.log.debug("encount a InterruptedException while clear endTaskData ");
                    }
                }
            }
        });
    }

    @Override // com.baijia.tianxiao.sal.common.api.ConsultUserStudentSynchService
    public void syncConsultAndStudentDataProplem(Long l, String str, CustomType customType, AbstractTaskData abstractTaskData) {
        log.info("sync the same mobile between consultUser with student");
        if (abstractTaskData == null) {
            return;
        }
        abstractTaskData.setCustomId(l);
        abstractTaskData.setMobile(str);
        abstractTaskData.setCustomType(customType);
        abstractTaskData.setStartTime(System.currentTimeMillis());
        AbstractTaskData putIfAbsent = this.markCache.putIfAbsent(abstractTaskData.getMark(), abstractTaskData);
        log.info("putIfAbsent is : {} with customeId : {} ", putIfAbsent, l);
        if (putIfAbsent != null) {
            if (putIfAbsent == abstractTaskData) {
                log.info("same taskData is in queue already:{} ", abstractTaskData.getMark());
                return;
            }
            if (!putIfAbsent.isEnd()) {
                log.info("current task has add to queue and not execute over :{}", abstractTaskData.getMark());
                return;
            }
            if (!putIfAbsent.isExpired()) {
                log.info("current task has add to queue throught execute over but not expired :{}", abstractTaskData.getMark());
                return;
            }
            this.lock.lock();
            try {
                try {
                    log.info("current task has execute over and expired but not remove by clear thread , so will reset new task to execute:{}", abstractTaskData.getMark());
                    this.markCache.put(abstractTaskData.getMark(), abstractTaskData);
                    this.lock.unlock();
                } catch (Exception e) {
                    e.printStackTrace();
                    this.lock.unlock();
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
        try {
            this.taskQueues.add(abstractTaskData.getMark());
            log.info("add new task to execute : {} ", abstractTaskData.getMark());
        } catch (Exception e2) {
            log.info("can not insert new sync task caus by full task queue ,and will execute at next request time ", e2.getCause());
        }
    }

    @Override // com.baijia.tianxiao.sal.common.api.ConsultUserStudentSynchService
    public void resStart() {
        stopSyncTasks();
        initTaskExecutors();
    }

    @Override // com.baijia.tianxiao.sal.common.api.ConsultUserStudentSynchService
    public void stopSyncTasks() {
        log.info("shutdown executorService and clear queue");
        List<Runnable> shutdownNow = this.executorService.shutdownNow();
        this.taskQueues.clear();
        for (int i = 0; i < shutdownNow.size(); i++) {
            this.taskQueues.add(TASK_OVER_SIGNAL);
        }
        log.info("task executor shutdown over ");
    }
}
