/*
 * Decompiled with CFR 0.152.
 */
package org.xtreemfs.osd.rwre;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.xtreemfs.common.uuids.ServiceUUID;
import org.xtreemfs.common.uuids.UnknownUUIDException;
import org.xtreemfs.common.xloc.XLocations;
import org.xtreemfs.foundation.LifeCycleListener;
import org.xtreemfs.foundation.SSLOptions;
import org.xtreemfs.foundation.buffer.ASCIIString;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.flease.Flease;
import org.xtreemfs.foundation.flease.FleaseConfig;
import org.xtreemfs.foundation.flease.FleaseMessageSenderInterface;
import org.xtreemfs.foundation.flease.FleaseStage;
import org.xtreemfs.foundation.flease.FleaseStatusListener;
import org.xtreemfs.foundation.flease.FleaseViewChangeListenerInterface;
import org.xtreemfs.foundation.flease.MasterEpochHandlerInterface;
import org.xtreemfs.foundation.flease.comm.FleaseMessage;
import org.xtreemfs.foundation.flease.proposer.FleaseException;
import org.xtreemfs.foundation.flease.proposer.FleaseListener;
import org.xtreemfs.foundation.logging.Logging;
import org.xtreemfs.foundation.pbrpc.client.PBRPCException;
import org.xtreemfs.foundation.pbrpc.client.RPCAuthentication;
import org.xtreemfs.foundation.pbrpc.client.RPCNIOSocketClient;
import org.xtreemfs.foundation.pbrpc.client.RPCResponse;
import org.xtreemfs.foundation.pbrpc.client.RPCResponseAvailableListener;
import org.xtreemfs.foundation.pbrpc.generatedinterfaces.RPC;
import org.xtreemfs.foundation.pbrpc.utils.ErrorUtils;
import org.xtreemfs.osd.InternalObjectData;
import org.xtreemfs.osd.OSDRequest;
import org.xtreemfs.osd.OSDRequestDispatcher;
import org.xtreemfs.osd.operations.EventRWRStatus;
import org.xtreemfs.osd.operations.OSDOperation;
import org.xtreemfs.osd.rwre.FleaseMasterEpochThread;
import org.xtreemfs.osd.rwre.RedirectToMasterException;
import org.xtreemfs.osd.rwre.ReplicaUpdatePolicy;
import org.xtreemfs.osd.rwre.ReplicatedFileState;
import org.xtreemfs.osd.rwre.RetryException;
import org.xtreemfs.osd.stages.PreprocStage;
import org.xtreemfs.osd.stages.Stage;
import org.xtreemfs.osd.stages.StorageStage;
import org.xtreemfs.osd.storage.CowPolicy;
import org.xtreemfs.pbrpc.generatedinterfaces.GlobalTypes;
import org.xtreemfs.pbrpc.generatedinterfaces.OSD;
import org.xtreemfs.pbrpc.generatedinterfaces.OSDServiceClient;

