/*
 * Decompiled with CFR 0.152.
 */
package com.firefly.net.tcp.aio;

import com.firefly.net.BufferSizePredictor;
import com.firefly.net.ByteBufferArrayOutputEntry;
import com.firefly.net.ByteBufferOutputEntry;
import com.firefly.net.Config;
import com.firefly.net.EventManager;
import com.firefly.net.OutputEntry;
import com.firefly.net.Session;
import com.firefly.net.buffer.AdaptiveBufferSizePredictor;
import com.firefly.net.buffer.FileRegion;
import com.firefly.utils.concurrent.Callback;
import com.firefly.utils.concurrent.CountingCallback;
import com.firefly.utils.io.BufferReaderHandler;
import com.firefly.utils.io.BufferUtils;
import com.firefly.utils.log.Log;
import com.firefly.utils.log.LogFactory;
import com.firefly.utils.time.Millisecond100Clock;
import com.firefly.utils.time.SafeSimpleDateFormat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.InterruptedByTimeoutException;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AsynchronousTcpSession
implements Session {
    private static Log log = LogFactory.getInstance().getLog("firefly-system");
    private final int sessionId;
    private final long openTime;
    private long closeTime;
    private long lastReadTime;
    private long lastWrittenTime;
    private long readBytes = 0L;
    private long writtenBytes = 0L;
    private volatile int state;
    private final AsynchronousSocketChannel socketChannel;
    private volatile InetSocketAddress localAddress;
    private volatile InetSocketAddress remoteAddress;
    private final Config config;
    private final EventManager eventManager;
    private volatile Object attachment;
    private final Lock outputLock = new ReentrantLock();
    private boolean isWriting = false;
    private final Queue<OutputEntry<?>> outputBuffer = new LinkedList();
    private final BufferSizePredictor bufferSizePredictor = new AdaptiveBufferSizePredictor();

    public AsynchronousTcpSession(int sessionId, Config config, EventManager eventManager, AsynchronousSocketChannel socketChannel) {
        this.sessionId = sessionId;
        this.openTime = Millisecond100Clock.currentTimeMillis();
        this.config = config;
        this.eventManager = eventManager;
        this.socketChannel = socketChannel;
        this.state = 1;
    }

    void _read() {
        if (!this.isOpen()) {
            return;
        }
        int bufferSize = BufferUtils.normalizeBufferSize((int)this.bufferSizePredictor.nextBufferSize());
        final ByteBuffer buf = ByteBuffer.allocate(bufferSize);
        if (log.isDebugEnabled()) {
            log.debug("the session {} buffer size is {}", new Object[]{this.getSessionId(), bufferSize});
        }
        this.socketChannel.read(buf, this.config.getTimeout(), TimeUnit.MILLISECONDS, this, new CompletionHandler<Integer, AsynchronousTcpSession>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void completed(Integer currentReadBytes, AsynchronousTcpSession session) {
                session.lastReadTime = Millisecond100Clock.currentTimeMillis();
                if (currentReadBytes < 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("the session {} input is closed, {}", new Object[]{session.getSessionId(), currentReadBytes});
                    }
                    session.closeNow();
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("the session {} read {} bytes", new Object[]{session.getSessionId(), currentReadBytes});
                }
                session.bufferSizePredictor.previousReceivedBufferSize(currentReadBytes);
                AsynchronousTcpSession asynchronousTcpSession = session;
                asynchronousTcpSession.readBytes = asynchronousTcpSession.readBytes + (long)currentReadBytes.intValue();
                buf.flip();
                try {
                    AsynchronousTcpSession.this.config.getDecoder().decode(buf, session);
                }
                catch (Throwable t) {
                    AsynchronousTcpSession.this.eventManager.executeExceptionTask(session, t);
                }
                finally {
                    AsynchronousTcpSession.this._read();
                }
            }

            @Override
            public void failed(Throwable t, AsynchronousTcpSession session) {
                if (t instanceof InterruptedByTimeoutException) {
                    if (log.isDebugEnabled()) {
                        log.debug("the session {} reading data is timeout.", new Object[]{AsynchronousTcpSession.this.getSessionId()});
                    }
                } else {
                    log.warn("the session {} read data is failed", t, new Object[]{session.getSessionId()});
                }
                session.closeNow();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writingFailedCallback(Callback callback, Throwable t) {
        if (t instanceof InterruptedByTimeoutException) {
            if (log.isDebugEnabled()) {
                log.debug("the session {} writing data is timeout.", new Object[]{this.getSessionId()});
            }
        } else {
            log.warn("the session {} writes data is failed", t, new Object[]{this.getSessionId()});
        }
        this.outputLock.lock();
        try {
            int bufferSize = this.outputBuffer.size();
            log.warn("the session {} has {} buffer data can not ouput", new Object[]{this.getSessionId(), bufferSize});
            this.outputBuffer.clear();
            this.isWriting = false;
            this.shutdownSocketChannel();
        }
        finally {
            this.outputLock.unlock();
        }
        callback.failed(t);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writingCompletedCallback(Callback callback, long currentWritenBytes) {
        this.lastWrittenTime = Millisecond100Clock.currentTimeMillis();
        if (currentWritenBytes < 0L) {
            if (log.isDebugEnabled()) {
                log.debug("the session {} output is closed, {}", new Object[]{this.getSessionId(), currentWritenBytes});
            }
            this.shutdownSocketChannel();
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("the session {} writes {} bytes", new Object[]{this.getSessionId(), currentWritenBytes});
        }
        this.writtenBytes += currentWritenBytes;
        callback.succeeded();
        this.outputLock.lock();
        try {
            OutputEntry<?> obj = this.outputBuffer.poll();
            if (obj != null) {
                this._write(obj);
            } else {
                this.isWriting = false;
            }
        }
        finally {
            this.outputLock.unlock();
        }
    }

    void _write(final OutputEntry<?> entry) {
        if (!this.isOpen()) {
            return;
        }
        switch (entry.getOutputEntryType()) {
            case BYTE_BUFFER: {
                ByteBufferOutputEntry byteBufferOutputEntry = (ByteBufferOutputEntry)entry;
                if (log.isDebugEnabled()) {
                    log.debug("the session {} will write buffer {}", new Object[]{this.getSessionId(), ((ByteBuffer)byteBufferOutputEntry.getData()).remaining()});
                }
                this.socketChannel.write((ByteBuffer)byteBufferOutputEntry.getData(), this.config.getTimeout(), TimeUnit.MILLISECONDS, this, new CompletionHandler<Integer, AsynchronousTcpSession>(){

                    @Override
                    public void completed(Integer currentWritenBytes, AsynchronousTcpSession session) {
                        AsynchronousTcpSession.this.writingCompletedCallback(entry.getCallback(), currentWritenBytes.intValue());
                    }

                    @Override
                    public void failed(Throwable t, AsynchronousTcpSession session) {
                        AsynchronousTcpSession.this.writingFailedCallback(entry.getCallback(), t);
                    }
                });
                break;
            }
            case BYTE_BUFFER_ARRAY: {
                ByteBufferArrayOutputEntry byteBuffersEntry = (ByteBufferArrayOutputEntry)entry;
                this.socketChannel.write((ByteBuffer[])byteBuffersEntry.getData(), 0, ((ByteBuffer[])byteBuffersEntry.getData()).length, this.config.getTimeout(), TimeUnit.MILLISECONDS, this, new CompletionHandler<Long, AsynchronousTcpSession>(){

                    @Override
                    public void completed(Long currentWritenBytes, AsynchronousTcpSession session) {
                        AsynchronousTcpSession.this.writingCompletedCallback(entry.getCallback(), currentWritenBytes);
                    }

                    @Override
                    public void failed(Throwable t, AsynchronousTcpSession session) {
                        AsynchronousTcpSession.this.writingFailedCallback(entry.getCallback(), t);
                    }
                });
                break;
            }
            case DISCONNECTION: {
                log.debug("the session {} will close", new Object[]{this.getSessionId()});
                this.shutdownSocketChannel();
            }
        }
    }

    @Override
    public void attachObject(Object attachment) {
        this.attachment = attachment;
    }

    @Override
    public Object getAttachment() {
        return this.attachment;
    }

    @Override
    public void fireReceiveMessage(Object message) {
        this.eventManager.executeReceiveTask(this, message);
    }

    @Override
    public void encode(Object message) {
        try {
            this.config.getEncoder().encode(message, this);
        }
        catch (Throwable t) {
            this.eventManager.executeExceptionTask(this, t);
        }
    }

    @Override
    public void write(OutputEntry<?> entry) {
        if (!this.isOpen()) {
            return;
        }
        if (entry == null) {
            return;
        }
        this.outputLock.lock();
        try {
            if (!this.isWriting) {
                this.isWriting = true;
                this._write(entry);
            } else {
                this.outputBuffer.offer(entry);
            }
        }
        finally {
            this.outputLock.unlock();
        }
    }

    @Override
    public void write(ByteBuffer byteBuffer, Callback callback) {
        this.write(new ByteBufferOutputEntry(callback, byteBuffer));
    }

    @Override
    public void write(ByteBuffer[] buffers, Callback callback) {
        this.write(new ByteBufferArrayOutputEntry(callback, buffers));
    }

    @Override
    public void write(Collection<ByteBuffer> buffers, Callback callback) {
        this.write(new ByteBufferArrayOutputEntry(callback, buffers.toArray(BufferUtils.EMPTY_BYTE_BUFFER_ARRAY)));
    }

    @Override
    public void write(FileRegion file, Callback callback) {
        try (FileRegion fileRegion = file;){
            fileRegion.transferTo(callback, new FileBufferReaderHandler(fileRegion.getLength()));
        }
        catch (Throwable t) {
            log.error("transfer file error", t, new Object[0]);
        }
    }

    @Override
    public void close() {
        this.write(DISCONNECTION_FLAG);
    }

    @Override
    public void closeNow() {
        if (!this.isOpen()) {
            return;
        }
        this.closeTime = Millisecond100Clock.currentTimeMillis();
        try {
            this.socketChannel.close();
        }
        catch (AsynchronousCloseException e) {
            if (log.isDebugEnabled()) {
                log.debug("the session {} asynchronously closed", new Object[]{this.sessionId});
            }
        }
        catch (IOException e) {
            log.error("the session {} close error", (Throwable)e, new Object[]{this.sessionId});
        }
        this.state = 0;
        this.eventManager.executeCloseTask(this);
    }

    @Override
    public void shutdownOutput() {
        try {
            this.socketChannel.shutdownOutput();
        }
        catch (ClosedChannelException e) {
            log.debug("the session {} is closed", (Throwable)e, new Object[]{this.sessionId});
        }
        catch (IOException e) {
            log.error("the session {} shutdown output error", (Throwable)e, new Object[]{this.sessionId});
        }
    }

    @Override
    public void shutdownInput() {
        try {
            this.socketChannel.shutdownInput();
        }
        catch (ClosedChannelException e) {
            log.debug("the session {} is closed", (Throwable)e, new Object[]{this.sessionId});
        }
        catch (IOException e) {
            log.error("the session {} shutdown input error", (Throwable)e, new Object[]{this.sessionId});
        }
    }

    private void shutdownSocketChannel() {
        this.shutdownOutput();
        this.shutdownInput();
    }

    @Override
    public int getSessionId() {
        return this.sessionId;
    }

    @Override
    public long getOpenTime() {
        return this.openTime;
    }

    @Override
    public long getCloseTime() {
        return this.closeTime;
    }

    @Override
    public long getDuration() {
        if (this.closeTime > 0L) {
            return this.closeTime - this.openTime;
        }
        return Millisecond100Clock.currentTimeMillis() - this.openTime;
    }

    @Override
    public long getLastReadTime() {
        return this.lastReadTime;
    }

    @Override
    public long getLastWrittenTime() {
        return this.lastWrittenTime;
    }

    @Override
    public long getLastActiveTime() {
        return Math.max(this.lastReadTime, this.lastWrittenTime);
    }

    @Override
    public long getReadBytes() {
        return this.readBytes;
    }

    @Override
    public long getWrittenBytes() {
        return this.writtenBytes;
    }

    @Override
    public int getState() {
        return this.state;
    }

    @Override
    public boolean isOpen() {
        return this.state > 0;
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        if (this.localAddress != null) {
            return this.localAddress;
        }
        try {
            this.localAddress = (InetSocketAddress)this.socketChannel.getLocalAddress();
            return this.localAddress;
        }
        catch (IOException e) {
            log.error("the session {} gets local address error", (Throwable)e, new Object[]{this.sessionId});
            return null;
        }
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        if (this.remoteAddress != null) {
            return this.remoteAddress;
        }
        try {
            this.remoteAddress = (InetSocketAddress)this.socketChannel.getRemoteAddress();
            return this.remoteAddress;
        }
        catch (Throwable t) {
            log.error("the session {} gets remote address error", t, new Object[]{this.sessionId});
            return null;
        }
    }

    public String toString() {
        return "[sessionId=" + this.sessionId + ", openTime=" + SafeSimpleDateFormat.defaultDateFormat.format(new Date(this.openTime)) + ", closeTime=" + SafeSimpleDateFormat.defaultDateFormat.format(new Date(this.closeTime)) + ", duration=" + this.getDuration() + ", readBytes=" + this.readBytes + ", writtenBytes=" + this.writtenBytes + "]";
    }

    @Override
    public long getIdleTimeout() {
        return this.config.getTimeout();
    }

    private class FileBufferReaderHandler
    implements BufferReaderHandler {
        private final long len;

        public FileBufferReaderHandler(long len) {
            this.len = len;
        }

        public void readBuffer(ByteBuffer buf, CountingCallback countingCallback, long count) {
            log.debug("write file,  count: {} , lenth: {}", new Object[]{count, this.len});
            AsynchronousTcpSession.this.write(buf, (Callback)countingCallback);
        }
    }
}

