ceph: skip mds sync on forced unmount
[pandora-kernel.git] / fs / ceph / mds_client.c
index 60a9a4a..0d451a8 100644 (file)
@@ -665,10 +665,10 @@ static struct ceph_msg *create_session_msg(u32 op, u64 seq)
        struct ceph_msg *msg;
        struct ceph_mds_session_head *h;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
-       if (IS_ERR(msg)) {
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h));
+       if (!msg) {
                pr_err("create_session_msg ENOMEM creating msg\n");
-               return ERR_PTR(PTR_ERR(msg));
+               return NULL;
        }
        h = msg->front.iov_base;
        h->op = cpu_to_le32(op);
@@ -687,7 +687,6 @@ static int __open_session(struct ceph_mds_client *mdsc,
        struct ceph_msg *msg;
        int mstate;
        int mds = session->s_mds;
-       int err = 0;
 
        /* wait for mds to go active? */
        mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
@@ -698,13 +697,9 @@ static int __open_session(struct ceph_mds_client *mdsc,
 
        /* send connect message */
        msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
-       if (IS_ERR(msg)) {
-               err = PTR_ERR(msg);
-               goto out;
-       }
+       if (!msg)
+               return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
-
-out:
        return 0;
 }
 
@@ -736,9 +731,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
 }
 
 /*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
  *
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
  */
 static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
@@ -882,8 +878,8 @@ static int send_renew_caps(struct ceph_mds_client *mdsc,
                ceph_mds_state_name(state));
        msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
                                 ++session->s_renew_seq);
-       if (IS_ERR(msg))
-               return PTR_ERR(msg);
+       if (!msg)
+               return -ENOMEM;
        ceph_con_send(&session->s_con, msg);
        return 0;
 }
