/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.common.utils;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public final class KafkaClientUtils {
    private static final Logger log = LoggerFactory.getLogger(KafkaClientUtils.class);
    private KafkaTemplate<String, String> kafkaTemplate;
    private ProducerListener<String, String> producerListener;
    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Autowired
    public KafkaClientUtils(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
        this.kafkaTemplate.setProducerListener((ProducerListener)new ProducerListener<String, String>(){

            public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
                if (KafkaClientUtils.this.producerListener != null) {
                    KafkaClientUtils.this.producerListener.onSuccess(topic, partition, (Object)key, (Object)value, recordMetadata);
                }
            }

            public void onError(String topic, Integer partition, String key, String value, Exception exception) {
                if (KafkaClientUtils.this.producerListener != null) {
                    KafkaClientUtils.this.producerListener.onError(topic, partition, (Object)key, (Object)value, exception);
                }
                log.error("Kafka send message on error topic: {}, partition: {}, key: {}, exception: {}", new Object[]{topic, partition, key, exception});
            }

            public boolean isInterestedInSuccess() {
                return true;
            }
        });
    }

    public void setProducerListener(ProducerListener<String, String> producerListener) {
        if (producerListener == null) {
            throw new IllegalArgumentException("ProducerListener must not be null");
        }
        this.producerListener = producerListener;
    }

    public ListenableFuture<SendResult<String, String>> sendMessage(String topic, String key, String data) {
        try {
            return this.kafkaTemplate.send(topic, (Object)key, (Object)data);
        }
        catch (Exception e) {
            log.error("sendMessage exception, e:{}, topic:{}  ", (Object)e, (Object)topic);
            throw e;
        }
    }

    public ListenableFuture<SendResult<String, String>> sendMessage(String topic, String data) {
        try {
            return this.kafkaTemplate.send(topic, (Object)data);
        }
        catch (Exception e) {
            log.error("sendMessage exception, e:{}, topic:{}  ", (Object)e, (Object)topic);
            throw e;
        }
    }

    public Consumer<String, String> buildConsumer() {
        return this.consumerFactory.createConsumer();
    }

    public Consumer<String, String> buildConsumer(String groupId, String clientIdSuffix) {
        return this.consumerFactory.createConsumer(groupId, clientIdSuffix);
    }

    public long getLag(String topic) {
        Consumer<String, String> consumer = this.buildConsumer();
        List partitionInfos = consumer.partitionsFor(topic);
        List topicPartitions = partitionInfos.stream().map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
        HashMap consumerCommitted = Maps.newHashMap();
        for (TopicPartition topicPartition : topicPartitions) {
            OffsetAndMetadata committed = consumer.committed(topicPartition);
            long committedOffset = committed.offset();
            consumerCommitted.put(topicPartition, committedOffset);
        }
        Map topicPartitionLongMap = consumer.endOffsets(topicPartitions);
        Long lag = 0L;
        for (TopicPartition partition : topicPartitionLongMap.keySet()) {
            Long committedOffset = consumerCommitted.getOrDefault(partition, 0L);
            Long endOffset = topicPartitionLongMap.getOrDefault(partition, 0L);
            if (committedOffset == 0L || endOffset == 0L) continue;
            lag = lag + endOffset - committedOffset;
        }
        return lag;
    }
}

