Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
[pandora-kernel.git] / fs / ceph / mds_client.c
index dd440bd..f091b13 100644 (file)
@@ -3,6 +3,7 @@
 #include <linux/wait.h>
 #include <linux/slab.h>
 #include <linux/sched.h>
+#include <linux/smp_lock.h>
 
 #include "mds_client.h"
 #include "mon_client.h"
  * are no longer valid.
  */
 
+struct ceph_reconnect_state {
+       struct ceph_pagelist *pagelist;
+       bool flock;
+};
+
 static void __wake_requests(struct ceph_mds_client *mdsc,
                            struct list_head *head);
 
@@ -449,7 +455,7 @@ void ceph_mdsc_release_request(struct kref *kref)
        kfree(req->r_path1);
        kfree(req->r_path2);
        put_request_session(req);
-       ceph_unreserve_caps(&req->r_caps_reservation);
+       ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
        kfree(req);
 }
 
@@ -512,7 +518,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
 {
        req->r_tid = ++mdsc->last_tid;
        if (req->r_num_caps)
-               ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+               ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+                                 req->r_num_caps);
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        ceph_mdsc_get_request(req);
        __insert_request(mdsc, req);
@@ -553,6 +560,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  *
  * Called under mdsc->mutex.
  */
+struct dentry *get_nonsnap_parent(struct dentry *dentry)
+{
+       while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
+               dentry = dentry->d_parent;
+       return dentry;
+}
+
 static int __choose_mds(struct ceph_mds_client *mdsc,
                        struct ceph_mds_request *req)
 {
@@ -583,14 +597,29 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
        if (req->r_inode) {
                inode = req->r_inode;
        } else if (req->r_dentry) {
-               if (req->r_dentry->d_inode) {
+               struct inode *dir = req->r_dentry->d_parent->d_inode;
+
+               if (dir->i_sb != mdsc->client->sb) {
+                       /* not this fs! */
+                       inode = req->r_dentry->d_inode;
+               } else if (ceph_snap(dir) != CEPH_NOSNAP) {
+                       /* direct snapped/virtual snapdir requests
+                        * based on parent dir inode */
+                       struct dentry *dn =
+                               get_nonsnap_parent(req->r_dentry->d_parent);
+                       inode = dn->d_inode;
+                       dout("__choose_mds using nonsnap parent %p\n", inode);
+               } else if (req->r_dentry->d_inode) {
+                       /* dentry target */
                        inode = req->r_dentry->d_inode;
                } else {
-                       inode = req->r_dentry->d_parent->d_inode;
+                       /* dir + name */
+                       inode = dir;
                        hash = req->r_dentry->d_name.hash;
                        is_hash = true;
                }
        }
+
        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
             (int)hash, mode);
        if (!inode)
@@ -703,6 +732,51 @@ static int __open_session(struct ceph_mds_client *mdsc,
        return 0;
 }
 
+/*
+ * open sessions for any export targets for the given mds
+ *
+ * called under mdsc->mutex
+ */
+static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
+                                         struct ceph_mds_session *session)
+{
+       struct ceph_mds_info *mi;
+       struct ceph_mds_session *ts;
+       int i, mds = session->s_mds;
+       int target;
+
+       if (mds >= mdsc->mdsmap->m_max_mds)
+               return;
+       mi = &mdsc->mdsmap->m_info[mds];
+       dout("open_export_target_sessions for mds%d (%d targets)\n",
+            session->s_mds, mi->num_export_targets);
+
+       for (i = 0; i < mi->num_export_targets; i++) {
+               target = mi->export_targets[i];
+               ts = __ceph_lookup_mds_session(mdsc, target);
+               if (!ts) {
+                       ts = register_session(mdsc, target);
+                       if (IS_ERR(ts))
+                               return;
+               }
+               if (session->s_state == CEPH_MDS_SESSION_NEW ||
+                   session->s_state == CEPH_MDS_SESSION_CLOSING)
+                       __open_session(mdsc, session);
+               else
+                       dout(" mds%d target mds%d %p is %s\n", session->s_mds,
+                            i, ts, session_state_name(ts->s_state));
+               ceph_put_mds_session(ts);
+       }
+}
+
+void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
+                                          struct ceph_mds_session *session)
+{
+       mutex_lock(&mdsc->mutex);
+       __open_export_target_sessions(mdsc, session);
+       mutex_unlock(&mdsc->mutex);
+}
+
 /*
  * session caps
  */