@@ -930,17 +926,15 @@ static int request_close_session(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_session *session)
 {
        struct ceph_msg *msg;
-       int err = 0;
 
        dout("request_close_session mds%d state %s seq %lld\n",
             session->s_mds, session_state_name(session->s_state),
             session->s_seq);
        msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
-       if (IS_ERR(msg))
-               err = PTR_ERR(msg);
-       else
-               ceph_con_send(&session->s_con, msg);
-       return err;
+       if (!msg)
+               return -ENOMEM;
+       ceph_con_send(&session->s_con, msg);
+       return 0;
 }
 
 /*
@@ -1057,8 +1051,7 @@ static int add_cap_releases(struct ceph_mds_client *mdsc,
 
        while (session->s_num_cap_releases < session->s_nr_caps + extra) {
                spin_unlock(&session->s_cap_lock);
-               msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-                                  0, 0, NULL);
+               msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE);
                if (!msg)
                        goto out_unlocked;
                dout("add_cap_releases %p msg %p now %d\n", session, msg,
@@ -1180,6 +1173,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
        if (!req)
                return ERR_PTR(-ENOMEM);
 
+       mutex_init(&req->r_fill_mutex);
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1266,7 +1260,7 @@ retry:
                struct inode *inode = temp->d_inode;
 
                if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
-                       dout("build_path_dentry path+%d: %p SNAPDIR\n",
+                       dout("build_path path+%d: %p SNAPDIR\n",
                             pos, temp);
                } else if (stop_on_nosnap && inode &&
                           ceph_snap(inode) == CEPH_NOSNAP) {
@@ -1277,20 +1271,18 @@ retry:
                                break;
                        strncpy(path + pos, temp->d_name.name,
                                temp->d_name.len);
-                       dout("build_path_dentry path+%d: %p '%.*s'\n",
-                            pos, temp, temp->d_name.len, path + pos);
                }
                if (pos)
                        path[--pos] = '/';
                temp = temp->d_parent;
                if (temp == NULL) {
-                       pr_err("build_path_dentry corrupt dentry\n");
+                       pr_err("build_path corrupt dentry\n");
                        kfree(path);
                        return ERR_PTR(-EINVAL);
                }
        }
        if (pos != 0) {
-               pr_err("build_path_dentry did not end path lookup where "
+               pr_err("build_path did not end path lookup where "
                       "expected, namelen is %d, pos is %d\n", len, pos);
                /* presumably this is only possible if racing with a
                   rename of one of the parent directories (we can not
@@ -1302,7 +1294,7 @@ retry:
 
        *base = ceph_ino(temp->d_inode);
        *plen = len;
-       dout("build_path_dentry on %p %d built %llx '%.*s'\n",
+       dout("build_path on %p %d built %llx '%.*s'\n",
             dentry, atomic_read(&dentry->d_count), *base, len, path);
        return path;
 }
@@ -1425,9 +1417,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        if (req->r_old_dentry_drop)
                len += req->r_old_dentry->d_name.len;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
-       if (IS_ERR(msg))
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len);
+       if (!msg) {
+               msg = ERR_PTR(-ENOMEM);
                goto out_free2;
+       }
 
        msg->hdr.tid = cpu_to_le64(req->r_tid);
 
@@ -1516,9 +1510,9 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        }
        msg = create_request_message(mdsc, req, mds);
        if (IS_ERR(msg)) {
-               req->r_reply = ERR_PTR(PTR_ERR(msg));
+               req->r_err = PTR_ERR(msg);
                complete_request(mdsc, req);
-               return -PTR_ERR(msg);
+               return PTR_ERR(msg);
        }
        req->r_request = msg;
 
@@ -1551,7 +1545,7 @@ static int __do_request(struct ceph_mds_client *mdsc,
        int mds = -1;
        int err = -EAGAIN;
 
-       if (req->r_reply)
+       if (req->r_err || req->r_got_result)
                goto out;
 
        if (req->r_timeout &&
@@ -1608,7 +1602,7 @@ out:
        return err;
 
 finish:
-       req->r_reply = ERR_PTR(err);
+       req->r_err = err;
        complete_request(mdsc, req);
        goto out;
 }
@@ -1688,59 +1682,66 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
        __register_request(mdsc, req, dir);
        __do_request(mdsc, req);
 
-       /* wait */
-       if (!req->r_reply) {
-               mutex_unlock(&mdsc->mutex);
-               if (req->r_timeout) {
-                       err = (long)wait_for_completion_interruptible_timeout(
-                               &req->r_completion, req->r_timeout);
-                       if (err == 0)
-                               req->r_reply = ERR_PTR(-EIO);
-                       else if (err < 0)
-                               req->r_reply = ERR_PTR(err);
-               } else {
-                        err = wait_for_completion_interruptible(
-                                &req->r_completion);
-                        if (err)
-                                req->r_reply = ERR_PTR(err);
-               }
-               mutex_lock(&mdsc->mutex);
+       if (req->r_err) {
+               err = req->r_err;
+               __unregister_request(mdsc, req);
+               dout("do_request early error %d\n", err);
+               goto out;
        }
 
-       if (IS_ERR(req->r_reply)) {
-               err = PTR_ERR(req->r_reply);
-               req->r_reply = NULL;
+       /* wait */
+       mutex_unlock(&mdsc->mutex);
+       dout("do_request waiting\n");
+       if (req->r_timeout) {
+               err = (long)wait_for_completion_interruptible_timeout(
+                       &req->r_completion, req->r_timeout);
+               if (err == 0)
+                       err = -EIO;
+       } else {
+               err = wait_for_completion_interruptible(&req->r_completion);
+       }
+       dout("do_request waited, got %d\n", err);
+       mutex_lock(&mdsc->mutex);
 
-               if (err == -ERESTARTSYS) {
-                       /* aborted */
-                       req->r_aborted = true;
+       /* only abort if we didn't race with a real reply */
+       if (req->r_got_result) {
+               err = le32_to_cpu(req->r_reply_info.head->result);
+       } else if (err < 0) {
+               dout("aborted request %lld with %d\n", req->r_tid, err);
 
-                       if (req->r_locked_dir &&
-                           (req->r_op & CEPH_MDS_OP_WRITE)) {
-                               struct ceph_inode_info *ci =
-                                       ceph_inode(req->r_locked_dir);
+               /*
+                * ensure we aren't running concurrently with
+                * ceph_fill_trace or ceph_readdir_prepopulate, which
+                * rely on locks (dir mutex) held by our caller.
+                */
+               mutex_lock(&req->r_fill_mutex);
+               req->r_err = err;
+               req->r_aborted = true;
+               mutex_unlock(&req->r_fill_mutex);
 
-                               dout("aborted, clearing I_COMPLETE on %p\n", 
-                                    req->r_locked_dir);
-                               spin_lock(&req->r_locked_dir->i_lock);
-                               ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
-                               ci->i_release_count++;
-                               spin_unlock(&req->r_locked_dir->i_lock);
-                       }
-               } else {
-                       /* clean up this request */
-                       __unregister_request(mdsc, req);
-                       if (!list_empty(&req->r_unsafe_item))
-                               list_del_init(&req->r_unsafe_item);
-                       complete(&req->r_safe_completion);
+               if (req->r_locked_dir &&
+                   (req->r_op & CEPH_MDS_OP_WRITE)) {
+                       struct ceph_inode_info *ci =
+                               ceph_inode(req->r_locked_dir);
+
+                       dout("aborted, clearing I_COMPLETE on %p, leases\n",
+                            req->r_locked_dir);
+                       spin_lock(&req->r_locked_dir->i_lock);
+                       ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
+                       ci->i_release_count++;
+                       spin_unlock(&req->r_locked_dir->i_lock);
+
+                       if (req->r_dentry)
+                               ceph_invalidate_dentry_lease(req->r_dentry);
+                       if (req->r_old_dentry)
+                               ceph_invalidate_dentry_lease(req->r_old_dentry);
                }
-       } else if (req->r_err) {
-               err = req->r_err;
        } else {
-               err = le32_to_cpu(req->r_reply_info.head->result);
+               err = req->r_err;
        }
-       mutex_unlock(&mdsc->mutex);
 
+out:
+       mutex_unlock(&mdsc->mutex);
        dout("do_request %p done, result %d\n", req, err);
        return err;
 }
@@ -1837,11 +1838,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                        mutex_unlock(&mdsc->mutex);
                        goto out;
                }
-       }
-
-       BUG_ON(req->r_reply);
-
-       if (!head->safe) {
+       } else {
                req->r_got_unsafe = true;
                list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
        }
