package com.kuaike.scrm.common.component;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.kuaike.scrm.common.dto.PublishPayloadDto;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

/* loaded from: input_file:com/kuaike/scrm/common/component/AbstractEventBusHandler.class */
public abstract class AbstractEventBusHandler {
    private static final Logger log = LoggerFactory.getLogger(AbstractEventBusHandler.class);

    @Value("${kafka.topic.event_bus}")
    private String eventBusTopic;

    @Value("${eventBus.consumer.enable:true}")
    private boolean enable;

    @Value("${spring.application.name}")
    private String applicationName;

    @Value("${IdGen.machineId}")
    private String machineId;

    @Autowired
    private KafkaProperties kafkaProperties;
    private Consumer<String, String> consumer;
    private Thread thread;

    @PostConstruct
    public void init() {
        if (this.enable) {
            String str = this.applicationName + "#" + this.machineId;
            Map buildConsumerProperties = this.kafkaProperties.buildConsumerProperties();
            buildConsumerProperties.put("auto.offset.reset", "latest");
            buildConsumerProperties.put("group.id", str);
            log.info("event bus consumer properties={}", JSON.toJSONString(buildConsumerProperties));
            this.consumer = new KafkaConsumer(buildConsumerProperties);
            this.consumer.subscribe(Collections.singleton(this.eventBusTopic));
            this.thread = new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(40L);
                } catch (InterruptedException e) {
                    log.error("error", e);
                }
                start();
            });
            this.thread.start();
            log.info("eventBus handler start");
        }
    }

    private void start() {
        while (true) {
            try {
                try {
                    ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(100L));
                    long currentTimeMillis = System.currentTimeMillis();
                    ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        newArrayListWithExpectedSize.add((PublishPayloadDto) JSON.parseObject((String) ((ConsumerRecord) it.next()).value(), PublishPayloadDto.class));
                    }
                    if (CollectionUtils.isEmpty(newArrayListWithExpectedSize)) {
                        this.consumer.commitSync();
                    } else {
                        Iterator it2 = newArrayListWithExpectedSize.iterator();
                        while (it2.hasNext()) {
                            handler((PublishPayloadDto) it2.next());
                        }
                        log.info("handle eventBus message {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        this.consumer.commitSync();
                    }
                } catch (Exception e) {
                    log.error("EventBusHandler error", e);
                    this.consumer.commitSync();
                }
            } catch (Throwable th) {
                this.consumer.commitSync();
                throw th;
            }
        }
    }

    public abstract void handler(PublishPayloadDto publishPayloadDto);
}