@@ -764,7 +838,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        last_inode = NULL;
                }
                if (old_cap) {
-                       ceph_put_cap(old_cap);
+                       ceph_put_cap(session->s_mdsc, old_cap);
                        old_cap = NULL;
                }
 
@@ -793,7 +867,7 @@ out:
        if (last_inode)
                iput(last_inode);
        if (old_cap)
-               ceph_put_cap(old_cap);
+               ceph_put_cap(session->s_mdsc, old_cap);
 
        return ret;
 }
@@ -1067,15 +1141,16 @@ static int trim_caps(struct ceph_mds_client *mdsc,
  * Called under s_mutex.
  */
 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-                         struct ceph_mds_session *session,
-                         int extra)
+                         struct ceph_mds_session *session)
 {
-       struct ceph_msg *msg;
+       struct ceph_msg *msg, *partial = NULL;
        struct ceph_mds_cap_release *head;
        int err = -ENOMEM;
+       int extra = mdsc->client->mount_args->cap_release_safety;
+       int num;
 
-       if (extra < 0)
-               extra = mdsc->client->mount_args->cap_release_safety;
+       dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
+            extra);
 
        spin_lock(&session->s_cap_lock);
 
@@ -1084,9 +1159,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
                                       struct ceph_msg,
                                 list_head);
                head = msg->front.iov_base;
-               extra += CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
+               num = le32_to_cpu(head->num);
+               if (num) {
+                       dout(" partial %p with (%d/%d)\n", msg, num,
+                            (int)CEPH_CAPS_PER_RELEASE);
+                       extra += CEPH_CAPS_PER_RELEASE - num;
+                       partial = msg;
+               }
        }
-
        while (session->s_num_cap_releases < session->s_nr_caps + extra) {
                spin_unlock(&session->s_cap_lock);
                msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
@@ -1103,19 +1183,14 @@ int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
                session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
        }
 
