/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.message.interceptor;

import com.github.benmanes.caffeine.cache.Caffeine;
import java.time.Duration;
import java.util.Map;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.utils.SerialFlux;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

public class SerialDeviceMessageSenderInterceptor
implements DeviceMessageSenderInterceptor {
    public static final SerialDeviceMessageSenderInterceptor GLOBAL = new SerialDeviceMessageSenderInterceptor();
    final Map<Object, SerialFlux<DeviceMessage>> pending = Caffeine.newBuilder().expireAfterAccess(Duration.ofHours(1L)).build().asMap();

    protected boolean needSerial(DeviceMessage message) {
        return true;
    }

    protected Object getSerialKey(DeviceMessage message) {
        return Tuples.of((Object)message.getDeviceId(), (Object)((Object)message.getMessageType()));
    }

    @Override
    public Flux<DeviceMessage> doSend(DeviceOperator device, DeviceMessage source, Flux<DeviceMessage> sender) {
        if (!this.needSerial(source)) {
            return sender;
        }
        Object key = this.getSerialKey(source);
        return this.pending.computeIfAbsent(key, ignore -> new SerialFlux()).join(sender);
    }

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}

