/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.services.transport.rsocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.SocketAcceptor;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.auth.Authenticator;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.MessageCodecException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.ServiceUnavailableException;
import io.scalecube.services.exceptions.UnauthorizedException;
import io.scalecube.services.methods.ServiceMethodInvoker;
import io.scalecube.services.methods.ServiceMethodRegistry;
import io.scalecube.services.transport.api.DataCodec;
import io.scalecube.services.transport.api.HeadersCodec;
import io.scalecube.services.transport.api.ReferenceCountUtil;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import io.scalecube.services.transport.rsocket.ConnectionSetup;
import io.scalecube.services.transport.rsocket.ConnectionSetupCodec;
import java.io.InputStream;
import java.util.Collection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

public class RSocketServiceAcceptor
implements SocketAcceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(RSocketServiceAcceptor.class);
    private final ConnectionSetupCodec connectionSetupCodec;
    private final HeadersCodec headersCodec;
    private final Collection<DataCodec> dataCodecs;
    private final Authenticator<Object> authenticator;
    private final ServiceMethodRegistry methodRegistry;

    public RSocketServiceAcceptor(ConnectionSetupCodec connectionSetupCodec, HeadersCodec headersCodec, Collection<DataCodec> dataCodecs, Authenticator<Object> authenticator, ServiceMethodRegistry methodRegistry) {
        this.connectionSetupCodec = connectionSetupCodec;
        this.headersCodec = headersCodec;
        this.dataCodecs = dataCodecs;
        this.authenticator = authenticator;
        this.methodRegistry = methodRegistry;
    }

    public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket rsocket) {
        LOGGER.info("[rsocket][accept][{}] setup: {}", (Object)rsocket, (Object)setupPayload);
        return Mono.justOrEmpty((Object)this.decodeConnectionSetup(setupPayload.data())).flatMap(connectionSetup -> this.authenticate(rsocket, (ConnectionSetup)connectionSetup)).flatMap(authData -> Mono.fromCallable(() -> this.newRSocket(authData))).switchIfEmpty(Mono.fromCallable(() -> this.newRSocket(null))).cast(RSocket.class);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private ConnectionSetup decodeConnectionSetup(ByteBuf byteBuf) {
        if (!byteBuf.isReadable()) return null;
        try (ByteBufInputStream stream = new ByteBufInputStream(byteBuf, false);){
            ConnectionSetup connectionSetup = this.connectionSetupCodec.decode((InputStream)stream);
            return connectionSetup;
        }
        catch (Throwable ex) {
            throw new MessageCodecException("Failed to decode connection setup", ex);
        }
    }

    private Mono<Object> authenticate(RSocket rsocket, ConnectionSetup connectionSetup) {
        if (this.authenticator == null || connectionSetup == null || !connectionSetup.hasCredentials()) {
            return Mono.empty();
        }
        return ((Mono)this.authenticator.apply(connectionSetup.credentials())).doOnSuccess(obj -> LOGGER.debug("[rsocket][authenticate][{}] authenticated", (Object)rsocket)).doOnError(ex -> LOGGER.error("[rsocket][authenticate][{}][error] cause: {}", (Object)rsocket, (Object)ex.toString())).onErrorMap(this::toUnauthorizedException);
    }

    private RSocket newRSocket(Object authData) {
        return new RSocketImpl(authData, new ServiceMessageCodec(this.headersCodec, this.dataCodecs), this.methodRegistry);
    }

    private UnauthorizedException toUnauthorizedException(Throwable th) {
        if (th instanceof ServiceException) {
            ServiceException e = (ServiceException)th;
            return new UnauthorizedException(e.errorCode(), e.getMessage());
        }
        return new UnauthorizedException(th);
    }

    private static class RSocketImpl
    implements RSocket {
        private final Object authData;
        private final ServiceMessageCodec messageCodec;
        private final ServiceMethodRegistry methodRegistry;

        private RSocketImpl(Object authData, ServiceMessageCodec messageCodec, ServiceMethodRegistry methodRegistry) {
            this.authData = authData;
            this.messageCodec = messageCodec;
            this.methodRegistry = methodRegistry;
        }

        public Mono<Payload> requestResponse(Payload payload) {
            return Mono.deferContextual(context -> Mono.just((Object)this.toMessage(payload))).doOnNext(this::validateRequest).flatMap(message -> {
                ServiceMethodInvoker methodInvoker = this.methodRegistry.getInvoker(message.qualifier());
                this.validateMethodInvoker(methodInvoker, (ServiceMessage)message);
                return methodInvoker.invokeOne(message).doOnNext(response -> this.releaseRequestOnError((ServiceMessage)message, (ServiceMessage)response));
            }).map(this::toPayload).doOnError(ex -> LOGGER.error("[requestResponse][error] cause: {}", (Object)ex.toString())).contextWrite(this::setupContext);
        }

        public Flux<Payload> requestStream(Payload payload) {
            return Mono.deferContextual(context -> Mono.just((Object)this.toMessage(payload))).doOnNext(this::validateRequest).flatMapMany(message -> {
                ServiceMethodInvoker methodInvoker = this.methodRegistry.getInvoker(message.qualifier());
                this.validateMethodInvoker(methodInvoker, (ServiceMessage)message);
                return methodInvoker.invokeMany(message).doOnNext(response -> this.releaseRequestOnError((ServiceMessage)message, (ServiceMessage)response));
            }).map(this::toPayload).doOnError(ex -> LOGGER.error("[requestStream][error] cause: {}", (Object)ex.toString())).contextWrite(this::setupContext);
        }

        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return Flux.deferContextual(context -> Flux.from((Publisher)payloads)).map(this::toMessage).switchOnFirst((first, messages) -> {
                if (first.hasValue()) {
                    ServiceMessage message = (ServiceMessage)first.get();
                    this.validateRequest(message);
                    ServiceMethodInvoker methodInvoker = this.methodRegistry.getInvoker(message.qualifier());
                    return methodInvoker.invokeBidirectional((Publisher)messages).doOnNext(response -> this.releaseRequestOnError(message, (ServiceMessage)response));
                }
                return messages;
            }).map(this::toPayload).doOnError(ex -> LOGGER.error("[requestChannel][error] cause: {}", (Object)ex.toString())).contextWrite(this::setupContext);
        }

        private Payload toPayload(ServiceMessage response) {
            return (Payload)this.messageCodec.encodeAndTransform(response, ByteBufPayload::create);
        }

        private ServiceMessage toMessage(Payload payload) {
            try {
                ServiceMessage serviceMessage = this.messageCodec.decode(payload.sliceData().retain(), payload.sliceMetadata().retain());
                return serviceMessage;
            }
            finally {
                payload.release();
            }
        }

        private Context setupContext(Context context) {
            return this.authData != null ? Context.of((Object)"auth.context", (Object)this.authData) : context;
        }

        private void validateRequest(ServiceMessage message) throws ServiceException {
            if (message.qualifier() == null) {
                this.releaseRequest(message);
                LOGGER.error("Qualifier is null, invocation failed for {}", (Object)message);
                throw new BadRequestException("Qualifier is null");
            }
        }

        private void validateMethodInvoker(ServiceMethodInvoker methodInvoker, ServiceMessage message) {
            if (methodInvoker == null) {
                this.releaseRequest(message);
                LOGGER.error("No service invoker found, invocation failed for {}", (Object)message);
                throw new ServiceUnavailableException("No service invoker found");
            }
        }

        private void releaseRequest(ServiceMessage request) {
            ReferenceCountUtil.safestRelease((Object)request.data());
        }

        private void releaseRequestOnError(ServiceMessage request, ServiceMessage response) {
            if (response.isError()) {
                this.releaseRequest(request);
            }
        }
    }
}

