ceph: clean up mds reply, error handling
authorSage Weil <sage@newdream.net>
Thu, 13 May 2010 18:19:06 +0000 (11:19 -0700)
committerSage Weil <sage@newdream.net>
Mon, 17 May 2010 17:25:44 +0000 (10:25 -0700)
We would occasionally BUG out in the reply handler because r_reply was
nonzero, due to a race with ceph_mdsc_do_request temporarily setting
r_reply to an ERR_PTR value.  This is unnecessary, messy, and also wrong
in the EIO case.

Clean up by consistently using r_err for errors and r_reply for messages.
Also fix the abort logic to trigger consistently for all errors that return
to the caller early (e.g., EIO from timeout case).  If an abort races with
a reply, use the result from the reply.

Also fix locking for r_err, r_reply update in the reply handler.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index 24561a5..b3b19f0 100644 (file)
@@ -1517,7 +1517,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        }
        msg = create_request_message(mdsc, req, mds);
        if (IS_ERR(msg)) {
-               req->r_reply = ERR_PTR(PTR_ERR(msg));
+               req->r_err = PTR_ERR(msg);
                complete_request(mdsc, req);
                return -PTR_ERR(msg);
        }
@@ -1552,7 +1552,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
        int mds = -1;
        int err = -EAGAIN;
 
-       if (req->r_reply)
+       if (req->r_err || req->r_got_result)
                goto out;
 
        if (req->r_timeout &&
@@ -1609,7 +1609,7 @@ out:
        return err;
 
 finish:
-       req->r_reply = ERR_PTR(err);
+       req->r_err = err;
        complete_request(mdsc, req);
        goto out;
 }
@@ -1689,59 +1689,53 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
        __register_request(mdsc, req, dir);
        __do_request(mdsc, req);
 
-       /* wait */
-       if (!req->r_reply) {
-               mutex_unlock(&mdsc->mutex);
-               if (req->r_timeout) {
-                       err = (long)wait_for_completion_interruptible_timeout(
-                               &req->r_completion, req->r_timeout);
-                       if (err == 0)
-                               req->r_reply = ERR_PTR(-EIO);
-                       else if (err < 0)
-                               req->r_reply = ERR_PTR(err);
-               } else {
-                        err = wait_for_completion_interruptible(
-                                &req->r_completion);
-                        if (err)
-                                req->r_reply = ERR_PTR(err);
-               }
-               mutex_lock(&mdsc->mutex);
+       if (req->r_err) {
+               err = req->r_err;
+               __unregister_request(mdsc, req);
+               dout("do_request early error %d\n", err);
+               goto out;
        }
 
-       if (IS_ERR(req->r_reply)) {
-               err = PTR_ERR(req->r_reply);
-               req->r_reply = NULL;
-
-               if (err == -ERESTARTSYS) {
-                       /* aborted */
-                       req->r_aborted = true;
+       /* wait */
+       mutex_unlock(&mdsc->mutex);
+       dout("do_request waiting\n");
+       if (req->r_timeout) {
+               err = (long)wait_for_completion_interruptible_timeout(
+                       &req->r_completion, req->r_timeout);
+               if (err == 0)
+                       err = -EIO;
+       } else {
+               err = wait_for_completion_interruptible(&req->r_completion);
+       }
+       dout("do_request waited, got %d\n", err);
+       mutex_lock(&mdsc->mutex);
 
-                       if (req->r_locked_dir &&
-                           (req->r_op & CEPH_MDS_OP_WRITE)) {
-                               struct ceph_inode_info *ci =
-                                       ceph_inode(req->r_locked_dir);
+       /* only abort if we didn't race with a real reply */
+       if (req->r_got_result) {
+               err = le32_to_cpu(req->r_reply_info.head->result);
+       } else if (err < 0) {
+               dout("aborted request %lld with %d\n", req->r_tid, err);
+               req->r_err = err;
+               req->r_aborted = true;
 
-                               dout("aborted, clearing I_COMPLETE on %p\n", 
-                                    req->r_locked_dir);
-                               spin_lock(&req->r_locked_dir->i_lock);
-                               ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                               ci->i_release_count++;
-                               spin_unlock(&req->r_locked_dir->i_lock);
-                       }
-               } else {
-                       /* clean up this request */
-                       __unregister_request(mdsc, req);
-                       if (!list_empty(&req->r_unsafe_item))
-                               list_del_init(&req->r_unsafe_item);
-                       complete(&req->r_safe_completion);
+               if (req->r_locked_dir &&
+                   (req->r_op & CEPH_MDS_OP_WRITE)) {
+                       struct ceph_inode_info *ci =
+                               ceph_inode(req->r_locked_dir);
+
+                       dout("aborted, clearing I_COMPLETE on %p\n",
+                            req->r_locked_dir);
+                       spin_lock(&req->r_locked_dir->i_lock);
+                       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+                       ci->i_release_count++;
+                       spin_unlock(&req->r_locked_dir->i_lock);
                }
-       } else if (req->r_err) {
-               err = req->r_err;
        } else {
-               err = le32_to_cpu(req->r_reply_info.head->result);
+               err = req->r_err;
        }
-       mutex_unlock(&mdsc->mutex);
 
+out:
+       mutex_unlock(&mdsc->mutex);
        dout("do_request %p done, result %d\n", req, err);
        return err;
 }
@@ -1838,11 +1832,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                        mutex_unlock(&mdsc->mutex);
                        goto out;
                }
-       }
-
-       BUG_ON(req->r_reply);
-
-       if (!head->safe) {
+       } else {
                req->r_got_unsafe = true;
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
        }
@@ -1880,12 +1870,19 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        up_read(&mdsc->snap_rwsem);
 out_err:
-       if (err) {
-               req->r_err = err;
+       mutex_lock(&mdsc->mutex);
+       if (!req->r_aborted) {
+               if (err) {
+                       req->r_err = err;
+               } else {
+                       req->r_reply = msg;
+                       ceph_msg_get(msg);
+                       req->r_got_result = true;
+               }
        } else {
-               req->r_reply = msg;
-               ceph_msg_get(msg);
+               dout("reply arrived after request %lld was aborted\n", tid);
        }
+       mutex_unlock(&mdsc->mutex);
 
        add_cap_releases(mdsc, req->r_session, -1);
        mutex_unlock(&session->s_mutex);
index 961cc6f..0b1dd10 100644 (file)
@@ -213,7 +213,7 @@ struct ceph_mds_request {
        struct completion r_safe_completion;
        ceph_mds_request_callback_t r_callback;
        struct list_head  r_unsafe_item;  /* per-session unsafe list item */
-       bool              r_got_unsafe, r_got_safe;
+       bool              r_got_unsafe, r_got_safe, r_got_result;
 
        bool              r_did_prepopulate;
        u32               r_readdir_offset;