/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.auth.CredentialsSupplier;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.ClientTransport;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ServerTransport;
import io.scalecube.services.transport.api.ServiceTransport;
import io.scalecube.services.transport.rsocket.ConnectionSetupCodec;
import io.scalecube.services.transport.rsocket.DelegatedLoopResources;
import io.scalecube.services.transport.rsocket.RSocketClientTransport;
import io.scalecube.services.transport.rsocket.RSocketClientTransportFactory;
import io.scalecube.services.transport.rsocket.RSocketServerTransport;
import io.scalecube.services.transport.rsocket.RSocketServerTransportFactory;
import java.util.Collection;
import java.util.StringJoiner;
import java.util.concurrent.ThreadFactory;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.channel.AbortedException;
import reactor.netty.resources.LoopResources;

public class RSocketServiceTransport
implements ServiceTransport {
    public static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceTransport.class);
    private int numOfWorkers = Runtime.getRuntime().availableProcessors();
    private HeadersCodec headersCodec = HeadersCodec.DEFAULT_INSTANCE;
    private Collection<DataCodec> dataCodecs = DataCodec.getAllInstances();
    private ConnectionSetupCodec connectionSetupCodec = ConnectionSetupCodec.DEFAULT_INSTANCE;
    private CredentialsSupplier credentialsSupplier;
    private Authenticator<Object> authenticator;
    private Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory = RSocketServerTransportFactory.websocket();
    private Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory = RSocketClientTransportFactory.websocket();
    private EventLoopGroup eventLoopGroup;
    private LoopResources clientLoopResources;
    private LoopResources serverLoopResources;

    public RSocketServiceTransport() {
    }

    private RSocketServiceTransport(RSocketServiceTransport other) {
        this.numOfWorkers = other.numOfWorkers;
        this.headersCodec = other.headersCodec;
        this.dataCodecs = other.dataCodecs;
        this.connectionSetupCodec = other.connectionSetupCodec;
        this.credentialsSupplier = other.credentialsSupplier;
        this.authenticator = other.authenticator;
        this.eventLoopGroup = other.eventLoopGroup;
        this.clientLoopResources = other.clientLoopResources;
        this.serverLoopResources = other.serverLoopResources;
        this.serverTransportFactory = other.serverTransportFactory;
        this.clientTransportFactory = other.clientTransportFactory;
    }

    public RSocketServiceTransport numOfWorkers(int numOfWorkers) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.numOfWorkers = numOfWorkers;
        return rst;
    }

    public RSocketServiceTransport headersCodec(HeadersCodec headersCodec) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.headersCodec = headersCodec;
        return rst;
    }

    public RSocketServiceTransport dataCodecs(Collection<DataCodec> dataCodecs) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.dataCodecs = dataCodecs;
        return rst;
    }

    public RSocketServiceTransport connectionSetupCodec(ConnectionSetupCodec connectionSetupCodec) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.connectionSetupCodec = connectionSetupCodec;
        return rst;
    }

    public RSocketServiceTransport credentialsSupplier(CredentialsSupplier credentialsSupplier) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.credentialsSupplier = credentialsSupplier;
        return rst;
    }

    public <R> RSocketServiceTransport authenticator(Authenticator<? extends R> authenticator) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.authenticator = authenticator;
        return rst;
    }

    public RSocketServiceTransport serverTransportFactory(Function<LoopResources, RSocketServerTransportFactory> serverTransportFactory) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.serverTransportFactory = serverTransportFactory;
        return rst;
    }

    public RSocketServiceTransport clientTransportFactory(Function<LoopResources, RSocketClientTransportFactory> clientTransportFactory) {
        RSocketServiceTransport rst = new RSocketServiceTransport(this);
        rst.clientTransportFactory = clientTransportFactory;
        return rst;
    }

    public ClientTransport clientTransport() {
        return new RSocketClientTransport(this.credentialsSupplier, this.connectionSetupCodec, this.headersCodec, this.dataCodecs, this.clientTransportFactory.apply(this.clientLoopResources));
    }

    public ServerTransport serverTransport(ServiceMethodRegistry methodRegistry) {
        return new RSocketServerTransport(this.authenticator, methodRegistry, this.connectionSetupCodec, this.headersCodec, this.dataCodecs, this.serverTransportFactory.apply(this.serverLoopResources));
    }

    public Mono<RSocketServiceTransport> start() {
        return Mono.fromRunnable(this::start0).thenReturn((Object)this);
    }

    public Mono<Void> stop() {
        return Flux.concatDelayError((Publisher[])new Publisher[]{Mono.defer(() -> this.serverLoopResources.disposeLater()), Mono.defer(this::shutdownEventLoopGroup)}).then();
    }

    private void start0() {
        this.eventLoopGroup = this.newEventLoopGroup();
        this.clientLoopResources = DelegatedLoopResources.newClientLoopResources(this.eventLoopGroup);
        this.serverLoopResources = DelegatedLoopResources.newServerLoopResources(this.eventLoopGroup);
    }

    private EventLoopGroup newEventLoopGroup() {
        DefaultThreadFactory threadFactory = new DefaultThreadFactory("rsocket-worker", true);
        EpollEventLoopGroup eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(this.numOfWorkers, (ThreadFactory)threadFactory) : new NioEventLoopGroup(this.numOfWorkers, (ThreadFactory)threadFactory);
        return LoopResources.colocate((EventLoopGroup)eventLoopGroup);
    }

    private Mono<Void> shutdownEventLoopGroup() {
        return Mono.defer(() -> FutureMono.from((Future)this.eventLoopGroup.shutdownGracefully()));
    }

    public String toString() {
        return new StringJoiner(", ", RSocketServiceTransport.class.getSimpleName() + "[", "]").add("numOfWorkers=" + this.numOfWorkers).add("headersCodec=" + this.headersCodec).add("dataCodecs=" + this.dataCodecs).add("connectionSetupCodec=" + this.connectionSetupCodec).add("serverTransportFactory=" + this.serverTransportFactory).add("clientTransportFactory=" + this.clientTransportFactory).toString();
    }

    static {
        Hooks.onErrorDropped(t -> {
            if ((AbortedException.isConnectionReset((Throwable)t) || ConnectionClosedException.isConnectionClosed((Throwable)t)) && LOGGER.isDebugEnabled()) {
                LOGGER.debug("Connection aborted: {}", (Object)t.toString());
            }
        });
    }
}

