/*
 * Decompiled with CFR 0.152.
 */
package org.flowable.eventregistry.spring.kafka;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.eventregistry.api.OutboundEvent;
import org.flowable.eventregistry.api.OutboundEventChannelAdapter;
import org.flowable.eventregistry.spring.kafka.KafkaPartitionProvider;
import org.springframework.kafka.core.KafkaOperations;

public class KafkaOperationsOutboundEventChannelAdapter
implements OutboundEventChannelAdapter<String> {
    protected KafkaOperations<Object, Object> kafkaOperations;
    protected KafkaPartitionProvider partitionProvider;
    protected String topic;
    protected String key;

    public KafkaOperationsOutboundEventChannelAdapter(KafkaOperations<Object, Object> kafkaOperations, KafkaPartitionProvider partitionProvider, String topic, String key) {
        this.kafkaOperations = kafkaOperations;
        this.partitionProvider = partitionProvider;
        this.topic = topic;
        this.key = key;
    }

    public void sendEvent(OutboundEvent<String> event) {
        try {
            String rawEvent = (String)event.getBody();
            Map headerMap = event.getHeaders();
            ArrayList<RecordHeader> headers = new ArrayList<RecordHeader>();
            for (String headerKey : headerMap.keySet()) {
                Object headerValue = headerMap.get(headerKey);
                if (headerValue == null) continue;
                headers.add(new RecordHeader(headerKey, headerValue.toString().getBytes(StandardCharsets.UTF_8)));
            }
            Integer partition = this.partitionProvider == null ? null : this.partitionProvider.determinePartition(event);
            ProducerRecord producerRecord = new ProducerRecord(this.topic, partition, (Object)this.key, (Object)rawEvent, headers);
            this.kafkaOperations.send(producerRecord).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlowableException("Sending the event was interrupted", (Throwable)e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof RuntimeException) {
                throw (RuntimeException)e.getCause();
            }
            throw new FlowableException("failed to send event", e.getCause());
        }
    }

    public void sendEvent(String rawEvent, Map<String, Object> headerMap) {
        throw new UnsupportedOperationException("Outbound processor should never call this");
    }
}

