/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.kafka.consumer;

import com.kuaike.kafka.consumer.ConsumerRecordHandler;
import com.kuaike.kafka.exception.KafkaClientException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerClient<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerClient.class);
    KafkaConsumer<K, V> consumer;
    Properties props;

    public ConsumerClient(String brokers, String group) {
        this.props = new Properties();
        if (brokers == null || brokers.trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        this.props.put("bootstrap.servers", brokers);
        if (group == null || group.trim().isEmpty()) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props.put("group.id", group);
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer(this.props);
    }

    public ConsumerClient(Properties props) {
        if (props.getProperty("bootstrap.servers") == null || props.getProperty("bootstrap.servers").trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        if (props.getProperty("group.id") == null || props.getProperty("group.id").trim().length() == 0) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props = props;
        this.consumer = new KafkaConsumer(props);
    }

    public void assign(Collection<TopicPartition> partitions) {
        this.consumer.assign(partitions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void consume(String topic, ConsumerRecordHandler handler) {
        if (topic == null || topic.trim().length() == 0) {
            throw new KafkaClientException("topic must not be empty");
        }
        try {
            try {
                this.consumer.subscribe(Collections.singletonList(topic));
                if (handler == null) {
                    throw new KafkaClientException("must provide a  ConsumerRecord handler ");
                }
                block4: while (true) {
                    ConsumerRecords records = this.consumer.poll(100L);
                    Iterator iterator = records.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block4;
                        ConsumerRecord record = (ConsumerRecord)iterator.next();
                        handler.handle(record);
                    }
                    break;
                }
            }
            catch (Exception e) {
                log.error("Unexpected error", (Throwable)e);
                this.consumer.close();
            }
        }
        catch (Throwable throwable) {
            this.consumer.close();
            throw throwable;
        }
    }

    public void close() {
        this.consumer.close();
    }

    public void shutdown() {
        this.consumer.wakeup();
    }
}

