package com.kuaike.scrm.chat.service;

import cn.kinyun.wework.sdk.entity.chat.ChatMsg;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.kuaike.scrm.common.utils.JacksonUtils;
import com.kuaike.scrm.dal.qualityCheck.mapper.QualityCheckLogMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.BlockingQueueConsumer;
import org.apache.curator.framework.recipes.queue.DistributedDelayQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;

/* loaded from: input_file:com/kuaike/scrm/chat/service/ChatMsgDelayQueueManager.class */
public class ChatMsgDelayQueueManager {

    @Autowired
    private CuratorFramework client;
    private DistributedDelayQueue<ChatMsg> queue;
    private BlockingQueueConsumer<ChatMsg> blockingQueueConsumer;
    private Thread consumerThread;
    private AtomicBoolean consumerThreadRun = new AtomicBoolean(true);

    @Autowired
    private QualityCheckLogMapper qualityCheckLogMapper;

    @Autowired
    private StringRedisTemplate redisTemplate;
    private static final Logger log = LoggerFactory.getLogger(ChatMsgDelayQueueManager.class);
    private static String path = "/queue/scrm/0001";

    @PostConstruct
    public void init() {
        this.blockingQueueConsumer = new BlockingQueueConsumer<>(new ConnectionStateListener() { // from class: com.kuaike.scrm.chat.service.ChatMsgDelayQueueManager.1
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                ChatMsgDelayQueueManager.log.info("stage change, stage={}", connectionState);
            }
        });
        this.queue = QueueBuilder.builder(this.client, this.blockingQueueConsumer, new QueueSerializer<ChatMsg>() { // from class: com.kuaike.scrm.chat.service.ChatMsgDelayQueueManager.2
            public byte[] serialize(ChatMsg chatMsg) {
                try {
                    return JacksonUtils.getInstance().writeValueAsString(chatMsg).getBytes(StandardCharsets.UTF_8);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                    return null;
                }
            }

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public ChatMsg m3deserialize(byte[] bArr) {
                try {
                    return (ChatMsg) JacksonUtils.getInstance().readValue(new String(bArr, StandardCharsets.UTF_8), ChatMsg.class);
                } catch (IOException e) {
                    e.printStackTrace();
                    return null;
                }
            }
        }, path).buildDelayQueue();
        try {
            this.queue.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
        this.consumerThread = new Thread(() -> {
            while (this.consumerThreadRun.get()) {
                try {
                    dealRecallMsg((ChatMsg) this.blockingQueueConsumer.take());
                } catch (InterruptedException e2) {
                    e2.printStackTrace();
                    Thread.currentThread().interrupt();
                }
            }
        });
        this.consumerThread.start();
        log.info("chatMsg delay queue start");
    }

    public void dealRecallMsg(ChatMsg chatMsg) {
        String preMsgId = chatMsg.getRevoke().getPreMsgId();
        log.info("dealRecallMsg msgId={},preMsgId={}", chatMsg.getMsgId(), preMsgId);
        List queryByMsgId = this.qualityCheckLogMapper.queryByMsgId(preMsgId);
        if (CollectionUtils.isNotEmpty(queryByMsgId)) {
            List list = (List) queryByMsgId.stream().filter(qualityCheckLog -> {
                return qualityCheckLog.getIsRevoke().intValue() == 0;
            }).map(qualityCheckLog2 -> {
                return qualityCheckLog2.getId();
            }).collect(Collectors.toList());
            if (!CollectionUtils.isNotEmpty(list)) {
                log.info("msg have been recall, exit. msgId={},preMsgId={}", chatMsg.getMsgId(), preMsgId);
                return;
            } else {
                this.qualityCheckLogMapper.updateRevoke(list);
                log.info("recall msg success msgId={},preMsgId={}", chatMsg.getMsgId(), preMsgId);
                return;
            }
        }
        Long increment = this.redisTemplate.opsForValue().increment(preMsgId);
        this.redisTemplate.expire(preMsgId, 120L, TimeUnit.SECONDS);
        if (increment.longValue() > 3) {
            log.info("重试3次后未找到需要撤回的消息,msgId={},revokeMsgId={}, exit", chatMsg.getMsgId(), preMsgId);
        } else {
            put(chatMsg);
            log.info("recall msg put into delayQueue, msgId={}, preMsgId={},第{}次", new Object[]{chatMsg.getMsgId(), preMsgId, increment});
        }
    }

    @PreDestroy
    public void close() {
        try {
            this.queue.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.consumerThreadRun.getAndSet(false);
    }

    public void put(ChatMsg chatMsg) {
        try {
            this.queue.put(chatMsg, System.currentTimeMillis() + 5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
