package com.kuaike.scrm.common.component;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.kuaike.scrm.common.dto.DynamicEvent;
import com.kuaike.scrm.common.enums.YnEnum;
import com.kuaike.scrm.dal.kafkaadmin.entity.KafkaAdminDynamicTopic;
import com.kuaike.scrm.dal.kafkaadmin.mapper.KafkaAdminDynamicTopicMapper;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/kuaike/scrm/common/component/KafkaAdminServiceImpl.class */
public class KafkaAdminServiceImpl implements KafkaAdminService {

    @Value("${dynamic.topic.temp.white.list:}")
    private String dynamicTopicTempWhiteList;

    @Value("${kafka.create.topic.event}")
    private String createTopicEvent;

    @Value("${kafka.dynamic.event.least.days:1}")
    private Integer dynamicEventLeastDays;

    @Value("${check.available.instance.groupId:}")
    private String groupId;
    private static final String CALLBACK_LOCK_PREFIX = "CALLBACK_";
    private static final String CONSUMER_INSTANCE_GROUP_PREFIX = "consumer_instance_group_prefix_";

    @Autowired
    private AdminClient adminClient;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Resource
    private KafkaAdminDynamicTopicMapper dynamicTopicMapper;

    @Resource
    private LockServiceSupport lockService;
    private static final Logger log = LoggerFactory.getLogger(KafkaAdminServiceImpl.class);
    private static final Long DAYS_CONVERT_MILLSECOND = 86400000L;

