package com.baijia.storm.sun.service.log;

import com.baijia.storm.sun.api.common.conf.BizConf;
import com.baijia.storm.sun.api.common.dto.request.EarthErrorPrompt;
import com.baijia.storm.sun.api.common.model.PrismRecord;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/baijia/storm/sun/service/log/LogStashKafkaStoreImpl.class */
public class LogStashKafkaStoreImpl implements LogStashService {
    private static final Logger log = LoggerFactory.getLogger(LogStashKafkaStoreImpl.class);

    @Resource
    @Qualifier("logCollectorProducer")
    private KafkaProducer<byte[], byte[]> producer;

    @Override // com.baijia.storm.sun.service.log.LogStashService
    public void store(String str, byte[] bArr) {
        if (!isKafkaTopicIsValid(str) || ArrayUtils.isEmpty(bArr)) {
            log.warn("topic or content is invalid. [topic {}, content, {}]", str, bArr);
        } else {
            this.producer.send(new ProducerRecord(str, bArr));
        }
    }

    @Override // com.baijia.storm.sun.service.log.LogStashService
    public void storeEarthErrorReport(List<EarthErrorPrompt> list) {
        for (EarthErrorPrompt earthErrorPrompt : list) {
            earthErrorPrompt.setCreateTime(earthErrorPrompt.getCreateTime() * 1000);
            store(LogStashConstant.KAFKA_CLIENT_ERROR_TOPIC_NAME, serializerEarthErrorPrompt(earthErrorPrompt));
        }
    }

    private byte[] serializerEarthErrorPrompt(EarthErrorPrompt earthErrorPrompt) {
        try {
            return BizConf.gson.toJson(earthErrorPrompt).getBytes(LogStashConstant.KAFKA_CONTENT_CHARSET_NAME);
        } catch (UnsupportedEncodingException e) {
            log.error("error while serialize, {}", ExceptionUtils.getStackTrace(e));
            return null;
        }
    }

    private static boolean isKafkaTopicIsValid(String str) {
        if (StringUtils.isEmpty(str) || str.equals(".") || str.equals("..")) {
            return false;
        }
        return Pattern.matches("[a-zA-Z0-9\\._\\-]{1,249}", str);
    }

    @Override // com.baijia.storm.sun.service.log.LogStashService
    public void storePrismRecords(List<PrismRecord> list) {
        list.forEach(prismRecord -> {
            store(LogStashConstant.KAFKA_TOPIC_PRISM_RECORD_NAME, serializerPrismRecord(prismRecord));
        });
    }

    private static byte[] serializerPrismRecord(PrismRecord prismRecord) {
        try {
            return BizConf.gson.toJson(prismRecord).getBytes(LogStashConstant.KAFKA_CONTENT_CHARSET_NAME);
        } catch (UnsupportedEncodingException e) {
            log.error("error while serialize, {}", ExceptionUtils.getStackTrace(e));
            return null;
        }
    }
}
