ceph: handle ESTALE properly; on receipt send to authority if it wasn't
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 22 Jun 2010 22:58:01 +0000 (15:58 -0700)
committerSage Weil <sage@newdream.net>
Mon, 2 Aug 2010 03:11:41 +0000 (20:11 -0700)
Signed-off-by: Greg Farnum <gregf@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index a546e0d..34d215f 100644 (file)
@@ -1628,6 +1628,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
        req->r_mds = mds;
        req->r_attempts++;
+       if (req->r_inode) {
+               struct ceph_cap *cap =
+                       ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+               if (cap)
+                       req->r_sent_on_mseq = cap->mseq;
+               else
+                       req->r_sent_on_mseq = -1;
+       }
        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
@@ -1962,21 +1971,39 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        result = le32_to_cpu(head->result);
 
        /*
-        * Tolerate 2 consecutive ESTALEs from the same mds.
-        * FIXME: we should be looking at the cap migrate_seq.
+        * Handle an ESTALE
+        * if we're not talking to the authority, send to them
+        * if the authority has changed while we weren't looking,
+        * send to new authority
+        * Otherwise we just have to return an ESTALE
         */
        if (result == -ESTALE) {
-               req->r_direct_mode = USE_AUTH_MDS;
-               req->r_num_stale++;
-               if (req->r_num_stale <= 2) {
+               dout("got ESTALE on request %llu", req->r_tid);
+               if (!req->r_inode) ; //do nothing; not an authority problem
+               else if (req->r_direct_mode != USE_AUTH_MDS) {
+                       dout("not using auth, setting for that now");
+                       req->r_direct_mode = USE_AUTH_MDS;
                        __do_request(mdsc, req);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
+               } else  {
+                       struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+                       struct ceph_cap *cap =
+                               ceph_get_cap_for_mds(ci, req->r_mds);;
+
+                       dout("already using auth");
+                       if ((!cap || cap != ci->i_auth_cap) ||
+                           (cap->mseq != req->r_sent_on_mseq)) {
+                               dout("but cap changed, so resending");
+                               __do_request(mdsc, req);
+                               mutex_unlock(&mdsc->mutex);
+                               goto out;
+                       }
                }
-       } else {
-               req->r_num_stale = 0;
+               dout("have to return ESTALE on request %llu", req->r_tid);
        }
 
+
        if (head->safe) {
                req->r_got_safe = true;
                __unregister_request(mdsc, req);
index c86be30..ab7e89f 100644 (file)
@@ -208,8 +208,8 @@ struct ceph_mds_request {
 
        int               r_attempts;   /* resend attempts */
        int               r_num_fwd;    /* number of forward attempts */
-       int               r_num_stale;
        int               r_resend_mds; /* mds to resend to next, if any*/
+       u32               r_sent_on_mseq; /* cap mseq request was sent at*/
 
        struct kref       r_kref;
        struct list_head  r_wait;