public class RWReplicationStage
extends Stage
implements FleaseMessageSenderInterface {
    public static final int STAGEOP_REPLICATED_WRITE = 1;
    public static final int STAGEOP_CLOSE = 2;
    public static final int STAGEOP_PROCESS_FLEASE_MSG = 3;
    public static final int STAGEOP_PREPAREOP = 5;
    public static final int STAGEOP_TRUNCATE = 6;
    public static final int STAGEOP_GETSTATUS = 7;
    public static final int STAGEOP_INTERNAL_AUTHSTATE = 10;
    public static final int STAGEOP_INTERNAL_OBJFETCHED = 11;
    public static final int STAGEOP_LEASE_STATE_CHANGED = 13;
    public static final int STAGEOP_INTERNAL_STATEAVAIL = 14;
    public static final int STAGEOP_INTERNAL_DELETE_COMPLETE = 15;
    public static final int STAGEOP_FORCE_RESET = 16;
    public static final int STAGEOP_INTERNAL_MAXOBJ_AVAIL = 17;
    public static final int STAGEOP_INTERNAL_BACKUP_AUTHSTATE = 18;
    public static final int STAGEOP_SETVIEW = 21;
    public static final int STAGEOP_INVALIDATEVIEW = 22;
    public static final int STAGEOP_FETCHINVALIDATED = 23;
    private final RPCNIOSocketClient client;
    private final OSDServiceClient osdClient;
    private final Map<String, ReplicatedFileState> files;
    private final Map<ASCIIString, String> cellToFileId;
    private final OSDRequestDispatcher master;
    private final FleaseStage fstage;
    private final RPCNIOSocketClient fleaseClient;
    private final OSDServiceClient fleaseOsdClient;
    private final ASCIIString localID;
    private int numObjsInFlight;
    private static final int MAX_OBJS_IN_FLIGHT = 10;
    private static final int MAX_PENDING_PER_FILE = 10;
    private static final int MAX_EXTERNAL_REQUESTS_IN_Q = 250;
    private final Queue<ReplicatedFileState> filesInReset;
    private final FleaseMasterEpochThread masterEpochThread;
    private final AtomicInteger externalRequestsInQueue;

    public RWReplicationStage(OSDRequestDispatcher master, SSLOptions sslOpts, int maxRequestsQueueLength) throws IOException {
        super("RWReplSt", maxRequestsQueueLength);
        this.master = master;
        this.client = new RPCNIOSocketClient(sslOpts, 15000, 300000, "RWReplicationStage");
        this.fleaseClient = new RPCNIOSocketClient(sslOpts, 15000, 300000, "RWReplicationStage (flease)");
        this.osdClient = new OSDServiceClient(this.client, null);
        this.fleaseOsdClient = new OSDServiceClient(this.fleaseClient, null);
        this.files = new HashMap<String, ReplicatedFileState>();
        this.cellToFileId = new HashMap<ASCIIString, String>();
        this.numObjsInFlight = 0;
        this.filesInReset = new LinkedList<ReplicatedFileState>();
        this.externalRequestsInQueue = new AtomicInteger(0);
        this.localID = new ASCIIString(master.getConfig().getUUID().toString());
        this.masterEpochThread = new FleaseMasterEpochThread(master.getStorageStage().getStorageLayout(), maxRequestsQueueLength);
        FleaseConfig fcfg = new FleaseConfig(master.getConfig().getFleaseLeaseToMS(), master.getConfig().getFleaseDmaxMS(), master.getConfig().getFleaseMsgToMS(), null, this.localID.toString(), master.getConfig().getFleaseRetries());
        this.fstage = new FleaseStage(fcfg, master.getConfig().getObjDir() + "/", (FleaseMessageSenderInterface)this, false, new FleaseViewChangeListenerInterface(){

            public void viewIdChangeEvent(ASCIIString cellId, int viewId) {
                RWReplicationStage.this.eventViewIdChanged(cellId, viewId);
            }
        }, new FleaseStatusListener(){

            public void statusChanged(ASCIIString cellId, Flease lease) {
                RWReplicationStage.this.eventLeaseStateChanged(cellId, lease, null);
            }

            public void leaseFailed(ASCIIString cellID, FleaseException error) {
                RWReplicationStage.this.eventLeaseStateChanged(cellID, null, error);
            }
        }, (MasterEpochHandlerInterface)this.masterEpochThread);
        this.fstage.setLifeCycleListener((LifeCycleListener)master);
    }

    @Override
    public void start() {
        this.masterEpochThread.start();
        this.client.start();
        this.fleaseClient.start();
        this.fstage.start();
        super.start();
    }

    @Override
    public void shutdown() {
        this.client.shutdown();
        this.fleaseClient.shutdown();
        this.fstage.shutdown();
        this.masterEpochThread.shutdown();
        super.shutdown();
    }

    @Override
    public void waitForStartup() throws Exception {
        this.masterEpochThread.waitForStartup();
        this.client.waitForStartup();
        this.fleaseClient.waitForStartup();
        this.fstage.waitForStartup();
        super.waitForStartup();
    }

    @Override
    public void waitForShutdown() throws Exception {
        this.client.waitForShutdown();
        this.fleaseClient.waitForShutdown();
        this.fstage.waitForShutdown();
        this.masterEpochThread.waitForShutdown();
        super.waitForShutdown();
    }

    public void eventReplicaStateAvailable(String fileId, OSD.ReplicaStatus localState, RPC.RPCHeader.ErrorResponse error) {
        this.enqueueOperation(14, new Object[]{fileId, localState, error}, null, null);
    }

    public void eventForceReset(GlobalTypes.FileCredentials credentials, XLocations xloc) {
        this.enqueueOperation(16, new Object[]{credentials, xloc}, null, null);
    }

    public void eventDeleteObjectsComplete(String fileId, RPC.RPCHeader.ErrorResponse error) {
        this.enqueueOperation(15, new Object[]{fileId, error}, null, null);
    }

    void eventObjectFetched(String fileId, OSD.ObjectVersionMapping object, InternalObjectData data, RPC.RPCHeader.ErrorResponse error) {
        this.enqueueOperation(11, new Object[]{fileId, object, data, error}, null, null);
    }

    void eventSetAuthState(String fileId, OSD.AuthoritativeReplicaState authState, OSD.ReplicaStatus localState, RPC.RPCHeader.ErrorResponse error) {
        this.enqueueOperation(10, new Object[]{fileId, authState, localState, error}, null, null);
    }

    void eventLeaseStateChanged(ASCIIString cellId, Flease lease, FleaseException error) {
        this.enqueueOperation(13, new Object[]{cellId, lease, error}, null, null);
    }

    void eventMaxObjAvail(String fileId, long maxObjVer, long fileSize, long truncateEpoch, RPC.RPCHeader.ErrorResponse error) {
        this.enqueueOperation(17, new Object[]{fileId, maxObjVer, error}, null, null);
    }

    public void eventBackupReplicaReset(String fileId, OSD.AuthoritativeReplicaState authState, OSD.ReplicaStatus localState, GlobalTypes.FileCredentials credentials, XLocations xloc) {
        this.enqueueOperation(18, new Object[]{fileId, authState, localState, credentials, xloc}, null, null);
    }

    void eventViewIdChanged(ASCIIString cellId, int viewId) {
        this.master.getPreprocStage().updateXLocSetFromFlease(cellId, viewId);
    }

    private void executeSetAuthState(OSD.ReplicaStatus localState, OSD.AuthoritativeReplicaState authState, ReplicatedFileState state, final String fileId) {
        boolean resetRequired = localState.getTruncateEpoch() < authState.getTruncateEpoch();
        HashMap<Long, Long> objectsToBeDeleted = new HashMap<Long, Long>();
        for (OSD.ObjectVersion localObject : localState.getObjectVersionsList()) {
            if (localObject.getObjectVersion() > authState.getMaxObjVersion()) continue;
            objectsToBeDeleted.put(localObject.getObjectNumber(), localObject.getObjectVersion());
        }
        for (OSD.ObjectVersionMapping authObject : authState.getObjectVersionsList()) {
            Long version = (Long)objectsToBeDeleted.get(authObject.getObjectNumber());
            if (version == null || version.longValue() != authObject.getObjectVersion()) continue;
            objectsToBeDeleted.remove(authObject.getObjectNumber());
        }
        HashMap<Long, OSD.ObjectVersionMapping> missingObjects = new HashMap<Long, OSD.ObjectVersionMapping>();
        for (OSD.ObjectVersionMapping authObject : authState.getObjectVersionsList()) {
            missingObjects.put(authObject.getObjectNumber(), authObject);
        }
        for (OSD.ObjectVersion localObject : localState.getObjectVersionsList()) {
            OSD.ObjectVersionMapping object = (OSD.ObjectVersionMapping)missingObjects.get(localObject.getObjectNumber());
            if (object == null || localObject.getObjectVersion() < object.getObjectVersion()) continue;
            missingObjects.remove(localObject.getObjectNumber());
        }
        if (!missingObjects.isEmpty() || !objectsToBeDeleted.isEmpty() || localState.getTruncateEpoch() < authState.getTruncateEpoch()) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica RESET required updates for: %s", this.localID, state.getFileId());
            }
            state.setObjectsToFetch(new LinkedList<OSD.ObjectVersionMapping>(missingObjects.values()));
            this.filesInReset.add(state);
            this.master.getStorageStage().deleteObjects(fileId, state.getsPolicy(), authState.getTruncateEpoch(), objectsToBeDeleted, new StorageStage.DeleteObjectsCallback(){

                @Override
                public void deleteObjectsComplete(RPC.RPCHeader.ErrorResponse error) {
                    RWReplicationStage.this.eventDeleteObjectsComplete(fileId, error);
                }
            });
        } else {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica RESET finished (replica is up-to-date): %s", this.localID, state.getFileId());
            }
            this.doOpen(state);
        }
    }

    private void processLeaseStateChanged(Stage.StageRequest method) {
        try {
            String fileId;
            ASCIIString cellId = (ASCIIString)method.getArgs()[0];
            Flease lease = (Flease)method.getArgs()[1];
            FleaseException error = (FleaseException)method.getArgs()[2];
            if (error == null) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) lease change event: %s, %s", this.localID, cellId, lease);
                }
            } else {
                Logging.logMessage(4, Logging.Category.replication, this, "(R:%s) lease error in cell %s: %s", this.localID, cellId, error);
            }
            if ((fileId = this.cellToFileId.get(cellId)) != null) {
                ReplicatedFileState state = this.files.get(fileId);
                assert (state != null);
                if (state.isInvalidated()) {
                    return;
                }
                boolean leaseOk = false;
                if (error == null) {
                    boolean localIsPrimary = lease.getLeaseHolder() != null && lease.getLeaseHolder().equals(this.localID);
                    ReplicatedFileState.ReplicaState oldState = state.getState();
                    state.setLocalIsPrimary(localIsPrimary);
                    state.setLease(lease);
                    if (oldState == ReplicatedFileState.ReplicaState.PRIMARY && lease.getLeaseHolder() == null && lease.getLeaseTimeout_ms() == 0L) {
                        Logging.logMessage(3, Logging.Category.replication, this, "(R:%s) was primary, lease error in cell %s, restarting replication: %s", this.localID, cellId, lease, error);
                        this.failed(state, ErrorUtils.getInternalServerError(new IOException(fileId + ": lease timed out, renew failed")), "processLeaseStateChanged");
                    } else if (state.getState() == ReplicatedFileState.ReplicaState.BACKUP || state.getState() == ReplicatedFileState.ReplicaState.PRIMARY || state.getState() == ReplicatedFileState.ReplicaState.WAITING_FOR_LEASE) {
                        if (localIsPrimary) {
                            if (oldState != ReplicatedFileState.ReplicaState.PRIMARY) {
                                state.setMasterEpoch(lease.getMasterEpochNumber());
                                this.doPrimary(state);
                            }
                        } else if (oldState != ReplicatedFileState.ReplicaState.BACKUP) {
                            state.setMasterEpoch(-1L);
                            this.doBackup(state);
                        }
                    }
                } else {
                    this.failed(state, ErrorUtils.getInternalServerError((Throwable)error), "processLeaseStateChanged (error != null)");
                }
            }
        }
        catch (Exception ex) {
            Logging.logMessage(3, this, "Exception was thrown and caught while processing the change of the lease state. This is an error in the code. Please report it! Caught exception: ", new Object[0]);
            Logging.logError(3, this, ex);
        }
    }

    private void processBackupAuthoritativeState(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            OSD.AuthoritativeReplicaState authState = (OSD.AuthoritativeReplicaState)method.getArgs()[1];
            OSD.ReplicaStatus localState = (OSD.ReplicaStatus)method.getArgs()[2];
            GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[3];
            XLocations loc = (XLocations)method.getArgs()[4];
            ReplicatedFileState state = this.getState(credentials, loc, true, false);
            if (state.isInvalidated()) {
                Logging.logMessage(6, Logging.Category.replication, this, "(R:%s) auth state ignored, file is invalidated %s", this.localID, fileId);
                return;
            }
            switch (state.getState()) {
                case INITIALIZING: 
                case OPEN: 
                case WAITING_FOR_LEASE: {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) enqueued backup reset for file %s", this.localID, fileId);
                    state.addPendingRequest(method);
                    break;
                }
                case BACKUP: {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) backup reset triggered by AUTHSTATE request for file %s", this.localID, fileId);
                    state.setState(ReplicatedFileState.ReplicaState.RESET);
                    this.executeSetAuthState(localState, authState, state, fileId);
                    break;
                }
                default: {
                    Logging.logMessage(4, Logging.Category.replication, this, "(R:%s) auth state ignored, already in reset for file %s", this.localID, fileId);
                    break;
                }
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void processSetAuthoritativeState(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            OSD.AuthoritativeReplicaState authState = (OSD.AuthoritativeReplicaState)method.getArgs()[1];
            OSD.ReplicaStatus localState = (OSD.ReplicaStatus)method.getArgs()[2];
            RPC.RPCHeader.ErrorResponse error = (RPC.RPCHeader.ErrorResponse)method.getArgs()[3];
            ReplicatedFileState state = this.files.get(fileId);
            if (state == null) {
                Logging.logMessage(4, Logging.Category.replication, this, "(R:%s) set AUTH for unknown file: %s", this.localID, fileId);
                return;
            }
            if (error != null) {
                this.failed(state, error, "processSetAuthoritativeState");
            } else {
                this.executeSetAuthState(localState, authState, state, fileId);
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void processDeleteObjectsComplete(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            RPC.RPCHeader.ErrorResponse error = (RPC.RPCHeader.ErrorResponse)method.getArgs()[1];
            ReplicatedFileState state = this.files.get(fileId);
            if (state != null) {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) deleted all objects requested by RESET for %s with %s", this.localID, state.getFileId(), ErrorUtils.formatError(error));
                }
                if (error != null) {
                    this.failed(state, error, "processDeleteObjectsComplete");
                } else {
                    this.fetchObjects();
                }
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void fetchObjects() {
        ReplicatedFileState file;
        while (this.numObjsInFlight < 10 && (file = this.filesInReset.poll()) != null) {
            if (!file.getObjectsToFetch().isEmpty()) {
                OSD.ObjectVersionMapping o = file.getObjectsToFetch().remove(0);
                file.setNumObjectsPending(file.getNumObjectsPending() + 1);
                ++this.numObjsInFlight;
                this.fetchObject(file.getFileId(), o);
            }
            if (file.getObjectsToFetch().isEmpty()) continue;
            this.filesInReset.add(file);
        }
    }

    private void fetchObject(final String fileId, final OSD.ObjectVersionMapping record) {
        ReplicatedFileState state = this.files.get(fileId);
        if (state == null) {
            return;
        }
        try {
            ServiceUUID osd = new ServiceUUID(record.getOsdUuidsList().get(0));
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) file %s, fetch object %d (version %d) from %s", this.localID, fileId, record.getObjectNumber(), record.getObjectVersion(), osd);
            }
            RPCResponse<OSD.ObjectData> r = this.osdClient.xtreemfs_rwr_fetch(osd.getAddress(), RPCAuthentication.authNone, RPCAuthentication.userService, state.getCredentials(), fileId, record.getObjectNumber(), record.getObjectVersion());
            r.registerListener(new RPCResponseAvailableListener(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void responseAvailable(RPCResponse r) {
                    try {
                        OSD.ObjectData metadata = (OSD.ObjectData)r.get();
                        InternalObjectData data = new InternalObjectData(metadata, r.getData());
                        RWReplicationStage.this.eventObjectFetched(fileId, record, data, null);
                    }
                    catch (PBRPCException ex) {
                        RWReplicationStage.this.eventObjectFetched(fileId, record, null, ErrorUtils.getErrorResponse(ex.getErrorType(), ex.getPOSIXErrno(), ex.toString(), ex));
                    }
                    catch (Exception ex) {
                        RWReplicationStage.this.eventObjectFetched(fileId, record, null, ErrorUtils.getErrorResponse(RPC.ErrorType.IO_ERROR, RPC.POSIXErrno.POSIX_ERROR_NONE, ex.toString(), ex));
                    }
                    finally {
                        r.freeBuffers();
                    }
                }
            });
        }
        catch (IOException ex) {
            this.eventObjectFetched(fileId, record, null, ErrorUtils.getErrorResponse(RPC.ErrorType.ERRNO, RPC.POSIXErrno.POSIX_ERROR_EIO, ex.toString(), ex));
        }
    }

    private void processObjectFetched(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            OSD.ObjectVersionMapping record = (OSD.ObjectVersionMapping)method.getArgs()[1];
            InternalObjectData data = (InternalObjectData)method.getArgs()[2];
            RPC.RPCHeader.ErrorResponse error = (RPC.RPCHeader.ErrorResponse)method.getArgs()[3];
            ReplicatedFileState state = this.files.get(fileId);
            if (state != null) {
                if (error != null) {
                    --this.numObjsInFlight;
                    this.fetchObjects();
                    this.failed(state, error, "processObjectFetched");
                } else if (data.getData() == null) {
                    --this.numObjsInFlight;
                    this.fetchObjects();
                    RPC.RPCHeader.ErrorResponse generatedError = RPC.RPCHeader.ErrorResponse.newBuilder().setErrorType(RPC.ErrorType.INTERNAL_SERVER_ERROR).setErrorMessage("Fetching a missing object failed because no data was returned. The object was probably deleted meanwhile.").build();
                    this.failed(state, generatedError, "processObjectFetched");
                } else {
                    int bytes = data.getData().remaining();
                    this.master.getStorageStage().writeObjectWithoutGMax(fileId, record.getObjectNumber(), state.getsPolicy(), 0, data.getData(), CowPolicy.PolicyNoCow, null, false, record.getObjectVersion(), null, new StorageStage.WriteObjectCallback(){

                        @Override
                        public void writeComplete(GlobalTypes.OSDWriteResponse result, RPC.RPCHeader.ErrorResponse error) {
                            if (error != null) {
                                Logging.logMessage(3, Logging.Category.replication, this, "cannot write object locally: %s", ErrorUtils.formatError(error));
                            }
                        }
                    });
                    this.master.getPreprocStage().pingFile(fileId);
                    this.master.objectReplicated();
                    this.master.replicatedDataReceived(bytes);
                    --this.numObjsInFlight;
                    int numPendingFile = state.getNumObjectsPending() - 1;
                    state.setNumObjectsPending(numPendingFile);
                    state.getPolicy().objectFetched(record.getObjectVersion());
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) fetched object for replica, file %s, remaining %d", this.localID, fileId, numPendingFile);
                    }
                    this.fetchObjects();
                    if (numPendingFile == 0) {
                        Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) RESET complete for file %s", this.localID, fileId);
                        this.doOpen(state);
                    }
                }
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void doReset(ReplicatedFileState file, long updateObjVer) {
        if (file.getState() == ReplicatedFileState.ReplicaState.RESET) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "file %s is already in RESET", file.getFileId());
            }
            return;
        }
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.RESET});
        }
        file.setState(ReplicatedFileState.ReplicaState.RESET);
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica RESET started: %s (update objVer=%d)", this.localID, file.getFileId(), updateObjVer);
        }
        OSDOperation op = this.master.getInternalEvent(EventRWRStatus.class);
        op.startInternalEvent(new Object[]{file.getFileId(), file.getsPolicy()});
    }

    private void processReplicaStateAvailExecReset(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            final OSD.ReplicaStatus localReplicaState = (OSD.ReplicaStatus)method.getArgs()[1];
            RPC.RPCHeader.ErrorResponse error = (RPC.RPCHeader.ErrorResponse)method.getArgs()[2];
            final ReplicatedFileState state = this.files.get(fileId);
            if (state != null) {
                if (error != null) {
                    Logging.logMessage(3, Logging.Category.replication, this, "local state for %s failed: %s", state.getFileId(), error);
                    this.failed(state, error, "processReplicaStateAvailExecReset");
                } else {
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) local state for %s available.", this.localID, state.getFileId());
                    }
                    state.getPolicy().executeReset(state.getCredentials(), localReplicaState, new ReplicaUpdatePolicy.ExecuteResetCallback(){

                        @Override
                        public void finished(OSD.AuthoritativeReplicaState authState) {
                            RWReplicationStage.this.eventSetAuthState(state.getFileId(), authState, localReplicaState, null);
                        }

                        @Override
                        public void failed(RPC.RPCHeader.ErrorResponse error) {
                            RWReplicationStage.this.eventSetAuthState(state.getFileId(), null, null, error);
                        }
                    });
                }
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void processForceReset(Stage.StageRequest method) {
        try {
            GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[0];
            XLocations loc = (XLocations)method.getArgs()[1];
            ReplicatedFileState state = this.getState(credentials, loc, true, false);
            if (!state.isForceReset()) {
                state.setForceReset(true);
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void doWaitingForLease(ReplicatedFileState file) {
        if (file.isInvalidated()) {
            this.doInvalidated(file);
        } else if (file.getPolicy().requiresLease()) {
            if (file.isCellOpen()) {
                if (file.isLocalIsPrimary()) {
                    this.doPrimary(file);
                } else {
                    this.doBackup(file);
                }
            } else {
                file.setCellOpen(true);
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.WAITING_FOR_LEASE});
                }
                try {
                    file.setState(ReplicatedFileState.ReplicaState.WAITING_FOR_LEASE);
                    ArrayList<InetSocketAddress> osdAddresses = new ArrayList<InetSocketAddress>();
                    for (ServiceUUID osd : file.getPolicy().getRemoteOSDUUIDs()) {
                        osdAddresses.add(osd.getAddress());
                    }
                    int viewId = file.getLocations().getVersion();
                    this.fstage.openCell(file.getPolicy().getCellId(), osdAddresses, true, viewId);
                }
                catch (UnknownUUIDException ex) {
                    this.failed(file, ErrorUtils.getErrorResponse(RPC.ErrorType.ERRNO, RPC.POSIXErrno.POSIX_ERROR_EIO, ex.toString(), ex), "doWaitingForLease");
                }
            }
        } else {
            this.doPrimary(file);
        }
    }

    private void doOpen(ReplicatedFileState file) {
        if (Logging.isDebug()) {
            Logging.logMessage(7, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.OPEN});
        }
        file.setState(ReplicatedFileState.ReplicaState.OPEN);
        if (file.hasPendingRequests()) {
            this.doWaitingForLease(file);
        }
    }

    private void doPrimary(ReplicatedFileState file) {
        assert (file.isLocalIsPrimary());
        try {
            if (file.getPolicy().onPrimary((int)file.getMasterEpoch()) && !file.isPrimaryReset()) {
                file.setPrimaryReset(true);
                this.doReset(file, -1L);
            } else {
                if (Logging.isDebug()) {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.PRIMARY});
                }
                file.setPrimaryReset(false);
                file.setState(ReplicatedFileState.ReplicaState.PRIMARY);
                while (file.hasPendingRequests()) {
                    this.enqueuePrioritized(file.removePendingRequest());
                }
            }
        }
        catch (IOException ex) {
            this.failed(file, ErrorUtils.getErrorResponse(RPC.ErrorType.ERRNO, RPC.POSIXErrno.POSIX_ERROR_EIO, ex.toString(), ex), "doPrimary");
        }
    }

    private void doBackup(ReplicatedFileState file) {
        assert (!file.isLocalIsPrimary());
        if (Logging.isDebug()) {
            Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.BACKUP});
        }
        file.setPrimaryReset(false);
        file.setState(ReplicatedFileState.ReplicaState.BACKUP);
        while (file.hasPendingRequests()) {
            this.enqueuePrioritized(file.removePendingRequest());
        }
    }

    private void doInvalidated(ReplicatedFileState file) {
        assert (file.isInvalidated());
        if (file.isInvalidatedReset()) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) replica state changed for %s from %s to %s", new Object[]{this.localID, file.getFileId(), file.getState(), ReplicatedFileState.ReplicaState.INVALIDATED});
            }
            file.setInvalidatedReset(false);
            file.setState(ReplicatedFileState.ReplicaState.INVALIDATED);
        }
        file.setPrimaryReset(false);
        while (file.hasPendingRequests()) {
            this.enqueuePrioritized(file.removePendingRequest());
        }
    }

    private void failed(ReplicatedFileState file, RPC.RPCHeader.ErrorResponse ex, String methodName) {
        Logging.logMessage(4, Logging.Category.replication, this, "(R:%s) replica for file %s failed (in method: %s): %s", this.localID, file.getFileId(), methodName, ErrorUtils.formatError(ex));
        file.setPrimaryReset(false);
        file.setState(ReplicatedFileState.ReplicaState.OPEN);
        file.setCellOpen(false);
        this.fstage.closeCell(file.getPolicy().getCellId(), false);
        file.clearPendingRequests(ex);
    }

    private void enqueuePrioritized(Stage.StageRequest rq) {
        while (!this.q.offer(rq)) {
            Stage.StageRequest otherRq = (Stage.StageRequest)this.q.poll();
            otherRq.sendInternalServerError(new IllegalStateException("internal queue overflow, cannot enqueue operation for processing."));
            Logging.logMessage(7, this, "Dropping request from rwre queue due to overload", new Object[0]);
        }
    }

    protected void enqueueExternalOperation(int stageOp, Object[] arguments, OSDRequest request, ReusableBuffer createdViewBuffer, Object callback) {
        if (this.externalRequestsInQueue.get() >= 250) {
            Logging.logMessage(4, this, "RW replication stage is overloaded, request %d for %s dropped", request.getRequestId(), request.getFileId());
            request.sendInternalServerError(new IllegalStateException("RW replication stage is overloaded, request dropped"));
            if (createdViewBuffer != null) {
                assert (createdViewBuffer.getRefCount() >= 2);
                BufferPool.free(createdViewBuffer);
            }
        } else {
            this.externalRequestsInQueue.incrementAndGet();
            this.enqueueOperation(stageOp, arguments, request, createdViewBuffer, callback);
        }
    }

    public void prepareOperation(GlobalTypes.FileCredentials credentials, XLocations xloc, long objNo, long objVersion, Operation op, RWReplicationCallback callback, OSDRequest request) {
        this.enqueueExternalOperation(5, new Object[]{credentials, xloc, objNo, objVersion, op}, request, null, callback);
    }

    public void replicatedWrite(GlobalTypes.FileCredentials credentials, XLocations xloc, long objNo, long objVersion, InternalObjectData data, ReusableBuffer createdViewBuffer, RWReplicationCallback callback, OSDRequest request) {
        this.enqueueExternalOperation(1, new Object[]{credentials, xloc, objNo, objVersion, data}, request, createdViewBuffer, callback);
    }

    public void replicateTruncate(GlobalTypes.FileCredentials credentials, XLocations xloc, long newFileSize, long newObjectVersion, RWReplicationCallback callback, OSDRequest request) {
        this.enqueueExternalOperation(6, new Object[]{credentials, xloc, newFileSize, newObjectVersion}, request, null, callback);
    }

    public void fileClosed(String fileId) {
        this.enqueueOperation(2, new Object[]{fileId}, null, null);
    }

    public void receiveFleaseMessage(ReusableBuffer message, InetSocketAddress sender) {
        try {
            FleaseMessage msg = new FleaseMessage(message);
            BufferPool.free(message);
            msg.setSender(sender);
            this.fstage.receiveMessage(msg);
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    public void getStatus(StatusCallback callback) {
        this.enqueueOperation(7, new Object[0], null, callback);
    }

    public void sendMessage(FleaseMessage message, InetSocketAddress recipient) {
        ReusableBuffer data = BufferPool.allocate(message.getSize());
        message.serialize(data);
        data.flip();
        try {
            RPCResponse r = this.fleaseOsdClient.xtreemfs_rwr_flease_msg(recipient, RPCAuthentication.authNone, RPCAuthentication.userService, this.master.getHostName(), this.master.getConfig().getPort(), data);
            r.registerListener(new RPCResponseAvailableListener(){

                public void responseAvailable(RPCResponse r) {
                    r.freeBuffers();
                }
            });
        }
        catch (IOException ex) {
            Logging.logError(3, this, ex);
        }
    }

    @Override
    protected void processMethod(Stage.StageRequest method) {
        switch (method.getStageMethod()) {
            case 1: {
                this.externalRequestsInQueue.decrementAndGet();
                this.processReplicatedWrite(method);
                break;
            }
            case 6: {
                this.externalRequestsInQueue.decrementAndGet();
                this.processReplicatedTruncate(method);
                break;
            }
            case 2: {
                this.processFileClosed(method);
                break;
            }
            case 3: {
                this.processFleaseMessage(method);
                break;
            }
            case 5: {
                this.externalRequestsInQueue.decrementAndGet();
                this.processPrepareOp(method);
                break;
            }
            case 10: {
                this.processSetAuthoritativeState(method);
                break;
            }
            case 13: {
                this.processLeaseStateChanged(method);
                break;
            }
            case 11: {
                this.processObjectFetched(method);
                break;
            }
            case 14: {
                this.processReplicaStateAvailExecReset(method);
                break;
            }
            case 15: {
                this.processDeleteObjectsComplete(method);
                break;
            }
            case 17: {
                this.processMaxObjAvail(method);
                break;
            }
            case 18: {
                this.processBackupAuthoritativeState(method);
                break;
            }
            case 16: {
                this.processForceReset(method);
                break;
            }
            case 7: {
                this.processGetStatus(method);
                break;
            }
            case 21: {
                this.processSetFleaseView(method);
                break;
            }
            case 22: {
                this.processInvalidateReplica(method);
                break;
            }
            case 23: {
                this.processFetchInvalidated(method);
                break;
            }
            default: {
                throw new IllegalArgumentException("no such stageop");
            }
        }
    }

    private void processFleaseMessage(Stage.StageRequest method) {
        try {
            ReusableBuffer data = (ReusableBuffer)method.getArgs()[0];
            InetSocketAddress sender = (InetSocketAddress)method.getArgs()[1];
            FleaseMessage msg = new FleaseMessage(data);
            BufferPool.free(data);
            msg.setSender(sender);
            this.fstage.receiveMessage(msg);
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void processFileClosed(Stage.StageRequest method) {
        try {
            String fileId = (String)method.getArgs()[0];
            this.closeFileState(fileId, false);
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void closeFileState(String fileId, boolean returnLease) {
        ReplicatedFileState state = this.files.remove(fileId);
        if (state != null) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "closing file %s", fileId);
            }
            state.getPolicy().closeFile();
            if (state.getPolicy().requiresLease()) {
                this.fstage.closeCell(state.getPolicy().getCellId(), returnLease);
            }
            this.cellToFileId.remove(state.getPolicy().getCellId());
        }
    }

    private ReplicatedFileState getState(GlobalTypes.FileCredentials credentials, XLocations loc, boolean forceReset, boolean invalidated) throws IOException {
        final String fileId = credentials.getXcap().getFileId();
        ReplicatedFileState state = this.files.get(fileId);
        if (state == null) {
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "open file: " + fileId, new Object[0]);
            }
            state = new ReplicatedFileState(fileId, loc, this.master.getConfig().getUUID(), this.fstage, this.osdClient);
            this.files.put(fileId, state);
            state.setCredentials(credentials);
            state.setForceReset(forceReset);
            state.setInvalidated(invalidated);
            this.cellToFileId.put(state.getPolicy().getCellId(), fileId);
            assert (state.getState() == ReplicatedFileState.ReplicaState.INITIALIZING);
            this.master.getStorageStage().internalGetMaxObjectNo(fileId, loc.getLocalReplica().getStripingPolicy(), new StorageStage.InternalGetMaxObjectNoCallback(){

                @Override
                public void maxObjectNoCompleted(long maxObjNo, long fileSize, long truncateEpoch, RPC.RPCHeader.ErrorResponse error) {
                    RWReplicationStage.this.eventMaxObjAvail(fileId, maxObjNo, fileSize, truncateEpoch, error);
                }
            });
        }
        return state;
    }

    private void processMaxObjAvail(Stage.StageRequest method) {
        try {
            ReplicatedFileState state;
            String fileId = (String)method.getArgs()[0];
            Long maxObjVersion = (Long)method.getArgs()[1];
            RPC.RPCHeader.ErrorResponse error = (RPC.RPCHeader.ErrorResponse)method.getArgs()[2];
            if (Logging.isDebug()) {
                Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) max obj avail for file: " + fileId + " max=" + maxObjVersion, this.localID);
            }
            if ((state = this.files.get(fileId)) == null) {
                Logging.logMessage(3, Logging.Category.replication, this, "received maxObjAvail event for unknow file: %s", fileId);
                return;
            }
            if (state.getState() != ReplicatedFileState.ReplicaState.INITIALIZING) {
                Logging.logMessage(3, Logging.Category.replication, this, "ReplicaState is %s instead of INITIALIZING, maxObjectVersion=%d", state.getState().name(), maxObjVersion);
                return;
            }
            state.getPolicy().setLocalObjectVersion(maxObjVersion);
            this.doOpen(state);
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
        }
    }

    private void processReplicatedWrite(Stage.StageRequest method) {
        final RWReplicationCallback callback = (RWReplicationCallback)method.getCallback();
        try {
            GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[0];
            XLocations loc = (XLocations)method.getArgs()[1];
            Long objNo = (Long)method.getArgs()[2];
            final Long objVersion = (Long)method.getArgs()[3];
            InternalObjectData objData = (InternalObjectData)method.getArgs()[4];
            String fileId = credentials.getXcap().getFileId();
            ReplicatedFileState state = this.files.get(fileId);
            if (state == null) {
                BufferPool.free(objData.getData());
                callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_EIO, "file is not open!"));
                return;
            }
            state.setCredentials(credentials);
            state.getPolicy().executeWrite(credentials, objNo, objVersion, objData, new ReplicaUpdatePolicy.ClientOperationCallback(){

                @Override
                public void finished() {
                    callback.success(objVersion);
                }

                @Override
                public void failed(RPC.RPCHeader.ErrorResponse error) {
                    callback.failed(error);
                }
            });
        }
        catch (Exception ex) {
            ex.printStackTrace();
            callback.failed(ErrorUtils.getInternalServerError(ex));
        }
    }

    private void processReplicatedTruncate(Stage.StageRequest method) {
        final RWReplicationCallback callback = (RWReplicationCallback)method.getCallback();
        try {
            GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[0];
            XLocations loc = (XLocations)method.getArgs()[1];
            Long newFileSize = (Long)method.getArgs()[2];
            final Long newObjVersion = (Long)method.getArgs()[3];
            String fileId = credentials.getXcap().getFileId();
            ReplicatedFileState state = this.files.get(fileId);
            if (state == null) {
                callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_EIO, "file is not open!"));
                return;
            }
            state.setCredentials(credentials);
            state.getPolicy().executeTruncate(credentials, newFileSize, newObjVersion, new ReplicaUpdatePolicy.ClientOperationCallback(){

                @Override
                public void finished() {
                    callback.success(newObjVersion);
                }

                @Override
                public void failed(RPC.RPCHeader.ErrorResponse error) {
                    callback.failed(error);
                }
            });
        }
        catch (Exception ex) {
            ex.printStackTrace();
            callback.failed(ErrorUtils.getInternalServerError(ex));
        }
    }

    private void processPrepareOp(Stage.StageRequest method) {
        block28: {
            RWReplicationCallback callback = (RWReplicationCallback)method.getCallback();
            try {
                GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[0];
                XLocations loc = (XLocations)method.getArgs()[1];
                Long objVersion = (Long)method.getArgs()[3];
                Operation op = (Operation)((Object)method.getArgs()[4]);
                String fileId = credentials.getXcap().getFileId();
                ReplicatedFileState state = this.getState(credentials, loc, false, false);
                if (state.isInvalidated()) {
                    callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_NONE, "file has been invalidated"));
                    return;
                }
                if (op == Operation.INTERNAL_UPDATE || op == Operation.INTERNAL_TRUNCATE) {
                    switch (state.getState()) {
                        case INITIALIZING: 
                        case OPEN: 
                        case WAITING_FOR_LEASE: 
                        case RESET: {
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, Logging.Category.replication, this, "enqeue update for %s (state is %s)", new Object[]{fileId, state.getState()});
                            }
                            if (state.sizeOfPendingRequests() > 10) {
                                if (Logging.isDebug()) {
                                    Logging.logMessage(7, this, "rejecting request: too many requests (is: %d, max %d) in queue for file %s", state.sizeOfPendingRequests(), 10, fileId);
                                }
                                callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_NONE, "too many requests in queue for file"));
                                return;
                            }
                            state.addPendingRequest(method);
                            if (state.getState() == ReplicatedFileState.ReplicaState.OPEN) {
                                this.doWaitingForLease(state);
                            }
                            return;
                        }
                    }
                    if (!state.getPolicy().acceptRemoteUpdate(objVersion)) {
                        Logging.logMessage(4, Logging.Category.replication, this, "received outdated object version %d for file %s", objVersion, fileId);
                        callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.IO_ERROR, RPC.POSIXErrno.POSIX_ERROR_EIO, "outdated object version for update rejected"));
                        return;
                    }
                    boolean needsReset = state.getPolicy().onRemoteUpdate(objVersion, state.getState());
                    if (Logging.isDebug()) {
                        Logging.logMessage(7, Logging.Category.replication, this, "%s needs reset: %s", fileId, needsReset);
                    }
                    if (needsReset) {
                        state.addPendingRequest(method);
                        this.doReset(state, objVersion);
                    } else {
                        callback.success(0L);
                    }
                    break block28;
                }
                state.setCredentials(credentials);
                switch (state.getState()) {
                    case INITIALIZING: 
                    case WAITING_FOR_LEASE: 
                    case RESET: {
                        if (state.sizeOfPendingRequests() > 10) {
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, this, "rejecting request: too many requests (is: %d, max %d) in queue for file %s", state.sizeOfPendingRequests(), 10, fileId);
                            }
                            callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_NONE, "too many requests in queue for file"));
                        } else {
                            state.addPendingRequest(method);
                        }
                        return;
                    }
                    case OPEN: {
                        if (state.sizeOfPendingRequests() > 10) {
                            if (Logging.isDebug()) {
                                Logging.logMessage(7, this, "rejecting request: too many requests (is: %d, max %d) in queue for file %s", state.sizeOfPendingRequests(), 10, fileId);
                            }
                            callback.failed(ErrorUtils.getErrorResponse(RPC.ErrorType.INTERNAL_SERVER_ERROR, RPC.POSIXErrno.POSIX_ERROR_NONE, "too many requests in queue for file"));
                            return;
                        }
                        state.addPendingRequest(method);
                        this.doWaitingForLease(state);
                        return;
                    }
                }
                try {
                    long newVersion = state.getPolicy().onClientOperation(op, objVersion, state.getState(), state.getLease());
                    callback.success(newVersion);
                }
                catch (RedirectToMasterException ex) {
                    callback.redirect(ex.getMasterUUID());
                }
                catch (RetryException ex) {
                    RPC.RPCHeader.ErrorResponse err = ErrorUtils.getInternalServerError(ex);
                    this.failed(state, err, "processPrepareOp");
                    if (state.getState() == ReplicatedFileState.ReplicaState.BACKUP || state.getState() == ReplicatedFileState.ReplicaState.PRIMARY) {
                        callback.failed(err);
                    }
                }
            }
            catch (Exception ex) {
                ex.printStackTrace();
                callback.failed(ErrorUtils.getInternalServerError(ex));
            }
        }
    }

    private void processGetStatus(Stage.StageRequest method) {
        StatusCallback callback = (StatusCallback)method.getCallback();
        try {
            HashMap<String, Map<String, String>> status = new HashMap<String, Map<String, String>>();
            Map fleaseState = this.fstage.getLocalState();
            for (String fileId : this.files.keySet()) {
                HashMap<String, String> fStatus = new HashMap<String, String>();
                ReplicatedFileState fState = this.files.get(fileId);
                ASCIIString cellId = fState.getPolicy().getCellId();
                fStatus.put("policy", fState.getPolicy().getClass().getSimpleName());
                fStatus.put("peers (OSDs)", fState.getPolicy().getRemoteOSDUUIDs().toString());
                fStatus.put("pending requests", String.valueOf(fState.sizeOfPendingRequests()));
                fStatus.put("cellId", cellId.toString());
                String primary = "unknown";
                if (fState.getLease() != null && !fState.getLease().isEmptyLease()) {
                    primary = fState.getLease().isValid() ? (fState.isLocalIsPrimary() ? "primary" : "backup ( primary is " + fState.getLease().getLeaseHolder() + ")") : "outdated lease: " + fState.getLease().getLeaseHolder();
                }
                fStatus.put("role", primary);
                status.put(fileId, fStatus);
            }
            callback.statusComplete(status);
        }
        catch (Exception ex) {
            ex.printStackTrace();
            callback.statusComplete(null);
        }
    }

    public String getPrimary(String fileId) {
        String primary = null;
        ReplicatedFileState fState = this.files.get(fileId);
        if (fState != null && fState.getLease() != null && !fState.getLease().isEmptyLease() && fState.getLease().isValid()) {
            primary = "" + fState.getLease().getLeaseHolder();
        }
        return primary;
    }

    public void setFleaseView(String fileId, ASCIIString cellId, OSD.XLocSetVersionState versionState) {
        this.enqueueOperation(21, new Object[]{fileId, cellId, versionState}, null, null);
    }

    private void processSetFleaseView(Stage.StageRequest method) {
        Object[] args = method.getArgs();
        String fileId = (String)args[0];
        ASCIIString cellId = (ASCIIString)args[1];
        OSD.XLocSetVersionState versionState = (OSD.XLocSetVersionState)args[2];
        int viewId = versionState.getInvalidated() ? -1 : versionState.getVersion();
        ReplicatedFileState state = this.files.get(fileId);
        if (state != null && state.getLocations().getVersion() < versionState.getVersion()) {
            this.closeFileState(fileId, true);
        }
        this.fstage.setViewId(cellId, viewId, new FleaseListener(){

            public void proposalResult(ASCIIString cellId, ASCIIString leaseHolder, long leaseTimeout_ms, long masterEpochNumber) {
            }

            public void proposalFailed(ASCIIString cellId, Throwable cause) {
            }
        });
    }

    public void invalidateReplica(String fileId, GlobalTypes.FileCredentials fileCreds, XLocations xLoc, PreprocStage.InvalidateXLocSetCallback callback) {
        this.enqueueOperation(22, new Object[]{fileId, fileCreds, xLoc}, null, callback);
    }

    private void processInvalidateReplica(Stage.StageRequest method) {
        ReplicatedFileState state;
        Object[] args = method.getArgs();
        String fileId = (String)args[0];
        GlobalTypes.FileCredentials fileCreds = (GlobalTypes.FileCredentials)args[1];
        XLocations xLoc = (XLocations)args[2];
        final PreprocStage.InvalidateXLocSetCallback callback = (PreprocStage.InvalidateXLocSetCallback)method.getCallback();
        try {
            state = this.getState(fileCreds, xLoc, true, true);
            state.setInvalidated(true);
            assert (state.isInvalidated());
        }
        catch (IOException ex) {
            Logging.logError(3, this, ex);
            callback.invalidateComplete(GlobalTypes.LeaseState.NONE, ErrorUtils.getInternalServerError(ex));
            return;
        }
        final GlobalTypes.LeaseState leaseState = state.isCellOpen() ? (state.isLocalIsPrimary() ? GlobalTypes.LeaseState.PRIMARY : GlobalTypes.LeaseState.BACKUP) : GlobalTypes.LeaseState.IDLE;
        this.fstage.closeCell(state.getPolicy().getCellId(), true);
        this.cellToFileId.remove(state.getPolicy().getCellId());
        if (state.hasPendingRequests()) {
            RPC.RPCHeader.ErrorResponse er = ErrorUtils.getErrorResponse(RPC.ErrorType.ERRNO, RPC.POSIXErrno.POSIX_ERROR_EIO, "File got invalidated!");
            state.clearPendingRequests(er);
        }
        this.fstage.setViewId(state.getPolicy().getCellId(), -1, new FleaseListener(){

            public void proposalResult(ASCIIString cellId, ASCIIString leaseHolder, long leaseTimeout_ms, long masterEpochNumber) {
                callback.invalidateComplete(leaseState, null);
            }

            public void proposalFailed(ASCIIString cellId, Throwable cause) {
                callback.invalidateComplete(leaseState, ErrorUtils.getInternalServerError(cause));
            }
        });
    }

    public void fetchInvalidated(String fileId, OSD.AuthoritativeReplicaState authState, OSD.ReplicaStatus localState, GlobalTypes.FileCredentials credentials, XLocations xloc, RWReplicationCallback callback, OSDRequest request) {
        this.enqueueOperation(23, new Object[]{fileId, authState, localState, credentials, xloc}, request, null, callback);
    }

    private void processFetchInvalidated(Stage.StageRequest method) {
        RWReplicationCallback callback = (RWReplicationCallback)method.getCallback();
        try {
            String fileId = (String)method.getArgs()[0];
            OSD.AuthoritativeReplicaState authState = (OSD.AuthoritativeReplicaState)method.getArgs()[1];
            OSD.ReplicaStatus localState = (OSD.ReplicaStatus)method.getArgs()[2];
            GlobalTypes.FileCredentials credentials = (GlobalTypes.FileCredentials)method.getArgs()[3];
            XLocations loc = (XLocations)method.getArgs()[4];
            ReplicatedFileState state = this.getState(credentials, loc, true, true);
            state.setInvalidated(true);
            assert (state.isInvalidated());
            if (state.hasPendingRequests()) {
                Logging.logMessage(3, Logging.Category.replication, this, "(R:%s) pending requests were queued while the replica for %s has been invalidated.", this.localID, fileId);
            }
            switch (state.getState()) {
                case RESET: {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) enqueued fetch invalidated reset for file %s", this.localID, fileId);
                    state.addPendingRequest(method);
                    break;
                }
                case INITIALIZING: {
                    Logging.logMessage(7, Logging.Category.replication, this, "(R:%s) enqueued fetch invalidated reset for file %s", this.localID, fileId);
                    state.addPendingRequest(method);
                    break;
                }
                case OPEN: 
                case WAITING_FOR_LEASE: 
                case BACKUP: 
                case PRIMARY: {
                    state.setInvalidatedReset(true);
                    state.addPendingRequest(method);
                    this.executeSetAuthState(localState, authState, state, fileId);
                    break;
                }
                case INVALIDATED: {
                    this.closeFileState(fileId, true);
                    callback.success(0L);
                }
            }
        }
        catch (Exception ex) {
            Logging.logError(3, this, ex);
            callback.failed(ErrorUtils.getInternalServerError(ex));
        }
    }

    public static interface StatusCallback {
        public void statusComplete(Map<String, Map<String, String>> var1);
    }

    public static interface RWReplicationCallback {
        public void success(long var1);

        public void redirect(String var1);

        public void failed(RPC.RPCHeader.ErrorResponse var1);
    }

    public static enum Operation {
        READ,
        WRITE,
        TRUNCATE,
        INTERNAL_UPDATE,
        INTERNAL_TRUNCATE;

    }
}