    private List<String> getDynamicTopicWhiteList() {
        return StringUtils.isBlank(this.dynamicTopicTempWhiteList) ? Lists.newArrayList(new String[]{"dev_scrm_qyapi_callback"}) : Arrays.asList(this.dynamicTopicTempWhiteList.split(","));
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public boolean checkCurrentTopicHasConsumer(String str, String str2) {
        String str3 = CONSUMER_INSTANCE_GROUP_PREFIX + str;
        HashOperations opsForHash = this.redisTemplate.opsForHash();
        String str4 = (String) opsForHash.get(str3, str2);
        if (StringUtils.isBlank(str4)) {
            consumerGroupSummary(str);
            str4 = (String) opsForHash.get(str3, str2);
        }
        log.info("查看当前 key:{} ; topic:{} 消息是否有消费者实例：{}", new Object[]{str3, str2, str4});
        return !StringUtils.isBlank(str4);
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public void consumerGroupSummary(String str) {
        DescribeConsumerGroupsResult describeConsumerGroups = this.adminClient.describeConsumerGroups(Lists.newArrayList(new String[]{str}));
        HashOperations opsForHash = this.redisTemplate.opsForHash();
        String str2 = CONSUMER_INSTANCE_GROUP_PREFIX + str;
        Map map = null;
        try {
            map = (Map) describeConsumerGroups.all().get();
        } catch (InterruptedException | ExecutionException e) {
        }
        if (MapUtils.isEmpty(map)) {
            return;
        }
        Collection<MemberDescription> members = ((ConsumerGroupDescription) map.get(str)).members();
        HashMap newHashMap = Maps.newHashMap();
        for (MemberDescription memberDescription : members) {
            MemberAssignment assignment = memberDescription.assignment();
            if (!Objects.isNull(assignment)) {
                Set set = assignment.topicPartitions();
                if (!CollectionUtils.isEmpty(set)) {
                    Optional findFirst = set.stream().map((v0) -> {
                        return v0.topic();
                    }).findFirst();
                    if (findFirst.isPresent()) {
                        newHashMap.put(findFirst.get(), memberDescription.consumerId());
                    }
                }
            }
        }
        if (MapUtils.isNotEmpty(newHashMap)) {
            log.info("缓存消费者信息：groupId{} topicInfo:{}", str, JSON.toJSONString(newHashMap));
            opsForHash.putAll(str2, newHashMap);
        }
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public Set<String> listTopics() {
        TreeSet treeSet = new TreeSet();
        KafkaFuture names = this.adminClient.listTopics().names();
        while (!names.isDone()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException | ExecutionException e) {
            }
        }
        treeSet.addAll((Collection) names.get());
        return treeSet;
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public String createAndSave(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            return str2;
        }
        try {
            return createAndSave(str, str2, 3, 1);
        } catch (Exception e) {
            log.error("动态创建topic 异常,errorMsg:errorMsg:{}", e.getMessage());
            return str2;
        }
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public boolean isExist(String str) {
        KafkaFuture all = this.adminClient.describeTopics(Collections.singleton(str)).all();
        while (!all.isDone()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException | ExecutionException e) {
                return false;
            }
        }
        return true;
    }

    @Override // com.kuaike.scrm.common.component.KafkaAdminService
    public List<KafkaAdminDynamicTopic> listAvailableDynamicTopic() {
        List<KafkaAdminDynamicTopic> listAvailableDynamicTopic = this.dynamicTopicMapper.listAvailableDynamicTopic(this.dynamicEventLeastDays);
        if (CollectionUtils.isEmpty(listAvailableDynamicTopic)) {
            log.warn("未找到最近几天活跃d的商户");
        }
        return listAvailableDynamicTopic;
    }

    public String createAndSave(String str, String str2, Integer num, Integer num2) {
        String dynamicTopic = getDynamicTopic(str, str2);
        List<String> dynamicTopicWhiteList = getDynamicTopicWhiteList();
        log.info("createAndSave param corpId: {}; topicTemp:{}; partitionNums:{}; replicaNum: {}; topicWhiteList:{};", new Object[]{str, str2, num, num2, JSON.toJSON(dynamicTopicWhiteList)});
        if (CollectionUtils.isEmpty(dynamicTopicWhiteList) || !dynamicTopicWhiteList.contains(str2)) {
            return str2;
        }
        KafkaAdminDynamicTopic queryCreated = queryCreated(str, str2);
        return Objects.nonNull(queryCreated) ? update(queryCreated, str2) : insert(dynamicTopic, str2, num, num2);
    }

    private String getDynamicTopic(String str, String str2) {
        return String.format("%s_%s", str2, str);
    }

    private String update(KafkaAdminDynamicTopic kafkaAdminDynamicTopic, String str) {
        if (Objects.isNull(kafkaAdminDynamicTopic)) {
            log.error("更新数据为空");
            return str;
        }
        kafkaAdminDynamicTopic.getUpdateTime();
        Date date = new Date();
        KafkaAdminDynamicTopic kafkaAdminDynamicTopic2 = new KafkaAdminDynamicTopic();
        kafkaAdminDynamicTopic2.setId(kafkaAdminDynamicTopic.getId());
        kafkaAdminDynamicTopic2.setUpdateTime(date);
        this.dynamicTopicMapper.updateByPrimaryKeySelective(kafkaAdminDynamicTopic2);
        sendDynamicMsg(kafkaAdminDynamicTopic);
        return kafkaAdminDynamicTopic.getTopic();
    }

    private String insert(String str, String str2, Integer num, Integer num2) {
        log.info("topic:{} ; topicTemplate: {} partitionNums:{}; replicaNums:{}", new Object[]{str, str2, num, num2});
        boolean createTopic = createTopic(str, num.intValue(), num2.shortValue());
        KafkaAdminDynamicTopic saveEntity = saveEntity(str2, str, num, num2, createTopic);
        if (!createTopic) {
            return str2;
        }
        sendDynamicMsg(saveEntity);
        return str;
    }

    private void sendDynamicMsg(KafkaAdminDynamicTopic kafkaAdminDynamicTopic) {
        DynamicEvent dynamicEvent = new DynamicEvent();
        dynamicEvent.setTopic(kafkaAdminDynamicTopic.getTopicTemp());
        dynamicEvent.setDynamicTopic(kafkaAdminDynamicTopic.getTopic());
        dynamicEvent.setDynamicTopicId(kafkaAdminDynamicTopic.getId());
        String jSONString = JSON.toJSONString(dynamicEvent);
        log.info("保存数据成功,发送动态创建topic 事件: {}", jSONString);
        this.kafkaTemplate.send(this.createTopicEvent, jSONString);
    }

    public KafkaAdminDynamicTopic queryCreated(String str, String str2) {
        String dynamicTopic = getDynamicTopic(str, str2);
        KafkaAdminDynamicTopic selectByTopic = this.dynamicTopicMapper.selectByTopic(dynamicTopic);
        log.info("当前动态topic 已存在：topic:{}", dynamicTopic);
        return selectByTopic;
    }

    private boolean createTopic(String str, int i, short s) {
        log.info("create topic with name={}, numPartitions={}, replicationFactor={}", new Object[]{str, Integer.valueOf(i), Short.valueOf(s)});
        try {
            KafkaFuture kafkaFuture = (KafkaFuture) this.adminClient.createTopics(Collections.singleton(new NewTopic(str, i, s))).values().get(str);
            while (!kafkaFuture.isDone()) {
                Thread.sleep(100L);
            }
            kafkaFuture.get();
            return true;
        } catch (InterruptedException | ExecutionException e) {
            log.error("创建topic message 异常：", e);
            return false;
        }
    }

    private KafkaAdminDynamicTopic saveEntity(String str, String str2, Integer num, Integer num2, boolean z) {
        KafkaAdminDynamicTopic buildEntity = buildEntity(str, str2, num, num2, z);
        this.dynamicTopicMapper.insertOnDuplicate(Lists.newArrayList(new KafkaAdminDynamicTopic[]{buildEntity}));
        return buildEntity;
    }

    private KafkaAdminDynamicTopic buildEntity(String str, String str2, Integer num, Integer num2, boolean z) {
        KafkaAdminDynamicTopic kafkaAdminDynamicTopic = new KafkaAdminDynamicTopic();
        kafkaAdminDynamicTopic.setTopicTemp(str);
        kafkaAdminDynamicTopic.setTopic(str2);
        kafkaAdminDynamicTopic.setCreateBy(-1L);
        kafkaAdminDynamicTopic.setCreateTime(new Date());
        kafkaAdminDynamicTopic.setUpdateBy(-1L);
        kafkaAdminDynamicTopic.setUpdateTime(new Date());
        kafkaAdminDynamicTopic.setConsumerCount(0);
        kafkaAdminDynamicTopic.setStatus(YnEnum.YES.getValue());
        kafkaAdminDynamicTopic.setReplicaNum(num2);
        kafkaAdminDynamicTopic.setPartitionsNum(num);
        return kafkaAdminDynamicTopic;
    }
}
