package cn.kinyun.customer.center.sal.biz.operator.executor;

import com.google.common.collect.Maps;
import com.kuaike.common.utils.JacksonUtil;
import com.kuaike.scrm.common.dto.OperatorResult;
import com.kuaike.scrm.common.enums.OperatorResultStatus;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/kinyun/customer/center/sal/biz/operator/executor/OperatorTaskService.class */
public class OperatorTaskService {
    private static final Logger log = LoggerFactory.getLogger(OperatorTaskService.class);
    private Map<String, Date> requestIdLatestHeartbeatMap = Maps.newConcurrentMap();
    private ScheduledExecutorService taskHeartbeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private ExecutorService executorService = new ThreadPoolExecutor(8, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(100));

    @Autowired
    private OperatorResultService operatorResultService;

    @Value("${kafka.topic.operatorResultNotice}")
    private String operatorResultNoticeTopic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostConstruct
    public void init() {
        log.info("init OperatorTaskService");
        this.taskHeartbeatScheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("requestIdLatestHeartbeatMap size={}", Integer.valueOf(this.requestIdLatestHeartbeatMap.size()));
            this.requestIdLatestHeartbeatMap.forEach((str, date) -> {
                OperatorResult queryByRequestId = this.operatorResultService.queryByRequestId(str);
                if (Objects.isNull(queryByRequestId)) {
                    removeTaskHeartbeat(str);
                    log.info("operatorResult is not exist in es,remove from map,requestId={}", str);
                } else if (Objects.nonNull(queryByRequestId.getStatus()) && queryByRequestId.getStatus().intValue() >= 2) {
                    removeTaskHeartbeat(str);
                    log.info("operatorResult was finished,remove from map,requestId={}", str);
                } else {
                    Date date = new Date();
                    this.operatorResultService.updateResult(queryByRequestId);
                    this.requestIdLatestHeartbeatMap.put(str, date);
                    log.info("operator heartbeat requestId={}, latestTime={}, newTime={}", date, date);
                }
            });
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    public void addTaskHeartbeat(String str) {
        this.requestIdLatestHeartbeatMap.put(str, new Date());
        log.info("addTaskHeartbeat with requestId={}", str);
    }

    public void removeTaskHeartbeat(String str) {
        if (Objects.nonNull(str)) {
            this.requestIdLatestHeartbeatMap.remove(str);
        }
        log.info("removeTaskHeartbeat with requestId={}", str);
    }

    private void sendNotice(String str, Integer num, String str2) {
        log.info("send operator status to kafka: requestId={},status={},failReason={}", new Object[]{str, num, str2});
        OperatorNoticeDto operatorNoticeDto = new OperatorNoticeDto();
        operatorNoticeDto.setFailReason(str2);
        operatorNoticeDto.setTime(new Date());
        operatorNoticeDto.setStatus(num);
        operatorNoticeDto.setRequestId(str);
        this.kafkaTemplate.send(this.operatorResultNoticeTopic, str, JacksonUtil.obj2Str(operatorNoticeDto));
    }

    public void executeOperator(Long l, String str, Callable<OperatorResult> callable) {
        this.executorService.submit(() -> {
            OperatorResult operatorResult = null;
            try {
                try {
                    addTaskHeartbeat(str);
                    operatorResult = new OperatorResult();
                    buildNewResult(l, str, operatorResult);
                    this.operatorResultService.saveResult(operatorResult);
                    OperatorResult operatorResult2 = (OperatorResult) callable.call();
                    operatorResult.setStatus(OperatorResultStatus.FINISH.getStatus());
                    operatorResult.setList(operatorResult2.getList());
                    operatorResult.setMemberType(operatorResult2.getMemberType());
                    operatorResult.setCustomerType(operatorResult2.getCustomerType());
                    this.operatorResultService.updateResult(operatorResult);
                    sendNotice(str, OperatorResultStatus.FINISH.getStatus(), null);
                    removeTaskHeartbeat(str);
                } catch (Exception e) {
                    log.error("submit operator with error,requestId={} ", str, e);
                    operatorResult.setStatus(OperatorResultStatus.FAIL.getStatus());
                    operatorResult.setFailReason(e.getMessage());
                    this.operatorResultService.updateResult(operatorResult);
                    sendNotice(str, OperatorResultStatus.FAIL.getStatus(), e.getMessage());
                    removeTaskHeartbeat(str);
                }
            } catch (Throwable th) {
                removeTaskHeartbeat(str);
                throw th;
            }
        });
    }

    private void buildNewResult(Long l, String str, OperatorResult operatorResult) {
        operatorResult.setStatus(OperatorResultStatus.WAITING.getStatus());
        operatorResult.setBizId(l);
        operatorResult.setCreateTime(new Date());
        operatorResult.setRequestId(str);
    }
}
