/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.common.component;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.kuaike.scrm.common.component.KafkaAdminService;
import com.kuaike.scrm.common.component.LockServiceSupport;
import com.kuaike.scrm.common.dto.DynamicEvent;
import com.kuaike.scrm.common.enums.YnEnum;
import com.kuaike.scrm.dal.kafkaadmin.entity.KafkaAdminDynamicTopic;
import com.kuaike.scrm.dal.kafkaadmin.mapper.KafkaAdminDynamicTopicMapper;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaAdminServiceImpl
implements KafkaAdminService {
    private static final Logger log = LoggerFactory.getLogger(KafkaAdminServiceImpl.class);
    @Value(value="${dynamic.topic.temp.white.list:}")
    private String dynamicTopicTempWhiteList;
    @Value(value="${kafka.create.topic.event}")
    private String createTopicEvent;
    @Value(value="${kafka.dynamic.event.least.days:1}")
    private Integer dynamicEventLeastDays;
    @Value(value="${check.available.instance.groupId:}")
    private String groupId;
    private static final String CALLBACK_LOCK_PREFIX = "CALLBACK_";
    private static final String CONSUMER_INSTANCE_GROUP_PREFIX = "consumer_instance_group_prefix_";
    private static final Long DAYS_CONVERT_MILLSECOND = 86400000L;
    @Autowired
    private AdminClient adminClient;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    @Resource
    private KafkaAdminDynamicTopicMapper dynamicTopicMapper;
    @Resource
    private LockServiceSupport lockService;

    private List<String> getDynamicTopicWhiteList() {
        if (StringUtils.isBlank((CharSequence)this.dynamicTopicTempWhiteList)) {
            return Lists.newArrayList((Object[])new String[]{"dev_scrm_qyapi_callback"});
        }
        String[] split = this.dynamicTopicTempWhiteList.split(",");
        return Arrays.asList(split);
    }

    @Override
    public boolean checkCurrentTopicHasConsumer(String groupId, String topic) {
        String redisKey = CONSUMER_INSTANCE_GROUP_PREFIX + groupId;
        HashOperations operations = this.redisTemplate.opsForHash();
        String clientId = (String)operations.get((Object)redisKey, (Object)topic);
        if (StringUtils.isBlank((CharSequence)clientId)) {
            this.consumerGroupSummary(groupId);
            clientId = (String)operations.get((Object)redisKey, (Object)topic);
        }
        log.info("\u67e5\u770b\u5f53\u524d key:{} ; topic:{} \u6d88\u606f\u662f\u5426\u6709\u6d88\u8d39\u8005\u5b9e\u4f8b\uff1a{}", new Object[]{redisKey, topic, clientId});
        return !StringUtils.isBlank((CharSequence)clientId);
    }

    @Override
    public void consumerGroupSummary(String groupId) {
        DescribeConsumerGroupsResult futureResult = this.adminClient.describeConsumerGroups((Collection)Lists.newArrayList((Object[])new String[]{groupId}));
        HashOperations operations = this.redisTemplate.opsForHash();
        String redisKey = CONSUMER_INSTANCE_GROUP_PREFIX + groupId;
        Map consumerGroupResult = null;
        try {
            consumerGroupResult = (Map)futureResult.all().get();
        }
        catch (InterruptedException | ExecutionException exception) {
            // empty catch block
        }
        if (MapUtils.isEmpty(consumerGroupResult)) {
            return;
        }
        ConsumerGroupDescription consumerGroupDesc = (ConsumerGroupDescription)consumerGroupResult.get(groupId);
        Collection members = consumerGroupDesc.members();
        HashMap map = Maps.newHashMap();
        for (MemberDescription member : members) {
            Optional<String> topicOpt;
            Set topicPartitions;
            MemberAssignment assignment = member.assignment();
            if (Objects.isNull(assignment) || CollectionUtils.isEmpty((Collection)(topicPartitions = assignment.topicPartitions())) || !(topicOpt = topicPartitions.stream().map(TopicPartition::topic).findFirst()).isPresent()) continue;
            map.put(topicOpt.get(), member.consumerId());
        }
        if (MapUtils.isNotEmpty((Map)map)) {
            log.info("\u7f13\u5b58\u6d88\u8d39\u8005\u4fe1\u606f\uff1agroupId{} topicInfo:{}", (Object)groupId, (Object)JSON.toJSONString((Object)map));
            operations.putAll((Object)redisKey, (Map)map);
        }
    }

    @Override
    public Set<String> listTopics() {
        TreeSet<String> names = new TreeSet<String>();
        ListTopicsResult list = this.adminClient.listTopics();
        KafkaFuture future = list.names();
        try {
            while (!future.isDone()) {
                Thread.sleep(100L);
            }
            names.addAll((Collection)future.get());
        }
        catch (InterruptedException | ExecutionException exception) {
            // empty catch block
        }
        return names;
    }

    @Override
    public String createAndSave(String corpId, String topic) {
        if (StringUtils.isBlank((CharSequence)corpId)) {
            return topic;
        }
        try {
            return this.createAndSave(corpId, topic, 3, 1);
        }
        catch (Exception e) {
            log.error("\u52a8\u6001\u521b\u5efatopic \u5f02\u5e38,errorMsg:errorMsg:{}", (Object)e.getMessage());
            return topic;
        }
    }

    @Override
    public boolean isExist(String topic) {
        DescribeTopicsResult result = this.adminClient.describeTopics(Collections.singleton(topic));
        KafkaFuture future = result.all();
        try {
            while (!future.isDone()) {
                Thread.sleep(100L);
            }
            Map topicDescriptionMap = (Map)future.get();
            return true;
        }
        catch (InterruptedException | ExecutionException exception) {
            return false;
        }
    }

    @Override
    public List<KafkaAdminDynamicTopic> listAvailableDynamicTopic() {
        List dynamicTopics = this.dynamicTopicMapper.listAvailableDynamicTopic(this.dynamicEventLeastDays);
        if (CollectionUtils.isEmpty((Collection)dynamicTopics)) {
            log.warn("\u672a\u627e\u5230\u6700\u8fd1\u51e0\u5929\u6d3b\u8dc3d\u7684\u5546\u6237");
        }
        return dynamicTopics;
    }

    public String createAndSave(String corpId, String topicTemp, Integer partitionNums, Integer replicaNum) {
        String topic = this.getDynamicTopic(corpId, topicTemp);
        List<String> topicWhiteList = this.getDynamicTopicWhiteList();
        log.info("createAndSave param corpId: {}; topicTemp:{}; partitionNums:{}; replicaNum: {}; topicWhiteList:{};", new Object[]{corpId, topicTemp, partitionNums, replicaNum, JSON.toJSON(topicWhiteList)});
        if (CollectionUtils.isEmpty(topicWhiteList) || !topicWhiteList.contains(topicTemp)) {
            return topicTemp;
        }
        KafkaAdminDynamicTopic dynamicTopic = this.queryCreated(corpId, topicTemp);
        if (Objects.nonNull(dynamicTopic)) {
            return this.update(dynamicTopic, topicTemp);
        }
        return this.insert(topic, topicTemp, partitionNums, replicaNum);
    }

    private String getDynamicTopic(String corpId, String topicTemplate) {
        return String.format("%s_%s", topicTemplate, corpId);
    }

    private String update(KafkaAdminDynamicTopic dynamicTopic, String templateTopic) {
        if (Objects.isNull(dynamicTopic)) {
            log.error("\u66f4\u65b0\u6570\u636e\u4e3a\u7a7a");
            return templateTopic;
        }
        Date updateTime = dynamicTopic.getUpdateTime();
        Date currentTime = new Date();
        KafkaAdminDynamicTopic update = new KafkaAdminDynamicTopic();
        update.setId(dynamicTopic.getId());
        update.setUpdateTime(currentTime);
        this.dynamicTopicMapper.updateByPrimaryKeySelective((Object)update);
        this.sendDynamicMsg(dynamicTopic);
        return dynamicTopic.getTopic();
    }

    private String insert(String topic, String topicTemp, Integer partitionNums, Integer replicaNum) {
        log.info("topic:{} ; topicTemplate: {} partitionNums:{}; replicaNums:{}", new Object[]{topic, topicTemp, partitionNums, replicaNum});
        boolean status = this.createTopic(topic, partitionNums, replicaNum.shortValue());
        KafkaAdminDynamicTopic dynamicTopic = this.saveEntity(topicTemp, topic, partitionNums, replicaNum, status);
        if (!status) {
            return topicTemp;
        }
        this.sendDynamicMsg(dynamicTopic);
        return topic;
    }

    private void sendDynamicMsg(KafkaAdminDynamicTopic dynamicTopic) {
        DynamicEvent value = new DynamicEvent();
        value.setTopic(dynamicTopic.getTopicTemp());
        value.setDynamicTopic(dynamicTopic.getTopic());
        value.setDynamicTopicId(dynamicTopic.getId());
        String msgText = JSON.toJSONString((Object)value);
        log.info("\u4fdd\u5b58\u6570\u636e\u6210\u529f,\u53d1\u9001\u52a8\u6001\u521b\u5efatopic \u4e8b\u4ef6: {}", (Object)msgText);
        this.kafkaTemplate.send(this.createTopicEvent, (Object)msgText);
    }

    public KafkaAdminDynamicTopic queryCreated(String coprId, String topicTemplate) {
        String topic = this.getDynamicTopic(coprId, topicTemplate);
        KafkaAdminDynamicTopic dynamicTopic = this.dynamicTopicMapper.selectByTopic(topic);
        log.info("\u5f53\u524d\u52a8\u6001topic \u5df2\u5b58\u5728\uff1atopic:{}", (Object)topic);
        return dynamicTopic;
    }

    private boolean createTopic(String topic, int numPartitions, short replicationFactor) {
        log.info("create topic with name={}, numPartitions={}, replicationFactor={}", new Object[]{topic, numPartitions, replicationFactor});
        NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor);
        CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Collections.singleton(newTopic));
        try {
            Map map = createTopicsResult.values();
            KafkaFuture future = (KafkaFuture)map.get(topic);
            while (!future.isDone()) {
                Thread.sleep(100L);
            }
            future.get();
            return true;
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("\u521b\u5efatopic message \u5f02\u5e38\uff1a", (Throwable)e);
            return false;
        }
    }

    private KafkaAdminDynamicTopic saveEntity(String topicTemp, String topic, Integer partitionNums, Integer replicaNum, boolean status) {
        KafkaAdminDynamicTopic dynamicTopic = this.buildEntity(topicTemp, topic, partitionNums, replicaNum, status);
        this.dynamicTopicMapper.insertOnDuplicate((List)Lists.newArrayList((Object[])new KafkaAdminDynamicTopic[]{dynamicTopic}));
        return dynamicTopic;
    }

    private KafkaAdminDynamicTopic buildEntity(String topicTemp, String topic, Integer partitionNums, Integer replicaNum, boolean status) {
        KafkaAdminDynamicTopic dynamicTopic = new KafkaAdminDynamicTopic();
        dynamicTopic.setTopicTemp(topicTemp);
        dynamicTopic.setTopic(topic);
        dynamicTopic.setCreateBy(Long.valueOf(-1L));
        dynamicTopic.setCreateTime(new Date());
        dynamicTopic.setUpdateBy(Long.valueOf(-1L));
        dynamicTopic.setUpdateTime(new Date());
        dynamicTopic.setConsumerCount(Integer.valueOf(0));
        dynamicTopic.setStatus(YnEnum.YES.getValue());
        dynamicTopic.setReplicaNum(replicaNum);
        dynamicTopic.setPartitionsNum(partitionNums);
        return dynamicTopic;
    }
}

