package io.github.resilience4j.ratelimiter.monitoring.endpoint;

import io.github.resilience4j.ratelimiter.event.RateLimiterEvent;
import io.github.resilience4j.ratelimiter.monitoring.model.RateLimiterEventDTO;
import java.io.IOException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/github/resilience4j/ratelimiter/monitoring/endpoint/RateLimiterEventsEmitter.class */
public class RateLimiterEventsEmitter {
    private static final Logger LOG = LoggerFactory.getLogger(RateLimiterEventsEmitter.class);
    private final SseEmitter sseEmitter = new SseEmitter();
    private final Disposable disposable;

    public RateLimiterEventsEmitter(Flux<RateLimiterEventDTO> flux) {
        this.sseEmitter.onCompletion(this::unsubscribe);
        this.sseEmitter.onTimeout(this::unsubscribe);
        Consumer consumer = this::notify;
        SseEmitter sseEmitter = this.sseEmitter;
        sseEmitter.getClass();
        Consumer consumer2 = sseEmitter::completeWithError;
        SseEmitter sseEmitter2 = this.sseEmitter;
        sseEmitter2.getClass();
        this.disposable = flux.subscribe(consumer, consumer2, sseEmitter2::complete);
    }

    private void notify(RateLimiterEventDTO rateLimiterEventDTO) {
        try {
            this.sseEmitter.send(rateLimiterEventDTO, MediaType.APPLICATION_JSON);
        } catch (IOException e) {
            LOG.warn("Failed to send circuitbreaker event", e);
        }
    }

    private void unsubscribe() {
        this.disposable.dispose();
    }

    public static SseEmitter createSseEmitter(Flux<RateLimiterEvent> flux) {
        return new RateLimiterEventsEmitter(flux.map(RateLimiterEventDTO::createRateLimiterEventDTO)).sseEmitter;
    }
}
