/*
 * Decompiled with CFR 0.152.
 */
package org.xtreemfs.foundation.pbrpc.server;

import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.xtreemfs.foundation.LifeCycleThread;
import org.xtreemfs.foundation.SSLOptions;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.pbrpc.channels.ChannelIO;
import org.xtreemfs.foundation.pbrpc.channels.SSLChannelIO;
import org.xtreemfs.foundation.pbrpc.channels.SSLHandshakeOnlyChannelIO;
import org.xtreemfs.foundation.pbrpc.server.RPCNIOSocketServerConnection;
import org.xtreemfs.foundation.pbrpc.server.RPCServerInterface;
import org.xtreemfs.foundation.pbrpc.server.RPCServerRequest;
import org.xtreemfs.foundation.pbrpc.server.RPCServerRequestListener;
import org.xtreemfs.foundation.pbrpc.server.RPCServerResponse;
import org.xtreemfs.foundation.util.OutputUtils;

public class RPCNIOSocketServer
extends LifeCycleThread
implements RPCServerInterface {
    public static final int MAX_FRAGMENTS = 1;
    public static final int MAX_FRAGMENT_SIZE = 0x2000000;
    private final ServerSocketChannel socket = ServerSocketChannel.open();
    private final Selector selector;
    private volatile boolean quit;
    private volatile RPCServerRequestListener receiver;
    private final SSLOptions sslOptions;
    private final AtomicInteger numConnections;
    private long pendingRequests;
    private final int bindPort;
    private final List<RPCNIOSocketServerConnection> connections;
    private final int maxClientQLength;
    private final int clientQThreshold;
    public static final int DEFAULT_MAX_CLIENT_Q_LENGTH = 100;

    public RPCNIOSocketServer(int bindPort, InetAddress bindAddr, RPCServerRequestListener rl, SSLOptions sslOptions) throws IOException {
        this(bindPort, bindAddr, rl, sslOptions, -1);
    }

    public RPCNIOSocketServer(int bindPort, InetAddress bindAddr, RPCServerRequestListener rl, SSLOptions sslOptions, int receiveBufferSize) throws IOException {
        this(bindPort, bindAddr, rl, sslOptions, receiveBufferSize, 100);
    }

    public RPCNIOSocketServer(int bindPort, InetAddress bindAddr, RPCServerRequestListener rl, SSLOptions sslOptions, int receiveBufferSize, int maxClientQLength) throws IOException {
        super("PBRPCSrv@" + bindPort);
        this.socket.configureBlocking(false);
        if (receiveBufferSize != -1) {
            this.socket.socket().setReceiveBufferSize(receiveBufferSize);
            try {
                if (this.socket.socket().getReceiveBufferSize() != receiveBufferSize) {
                    Logging.logMessage(4, Logging.Category.net, this, "could not set socket receive buffer size to " + receiveBufferSize + ", using default size of " + this.socket.socket().getReceiveBufferSize(), new Object[0]);
                }
            }
            catch (SocketException exc) {
                Logging.logMessage(4, this, "could not check whether receive buffer size was successfully set to %d bytes", receiveBufferSize);
            }
        } else {
            this.socket.socket().setReceiveBufferSize(262144);
        }
        this.socket.socket().setReuseAddress(true);
        try {
            this.socket.socket().bind(bindAddr == null ? new InetSocketAddress(bindPort) : new InetSocketAddress(bindAddr, bindPort));
        }
        catch (BindException e) {
            throw new BindException(e.getMessage() + ". Port number: " + bindPort);
        }
        this.bindPort = bindPort;
        this.selector = Selector.open();
        this.socket.register(this.selector, 16);
        this.receiver = rl;
        this.sslOptions = sslOptions;
        this.numConnections = new AtomicInteger(0);
        this.connections = new LinkedList<RPCNIOSocketServerConnection>();
        this.maxClientQLength = maxClientQLength;
        int n = this.clientQThreshold = maxClientQLength / 2 >= 0 ? maxClientQLength / 2 : 0;
        if (maxClientQLength <= 1) {
            Logging.logMessage(4, this, "max client queue length is 1, pipelining is disabled.", new Object[0]);
        }
    }

    @Override
    public void shutdown() {
        this.quit = true;
        this.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendResponse(RPCServerRequest request, RPCServerResponse response) {
        RPCNIOSocketServerConnection connection;
        block13: {
            assert (response != null);
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, "response sent", new Object[0]);
            }
            connection = (RPCNIOSocketServerConnection)request.getConnection();
            try {
                request.freeBuffers();
            }
            catch (AssertionError ex) {
                if (!Logging.isInfo()) break block13;
                Logging.logMessage(6, Logging.Category.net, this, "Caught an AssertionError while trying to free buffers:", new Object[0]);
                Logging.logError(6, this, (Throwable)((Object)ex));
            }
        }
        assert (connection.getServer() == this);
        if (!connection.isConnectionClosed()) {
            RPCNIOSocketServerConnection rPCNIOSocketServerConnection = connection;
            synchronized (rPCNIOSocketServerConnection) {
                boolean isEmpty = connection.getPendingResponses().isEmpty();
                connection.addPendingResponse(response);
                if (isEmpty) {
                    SelectionKey key = connection.getChannel().keyFor(this.selector);
                    if (key != null) {
                        try {
                            key.interestOps(key.interestOps() | 4);
                        }
                        catch (CancelledKeyException e) {
                            // empty catch block
                        }
                    }
                    this.selector.wakeup();
                }
            }
        }
        response.freeBuffers();
    }

    @Override
    public void run() {
        this.notifyStarted();
        if (Logging.isInfo()) {
            String sslMode = "";
            if (this.sslOptions != null) {
                sslMode = this.sslOptions.isFakeSSLMode() ? "GRID SSL mode enabled (SSL handshake only)" : "SSL enabled (" + this.sslOptions.getSSLProtocol() + ")";
            }
            Logging.logMessage(6, Logging.Category.net, this, "PBRPC Srv %d ready %s", this.bindPort, sslMode);
        }
        try {
            while (!this.quit) {
                int numKeys = 0;
                try {
                    numKeys = this.selector.select();
                }
                catch (CancelledKeyException ex) {
                }
                catch (IOException ex) {
                    Logging.logMessage(4, Logging.Category.net, this, "Exception while selecting: %s", ex.toString());
                    continue;
                }
                if (numKeys <= 0) continue;
                Set<SelectionKey> keys = this.selector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    try {
                        if (key.isAcceptable()) {
                            this.acceptConnection(key);
                        }
                        if (key.isReadable()) {
                            this.readConnection(key);
                        }
                        if (!key.isWritable()) continue;
                        this.writeConnection(key);
                    }
                    catch (CancelledKeyException ex) {}
                }
            }
            for (RPCNIOSocketServerConnection con : this.connections) {
                try {
                    con.getChannel().close();
                }
                catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            this.selector.close();
            this.socket.close();
            if (Logging.isInfo()) {
                Logging.logMessage(6, Logging.Category.net, this, "PBRPC Server %d shutdown complete", this.bindPort);
            }
            this.notifyStopped();
        }
        catch (Throwable thr) {
            Logging.logMessage(3, Logging.Category.net, this, "PBRPC Server %d CRASHED!", this.bindPort);
            this.notifyCrashed(thr);
        }
    }

    private void readConnection(SelectionKey key) {
        block32: {
            RPCNIOSocketServerConnection con = (RPCNIOSocketServerConnection)key.attachment();
            ChannelIO channel = con.getChannel();
            try {
                if (channel.isShutdownInProgress() || !channel.doHandshake(key)) break block32;
                block17: while (true) {
                    if (con.getOpenRequests().get() > this.maxClientQLength) {
                        key.interestOps(key.interestOps() & 0xFFFFFFFE);
                        Logging.logMessage(4, Logging.Category.net, this, "client sent too many requests... not accepting new requests from %s, q=%d", con.getChannel().socket().getRemoteSocketAddress().toString(), con.getOpenRequests().get());
                        return;
                    }
                    ByteBuffer buf = null;
                    switch (con.getReceiveState()) {
                        case RECORD_MARKER: {
                            buf = con.getReceiveRecordMarker();
                            break;
                        }
                        case RPC_MESSAGE: {
                            buf = con.getReceiveBuffers()[1].getBuffer();
                            break;
                        }
                        case RPC_HEADER: {
                            buf = con.getReceiveBuffers()[0].getBuffer();
                            break;
                        }
                        case DATA: {
                            buf = con.getReceiveBuffers()[2].getBuffer();
                        }
                    }
                    int numBytesRead = RPCNIOSocketServer.readData(key, channel, buf);
                    if (numBytesRead == -1) {
                        if (Logging.isInfo()) {
                            Logging.logMessage(7, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                        }
                        this.closeConnection(key);
                        return;
                    }
                    if (buf.hasRemaining()) break block32;
                    switch (con.getReceiveState()) {
                        case RECORD_MARKER: {
                            buf.position(0);
                            int hdrLen = buf.getInt();
                            int msgLen = buf.getInt();
                            int dataLen = buf.getInt();
                            if (hdrLen <= 0 || hdrLen >= 0x2000000 || msgLen < 0 || msgLen >= 0x2000000 || dataLen < 0 || dataLen >= 0x2000000) {
                                Logging.logMessage(3, Logging.Category.net, this, "invalid record marker size (%d/%d/%d) received, closing connection to client %s", hdrLen, msgLen, dataLen, channel.socket().getRemoteSocketAddress().toString());
                                this.closeConnection(key);
                                return;
                            }
                            ReusableBuffer[] buffers = new ReusableBuffer[]{BufferPool.allocate(hdrLen), msgLen > 0 ? BufferPool.allocate(msgLen) : null, dataLen > 0 ? BufferPool.allocate(dataLen) : null};
                            con.setReceiveBuffers(buffers);
                            con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RPC_HEADER);
                            continue block17;
                        }
                        case RPC_HEADER: {
                            if (con.getReceiveBuffers()[1] != null) {
                                con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RPC_MESSAGE);
                                continue block17;
                            }
                            if (con.getReceiveBuffers()[2] == null) break;
                            con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.DATA);
                            continue block17;
                        }
                        case RPC_MESSAGE: {
                            if (con.getReceiveBuffers()[2] == null) break;
                            con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.DATA);
                            continue block17;
                        }
                    }
                    con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RECORD_MARKER);
                    con.getReceiveRecordMarker().clear();
                    ReusableBuffer[] receiveBuffers = con.getReceiveBuffers();
                    receiveBuffers[0].flip();
                    if (receiveBuffers[1] != null) {
                        receiveBuffers[1].flip();
                    }
                    if (receiveBuffers[2] != null) {
                        receiveBuffers[2].flip();
                    }
                    con.setReceiveBuffers(null);
                    RPCServerRequest rq = null;
                    try {
                        rq = new RPCServerRequest(con, receiveBuffers[0], receiveBuffers[1], receiveBuffers[2]);
                    }
                    catch (IOException ex) {
                        Logging.logMessage(3, Logging.Category.net, this, "invalid PBRPC header received: " + ex, new Object[0]);
                        if (Logging.isDebug()) {
                            Logging.logError(7, this, ex);
                        }
                        this.closeConnection(key);
                        BufferPool.free(receiveBuffers[1]);
                        BufferPool.free(receiveBuffers[2]);
                        return;
                    }
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.net, this, rq.toString(), new Object[0]);
                    }
                    con.getOpenRequests().incrementAndGet();
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.net, this, "request received", new Object[0]);
                    }
                    ++this.pendingRequests;
                    if (!this.receiveRequest(key, rq, con)) break;
                }
                this.closeConnection(key);
                return;
            }
            catch (CancelledKeyException ex) {
                if (Logging.isInfo()) {
                    Logging.logMessage(6, Logging.Category.net, this, "client closed connection (CancelledKeyException): %s", channel.socket().getRemoteSocketAddress().toString());
                }
                this.closeConnection(key);
            }
            catch (ClosedByInterruptException ex) {
                if (Logging.isInfo()) {
                    Logging.logMessage(6, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                }
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "connection to %s closed by remote peer", con.getChannel().socket().getRemoteSocketAddress().toString());
                }
                this.closeConnection(key);
            }
            catch (IOException ex) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(ex), new Object[0]);
                }
                this.closeConnection(key);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeConnection(SelectionKey key) {
        block19: {
            RPCNIOSocketServerConnection con = (RPCNIOSocketServerConnection)key.attachment();
            ChannelIO channel = con.getChannel();
            try {
                if (channel.isShutdownInProgress() || !channel.doHandshake(key)) break block19;
                while (true) {
                    ByteBuffer[] response;
                    if ((response = con.getSendBuffers()) == null) {
                        RPCNIOSocketServerConnection rPCNIOSocketServerConnection = con;
                        synchronized (rPCNIOSocketServerConnection) {
                            RPCServerResponse rq = con.getPendingResponses().peek();
                            if (rq == null) {
                                con.setSendBuffers(null);
                                key.interestOps(key.interestOps() & 0xFFFFFFFB);
                                break;
                            }
                            response = rq.packBuffers(con.getSendFragHdr());
                            con.setSendBuffers(response);
                            con.setExpectedRecordSize(rq.getRpcMessageSize());
                        }
                    }
                    assert (response != null);
                    long numBytesWritten = channel.write(response);
                    if (numBytesWritten == -1L) {
                        if (Logging.isInfo()) {
                            Logging.logMessage(6, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                        }
                        this.closeConnection(key);
                        return;
                    }
                    con.recordBytesSent(numBytesWritten);
                    if (response[response.length - 1].hasRemaining()) {
                        key.interestOps(key.interestOps() | 4);
                        break;
                    }
                    con.checkEnoughBytesSent();
                    --this.pendingRequests;
                    RPCServerResponse rq = con.getPendingResponses().poll();
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.net, this, "sent response for %s", rq.toString());
                    }
                    rq.freeBuffers();
                    con.setSendBuffers(null);
                    con.getSendFragHdr().clear();
                    int numRq = con.getOpenRequests().decrementAndGet();
                    if ((key.interestOps() & 1) != 0 || numRq >= this.clientQThreshold) continue;
                    key.interestOps(key.interestOps() | 1);
                    Logging.logMessage(4, Logging.Category.net, this, "client allowed to send data again: %s, q=%d", con.getChannel().socket().getRemoteSocketAddress().toString(), numRq);
                }
            }
            catch (CancelledKeyException ex) {
                if (Logging.isInfo()) {
                    Logging.logMessage(6, Logging.Category.net, this, "client closed connection (CancelledKeyException): %s", channel.socket().getRemoteSocketAddress().toString());
                }
                this.closeConnection(key);
            }
            catch (ClosedByInterruptException ex) {
                if (Logging.isInfo()) {
                    Logging.logMessage(6, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                }
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "connection to %s closed by remote peer", con.getChannel().socket().getRemoteSocketAddress().toString());
                }
                this.closeConnection(key);
            }
            catch (IOException ex) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(ex), new Object[0]);
                }
                this.closeConnection(key);
            }
        }
    }

    public static int readData(SelectionKey key, ChannelIO channel, ByteBuffer buf) throws IOException {
        return channel.read(buf);
    }

    public static int writeData(SelectionKey key, ChannelIO channel, ByteBuffer buf) throws IOException {
        return channel.write(buf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(SelectionKey key) {
        RPCNIOSocketServerConnection con = (RPCNIOSocketServerConnection)key.attachment();
        ChannelIO channel = con.getChannel();
        try {
            this.connections.remove(con);
            con.setConnectionClosed(true);
            key.cancel();
            channel.close();
        }
        catch (Exception ex) {
        }
        finally {
            this.numConnections.decrementAndGet();
            con.freeBuffers();
        }
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.net, this, "closing connection to %s", channel.socket().getRemoteSocketAddress().toString());
        }
    }

    private void acceptConnection(SelectionKey key) {
        block12: {
            SocketChannel client = null;
            RPCNIOSocketServerConnection con = null;
            ChannelIO channelIO = null;
            try {
                client = this.socket.accept();
                channelIO = this.sslOptions == null ? new ChannelIO(client) : (this.sslOptions.isFakeSSLMode() ? new SSLHandshakeOnlyChannelIO(client, this.sslOptions, false) : new SSLChannelIO(client, this.sslOptions, false));
                con = new RPCNIOSocketServerConnection(this, channelIO);
                client.configureBlocking(false);
                client.register(this.selector, 1, con);
                client.socket().setTcpNoDelay(true);
                this.numConnections.incrementAndGet();
                this.connections.add(con);
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "connect from client at %s", client.socket().getRemoteSocketAddress().toString());
                }
            }
            catch (ClosedChannelException ex) {
                if (Logging.isInfo()) {
                    Logging.logMessage(6, Logging.Category.net, this, "client closed connection during accept", new Object[0]);
                }
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "cannot establish connection: %s", ex.toString());
                }
                if (channelIO != null) {
                    try {
                        channelIO.close();
                    }
                    catch (IOException ex2) {}
                }
            }
            catch (IOException ex) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "cannot establish connection: %s", ex.toString());
                }
                if (channelIO == null) break block12;
                try {
                    channelIO.close();
                }
                catch (IOException ex2) {
                    // empty catch block
                }
            }
        }
    }

    private boolean receiveRequest(SelectionKey key, RPCServerRequest request, RPCNIOSocketServerConnection con) {
        try {
            request.getHeader();
            this.receiver.receiveRecord(request);
            return true;
        }
        catch (IllegalArgumentException ex) {
            Logging.logMessage(3, Logging.Category.net, this, "invalid PBRPC header received: " + ex, new Object[0]);
            if (Logging.isDebug()) {
                Logging.logError(7, this, ex);
            }
            return false;
        }
    }

    public int getNumConnections() {
        return this.numConnections.get();
    }

    public long getPendingRequests() {
        return this.pendingRequests;
    }

    public void updateRequestDispatcher(RPCServerRequestListener rl) {
        this.receiver = rl;
    }
}

