stmmac: use resource_size()
[pandora-kernel.git] / fs / ceph / mds_client.c
index 623c67c..a260010 100644 (file)
@@ -255,6 +255,7 @@ static const char *session_state_name(int s)
        case CEPH_MDS_SESSION_OPEN: return "open";
        case CEPH_MDS_SESSION_HUNG: return "hung";
        case CEPH_MDS_SESSION_CLOSING: return "closing";
+       case CEPH_MDS_SESSION_RESTARTING: return "restarting";
        case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
        default: return "???";
        }
@@ -308,6 +309,15 @@ static bool __have_session(struct ceph_mds_client *mdsc, int mds)
        return mdsc->sessions[mds];
 }
 
+static int __verify_registered_session(struct ceph_mds_client *mdsc,
+                                      struct ceph_mds_session *s)
+{
+       if (s->s_mds >= mdsc->max_sessions ||
+           mdsc->sessions[s->s_mds] != s)
+               return -ENOENT;
+       return 0;
+}
+
 /*
  * create+register a new session for given mds.
  * called under mdsc->mutex.
@@ -343,7 +353,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        s->s_num_cap_releases = 0;
-       s->s_iterating_caps = false;
+       s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
@@ -381,10 +391,11 @@ fail_realloc:
 /*
  * called under mdsc->mutex
  */
-static void unregister_session(struct ceph_mds_client *mdsc,
+static void __unregister_session(struct ceph_mds_client *mdsc,
                               struct ceph_mds_session *s)
 {
-       dout("unregister_session mds%d %p\n", s->s_mds, s);
+       dout("__unregister_session mds%d %p\n", s->s_mds, s);
+       BUG_ON(mdsc->sessions[s->s_mds] != s);
        mdsc->sessions[s->s_mds] = NULL;
        ceph_con_close(&s->s_con);
        ceph_put_mds_session(s);
@@ -448,10 +459,42 @@ static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
                                             u64 tid)
 {
        struct ceph_mds_request *req;
-       req = radix_tree_lookup(&mdsc->request_tree, tid);
-       if (req)
-               ceph_mdsc_get_request(req);
-       return req;
+       struct rb_node *n = mdsc->request_tree.rb_node;
+
+       while (n) {
+               req = rb_entry(n, struct ceph_mds_request, r_node);
+               if (tid < req->r_tid)
+                       n = n->rb_left;
+               else if (tid > req->r_tid)
+                       n = n->rb_right;
+               else {
+                       ceph_mdsc_get_request(req);
+                       return req;
+               }
+       }
+       return NULL;
+}
+
+static void __insert_request(struct ceph_mds_client *mdsc,
+                            struct ceph_mds_request *new)
+{
+       struct rb_node **p = &mdsc->request_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_mds_request *req = NULL;
+
+       while (*p) {
+               parent = *p;
+               req = rb_entry(parent, struct ceph_mds_request, r_node);
+               if (new->r_tid < req->r_tid)
+                       p = &(*p)->rb_left;
+               else if (new->r_tid > req->r_tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->r_node, parent, p);
+       rb_insert_color(&new->r_node, &mdsc->request_tree);
 }
 
 /*
@@ -469,7 +512,7 @@ static void __register_request(struct ceph_mds_client *mdsc,
                ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        ceph_mdsc_get_request(req);
-       radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
+       __insert_request(mdsc, req);
 
        if (dir) {
                struct ceph_inode_info *ci = ceph_inode(dir);
@@ -485,7 +528,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_request *req)
 {
        dout("__unregister_request %p tid %lld\n", req, req->r_tid);
-       radix_tree_delete(&mdsc->request_tree, req->r_tid);
+       rb_erase(&req->r_node, &mdsc->request_tree);
        ceph_mdsc_put_request(req);
 
        if (req->r_unsafe_dir) {
@@ -696,28 +739,61 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
                                            void *), void *arg)
 {
-       struct ceph_cap *cap, *ncap;
-       struct inode *inode;
+       struct list_head *p;
+       struct ceph_cap *cap;
+       struct inode *inode, *last_inode = NULL;
+       struct ceph_cap *old_cap = NULL;
        int ret;
 
        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
        spin_lock(&session->s_cap_lock);
-       session->s_iterating_caps = true;
-       list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
+       p = session->s_caps.next;
+       while (p != &session->s_caps) {
+               cap = list_entry(p, struct ceph_cap, session_caps);
                inode = igrab(&cap->ci->vfs_inode);
-               if (!inode)
+               if (!inode) {
+                       p = p->next;
                        continue;
+               }
+               session->s_cap_iterator = cap;
                spin_unlock(&session->s_cap_lock);
+
+               if (last_inode) {
+                       iput(last_inode);
+                       last_inode = NULL;
+               }
+               if (old_cap) {
+                       ceph_put_cap(old_cap);
+                       old_cap = NULL;
+               }
+
                ret = cb(inode, cap, arg);
-               iput(inode);
+               last_inode = inode;
+
                spin_lock(&session->s_cap_lock);
+               p = p->next;
+               if (cap->ci == NULL) {
+                       dout("iterate_session_caps  finishing cap %p removal\n",
+                            cap);
+                       BUG_ON(cap->session != session);
+                       list_del_init(&cap->session_caps);
+                       session->s_nr_caps--;
+                       cap->session = NULL;
+                       old_cap = cap;  /* put_cap it w/o locks held */
+               }
                if (ret < 0)
                        goto out;
        }
        ret = 0;
 out:
-       session->s_iterating_caps = false;
+       session->s_cap_iterator = NULL;
        spin_unlock(&session->s_cap_lock);
+
+       if (last_inode)
+               iput(last_inode);
+       if (old_cap)
+               ceph_put_cap(old_cap);
+
        return ret;
 }
 
@@ -909,7 +985,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
        session->s_trim_caps--;
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
        } else {
                /* try to drop referring dentries */
                spin_unlock(&inode->i_lock);
@@ -1115,17 +1191,25 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
 }
 
 /*
- * return oldest (lowest) tid in request tree, 0 if none.
+ * return oldest (lowest) request, tid in request tree, 0 if none.
  *
  * called under mdsc->mutex.
  */
+static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
+{
+       if (RB_EMPTY_ROOT(&mdsc->request_tree))
+               return NULL;
+       return rb_entry(rb_first(&mdsc->request_tree),
+                       struct ceph_mds_request, r_node);
+}
+
 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
 {
-       struct ceph_mds_request *first;
-       if (radix_tree_gang_lookup(&mdsc->request_tree,
-                                  (void **)&first, 0, 1) <= 0)
-               return 0;
-       return first->r_tid;
+       struct ceph_mds_request *req = __get_oldest_req(mdsc);
+
+       if (req)
+               return req->r_tid;
+       return 0;
 }
 
 /*
@@ -1325,7 +1409,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        }
 
        len = sizeof(*head) +
-               pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
+               pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
 
        /* calculate (max) length for cap releases */
        len += sizeof(struct ceph_mds_request_release) *
