/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.plugin.internal.device;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.hswebframework.web.exception.ValidationException;
import org.jetlinks.core.cache.ReactiveCacheContainer;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.monitor.logger.Logger;
import org.jetlinks.core.monitor.tracer.Tracer;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.plugin.core.PluginContext;
import org.jetlinks.plugin.internal.device.DeviceGatewayPlugin;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.netty.Connection;
import reactor.netty.tcp.TcpClient;

public abstract class TcpDeviceClientPlugin
extends DeviceGatewayPlugin {
    public static final ConfigKey<String> HOST = ConfigKey.of((String)"host", (String)"\u4e3b\u673a\u5730\u5740", String.class);
    public static final ConfigKey<Integer> PORT = ConfigKey.of((String)"port", (String)"\u7aef\u53e3", Integer.class);
    protected final ReactiveCacheContainer<String, TcpDeviceClient> clientCache = ReactiveCacheContainer.create();
    private boolean sortConnection;
    private int maxRetryTimes = 32;

    public TcpDeviceClientPlugin(String id, PluginContext context) {
        super(id, context);
    }

    @Override
    public Mono<ConfigMetadata> getDeviceConfigMetadata(String deviceId) {
        return Mono.just((Object)new DefaultConfigMetadata("TCP\u8fde\u63a5\u914d\u7f6e", "").add(HOST.getKey(), HOST.getName(), (DataType)StringType.GLOBAL).add(PORT.getKey(), PORT.getName(), (DataType)IntType.GLOBAL));
    }

    @Override
    public Mono<Void> doOnDeviceRegister(DeviceOperator device) {
        return this.getOrCreateClient(device.getDeviceId()).then();
    }

    @Override
    public Publisher<? extends DeviceMessage> execute(DeviceMessage message) {
        if (message instanceof DisconnectDeviceMessage) {
            this.clientCache.remove((Object)message.getDeviceId());
            return Mono.just((Object)((DisconnectDeviceMessage)message).newReply().success());
        }
        return this.getOrCreateClient(message.getDeviceId()).flatMap(client -> client.downstream(message)).then(Mono.empty());
    }

    protected Mono<TcpDeviceClient> getOrCreateClient(String deviceId) {
        return this.clientCache.computeIfAbsent((Object)deviceId, this::createClient);
    }

    protected abstract Mono<TcpDeviceClient> createClient(String var1);

    @Override
    public Mono<Byte> getDeviceState(DeviceOperator device) {
        TcpDeviceClient client = (TcpDeviceClient)this.clientCache.getNow((Object)device.getDeviceId());
        if (client == null || client.isDisposed()) {
            return Mono.just((Object)-1);
        }
        return Mono.just((Object)1);
    }

    protected Mono<Void> doShutdown() {
        this.clientCache.clear();
        return super.doShutdown();
    }

    public ReactiveCacheContainer<String, TcpDeviceClient> getClientCache() {
        return this.clientCache;
    }

    public boolean isSortConnection() {
        return this.sortConnection;
    }

    public int getMaxRetryTimes() {
        return this.maxRetryTimes;
    }

    public void setSortConnection(boolean sortConnection) {
        this.sortConnection = sortConnection;
    }

    public void setMaxRetryTimes(int maxRetryTimes) {
        this.maxRetryTimes = maxRetryTimes;
    }

    public static abstract class TcpDeviceClient
    implements Disposable {
        static final AtomicReferenceFieldUpdater<TcpDeviceClient, Sinks.One> CONNECTING = AtomicReferenceFieldUpdater.newUpdater(TcpDeviceClient.class, Sinks.One.class, "connecting");
        public final String deviceId;
        protected final TcpDeviceClientPlugin parent;
        private volatile Connection connected;
        private volatile Sinks.One<Connection> connecting;
        private volatile Disposable connectionDisposable;
        private int retryTimes;
        private boolean disposed;
        private Throwable lastError;

        protected Logger logger() {
            return this.parent.context().monitor().logger();
        }

        protected Tracer tracer() {
            return this.parent.context().monitor().tracer();
        }

        protected final Mono<Connection> connect() {
            if (this.connected != null && !this.connected.isDisposed()) {
                return Mono.just((Object)this.connected);
            }
            Sinks.One connecting = Sinks.one();
            if (CONNECTING.compareAndSet(this, null, connecting)) {
                Disposable connectionDisposable;
                if (null != this.connectionDisposable) {
                    this.connectionDisposable.dispose();
                }
                this.connectionDisposable = connectionDisposable = ((Mono)this.connectNow().as((Function)this.tracer().traceMono("connect"))).subscribe(c -> {
                    this.logger().debug("\u8fde\u63a5\u8bbe\u5907[{}]\u6210\u529f", new Object[]{this.deviceId});
                    connecting.emitValue(c, Reactors.emitFailureHandler());
                    CONNECTING.compareAndSet(this, connecting, null);
                }, err -> {
                    this.logger().warn("\u8fde\u63a5\u8bbe\u5907[{}]\u5931\u8d25", new Object[]{this.deviceId, err});
                    this.lastError = err;
                    connecting.emitError(this.lastError, Reactors.emitFailureHandler());
                    CONNECTING.compareAndSet(this, connecting, null);
                    this.tryReconnect();
                }, () -> {
                    connecting.emitEmpty(Reactors.emitFailureHandler());
                    if (CONNECTING.compareAndSet(this, connecting, null)) {
                        this.logger().warn("\u8fde\u63a5\u8bbe\u5907[{}]\u5931\u8d25,\u672a\u6b63\u786e\u914d\u7f6ehost,port?", new Object[]{this.deviceId});
                    }
                    this.tryReconnect();
                });
                return connecting.asMono();
            }
            return this.connecting.asMono();
        }

        private Mono<Connection> connectNow() {
            return this.parent.registry.getDevice(this.deviceId).flatMap(device -> device.getSelfConfigs(new ConfigKey[]{HOST, PORT}).filter(values -> values.size() == 2).flatMap(values -> this.connection((String)values.getValue(HOST).orElseThrow(() -> new ValidationException("host cant not be empty")), (Integer)values.getValue(PORT).orElseThrow(() -> new ValidationException("port cant not be null")))));
        }

        private synchronized void connected(Connection connected) {
            this.retryTimes = 0;
            if (this.connected != null) {
                this.connected.dispose();
            }
            if (this.isDisposed()) {
                connected.dispose();
                return;
            }
            this.connected = connected;
            this.initConnection(this.connected);
        }

        protected abstract void initConnection(Connection var1);

        protected void tryReconnect() {
            if (this.isDisposed() || this.parent.isSortConnection()) {
                return;
            }
            if (this.parent.getMaxRetryTimes() > 0 && this.retryTimes++ >= this.parent.getMaxRetryTimes()) {
                this.logger().warn("\u8bbe\u5907[{}]\u91cd\u8bd5\u6b21\u6570\u8d85\u8fc7\u6700\u5927\u6b21\u6570[{}]", new Object[]{this.deviceId, this.parent.getMaxRetryTimes()});
                this.dispose();
                this.parent.clientCache.remove((Object)this.deviceId);
                return;
            }
            Mono.delay((Duration)Duration.ofMillis((long)this.retryTimes * 100L)).then(Mono.defer(this::connect)).subscribe();
        }

        public abstract Mono<Void> downstream(DeviceMessage var1);

        public final Mono<Void> upstream(DeviceMessage message) {
            if (message.getDeviceId() == null) {
                message.thingId(DeviceThingType.device.getId(), this.deviceId);
            }
            this.parent.context().monitor().logger().debug("\u8bbe\u5907\u4e0a\u62a5\u6d88\u606f:{}", new Object[]{message});
            return this.parent.handleMessage(message);
        }

        protected Mono<Connection> connection(String host, int port) {
            this.logger().debug("start connect device {} tcp server {}:{}", new Object[]{this.deviceId, host, port});
            return this.initClient(TcpClient.create().host(host).port(port)).doOnConnected(this::connected).doOnDisconnected(this::handleDisconnected).connect().cast(Connection.class);
        }

        protected void handleDisconnected(Connection connection) {
            this.tryReconnect();
        }

        protected abstract TcpClient initClient(TcpClient var1);

        public boolean isDisposed() {
            return this.disposed;
        }

        public void dispose() {
            this.disposed = true;
            if (null != this.connected) {
                this.connected.dispose();
            }
            if (this.connectionDisposable != null) {
                this.connectionDisposable.dispose();
            }
            if (this.connecting != null) {
                this.connecting.tryEmitError((Throwable)new DeviceOperationException(ErrorCode.CONNECTION_LOST));
            }
        }

        public TcpDeviceClient(String deviceId, TcpDeviceClientPlugin parent) {
            this.deviceId = deviceId;
            this.parent = parent;
        }
    }
}

