/*
 * Decompiled with CFR 0.152.
 */
package com.kuaike.scrm.common.component;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.kuaike.scrm.common.dto.PublishPayloadDto;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

public abstract class AbstractEventBusHandler {
    private static final Logger log = LoggerFactory.getLogger(AbstractEventBusHandler.class);
    @Value(value="${kafka.topic.event_bus}")
    private String eventBusTopic;
    @Value(value="${eventBus.consumer.enable:true}")
    private boolean enable;
    @Value(value="${spring.application.name}")
    private String applicationName;
    @Value(value="${IdGen.machineId}")
    private String machineId;
    @Autowired
    private KafkaProperties kafkaProperties;
    private Consumer<String, String> consumer;

    @PostConstruct
    public void init() {
        if (this.enable) {
            String groupId = this.applicationName + "#" + this.machineId;
            Map properties = this.kafkaProperties.buildConsumerProperties();
            properties.put("auto.offset.reset", "latest");
            properties.put("group.id", groupId);
            log.info("event bus consumer properties={}", (Object)JSON.toJSONString((Object)properties));
            this.consumer = new KafkaConsumer(properties);
            this.consumer.subscribe(Collections.singleton(this.eventBusTopic));
            Thread thread = new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(70L);
                }
                catch (InterruptedException e) {
                    log.error("error", (Throwable)e);
                    Thread.currentThread().interrupt();
                }
                this.start();
            }, "EventBusThread-" + groupId);
            thread.start();
            log.info("eventBus handler start");
        }
    }

    private void start() {
        while (true) {
            try {
                while (true) {
                    ConsumerRecords records = this.consumer.poll(Duration.ofMillis(1000L));
                    long startTime = System.currentTimeMillis();
                    ArrayList list = Lists.newArrayListWithExpectedSize((int)records.count());
                    for (ConsumerRecord consumerRecord : records) {
                        PublishPayloadDto chatMsg = (PublishPayloadDto)JSON.parseObject((String)((String)consumerRecord.value()), PublishPayloadDto.class);
                        list.add(chatMsg);
                    }
                    if (CollectionUtils.isEmpty((Collection)list)) continue;
                    for (PublishPayloadDto publishPayloadDto : list) {
                        this.consume(publishPayloadDto);
                    }
                    log.info("handle eventBus message {} ms", (Object)(System.currentTimeMillis() - startTime));
                }
            }
            catch (Exception e) {
                log.error("EventBusHandler error", (Throwable)e);
                continue;
            }
            break;
        }
    }

    private void consume(PublishPayloadDto publishPayloadDto) {
        try {
            this.handler(publishPayloadDto);
        }
        catch (Exception e) {
            log.error("EventBusHandler error", (Throwable)e);
        }
        finally {
            this.consumer.commitSync();
        }
    }

    public abstract void handler(PublishPayloadDto var1);
}