@@ -1540,26 +1624,19 @@ static void __wake_requests(struct ceph_mds_client *mdsc,
  */
 static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
 {
-       struct ceph_mds_request *reqs[10];
-       u64 nexttid = 0;
-       int i, got;
+       struct ceph_mds_request *req;
+       struct rb_node *p;
 
        dout("kick_requests mds%d\n", mds);
-       while (nexttid <= mdsc->last_tid) {
-               got = radix_tree_gang_lookup(&mdsc->request_tree,
-                                            (void **)&reqs, nexttid, 10);
-               if (got == 0)
-                       break;
-               nexttid = reqs[got-1]->r_tid + 1;
-               for (i = 0; i < got; i++) {
-                       if (reqs[i]->r_got_unsafe)
-                               continue;
-                       if (reqs[i]->r_session &&
-                           reqs[i]->r_session->s_mds == mds) {
-                               dout(" kicking tid %llu\n", reqs[i]->r_tid);
-                               put_request_session(reqs[i]);
-                               __do_request(mdsc, reqs[i]);
-                       }
+       for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
+               req = rb_entry(p, struct ceph_mds_request, r_node);
+               if (req->r_got_unsafe)
+                       continue;
+               if (req->r_session &&
+                   req->r_session->s_mds == mds) {
+                       dout(" kicking tid %llu\n", req->r_tid);
+                       put_request_session(req);
+                       __do_request(mdsc, req);
                }
        }
 }
@@ -1673,10 +1750,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
        u64 tid;
        int err, result;
-       int mds;
+       int mds = session->s_mds;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               return;
        if (msg->front.iov_len < sizeof(*head)) {
                pr_err("mdsc_handle_reply got corrupt (short) reply\n");
                ceph_msg_dump(msg);
@@ -1693,7 +1768,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                return;
        }
        dout("handle_reply %p\n", req);
-       mds = le64_to_cpu(msg->hdr.src.name.num);
 
        /* correct session? */
        if (!req->r_session && req->r_session != session) {
@@ -1748,7 +1822,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                        list_del_init(&req->r_unsafe_item);
 
                        /* last unsafe request during umount? */
-                       if (mdsc->stopping && !__get_oldest_tid(mdsc))
+                       if (mdsc->stopping && !__get_oldest_req(mdsc))
                                complete(&mdsc->safe_umount_waiters);
                        mutex_unlock(&mdsc->mutex);
                        goto out;
@@ -1817,38 +1891,29 @@ out:
 /*
  * handle mds notification that our request has been forwarded.
  */
-static void handle_forward(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_forward(struct ceph_mds_client *mdsc,
+                          struct ceph_mds_session *session,
+                          struct ceph_msg *msg)
 {
        struct ceph_mds_request *req;
-       u64 tid;
+       u64 tid = le64_to_cpu(msg->hdr.tid);
        u32 next_mds;
        u32 fwd_seq;
-       u8 must_resend;
        int err = -EINVAL;
        void *p = msg->front.iov_base;
        void *end = p + msg->front.iov_len;
-       int from_mds, state;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               goto bad;
-       from_mds = le64_to_cpu(msg->hdr.src.name.num);
-
-       ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
-       tid = ceph_decode_64(&p);
+       ceph_decode_need(&p, end, 2*sizeof(u32), bad);
        next_mds = ceph_decode_32(&p);
        fwd_seq = ceph_decode_32(&p);
-       must_resend = ceph_decode_8(&p);
-
-       WARN_ON(must_resend);  /* shouldn't happen. */
 
        mutex_lock(&mdsc->mutex);
        req = __lookup_request(mdsc, tid);
        if (!req) {
-               dout("forward %llu dne\n", tid);
+               dout("forward %llu to mds%d - req dne\n", tid, next_mds);
                goto out;  /* dup reply? */
        }
 
-       state = mdsc->sessions[next_mds]->s_state;
        if (fwd_seq <= req->r_num_fwd) {
                dout("forward %llu to mds%d - old seq %d <= %d\n",
                     tid, next_mds, req->r_num_fwd, fwd_seq);
@@ -1878,14 +1943,10 @@ static void handle_session(struct ceph_mds_session *session,
        struct ceph_mds_client *mdsc = session->s_mdsc;
        u32 op;
        u64 seq;
-       int mds;
+       int mds = session->s_mds;
        struct ceph_mds_session_head *h = msg->front.iov_base;
        int wake = 0;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               return;
-       mds = le64_to_cpu(msg->hdr.src.name.num);
-
        /* decode */
        if (msg->front.iov_len != sizeof(*h))
                goto bad;
@@ -1893,6 +1954,8 @@ static void handle_session(struct ceph_mds_session *session,
        seq = le64_to_cpu(h->seq);
 
        mutex_lock(&mdsc->mutex);
+       if (op == CEPH_SESSION_CLOSE)
+               __unregister_session(mdsc, session);
        /* FIXME: this ttl calculation is generous */
        session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
        mutex_unlock(&mdsc->mutex);
@@ -1923,7 +1986,6 @@ static void handle_session(struct ceph_mds_session *session,
                break;
 
        case CEPH_SESSION_CLOSE:
-               unregister_session(mdsc, session);
                remove_session_caps(session);
                wake = 1; /* for good measure */
                complete(&mdsc->session_close_waiters);
@@ -2063,9 +2125,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
 {
        struct ceph_mds_session *session = NULL;
        struct ceph_msg *reply;
+       struct rb_node *p;
        int err;
-       int got;
-       u64 next_snap_ino = 0;
        struct ceph_pagelist *pagelist;
 
        pr_info("reconnect to recovering mds%d\n", mds);
@@ -2121,14 +2182,10 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
         * parent for all of our realms.  If the mds has any newer info,
         * it will tell us.
         */
-       next_snap_ino = 0;
-       while (1) {
-               struct ceph_snap_realm *realm;
+       for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
+               struct ceph_snap_realm *realm =
+                       rb_entry(p, struct ceph_snap_realm, node);
                struct ceph_mds_snaprealm_reconnect sr_rec;
-               got = radix_tree_gang_lookup(&mdsc->snap_realms,
-                                            (void **)&realm, next_snap_ino, 1);
-               if (!got)
-                       break;
 
                dout(" adding snap realm %llx seq %lld parent %llx\n",
                     realm->ino, realm->seq, realm->parent_ino);
@@ -2138,7 +2195,6 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
                if (err)
                        goto fail;
-               next_snap_ino = realm->ino + 1;
        }
 
 send:
@@ -2208,7 +2264,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                                /* the session never opened, just close it
                                 * out now */
                                __wake_requests(mdsc, &s->s_waiting);
-                               unregister_session(mdsc, s);
+                               __unregister_session(mdsc, s);
                        } else {
                                /* just close it */
                                mutex_unlock(&mdsc->mutex);
@@ -2268,24 +2324,22 @@ void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
        di->lease_session = NULL;
 }
 
-static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+static void handle_lease(struct ceph_mds_client *mdsc,
+                        struct ceph_mds_session *session,
+                        struct ceph_msg *msg)
 {
        struct super_block *sb = mdsc->client->sb;
        struct inode *inode;
-       struct ceph_mds_session *session;
        struct ceph_inode_info *ci;
        struct dentry *parent, *dentry;
        struct ceph_dentry_info *di;
-       int mds;
+       int mds = session->s_mds;
        struct ceph_mds_lease *h = msg->front.iov_base;
        struct ceph_vino vino;
        int mask;
        struct qstr dname;
        int release = 0;
 
-       if (msg->hdr.src.name.type != CEPH_ENTITY_TYPE_MDS)
-               return;
-       mds = le64_to_cpu(msg->hdr.src.name.num);
        dout("handle_lease from mds%d\n", mds);
 
        /* decode */
@@ -2299,15 +2353,6 @@ static void handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
        if (dname.len != get_unaligned_le32(h+1))
                goto bad;
 
-       /* find session */
-       mutex_lock(&mdsc->mutex);
-       session = __ceph_lookup_mds_session(mdsc, mds);
-       mutex_unlock(&mdsc->mutex);
-       if (!session) {
-               pr_err("handle_lease got lease but no session mds%d\n", mds);
-               return;
-       }
-
        mutex_lock(&session->s_mutex);
        session->s_seq++;
 
@@ -2376,7 +2421,6 @@ release:
 out:
        iput(inode);
        mutex_unlock(&session->s_mutex);
-       ceph_put_mds_session(session);
        return;
 
 bad:
@@ -2569,11 +2613,11 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        mdsc->max_sessions = 0;
        mdsc->stopping = 0;
        init_rwsem(&mdsc->snap_rwsem);
-       INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
+       mdsc->snap_realms = RB_ROOT;
        INIT_LIST_HEAD(&mdsc->snap_empty);
        spin_lock_init(&mdsc->snap_empty_lock);
        mdsc->last_tid = 0;
-       INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
+       mdsc->request_tree = RB_ROOT;
        INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
        mdsc->last_renew_caps = jiffies;
        INIT_LIST_HEAD(&mdsc->cap_delay_list);
@@ -2600,20 +2644,19 @@ static void wait_requests(struct ceph_mds_client *mdsc)
        struct ceph_client *client = mdsc->client;
 
        mutex_lock(&mdsc->mutex);
-       if (__get_oldest_tid(mdsc)) {
+       if (__get_oldest_req(mdsc)) {
                mutex_unlock(&mdsc->mutex);
+
                dout("wait_requests waiting for requests\n");
                wait_for_completion_timeout(&mdsc->safe_umount_waiters,
                                    client->mount_args->mount_timeout * HZ);
-               mutex_lock(&mdsc->mutex);
 
                /* tear down remaining requests */
-               while (radix_tree_gang_lookup(&mdsc->request_tree,
-                                             (void **)&req, 0, 1)) {
+               mutex_lock(&mdsc->mutex);
+               while ((req = __get_oldest_req(mdsc))) {
                        dout("wait_requests timed out on tid %llu\n",
                             req->r_tid);
-                       radix_tree_delete(&mdsc->request_tree, req->r_tid);
-                       ceph_mdsc_put_request(req);
+                       __unregister_request(mdsc, req);
                }
        }
        mutex_unlock(&mdsc->mutex);
@@ -2639,31 +2682,29 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
  */
 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
 {
-       struct ceph_mds_request *req;
-       u64 next_tid = 0;
-       int got;
+       struct ceph_mds_request *req = NULL;
+       struct rb_node *n;
 
        mutex_lock(&mdsc->mutex);
        dout("wait_unsafe_requests want %lld\n", want_tid);
-       while (1) {
-               got = radix_tree_gang_lookup(&mdsc->request_tree, (void **)&req,
-                                            next_tid, 1);
-               if (!got)
-                       break;
-               if (req->r_tid > want_tid)
+       req = __get_oldest_req(mdsc);
+       while (req && req->r_tid <= want_tid) {
+               if ((req->r_op & CEPH_MDS_OP_WRITE)) {
+                       /* write op */
+                       ceph_mdsc_get_request(req);
+                       mutex_unlock(&mdsc->mutex);
+                       dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
+                            req->r_tid, want_tid);
+                       wait_for_completion(&req->r_safe_completion);
+                       mutex_lock(&mdsc->mutex);
+                       n = rb_next(&req->r_node);
+                       ceph_mdsc_put_request(req);
+               } else {
+                       n = rb_next(&req->r_node);
+               }
+               if (!n)
                        break;
-
-               next_tid = req->r_tid + 1;
-               if ((req->r_op & CEPH_MDS_OP_WRITE) == 0)
-                       continue;  /* not a write op */
-
-               ceph_mdsc_get_request(req);
-               mutex_unlock(&mdsc->mutex);
-               dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
-                    req->r_tid, want_tid);
-               wait_for_completion(&req->r_safe_completion);
-               mutex_lock(&mdsc->mutex);
-               ceph_mdsc_put_request(req);
+               req = rb_entry(n, struct ceph_mds_request, r_node);
        }
        mutex_unlock(&mdsc->mutex);
        dout("wait_unsafe_requests done\n");
@@ -2736,7 +2777,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        for (i = 0; i < mdsc->max_sessions; i++) {
                if (mdsc->sessions[i]) {
                        session = get_session(mdsc->sessions[i]);
-                       unregister_session(mdsc, session);
+                       __unregister_session(mdsc, session);
                        mutex_unlock(&mdsc->mutex);
                        mutex_lock(&session->s_mutex);
                        remove_session_caps(session);
@@ -2833,8 +2874,7 @@ static struct ceph_connection *con_get(struct ceph_connection *con)
        struct ceph_mds_session *s = con->private;
 
        if (get_session(s)) {
-               dout("mdsc con_get %p %d -> %d\n", s,
-                    atomic_read(&s->s_ref) - 1, atomic_read(&s->s_ref));
+               dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
                return con;
        }
        dout("mdsc con_get %p FAIL\n", s);
@@ -2845,9 +2885,8 @@ static void con_put(struct ceph_connection *con)
 {
        struct ceph_mds_session *s = con->private;
 
-       dout("mdsc con_put %p %d -> %d\n", s, atomic_read(&s->s_ref),
-            atomic_read(&s->s_ref) - 1);
        ceph_put_mds_session(s);
+       dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
 }
 
 /*
@@ -2868,6 +2907,13 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        struct ceph_mds_client *mdsc = s->s_mdsc;
        int type = le16_to_cpu(msg->hdr.type);
 
+       mutex_lock(&mdsc->mutex);
+       if (__verify_registered_session(mdsc, s) < 0) {
+               mutex_unlock(&mdsc->mutex);
+               goto out;
+       }
+       mutex_unlock(&mdsc->mutex);
+
        switch (type) {
        case CEPH_MSG_MDS_MAP:
                ceph_mdsc_handle_map(mdsc, msg);
@@ -2879,22 +2925,23 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                handle_reply(s, msg);
                break;
        case CEPH_MSG_CLIENT_REQUEST_FORWARD:
-               handle_forward(mdsc, msg);
+               handle_forward(mdsc, s, msg);
                break;
        case CEPH_MSG_CLIENT_CAPS:
                ceph_handle_caps(s, msg);
                break;
        case CEPH_MSG_CLIENT_SNAP:
-               ceph_handle_snap(mdsc, msg);
+               ceph_handle_snap(mdsc, s, msg);
                break;
        case CEPH_MSG_CLIENT_LEASE:
-               handle_lease(mdsc, msg);
+               handle_lease(mdsc, s, msg);
                break;
 
        default:
                pr_err("received unknown message type %d %s\n", type,
                       ceph_msg_type_name(type));
        }
+out:
        ceph_msg_put(msg);
 }
 
@@ -2946,15 +2993,26 @@ static int verify_authorizer_reply(struct ceph_connection *con, int len)
        return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
 }
 
+static int invalidate_authorizer(struct ceph_connection *con)
+{
+       struct ceph_mds_session *s = con->private;
+       struct ceph_mds_client *mdsc = s->s_mdsc;
+       struct ceph_auth_client *ac = mdsc->client->monc.auth;
+
+       if (ac->ops->invalidate_authorizer)
+               ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
+
+       return ceph_monc_validate_auth(&mdsc->client->monc);
+}
+
 const static struct ceph_connection_operations mds_con_ops = {
        .get = con_get,
        .put = con_put,
        .dispatch = dispatch,
        .get_authorizer = get_authorizer,
        .verify_authorizer_reply = verify_authorizer_reply,
+       .invalidate_authorizer = invalidate_authorizer,
        .peer_reset = peer_reset,
-       .alloc_msg = ceph_alloc_msg,
-       .alloc_middle = ceph_alloc_middle,
 };