/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import io.rsocket.resume.RSocketSession;
import io.rsocket.resume.ResumableDuplexConnection;
import io.rsocket.resume.ResumableFramesStore;
import io.rsocket.resume.ResumeStateHolder;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;

public class ServerRSocketSession
implements RSocketSession,
ResumeStateHolder,
CoreSubscriber<Long> {
    private static final Logger logger = LoggerFactory.getLogger(ServerRSocketSession.class);
    final ResumableDuplexConnection resumableConnection;
    final Duration resumeSessionDuration;
    final ResumableFramesStore resumableFramesStore;
    final String session;
    final ByteBufAllocator allocator;
    final boolean cleanupStoreOnKeepAlive;
    final Queue<Runnable> connectionsQueue;
    volatile int wip;
    static final AtomicIntegerFieldUpdater<ServerRSocketSession> WIP = AtomicIntegerFieldUpdater.newUpdater(ServerRSocketSession.class, "wip");
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<ServerRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ServerRSocketSession.class, Subscription.class, "s");
    KeepAliveSupport keepAliveSupport;

    public ServerRSocketSession(ByteBuf session, ResumableDuplexConnection resumableDuplexConnection, DuplexConnection initialDuplexConnection, ResumableFramesStore resumableFramesStore, Duration resumeSessionDuration, boolean cleanupStoreOnKeepAlive) {
        this.session = session.toString(CharsetUtil.UTF_8);
        this.allocator = initialDuplexConnection.alloc();
        this.resumeSessionDuration = resumeSessionDuration;
        this.resumableFramesStore = resumableFramesStore;
        this.cleanupStoreOnKeepAlive = cleanupStoreOnKeepAlive;
        this.resumableConnection = resumableDuplexConnection;
        this.connectionsQueue = (Queue)Queues.unboundedMultiproducer().get();
        WIP.lazySet(this, 1);
        resumableDuplexConnection.onClose().doFinally(__ -> this.dispose()).subscribe();
        resumableDuplexConnection.onActiveConnectionClosed().subscribe(__ -> this.tryTimeoutSession());
    }

    void tryTimeoutSession() {
        this.keepAliveSupport.stop();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Connection is lost. Trying to timeout the active session", (Object)this.session);
        }
        Mono.delay((Duration)this.resumeSessionDuration).subscribe((CoreSubscriber)this);
        if (WIP.decrementAndGet(this) == 0) {
            return;
        }
        Runnable doResumeRunnable = this.connectionsQueue.poll();
        if (doResumeRunnable != null) {
            doResumeRunnable.run();
        }
    }

    public void resumeWith(ByteBuf resumeFrame, DuplexConnection nextDuplexConnection) {
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. New DuplexConnection received.", (Object)this.session);
        }
        long remotePos = ResumeFrameCodec.firstAvailableClientPos(resumeFrame);
        long remoteImpliedPos = ResumeFrameCodec.lastReceivedServerPos(resumeFrame);
        this.connectionsQueue.offer(() -> this.doResume(remotePos, remoteImpliedPos, nextDuplexConnection));
        if (WIP.getAndIncrement(this) != 0) {
            return;
        }
        Runnable doResumeRunnable = this.connectionsQueue.poll();
        if (doResumeRunnable != null) {
            doResumeRunnable.run();
        }
    }

    void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplexConnection) {
        if (!this.tryCancelSessionTimeout()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[server]|Session[{}]. Session has already been expired. Terminating received connection", (Object)this.session);
            }
            RejectedResumeException rejectedResumeException = new RejectedResumeException("resume_internal_error: Session Expired");
            nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
            nextDuplexConnection.receive().subscribe();
            return;
        }
        long impliedPosition = this.resumableFramesStore.frameImpliedPosition();
        long position = this.resumableFramesStore.framePosition();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[server]|Session[{}]. Resume FRAME received. ServerResumeState[impliedPosition[{}], position[{}]]. ClientResumeState[remoteImpliedPosition[{}], remotePosition[{}]]", new Object[]{this.session, impliedPosition, position, remoteImpliedPos, remotePos});
        }
        if (remotePos <= impliedPosition && position <= remoteImpliedPos) {
            try {
                if (position != remoteImpliedPos) {
                    this.resumableFramesStore.releaseFrames(remoteImpliedPos);
                }
                nextDuplexConnection.sendFrame(0, ResumeOkFrameCodec.encode(this.allocator, impliedPosition));
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[server]|Session[{}]. ResumeOKFrame[impliedPosition[{}]] has been sent", (Object)this.session, (Object)impliedPosition);
                }
            }
            catch (Throwable t) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[server]|Session[{}]. Exception occurred while releasing frames in the frameStore", (Object)this.session, (Object)t);
                }
                this.dispose();
                RejectedResumeException rejectedResumeException = new RejectedResumeException(t.getMessage(), t);
                nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
                nextDuplexConnection.receive().subscribe();
                return;
            }
            this.keepAliveSupport.start();
            if (logger.isDebugEnabled()) {
                logger.debug("Side[server]|Session[{}]. Session has been resumed successfully", (Object)this.session);
            }
            if (!this.resumableConnection.connect(nextDuplexConnection)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[server]|Session[{}]. Session has already been expired. Terminating received connection", (Object)this.session);
                }
                RejectedResumeException rejectedResumeException = new RejectedResumeException("resume_internal_error: Session Expired");
                nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
                nextDuplexConnection.receive().subscribe();
            }
        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[server]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", new Object[]{this.session, remoteImpliedPos, position, remotePos, impliedPosition});
            }
            this.dispose();
            RejectedResumeException rejectedResumeException = new RejectedResumeException(String.format("resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", remotePos, remoteImpliedPos, position, impliedPosition));
            nextDuplexConnection.sendErrorAndClose(rejectedResumeException);
            nextDuplexConnection.receive().subscribe();
        }
    }

    boolean tryCancelSessionTimeout() {
        Subscription subscription;
        do {
            if ((subscription = this.s) != Operators.cancelledSubscription()) continue;
            return false;
        } while (!S.compareAndSet(this, subscription, null));
        subscription.cancel();
        return true;
    }

    @Override
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override
    public void onImpliedPosition(long remoteImpliedPos) {
        if (this.cleanupStoreOnKeepAlive) {
            try {
                this.resumableFramesStore.releaseFrames(remoteImpliedPos);
            }
            catch (Throwable e) {
                this.resumableConnection.sendErrorAndClose(new ConnectionErrorException(e.getMessage(), e));
            }
        }
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onNext(Long aLong) {
        if (!Operators.terminate(S, (Object)this)) {
            return;
        }
        this.resumableConnection.dispose();
    }

    public void onComplete() {
    }

    public void onError(Throwable t) {
    }

    @Override
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }

    public void dispose() {
        Operators.terminate(S, (Object)this);
        this.resumableConnection.dispose();
    }

    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }
}