-       if (!list_empty(&session->s_cap_releases)) {
-               msg = list_first_entry(&session->s_cap_releases,
-                                      struct ceph_msg,
-                                      list_head);
-               head = msg->front.iov_base;
-               if (head->num) {
-                       dout(" queueing non-full %p (%d)\n", msg,
-                            le32_to_cpu(head->num));
-                       list_move_tail(&msg->list_head,
-                                     &session->s_cap_releases_done);
-                       session->s_num_cap_releases -=
-                               CEPH_CAPS_PER_RELEASE - le32_to_cpu(head->num);
-               }
+       if (partial) {
+               head = partial->front.iov_base;
+               num = le32_to_cpu(head->num);
+               dout(" queueing partial %p with %d/%d\n", partial, num,
+                    (int)CEPH_CAPS_PER_RELEASE);
+               list_move_tail(&partial->list_head,
+                              &session->s_cap_releases_done);
+               session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
        }
        err = 0;
        spin_unlock(&session->s_cap_lock);
@@ -1250,6 +1325,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
                return ERR_PTR(-ENOMEM);
 
        mutex_init(&req->r_fill_mutex);
+       req->r_mdsc = mdsc;
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1580,6 +1656,15 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
        req->r_mds = mds;
        req->r_attempts++;
+       if (req->r_inode) {
+               struct ceph_cap *cap =
+                       ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
+
+               if (cap)
+                       req->r_sent_on_mseq = cap->mseq;
+               else
+                       req->r_sent_on_mseq = -1;
+       }
        dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
             req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
 
@@ -1914,21 +1999,40 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        result = le32_to_cpu(head->result);
 
        /*
-        * Tolerate 2 consecutive ESTALEs from the same mds.
-        * FIXME: we should be looking at the cap migrate_seq.
+        * Handle an ESTALE
+        * if we're not talking to the authority, send to them
+        * if the authority has changed while we weren't looking,
+        * send to new authority
+        * Otherwise we just have to return an ESTALE
         */
        if (result == -ESTALE) {
-               req->r_direct_mode = USE_AUTH_MDS;
-               req->r_num_stale++;
-               if (req->r_num_stale <= 2) {
+               dout("got ESTALE on request %llu", req->r_tid);
+               if (!req->r_inode) {
+                       /* do nothing; not an authority problem */
+               } else if (req->r_direct_mode != USE_AUTH_MDS) {
+                       dout("not using auth, setting for that now");
+                       req->r_direct_mode = USE_AUTH_MDS;
                        __do_request(mdsc, req);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
+               } else  {
+                       struct ceph_inode_info *ci = ceph_inode(req->r_inode);
+                       struct ceph_cap *cap =
+                               ceph_get_cap_for_mds(ci, req->r_mds);;
+
+                       dout("already using auth");
+                       if ((!cap || cap != ci->i_auth_cap) ||
+                           (cap->mseq != req->r_sent_on_mseq)) {
+                               dout("but cap changed, so resending");
+                               __do_request(mdsc, req);
+                               mutex_unlock(&mdsc->mutex);
+                               goto out;
+                       }
                }
-       } else {
-               req->r_num_stale = 0;
+               dout("have to return ESTALE on request %llu", req->r_tid);
        }
 
+
        if (head->safe) {
                req->r_got_safe = true;
                __unregister_request(mdsc, req);
@@ -1985,7 +2089,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
-               ceph_unreserve_caps(&req->r_caps_reservation);
+               ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
        mutex_unlock(&req->r_fill_mutex);
 
@@ -2005,7 +2109,7 @@ out_err:
        }
        mutex_unlock(&mdsc->mutex);
 
-       ceph_add_cap_releases(mdsc, req->r_session, -1);
+       ceph_add_cap_releases(mdsc, req->r_session);
        mutex_unlock(&session->s_mutex);
 
        /* kick calling process */
@@ -2126,7 +2230,7 @@ static void handle_session(struct ceph_mds_session *session,
                        pr_info("mds%d reconnect denied\n", session->s_mds);
                remove_session_caps(session);
                wake = 1; /* for good measure */
-               complete_all(&mdsc->session_close_waiters);
+               wake_up_all(&mdsc->session_close_wq);
                kick_requests(mdsc, mds);
                break;
 
@@ -2193,9 +2297,14 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                          void *arg)
 {
-       struct ceph_mds_cap_reconnect rec;
+       union {
+               struct ceph_mds_cap_reconnect v2;
+               struct ceph_mds_cap_reconnect_v1 v1;
+       } rec;
+       size_t reclen;
        struct ceph_inode_info *ci;
-       struct ceph_pagelist *pagelist = arg;
+       struct ceph_reconnect_state *recon_state = arg;
+       struct ceph_pagelist *pagelist = recon_state->pagelist;
        char *path;
        int pathlen, err;
        u64 pathbase;
@@ -2215,7 +2324,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
                if (IS_ERR(path)) {
                        err = PTR_ERR(path);
-                       BUG_ON(err);
+                       goto out_dput;
                }
        } else {
                path = NULL;
@@ -2223,25 +2332,53 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        }
        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
        if (err)
-               goto out;
+               goto out_free;
 
        spin_lock(&inode->i_lock);
        cap->seq = 0;        /* reset cap seq */
        cap->issue_seq = 0;  /* and issue_seq */
-       rec.cap_id = cpu_to_le64(cap->cap_id);
-       rec.pathbase = cpu_to_le64(pathbase);
-       rec.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
-       rec.issued = cpu_to_le32(cap->issued);
-       rec.size = cpu_to_le64(inode->i_size);
-       ceph_encode_timespec(&rec.mtime, &inode->i_mtime);
-       ceph_encode_timespec(&rec.atime, &inode->i_atime);
-       rec.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+
+       if (recon_state->flock) {
+               rec.v2.cap_id = cpu_to_le64(cap->cap_id);
+               rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+               rec.v2.issued = cpu_to_le32(cap->issued);
+               rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+               rec.v2.pathbase = cpu_to_le64(pathbase);
+               rec.v2.flock_len = 0;
+               reclen = sizeof(rec.v2);
+       } else {
+               rec.v1.cap_id = cpu_to_le64(cap->cap_id);
+               rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+               rec.v1.issued = cpu_to_le32(cap->issued);
+               rec.v1.size = cpu_to_le64(inode->i_size);
+               ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
+               ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
+               rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+               rec.v1.pathbase = cpu_to_le64(pathbase);
+               reclen = sizeof(rec.v1);
+       }
        spin_unlock(&inode->i_lock);
 
-       err = ceph_pagelist_append(pagelist, &rec, sizeof(rec));
+       if (recon_state->flock) {
+               int num_fcntl_locks, num_flock_locks;
 
-out:
+               lock_kernel();
+               ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
+               rec.v2.flock_len = (2*sizeof(u32) +
+                                   (num_fcntl_locks+num_flock_locks) *
+                                   sizeof(struct ceph_filelock));
+
+               err = ceph_pagelist_append(pagelist, &rec, reclen);
+               if (!err)
+                       err = ceph_encode_locks(inode, pagelist,
+                                               num_fcntl_locks,
+                                               num_flock_locks);
+               unlock_kernel();
+       }
+
+out_free:
        kfree(path);
+out_dput:
        dput(dentry);
        return err;
 }
@@ -2267,6 +2404,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        int mds = session->s_mds;
        int err = -ENOMEM;
        struct ceph_pagelist *pagelist;
+       struct ceph_reconnect_state recon_state;
 
        pr_info("mds%d reconnect start\n", mds);
 
@@ -2301,7 +2439,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
        if (err)
                goto fail;
-       err = iterate_session_caps(session, encode_caps_cb, pagelist);
+
+       recon_state.pagelist = pagelist;
+       recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+       err = iterate_session_caps(session, encode_caps_cb, &recon_state);
        if (err < 0)
                goto fail;
 
@@ -2326,6 +2467,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        }
 
        reply->pagelist = pagelist;