@@ -1870,21 +1867,30 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        }
 
        /* insert trace into our cache */
+       mutex_lock(&req->r_fill_mutex);
        err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
                ceph_unreserve_caps(&req->r_caps_reservation);
        }
+       mutex_unlock(&req->r_fill_mutex);
 
        up_read(&mdsc->snap_rwsem);
 out_err:
-       if (err) {
-               req->r_err = err;
+       mutex_lock(&mdsc->mutex);
+       if (!req->r_aborted) {
+               if (err) {
+                       req->r_err = err;
+               } else {
+                       req->r_reply = msg;
+                       ceph_msg_get(msg);
+                       req->r_got_result = true;
+               }
        } else {
-               req->r_reply = msg;
-               ceph_msg_get(msg);
+               dout("reply arrived after request %lld was aborted\n", tid);
        }
+       mutex_unlock(&mdsc->mutex);
 
        add_cap_releases(mdsc, req->r_session, -1);
        mutex_unlock(&session->s_mutex);
@@ -2136,7 +2142,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        struct ceph_mds_session *session = NULL;
        struct ceph_msg *reply;
        struct rb_node *p;
-       int err;
+       int err = -ENOMEM;
        struct ceph_pagelist *pagelist;
 
        pr_info("reconnect to recovering mds%d\n", mds);
@@ -2146,11 +2152,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                goto fail_nopagelist;
        ceph_pagelist_init(pagelist);
 
-       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, 0, 0, NULL);
-       if (IS_ERR(reply)) {
-               err = PTR_ERR(reply);
+       err = -ENOMEM;
+       reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0);
+       if (!reply)
                goto fail_nomsg;
-       }
 
        /* find session */
        session = __ceph_lookup_mds_session(mdsc, mds);
@@ -2185,7 +2190,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                goto fail;
        err = iterate_session_caps(session, encode_caps_cb, pagelist);
        if (err < 0)
-               goto out;
+               goto fail;
 
        /*
         * snaprealms.  we provide mds with the ino, seq (version), and
@@ -2213,28 +2218,31 @@ send:
        reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
 
-       if (session) {
-               session->s_state = CEPH_MDS_SESSION_OPEN;
-               __wake_requests(mdsc, &session->s_waiting);
-       }
+       session->s_state = CEPH_MDS_SESSION_OPEN;
+       mutex_unlock(&session->s_mutex);
+
+       mutex_lock(&mdsc->mutex);
+       __wake_requests(mdsc, &session->s_waiting);
+       mutex_unlock(&mdsc->mutex);
+
+       ceph_put_mds_session(session);
 
-out:
        up_read(&mdsc->snap_rwsem);
-       if (session) {
-               mutex_unlock(&session->s_mutex);
-               ceph_put_mds_session(session);
-       }
        mutex_lock(&mdsc->mutex);
        return;
 
 fail:
        ceph_msg_put(reply);
+       up_read(&mdsc->snap_rwsem);
+       mutex_unlock(&session->s_mutex);
+       ceph_put_mds_session(session);
 fail_nomsg:
        ceph_pagelist_release(pagelist);
        kfree(pagelist);
 fail_nopagelist:
-       pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
-       goto out;
+       pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+       mutex_lock(&mdsc->mutex);
+       return;
 }
 
 
@@ -2453,8 +2461,8 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
        dnamelen = dentry->d_name.len;
        len += dnamelen;
 
-       msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
-       if (IS_ERR(msg))
+       msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len);
+       if (!msg)
                return;
        lease = msg->front.iov_base;
        lease->action = action;
@@ -2616,6 +2624,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        mdsc->client = client;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
+       if (mdsc->mdsmap == NULL)
+               return -ENOMEM;
+
        init_completion(&mdsc->safe_umount_waiters);
        init_completion(&mdsc->session_close_waiters);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
@@ -2641,6 +2652,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
+
        return 0;
 }
 
@@ -2736,6 +2748,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
        u64 want_tid, want_flush;
 
+       if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+               return;
+
        dout("sync\n");
        mutex_lock(&mdsc->mutex);
        want_tid = mdsc->last_tid;