package org.springframework.kafka.listener;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.2.0.RELEASE.jar:org/springframework/kafka/listener/DeadLetterPublishingRecoverer.class */
public class DeadLetterPublishingRecoverer implements BiConsumer<ConsumerRecord<?, ?>, Exception> {
    private static final Log logger = LogFactory.getLog((Class<?>) DeadLetterPublishingRecoverer.class);
    private final KafkaTemplate<Object, Object> template;
    private final boolean transactional;
    private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;

    public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> kafkaTemplate) {
        this(kafkaTemplate, (consumerRecord, exc) -> {
            return new TopicPartition(consumerRecord.topic() + ".DLT", consumerRecord.partition());
        });
    }

    public DeadLetterPublishingRecoverer(KafkaTemplate<Object, Object> kafkaTemplate, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> biFunction) {
        Assert.notNull(kafkaTemplate, "The template cannot be null");
        Assert.notNull(biFunction, "The destinationResolver cannot be null");
        this.template = kafkaTemplate;
        this.transactional = kafkaTemplate.isTransactional();
        this.destinationResolver = biFunction;
    }

    @Override // java.util.function.BiConsumer
    public void accept(ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        TopicPartition apply = this.destinationResolver.apply(consumerRecord, exc);
        RecordHeaders recordHeaders = new RecordHeaders(consumerRecord.headers().toArray());
        enhanceHeaders(recordHeaders, consumerRecord, exc);
        ProducerRecord<Object, Object> createProducerRecord = createProducerRecord(consumerRecord, apply, recordHeaders);
        if (this.transactional) {
            this.template.executeInTransaction(kafkaOperations -> {
                publish(createProducerRecord, kafkaOperations);
                return null;
            });
        } else {
            publish(createProducerRecord, this.template);
        }
    }

    protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> consumerRecord, TopicPartition topicPartition, RecordHeaders recordHeaders) {
        return new ProducerRecord<>(topicPartition.topic(), topicPartition.partition() < 0 ? null : Integer.valueOf(topicPartition.partition()), consumerRecord.key(), consumerRecord.value(), recordHeaders);
    }

    private void publish(ProducerRecord<Object, Object> producerRecord, KafkaOperations<Object, Object> kafkaOperations) {
        try {
            kafkaOperations.send(producerRecord).addCallback(sendResult -> {
                if (logger.isDebugEnabled()) {
                    logger.debug("Successful dead-letter publication: " + sendResult);
                }
            }, th -> {
                logger.error("Dead-letter publication failed for: " + producerRecord, th);
            });
        } catch (Exception e) {
            logger.error("Dead-letter publication failed for: " + producerRecord, e);
        }
    }

    private void enhanceHeaders(RecordHeaders recordHeaders, ConsumerRecord<?, ?> consumerRecord, Exception exc) {
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC, consumerRecord.topic().getBytes(StandardCharsets.UTF_8)));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION, ByteBuffer.allocate(4).putInt(consumerRecord.partition()).array()));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET, ByteBuffer.allocate(8).putLong(consumerRecord.offset()).array()));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP, ByteBuffer.allocate(8).putLong(consumerRecord.timestamp()).array()));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE, consumerRecord.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_FQCN, exc.getClass().getName().getBytes(StandardCharsets.UTF_8)));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_MESSAGE, exc.getMessage().getBytes(StandardCharsets.UTF_8)));
        recordHeaders.add(new RecordHeader(KafkaHeaders.DLT_EXCEPTION_STACKTRACE, getStackTraceAsString(exc).getBytes(StandardCharsets.UTF_8)));
    }

    private String getStackTraceAsString(Throwable th) {
        StringWriter stringWriter = new StringWriter();
        th.printStackTrace(new PrintWriter((Writer) stringWriter, true));
        return stringWriter.getBuffer().toString();
    }
}
