package com.kuaike.kafka.consumer;

import com.kuaike.kafka.exception.KafkaClientException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kuaike/kafka/consumer/ConsumerClient.class */
public class ConsumerClient<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerClient.class);
    KafkaConsumer<K, V> consumer;
    Properties props;

    public ConsumerClient(String str, String str2) {
        this.props = new Properties();
        if (str == null || str.trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        this.props.put("bootstrap.servers", str);
        if (str2 == null || str2.trim().isEmpty()) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props.put("group.id", str2);
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(this.props);
    }

    public ConsumerClient(Properties properties) {
        if (properties.getProperty("bootstrap.servers") == null || properties.getProperty("bootstrap.servers").trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        if (properties.getProperty("group.id") == null || properties.getProperty("group.id").trim().length() == 0) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props = properties;
        this.consumer = new KafkaConsumer<>(properties);
    }

    public void assign(Collection<TopicPartition> collection) {
        this.consumer.assign(collection);
    }

    public void consume(String str, ConsumerRecordHandler consumerRecordHandler) {
        if (str != null) {
            try {
                if (str.trim().length() != 0) {
                    try {
                        this.consumer.subscribe(Collections.singletonList(str));
                        if (consumerRecordHandler == null) {
                            throw new KafkaClientException("must provide a  ConsumerRecord handler ");
                        }
                        while (true) {
                            Iterator it = this.consumer.poll(100L).iterator();
                            while (it.hasNext()) {
                                consumerRecordHandler.handle((ConsumerRecord) it.next());
                            }
                        }
                    } catch (Exception e) {
                        log.error("Unexpected error", e);
                        this.consumer.close();
                        return;
                    }
                }
            } catch (Throwable th) {
                this.consumer.close();
                throw th;
            }
        }
        throw new KafkaClientException("topic must not be empty");
    }

    public void close() {
        this.consumer.close();
    }

    public void shutdown() {
        this.consumer.wakeup();
    }
}
