/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.kafka.consumer;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.kuaike.kafka.consumer.BatchConsumerRecordHandler;
import com.kuaike.kafka.consumer.ConsumerRecordHandler;
import com.kuaike.kafka.exception.KafkaClientException;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManualCommitConsumerClient<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ManualCommitConsumerClient.class);
    KafkaConsumer<K, V> consumer;
    Properties props;

    public ManualCommitConsumerClient(String brokers, String group) {
        this.props = new Properties();
        if (brokers == null || brokers.trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        this.props.put("bootstrap.servers", brokers);
        if (group == null || group.trim().isEmpty()) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props.put("group.id", group);
        this.props.put("auto.offset.reset", "earliest");
        this.props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.props.put("enable.auto.commit", "false");
        this.consumer = new KafkaConsumer(this.props);
    }

    public ManualCommitConsumerClient(Properties props) {
        if (props.getProperty("bootstrap.servers") == null || props.getProperty("bootstrap.servers").trim().length() == 0) {
            throw new KafkaClientException("bootstrap.servers must not be empty");
        }
        if (props.getProperty("group.id") == null || props.getProperty("group.id").trim().length() == 0) {
            throw new KafkaClientException("group.id must not be empty");
        }
        this.props = props;
        this.props.put("enable.auto.commit", "false");
        this.consumer = new KafkaConsumer(props);
    }

    public void assign(Collection<TopicPartition> partitions) {
        this.consumer.assign(partitions);
    }

    public void consume(String topic, ConsumerRecordHandler handler) {
        if (topic == null || topic.trim().length() == 0) {
            throw new KafkaClientException("topic must not be empty");
        }
        this.consumer.subscribe(Collections.singletonList(topic), (ConsumerRebalanceListener)new RebalanceHandler());
        if (this.props.getProperty("group.id") == null || this.props.getProperty("group.id").trim().isEmpty()) {
            throw new KafkaClientException("group.id must not be empty");
        }
        if (handler == null) {
            throw new KafkaClientException("must provide a  ConsumerRecord handler ");
        }
        try {
            while (true) {
                ConsumerRecords records = this.consumer.poll(1000L);
                Stopwatch stopwatch = Stopwatch.createStarted();
                log.debug("Polled Record with size: {} ", (Object)records.count());
                if (handler instanceof BatchConsumerRecordHandler) {
                    BatchConsumerRecordHandler batchConsumerRecordHandler = (BatchConsumerRecordHandler)handler;
                    batchConsumerRecordHandler.batchHandle(Lists.newArrayList((Iterable)records));
                } else {
                    for (ConsumerRecord record : records) {
                        handler.handle(record);
                    }
                }
                log.debug("Async commit, Using {}s", (Object)stopwatch.elapsed(TimeUnit.SECONDS));
                this.consumer.commitAsync();
            }
        }
        catch (Throwable throwable) {
            log.debug("sync commite ");
            throw throwable;
        }
    }

    public void close() {
        this.consumer.close();
    }

    public void shutdown() {
        this.consumer.wakeup();
    }

    private class RebalanceHandler
    implements ConsumerRebalanceListener {
        private RebalanceHandler() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            log.info("Lost partitions in rebalance. Committing current offsets");
            ManualCommitConsumerClient.this.consumer.commitSync();
        }
    }
}