+       if (recon_state.flock)
+               reply->hdr.version = cpu_to_le16(2);
        reply->hdr.data_len = cpu_to_le32(pagelist->length);
        reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
@@ -2376,9 +2519,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                oldstate = ceph_mdsmap_get_state(oldmap, i);
                newstate = ceph_mdsmap_get_state(newmap, i);
 
-               dout("check_new_map mds%d state %s -> %s (session %s)\n",
+               dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
                     i, ceph_mds_state_name(oldstate),
+                    ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
                     ceph_mds_state_name(newstate),
+                    ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
                     session_state_name(s->s_state));
 
                if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
@@ -2428,6 +2573,21 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                        wake_up_session_caps(s, 1);
                }
        }
+
+       for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
+               s = mdsc->sessions[i];
+               if (!s)
+                       continue;
+               if (!ceph_mdsmap_is_laggy(newmap, i))
+                       continue;
+               if (s->s_state == CEPH_MDS_SESSION_OPEN ||
+                   s->s_state == CEPH_MDS_SESSION_HUNG ||
+                   s->s_state == CEPH_MDS_SESSION_CLOSING) {
+                       dout(" connecting to export targets of laggy mds%d\n",
+                            i);
+                       __open_export_target_sessions(mdsc, s);
+               }
+       }
 }
 
 
