/*
 * Decompiled with CFR 0.152.
 */
package cn.kinyun.customer.center.sal.biz.operator.executor;

import cn.kinyun.customer.center.sal.biz.operator.executor.OperatorNoticeDto;
import cn.kinyun.customer.center.sal.biz.operator.executor.OperatorResultService;
import com.google.common.collect.Maps;
import com.kuaike.common.utils.JacksonUtil;
import com.kuaike.scrm.common.dto.OperatorResult;
import com.kuaike.scrm.common.enums.OperatorResultStatus;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class OperatorTaskService {
    private static final Logger log = LoggerFactory.getLogger(OperatorTaskService.class);
    private Map<String, Date> requestIdLatestHeartbeatMap = Maps.newConcurrentMap();
    private ScheduledExecutorService taskHeartbeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private ExecutorService executorService = new ThreadPoolExecutor(8, 8, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
    @Autowired
    private OperatorResultService operatorResultService;
    @Value(value="${kafka.topic.operatorResultNotice}")
    private String operatorResultNoticeTopic;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostConstruct
    public void init() {
        log.info("init OperatorTaskService");
        this.taskHeartbeatScheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("requestIdLatestHeartbeatMap size={}", (Object)this.requestIdLatestHeartbeatMap.size());
            this.requestIdLatestHeartbeatMap.forEach((requestId, latestTime) -> {
                OperatorResult operatorResult = this.operatorResultService.queryByRequestId((String)requestId);
                if (Objects.isNull(operatorResult)) {
                    this.removeTaskHeartbeat((String)requestId);
                    log.info("operatorResult is not exist in es,remove from map,requestId={}", requestId);
                    return;
                }
                if (Objects.nonNull(operatorResult.getStatus()) && operatorResult.getStatus() >= 2) {
                    this.removeTaskHeartbeat((String)requestId);
                    log.info("operatorResult was finished,remove from map,requestId={}", requestId);
                    return;
                }
                Date newTime = new Date();
                this.operatorResultService.updateResult(operatorResult);
                this.requestIdLatestHeartbeatMap.put((String)requestId, newTime);
                log.info("operator heartbeat requestId={}, latestTime={}, newTime={}", latestTime, (Object)newTime);
            });
        }, 30L, 30L, TimeUnit.SECONDS);
    }

    public void addTaskHeartbeat(String requestId) {
        this.requestIdLatestHeartbeatMap.put(requestId, new Date());
        log.info("addTaskHeartbeat with requestId={}", (Object)requestId);
    }

    public void removeTaskHeartbeat(String requestId) {
        if (Objects.nonNull(requestId)) {
            this.requestIdLatestHeartbeatMap.remove(requestId);
        }
        log.info("removeTaskHeartbeat with requestId={}", (Object)requestId);
    }

    private void sendNotice(String requestId, Integer status, String failReason) {
        log.info("send operator status to kafka: requestId={},status={},failReason={}", new Object[]{requestId, status, failReason});
        OperatorNoticeDto operatorNoticeDto = new OperatorNoticeDto();
        operatorNoticeDto.setFailReason(failReason);
        operatorNoticeDto.setTime(new Date());
        operatorNoticeDto.setStatus(status);
        operatorNoticeDto.setRequestId(requestId);
        this.kafkaTemplate.send(this.operatorResultNoticeTopic, (Object)requestId, (Object)JacksonUtil.obj2Str((Object)operatorNoticeDto));
    }

    public void executeOperator(Long bizId, String requestId, Callable<OperatorResult> callable) {
        this.executorService.submit(() -> {
            OperatorResult result = null;
            try {
                this.addTaskHeartbeat(requestId);
                result = new OperatorResult();
                this.buildNewResult(bizId, requestId, result);
                this.operatorResultService.saveResult(result);
                OperatorResult calcResult = (OperatorResult)callable.call();
                result.setStatus(OperatorResultStatus.FINISH.getStatus());
                result.setList(calcResult.getList());
                result.setMemberType(calcResult.getMemberType());
                result.setCustomerType(calcResult.getCustomerType());
                this.operatorResultService.updateResult(result);
                this.sendNotice(requestId, OperatorResultStatus.FINISH.getStatus(), null);
            }
            catch (Exception e) {
                log.error("submit operator with error,requestId={} ", (Object)requestId, (Object)e);
                result.setStatus(OperatorResultStatus.FAIL.getStatus());
                result.setFailReason(e.getMessage());
                this.operatorResultService.updateResult(result);
                this.sendNotice(requestId, OperatorResultStatus.FAIL.getStatus(), e.getMessage());
            }
            finally {
                this.removeTaskHeartbeat(requestId);
            }
        });
    }

    private void buildNewResult(Long bizId, String requestId, OperatorResult result) {
        result.setStatus(OperatorResultStatus.WAITING.getStatus());
        result.setBizId(bizId);
        result.setCreateTime(new Date());
        result.setRequestId(requestId);
    }
}

