/*
 * Decompiled with CFR 0.152.
 */
package org.xtreemfs.foundation.pbrpc.client;

import com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.xtreemfs.foundation.LifeCycleThread;
import org.xtreemfs.foundation.SSLOptions;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.pbrpc.channels.ChannelIO;
import org.xtreemfs.foundation.pbrpc.channels.SSLChannelIO;
import org.xtreemfs.foundation.pbrpc.channels.SSLHandshakeOnlyChannelIO;
import org.xtreemfs.foundation.pbrpc.client.RPCClientConnection;
import org.xtreemfs.foundation.pbrpc.client.RPCClientRequest;
import org.xtreemfs.foundation.pbrpc.client.RPCResponse;
import org.xtreemfs.foundation.pbrpc.generatedinterfaces.RPC;
import org.xtreemfs.foundation.pbrpc.server.RPCNIOSocketServer;
import org.xtreemfs.foundation.pbrpc.server.RPCNIOSocketServerConnection;
import org.xtreemfs.foundation.pbrpc.utils.ReusableBufferInputStream;
import org.xtreemfs.foundation.util.OutputUtils;

public class RPCNIOSocketClient
extends LifeCycleThread {
    public static boolean ENABLE_STATISTICS = false;
    public static final int MAX_RECONNECT = 4;
    public static final int TIMEOUT_GRANULARITY = 250;
    private final Map<InetSocketAddress, RPCClientConnection> connections;
    private final int requestTimeout;
    private final int connectionTimeout;
    private long lastCheck;
    private final Selector selector;
    private volatile boolean quit;
    private final SSLOptions sslOptions;
    private final AtomicInteger transactionId;
    private final ConcurrentLinkedQueue<RPCClientConnection> toBeEstablished;
    private final int sendBufferSize;
    private final int receiveBufferSize;
    private final SocketAddress localBindPoint;
    private boolean brokenSelect;

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout) throws IOException {
        this(sslOptions, requestTimeout, connectionTimeout, -1, -1, null, "", false);
    }

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout, String threadName) throws IOException {
        this(sslOptions, requestTimeout, connectionTimeout, -1, -1, null, threadName, false);
    }

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout, int sendBufferSize, int receiveBufferSize, SocketAddress localBindPoint) throws IOException {
        this(sslOptions, requestTimeout, connectionTimeout, sendBufferSize, receiveBufferSize, localBindPoint, "", false);
    }

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout, String threadName, boolean startAsDaemon) throws IOException {
        this(sslOptions, requestTimeout, connectionTimeout, -1, -1, null, threadName, startAsDaemon);
    }

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout, int sendBufferSize, int receiveBufferSize, SocketAddress localBindPoint, String threadName) throws IOException {
        this(sslOptions, requestTimeout, connectionTimeout, sendBufferSize, receiveBufferSize, localBindPoint, threadName, false);
    }

    public RPCNIOSocketClient(SSLOptions sslOptions, int requestTimeout, int connectionTimeout, int sendBufferSize, int receiveBufferSize, SocketAddress localBindPoint, String threadName, boolean startAsDaemon) throws IOException {
        super(threadName);
        this.setDaemon(startAsDaemon);
        if (requestTimeout >= connectionTimeout - 500) {
            throw new IllegalArgumentException("request timeout must be smaller than connection timeout less 500ms");
        }
        this.requestTimeout = requestTimeout;
        this.connectionTimeout = connectionTimeout;
        this.sendBufferSize = sendBufferSize;
        this.receiveBufferSize = receiveBufferSize;
        this.localBindPoint = localBindPoint;
        this.connections = new HashMap<InetSocketAddress, RPCClientConnection>();
        this.selector = Selector.open();
        this.sslOptions = sslOptions;
        this.quit = false;
        this.transactionId = new AtomicInteger((int)(Math.random() * 1000000.0 + 1.0));
        this.toBeEstablished = new ConcurrentLinkedQueue();
        if (this.localBindPoint != null && Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.net, this, "RPC Client '%s': Using the following address for outgoing connections: %s", threadName, this.localBindPoint);
        }
    }

    public void sendRequest(InetSocketAddress server, RPC.Auth auth, RPC.UserCredentials uCred, int interface_id, int proc_id, Message message, ReusableBuffer data, RPCResponse response, boolean highPriority) {
        try {
            RPCClientRequest rq = new RPCClientRequest(auth, uCred, this.transactionId.incrementAndGet(), interface_id, proc_id, message, data, response);
            this.internalSendRequest(server, rq, highPriority);
        }
        catch (Throwable e) {
            response.requestFailed(e.toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalSendRequest(InetSocketAddress server, RPCClientRequest request, boolean highPriority) {
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.net, this, "sending request %s no %d", request.toString(), this.transactionId.get());
        }
        RPCClientConnection con = null;
        Object object = this.connections;
        synchronized (object) {
            con = this.connections.get(server);
            if (con == null) {
                con = new RPCClientConnection(server);
                this.connections.put(server, con);
            }
        }
        object = con;
        synchronized (object) {
            boolean isEmpty = con.getSendQueue().isEmpty();
            request.queued();
            con.useConnection();
            if (highPriority) {
                con.getSendQueue().add(0, request);
            } else {
                con.getSendQueue().add(request);
            }
            if (!con.isConnected()) {
                this.establishConnection(server, con);
            } else if (isEmpty) {
                SelectionKey key = con.getChannel().keyFor(this.selector);
                if (key != null) {
                    try {
                        key.interestOps(key.interestOps() | 4);
                    }
                    catch (CancelledKeyException e) {
                        // empty catch block
                    }
                }
                this.selector.wakeup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.brokenSelect = false;
        this.notifyStarted();
        this.lastCheck = System.currentTimeMillis();
        block21: while (true) {
            try {
                while (!this.quit) {
                    if (!this.toBeEstablished.isEmpty()) {
                        RPCClientConnection con;
                        while ((con = this.toBeEstablished.poll()) != null) {
                            try {
                                con.getChannel().register(this.selector, 13, con);
                            }
                            catch (ClosedChannelException ex) {
                                this.closeConnection(con.getChannel().keyFor(this.selector), ex.toString());
                            }
                        }
                        this.toBeEstablished.clear();
                    }
                    int numKeys = 0;
                    try {
                        numKeys = this.selector.select(250L);
                    }
                    catch (CancelledKeyException ex) {
                        Logging.logMessage(4, Logging.Category.net, this, "Exception while selecting: %s", ex.toString());
                        continue;
                    }
                    catch (IOException ex) {
                        Logging.logMessage(4, Logging.Category.net, this, "Exception while selecting: %s", ex.toString());
                        continue;
                    }
                    if (numKeys > 0) {
                        Set<SelectionKey> keys = this.selector.selectedKeys();
                        Iterator<SelectionKey> iter = keys.iterator();
                        while (iter.hasNext()) {
                            try {
                                SelectionKey key = iter.next();
                                iter.remove();
                                if (key.isConnectable()) {
                                    this.connectConnection(key);
                                }
                                if (key.isReadable()) {
                                    this.readConnection(key);
                                }
                                if (!key.isWritable()) continue;
                                this.writeConnection(key);
                            }
                            catch (CancelledKeyException ex) {}
                        }
                    }
                    if (numKeys == 0 && this.brokenSelect) {
                        try {
                            RPCNIOSocketClient.sleep(25L);
                        }
                        catch (InterruptedException ex) {
                            break block21;
                        }
                    }
                    try {
                        this.checkForTimers();
                        continue block21;
                    }
                    catch (ConcurrentModificationException ce) {
                        Logging.logMessage(2, this, OutputUtils.getThreadDump(), new Object[0]);
                    }
                }
                break;
            }
            catch (Throwable thr) {
                Logging.logMessage(3, Logging.Category.net, this, "PBRPC Client CRASHED!", new Object[0]);
                this.notifyCrashed(thr);
                break;
            }
        }
        Map<InetSocketAddress, RPCClientConnection> map = this.connections;
        synchronized (map) {
            Iterator<RPCClientConnection> i$ = this.connections.values().iterator();
            while (i$.hasNext()) {
                RPCClientConnection con;
                RPCClientConnection rPCClientConnection = con = i$.next();
                synchronized (rPCClientConnection) {
                    for (RPCClientRequest rq : con.getSendQueue()) {
                        rq.getResponse().requestFailed("RPC cancelled due to client shutdown");
                        rq.freeBuffers();
                    }
                    for (RPCClientRequest rq : con.getRequests().values()) {
                        rq.getResponse().requestFailed("RPC cancelled due to client shutdown");
                        rq.freeBuffers();
                    }
                    try {
                        if (con.getChannel() != null) {
                            con.getChannel().close();
                        }
                    }
                    catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
        this.notifyStopped();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void establishConnection(InetSocketAddress server, RPCClientConnection con) {
        if (con.canReconnect()) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, "connect to %s", server.toString());
            }
            try {
                ChannelIO channel = this.sslOptions == null ? new ChannelIO(SocketChannel.open()) : (this.sslOptions.isFakeSSLMode() ? new SSLHandshakeOnlyChannelIO(SocketChannel.open(), this.sslOptions, true) : new SSLChannelIO(SocketChannel.open(), this.sslOptions, true));
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(true);
                if (this.localBindPoint != null) {
                    channel.socket().bind(this.localBindPoint);
                }
                if (this.sendBufferSize != -1) {
                    channel.socket().setSendBufferSize(this.sendBufferSize);
                    if (channel.socket().getSendBufferSize() != this.sendBufferSize) {
                        Logging.logMessage(4, Logging.Category.net, this, "could not set socket send buffer size to " + this.sendBufferSize + ", using default size of " + channel.socket().getSendBufferSize(), new Object[0]);
                    }
                }
                if (this.receiveBufferSize != -1) {
                    channel.socket().setReceiveBufferSize(this.receiveBufferSize);
                    if (channel.socket().getReceiveBufferSize() != this.receiveBufferSize) {
                        Logging.logMessage(4, Logging.Category.net, this, "could not set socket receive buffer size to " + this.receiveBufferSize + ", using default size of " + channel.socket().getReceiveBufferSize(), new Object[0]);
                    }
                } else {
                    channel.socket().setReceiveBufferSize(262144);
                }
                channel.connect(server);
                con.setChannel(channel);
                this.toBeEstablished.add(con);
                this.selector.wakeup();
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "connection created", new Object[0]);
                    Logging.logMessage(7, Logging.Category.net, this, "socket send buffer size: %d", channel.socket().getSendBufferSize());
                    Logging.logMessage(7, Logging.Category.net, this, "socket receive buffer size: %d", channel.socket().getReceiveBufferSize());
                    Logging.logMessage(7, Logging.Category.net, this, "local bind point: %s", channel.socket().getLocalAddress());
                }
            }
            catch (Exception ex) {
                if (ex.getClass() == SocketException.class && ex.getMessage().equals("Invalid argument")) {
                    Logging.logMessage(3, Logging.Category.net, this, "FAILED TO USE THE FOLLOWING ADDRESS FOR OUTGOING REQUESTS: %s. Make sure that the hostname is correctly spelled in the configuration and it resolves to the correct IP.", this.localBindPoint);
                }
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, "cannot contact server %s", con.getEndpointString());
                }
                con.connectFailed();
                for (RPCClientRequest rq : con.getSendQueue()) {
                    rq.getResponse().requestFailed("sending RPC failed: server '" + con.getEndpointString() + "' not reachable (" + ex + ")");
                    rq.freeBuffers();
                }
                con.getSendQueue().clear();
            }
        } else {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, "reconnect to server still blocked locally to avoid flooding (server: %s)", con.getEndpointString());
            }
            RPCClientConnection rPCClientConnection = con;
            synchronized (rPCClientConnection) {
                for (RPCClientRequest rq : con.getSendQueue()) {
                    rq.getResponse().requestFailed("sending RPC failed: reconnecting to the server '" + con.getEndpointString() + "' was blocked locally to avoid flooding");
                    rq.freeBuffers();
                }
                con.getSendQueue().clear();
            }
        }
    }

    private void readConnection(SelectionKey key) {
        RPCClientConnection con = (RPCClientConnection)key.attachment();
        ChannelIO channel = con.getChannel();
        try {
            if (!channel.isShutdownInProgress() && channel.doHandshake(key)) {
                block14: while (true) {
                    ByteBuffer buf = null;
                    switch (con.getReceiveState()) {
                        case RECORD_MARKER: {
                            buf = con.getResponseRecordMarker();
                            break;
                        }
                        case RPC_MESSAGE: {
                            buf = con.getResponseBuffers()[1].getBuffer();
                            break;
                        }
                        case RPC_HEADER: {
                            buf = con.getResponseBuffers()[0].getBuffer();
                            break;
                        }
                        case DATA: {
                            buf = con.getResponseBuffers()[2].getBuffer();
                        }
                    }
                    int numBytesRead = RPCNIOSocketServer.readData(key, channel, buf);
                    if (numBytesRead == -1) {
                        if (Logging.isInfo()) {
                            Logging.logMessage(7, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                        }
                        this.closeConnection(key, "server (" + channel.socket().getRemoteSocketAddress().toString() + ") closed connection");
                        return;
                    }
                    if (!buf.hasRemaining()) {
                        switch (con.getReceiveState()) {
                            case RECORD_MARKER: {
                                buf.position(0);
                                int hdrLen = buf.getInt();
                                int msgLen = buf.getInt();
                                int dataLen = buf.getInt();
                                if (hdrLen <= 0 || hdrLen >= 0x2000000 || msgLen < 0 || msgLen >= 0x2000000 || dataLen < 0 || dataLen >= 0x2000000) {
                                    Logging.logMessage(3, Logging.Category.net, this, "invalid record marker size (%d/%d/%d) received, closing connection to client %s", hdrLen, msgLen, dataLen, channel.socket().getRemoteSocketAddress().toString());
                                    this.closeConnection(key, "received invalid record marker from server (" + channel.socket().getRemoteSocketAddress().toString() + "), closed connection");
                                    return;
                                }
                                ReusableBuffer[] buffers = new ReusableBuffer[]{BufferPool.allocate(hdrLen), msgLen > 0 ? BufferPool.allocate(msgLen) : null, dataLen > 0 ? BufferPool.allocate(dataLen) : null};
                                con.setResponseBuffers(buffers);
                                con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RPC_HEADER);
                                continue block14;
                            }
                            case RPC_HEADER: {
                                if (con.getResponseBuffers()[1] != null) {
                                    con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RPC_MESSAGE);
                                    continue block14;
                                }
                                if (con.getResponseBuffers()[2] == null) break;
                                con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.DATA);
                                continue block14;
                            }
                            case RPC_MESSAGE: {
                                if (con.getResponseBuffers()[2] == null) break;
                                con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.DATA);
                                continue block14;
                            }
                        }
                        con.setReceiveState(RPCNIOSocketServerConnection.ReceiveState.RECORD_MARKER);
                        con.getResponseRecordMarker().clear();
                        this.assembleResponse(key, con);
                        continue;
                    }
                    break;
                }
            }
        }
        catch (IOException ex) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(ex), new Object[0]);
            }
            this.closeConnection(key, "server closed connection (" + ex + ")");
        }
        catch (NotYetConnectedException e) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(e), new Object[0]);
            }
            this.closeConnection(key, "server closed connection: " + e);
        }
    }

    private void assembleResponse(SelectionKey key, RPCClientConnection con) throws IOException {
        try {
            ReusableBuffer[] receiveBuffers = con.getResponseBuffers();
            receiveBuffers[0].flip();
            if (receiveBuffers[1] != null) {
                receiveBuffers[1].flip();
            }
            if (receiveBuffers[2] != null) {
                receiveBuffers[2].flip();
            }
            ReusableBufferInputStream rbis = new ReusableBufferInputStream(receiveBuffers[0]);
            RPC.RPCHeader header = RPC.RPCHeader.parseFrom(rbis);
            BufferPool.free(receiveBuffers[0]);
            RPCClientRequest rq = con.getRequest(header.getCallId());
            if (rq == null) {
                BufferPool.free(receiveBuffers[1]);
                BufferPool.free(receiveBuffers[2]);
                con.setResponseBuffers(null);
                Logging.logMessage(4, Logging.Category.net, this, "received response for unknown request callId=%d", header.getCallId());
                return;
            }
            RPCResponse response = rq.getResponse();
            rq.setResponseHeader(header);
            con.setResponseBuffers(null);
            response.responseAvailable(rq, receiveBuffers[1], receiveBuffers[2]);
        }
        catch (IOException ex) {
            this.closeConnection(key, "invalid response received: " + ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeConnection(SelectionKey key) {
        block23: {
            RPCClientConnection con = (RPCClientConnection)key.attachment();
            ChannelIO channel = con.getChannel();
            try {
                if (channel.isShutdownInProgress() || !channel.doHandshake(key)) break block23;
                while (true) {
                    ByteBuffer[] buffers = con.getRequestBuffers();
                    RPCClientRequest send = con.getPendingRequest();
                    if (buffers == null) {
                        assert (send == null);
                        RPCClientConnection rPCClientConnection = con;
                        synchronized (rPCClientConnection) {
                            if (con.getSendQueue().isEmpty()) {
                                key.interestOps(key.interestOps() & 0xFFFFFFFB);
                                break;
                            }
                            send = con.getSendQueue().remove(0);
                        }
                        assert (send != null);
                        con.getRequestRecordMarker().clear();
                        buffers = send.packBuffers(con.getRequestRecordMarker());
                        con.setRequestBuffers(buffers);
                        con.setPendingRequest(send);
                    }
                    assert (buffers != null);
                    long numBytesWritten = channel.write(buffers);
                    if (numBytesWritten == -1L) {
                        if (Logging.isInfo()) {
                            Logging.logMessage(6, Logging.Category.net, this, "client closed connection (EOF): %s", channel.socket().getRemoteSocketAddress().toString());
                        }
                        this.closeConnection(key, "server unexpectedly closed connection (EOF)");
                        return;
                    }
                    send.recordBytesWritten(numBytesWritten);
                    if (buffers[buffers.length - 1].hasRemaining()) {
                        key.interestOps(key.interestOps() | 4);
                        break;
                    }
                    RPCClientConnection rPCClientConnection = con;
                    synchronized (rPCClientConnection) {
                        con.addRequest(send.getRequestHeader().getCallId(), send);
                        if (Logging.isDebug()) {
                            Logging.logMessage(7, Logging.Category.net, this, "sent request %d to %s", send.getRequestHeader().getCallId(), con.getEndpointString());
                        }
                    }
                    send.checkEnoughBytesSent();
                    con.setRequestBuffers(null);
                    con.setPendingRequest(null);
                }
            }
            catch (CancelledKeyException ex) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(ex), new Object[0]);
                }
                this.closeConnection(key, "server closed connection: " + ex);
            }
            catch (IOException ex) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(ex), new Object[0]);
                }
                this.closeConnection(key, "server closed connection: " + ex);
            }
            catch (NotYetConnectedException e) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.net, this, OutputUtils.stackTraceToString(e), new Object[0]);
                }
                this.closeConnection(key, "server closed connection: " + e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connectConnection(SelectionKey key) {
        RPCClientConnection con = (RPCClientConnection)key.attachment();
        ChannelIO channel = con.getChannel();
        try {
            if (channel.isConnectionPending()) {
                channel.finishConnect();
            }
            RPCClientConnection rPCClientConnection = con;
            synchronized (rPCClientConnection) {
                if (!con.getSendQueue().isEmpty()) {
                    key.interestOps(5);
                }
            }
            con.connected();
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.net, this, "connected from %s to %s", con.getChannel().socket().getLocalSocketAddress().toString(), con.getEndpointString());
            }
        }
        catch (CancelledKeyException ex) {
            con.connectFailed();
            this.closeConnection(key, "server '" + con.getEndpointString() + "' not reachable (" + ex + ")");
        }
        catch (IOException ex) {
            con.connectFailed();
            this.closeConnection(key, "server '" + con.getEndpointString() + "' not reachable (" + ex + ")");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeConnection(SelectionKey key, String errorMessage) {
        RPCClientConnection con = (RPCClientConnection)key.attachment();
        ChannelIO channel = con.getChannel();
        LinkedList<RPCClientRequest> cancelRq = new LinkedList<RPCClientRequest>();
        RPCClientConnection rPCClientConnection = con;
        synchronized (rPCClientConnection) {
            try {
                key.cancel();
                channel.close();
            }
            catch (Exception ex) {
                // empty catch block
            }
            cancelRq.addAll(con.getRequests().values());
            cancelRq.addAll(con.getSendQueue());
            con.getRequests().clear();
            con.getSendQueue().clear();
            con.setChannel(null);
        }
        for (RPCClientRequest rq : cancelRq) {
            rq.getResponse().requestFailed("sending RPC failed: " + errorMessage);
            rq.freeBuffers();
        }
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.net, this, "closing connection to %s", con.getEndpointString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkForTimers() {
        long now = System.currentTimeMillis();
        if (now >= this.lastCheck + 250L) {
            Map<InetSocketAddress, RPCClientConnection> map = this.connections;
            synchronized (map) {
                Iterator<RPCClientConnection> conIter = this.connections.values().iterator();
                while (conIter.hasNext()) {
                    RPCClientConnection con = conIter.next();
                    if (con.getLastUsed() < now - (long)this.connectionTimeout) {
                        if (Logging.isDebug()) {
                            Logging.logMessage(7, Logging.Category.net, this, "removing idle connection", new Object[0]);
                        }
                        try {
                            conIter.remove();
                            this.closeConnection(con.getChannel().keyFor(this.selector), null);
                        }
                        catch (Exception ex) {}
                        continue;
                    }
                    LinkedList<RPCClientRequest> cancelRq = new LinkedList<RPCClientRequest>();
                    RPCClientConnection rPCClientConnection = con;
                    synchronized (rPCClientConnection) {
                        RPCClientRequest rq;
                        Iterator<RPCClientRequest> iter = con.getRequests().values().iterator();
                        while (iter.hasNext()) {
                            rq = iter.next();
                            if (rq.getTimeQueued() + (long)this.requestTimeout >= now) continue;
                            cancelRq.add(rq);
                            iter.remove();
                        }
                        iter = con.getSendQueue().iterator();
                        while (iter.hasNext() && (rq = iter.next()).getTimeQueued() + (long)this.requestTimeout < now) {
                            cancelRq.add(rq);
                            iter.remove();
                        }
                    }
                    for (RPCClientRequest rq : cancelRq) {
                        rq.getResponse().requestFailed("sending RPC failed: request timed out");
                        rq.freeBuffers();
                    }
                }
                this.lastCheck = now;
            }
        }
    }

    @Override
    public void shutdown() {
        this.quit = true;
        this.interrupt();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long[] getTransferStats(InetSocketAddress server) {
        RPCClientConnection con = null;
        Map<InetSocketAddress, RPCClientConnection> map = this.connections;
        synchronized (map) {
            con = this.connections.get(server);
        }
        if (con == null) {
            return null;
        }
        return new long[]{con.bytesRX, con.bytesTX};
    }
}