@@ -2715,7 +2875,7 @@ static void delayed_work(struct work_struct *work)
                        send_renew_caps(mdsc, s);
                else
                        ceph_con_keepalive(&s->s_con);
-               ceph_add_cap_releases(mdsc, s, -1);
+               ceph_add_cap_releases(mdsc, s);
                if (s->s_state == CEPH_MDS_SESSION_OPEN ||
                    s->s_state == CEPH_MDS_SESSION_HUNG)
                        ceph_send_cap_releases(mdsc, s);
@@ -2739,7 +2899,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
                return -ENOMEM;
 
        init_completion(&mdsc->safe_umount_waiters);
-       init_completion(&mdsc->session_close_waiters);
+       init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->sessions = NULL;
        mdsc->max_sessions = 0;
@@ -2764,6 +2924,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
 
+       ceph_caps_init(mdsc);
+       ceph_adjust_min_caps(mdsc, client->min_caps);
+
        return 0;
 }
 
@@ -2881,6 +3044,23 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
        wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
 }
 
+/*
+ * true if all sessions are closed, or we force unmount
+ */
+bool done_closing_sessions(struct ceph_mds_client *mdsc)
+{
+       int i, n = 0;
+
+       if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+               return true;
+
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++)
+               if (mdsc->sessions[i])
+                       n++;
+       mutex_unlock(&mdsc->mutex);
+       return n == 0;
+}
 
 /*
  * called after sb is ro.
@@ -2889,45 +3069,32 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 {
        struct ceph_mds_session *session;
        int i;
-       int n;
        struct ceph_client *client = mdsc->client;
-       unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
+       unsigned long timeout = client->mount_args->mount_timeout * HZ;
 
        dout("close_sessions\n");
 
-       mutex_lock(&mdsc->mutex);
-
        /* close sessions */
-       started = jiffies;
-       while (time_before(jiffies, started + timeout)) {
-               dout("closing sessions\n");
-               n = 0;
-               for (i = 0; i < mdsc->max_sessions; i++) {
-                       session = __ceph_lookup_mds_session(mdsc, i);
-                       if (!session)
-                               continue;
-                       mutex_unlock(&mdsc->mutex);
-                       mutex_lock(&session->s_mutex);
-                       __close_session(mdsc, session);
-                       mutex_unlock(&session->s_mutex);
-                       ceph_put_mds_session(session);
-                       mutex_lock(&mdsc->mutex);
-                       n++;
-               }
-               if (n == 0)
-                       break;
-
-               if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
-                       break;
-
-               dout("waiting for sessions to close\n");
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++) {
+               session = __ceph_lookup_mds_session(mdsc, i);
+               if (!session)
+                       continue;
                mutex_unlock(&mdsc->mutex);
-               wait_for_completion_timeout(&mdsc->session_close_waiters,
-                                           timeout);
+               mutex_lock(&session->s_mutex);
+               __close_session(mdsc, session);
+               mutex_unlock(&session->s_mutex);
+               ceph_put_mds_session(session);
                mutex_lock(&mdsc->mutex);
        }
+       mutex_unlock(&mdsc->mutex);
+
+       dout("waiting for sessions to close\n");
+       wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
+                          timeout);
 
        /* tear down remaining sessions */
+       mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
                if (mdsc->sessions[i]) {
                        session = get_session(mdsc->sessions[i]);
@@ -2940,9 +3107,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                        mutex_lock(&mdsc->mutex);
                }
        }
-
        WARN_ON(!list_empty(&mdsc->cap_delay_list));
-
        mutex_unlock(&mdsc->mutex);
 
        ceph_cleanup_empty_realms(mdsc);
@@ -2959,6 +3124,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
        if (mdsc->mdsmap)
                ceph_mdsmap_destroy(mdsc->mdsmap);
        kfree(mdsc->sessions);
+       ceph_caps_finalize(mdsc);
 }