/*
 * Decompiled with CFR 0.152.
 */
package org.xtreemfs.foundation.pbrpc.server;

import com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.xtreemfs.foundation.LifeCycleThread;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.pbrpc.generatedinterfaces.RPC;
import org.xtreemfs.foundation.pbrpc.server.RPCServerInterface;
import org.xtreemfs.foundation.pbrpc.server.RPCServerRequest;
import org.xtreemfs.foundation.pbrpc.server.RPCServerRequestListener;
import org.xtreemfs.foundation.pbrpc.server.RPCServerResponse;
import org.xtreemfs.foundation.pbrpc.server.UDPMessage;
import org.xtreemfs.foundation.pbrpc.utils.PBRPCDatagramPacket;
import org.xtreemfs.foundation.pbrpc.utils.RecordMarker;
import org.xtreemfs.foundation.pbrpc.utils.ReusableBufferInputStream;

public class RPCUDPSocketServer
extends LifeCycleThread
implements RPCServerInterface {
    public final int port;
    private final DatagramChannel channel;
    private final Selector selector;
    private volatile boolean quit;
    private final LinkedBlockingQueue<UDPMessage> q;
    private final RPCServerRequestListener receiver;
    public static final int MAX_UDP_SIZE = 2048;
    private final AtomicInteger callIdCounter;

    public RPCUDPSocketServer(int port, RPCServerRequestListener receiver) throws IOException {
        super("UDPComStage");
        this.port = port;
        this.q = new LinkedBlockingQueue();
        this.receiver = receiver;
        this.callIdCounter = new AtomicInteger(1);
        this.selector = Selector.open();
        this.channel = DatagramChannel.open();
        this.channel.socket().setReuseAddress(true);
        this.channel.socket().bind(new InetSocketAddress(port));
        this.channel.configureBlocking(false);
        this.channel.register(this.selector, 1);
        if (Logging.isInfo()) {
            Logging.logMessage(6, Logging.Category.net, this, "UDP socket on port %d ready", port);
        }
    }

    @Override
    public void sendResponse(RPCServerRequest request, RPCServerResponse response) {
        UDPMessage msg = (UDPMessage)request.getConnection();
        UDPMessage responseMsg = new UDPMessage(response.getBuffers()[0], msg.getAddress(), this);
        request.freeBuffers();
        this.send(responseMsg);
    }

    public void sendRequest(RPC.RPCHeader header, Message message, InetSocketAddress receiver) throws IOException {
        PBRPCDatagramPacket dpack = new PBRPCDatagramPacket(header, message);
        header = header.toBuilder().setCallId(this.callIdCounter.getAndIncrement()).build();
        UDPMessage msg = new UDPMessage(dpack.assembleDatagramPacket(), receiver, this);
        this.send(msg);
    }

    private void send(UDPMessage rq) {
        this.q.add(rq);
        if (this.q.size() == 1) {
            this.selector.wakeup();
        }
    }

    @Override
    public void shutdown() {
        this.quit = true;
        this.interrupt();
    }

    @Override
    public void run() {
        try {
            this.notifyStarted();
            boolean isRdOnly = true;
            while (!this.quit) {
                if (this.q.size() == 0) {
                    if (!isRdOnly) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        isRdOnly = true;
                    }
                } else if (isRdOnly) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    isRdOnly = false;
                }
                int numKeys = this.selector.select();
                if (this.q.size() == 0) {
                    if (!isRdOnly) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        isRdOnly = true;
                    }
                } else if (isRdOnly) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    isRdOnly = false;
                }
                if (numKeys == 0) continue;
                if (this.q.size() > 10000) {
                    Logging.logMessage(4, Logging.Category.net, this, "QS!!!!! %d", this.q.size());
                    Logging.logMessage(4, Logging.Category.net, this, "is readOnly: " + isRdOnly, new Object[0]);
                }
                Set<SelectionKey> keys = this.selector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();
                block8: while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isReadable()) {
                        InetSocketAddress sender = null;
                        ReusableBuffer data = BufferPool.allocate(2048);
                        sender = (InetSocketAddress)this.channel.receive(data.getBuffer());
                        if (sender == null || !data.hasRemaining()) {
                            BufferPool.free(data);
                            Logging.logMessage(4, Logging.Category.net, this, "read key for empty read/empty packet", new Object[0]);
                            continue;
                        }
                        if (Logging.isDebug()) {
                            Logging.logMessage(7, Logging.Category.net, this, "read data from %s", sender.toString());
                        }
                        try {
                            data.flip();
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, Logging.Category.net, this, "data: %s", data.toString());
                            }
                            RecordMarker rm = new RecordMarker(data.getBuffer());
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, Logging.Category.net, this, "rm: %d/%d data: %d", rm.getRpcHeaderLength(), rm.getMessageLength(), data.limit());
                            }
                            ReusableBufferInputStream rbis = new ReusableBufferInputStream(data);
                            int origLimit = data.limit();
                            assert (origLimit == 12 + rm.getRpcHeaderLength() + rm.getMessageLength());
                            data.limit(12 + rm.getRpcHeaderLength());
                            RPC.RPCHeader header = ((RPC.RPCHeader.Builder)RPC.RPCHeader.newBuilder().mergeFrom(rbis)).build();
                            data.range(12 + rm.getRpcHeaderLength(), rm.getMessageLength());
                            UDPMessage msg = new UDPMessage(null, sender, this);
                            RPCServerRequest rq = new RPCServerRequest(msg, header, data);
                            this.receiver.receiveRecord(rq);
                        }
                        catch (Throwable ex) {
                            ex.printStackTrace();
                            Logging.logMessage(4, Logging.Category.net, this, "received invalid UPD message: " + ex, new Object[0]);
                            BufferPool.free(data);
                        }
                        continue;
                    }
                    if (key.isWritable()) {
                        UDPMessage r = this.q.poll();
                        while (r != null) {
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, Logging.Category.net, this, "sent packet to %s", r.getAddress().toString());
                            }
                            int sent = this.channel.send(r.getBuffer().getBuffer(), r.getAddress());
                            BufferPool.free(r.getBuffer());
                            if (sent == 0) {
                                if (Logging.isDebug()) {
                                    Logging.logMessage(7, Logging.Category.net, this, "cannot send anymore", new Object[0]);
                                }
                                this.q.put(r);
                                continue block8;
                            }
                            r = this.q.poll();
                        }
                        continue;
                    }
                    throw new RuntimeException("strange key state: " + key);
                }
            }
            this.selector.close();
            this.channel.close();
        }
        catch (CancelledKeyException ex) {
        }
        catch (ClosedByInterruptException ex) {
        }
        catch (IOException ex) {
            Logging.logError(3, this, ex);
        }
        catch (Throwable th) {
            this.notifyCrashed(th);
            return;
        }
        this.notifyStopped();
    }
}

