a1ee8fa3a8e7a4e8778fa624ef44154f67e2acf9
[pandora-kernel.git] / fs / ceph / mds_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/fs.h>
4 #include <linux/wait.h>
5 #include <linux/slab.h>
6 #include <linux/sched.h>
7 #include <linux/debugfs.h>
8 #include <linux/seq_file.h>
9
10 #include "super.h"
11 #include "mds_client.h"
12
13 #include <linux/ceph/messenger.h>
14 #include <linux/ceph/decode.h>
15 #include <linux/ceph/pagelist.h>
16 #include <linux/ceph/auth.h>
17 #include <linux/ceph/debugfs.h>
18
19 /*
20  * A cluster of MDS (metadata server) daemons is responsible for
21  * managing the file system namespace (the directory hierarchy and
22  * inodes) and for coordinating shared access to storage.  Metadata is
23  * partitioning hierarchically across a number of servers, and that
24  * partition varies over time as the cluster adjusts the distribution
25  * in order to balance load.
26  *
27  * The MDS client is primarily responsible to managing synchronous
28  * metadata requests for operations like open, unlink, and so forth.
29  * If there is a MDS failure, we find out about it when we (possibly
30  * request and) receive a new MDS map, and can resubmit affected
31  * requests.
32  *
33  * For the most part, though, we take advantage of a lossless
34  * communications channel to the MDS, and do not need to worry about
35  * timing out or resubmitting requests.
36  *
37  * We maintain a stateful "session" with each MDS we interact with.
38  * Within each session, we sent periodic heartbeat messages to ensure
39  * any capabilities or leases we have been issues remain valid.  If
40  * the session times out and goes stale, our leases and capabilities
41  * are no longer valid.
42  */
43
44 struct ceph_reconnect_state {
45         struct ceph_pagelist *pagelist;
46         bool flock;
47 };
48
49 static void __wake_requests(struct ceph_mds_client *mdsc,
50                             struct list_head *head);
51
52 static const struct ceph_connection_operations mds_con_ops;
53
54
55 /*
56  * mds reply parsing
57  */
58
59 /*
60  * parse individual inode info
61  */
62 static int parse_reply_info_in(void **p, void *end,
63                                struct ceph_mds_reply_info_in *info,
64                                int features)
65 {
66         int err = -EIO;
67
68         info->in = *p;
69         *p += sizeof(struct ceph_mds_reply_inode) +
70                 sizeof(*info->in->fragtree.splits) *
71                 le32_to_cpu(info->in->fragtree.nsplits);
72
73         ceph_decode_32_safe(p, end, info->symlink_len, bad);
74         ceph_decode_need(p, end, info->symlink_len, bad);
75         info->symlink = *p;
76         *p += info->symlink_len;
77
78         if (features & CEPH_FEATURE_DIRLAYOUTHASH)
79                 ceph_decode_copy_safe(p, end, &info->dir_layout,
80                                       sizeof(info->dir_layout), bad);
81         else
82                 memset(&info->dir_layout, 0, sizeof(info->dir_layout));
83
84         ceph_decode_32_safe(p, end, info->xattr_len, bad);
85         ceph_decode_need(p, end, info->xattr_len, bad);
86         info->xattr_data = *p;
87         *p += info->xattr_len;
88         return 0;
89 bad:
90         return err;
91 }
92
93 /*
94  * parse a normal reply, which may contain a (dir+)dentry and/or a
95  * target inode.
96  */
97 static int parse_reply_info_trace(void **p, void *end,
98                                   struct ceph_mds_reply_info_parsed *info,
99                                   int features)
100 {
101         int err;
102
103         if (info->head->is_dentry) {
104                 err = parse_reply_info_in(p, end, &info->diri, features);
105                 if (err < 0)
106                         goto out_bad;
107
108                 if (unlikely(*p + sizeof(*info->dirfrag) > end))
109                         goto bad;
110                 info->dirfrag = *p;
111                 *p += sizeof(*info->dirfrag) +
112                         sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
113                 if (unlikely(*p > end))
114                         goto bad;
115
116                 ceph_decode_32_safe(p, end, info->dname_len, bad);
117                 ceph_decode_need(p, end, info->dname_len, bad);
118                 info->dname = *p;
119                 *p += info->dname_len;
120                 info->dlease = *p;
121                 *p += sizeof(*info->dlease);
122         }
123
124         if (info->head->is_target) {
125                 err = parse_reply_info_in(p, end, &info->targeti, features);
126                 if (err < 0)
127                         goto out_bad;
128         }
129
130         if (unlikely(*p != end))
131                 goto bad;
132         return 0;
133
134 bad:
135         err = -EIO;
136 out_bad:
137         pr_err("problem parsing mds trace %d\n", err);
138         return err;
139 }
140
141 /*
142  * parse readdir results
143  */
144 static int parse_reply_info_dir(void **p, void *end,
145                                 struct ceph_mds_reply_info_parsed *info,
146                                 int features)
147 {
148         u32 num, i = 0;
149         int err;
150
151         info->dir_dir = *p;
152         if (*p + sizeof(*info->dir_dir) > end)
153                 goto bad;
154         *p += sizeof(*info->dir_dir) +
155                 sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
156         if (*p > end)
157                 goto bad;
158
159         ceph_decode_need(p, end, sizeof(num) + 2, bad);
160         num = ceph_decode_32(p);
161         info->dir_end = ceph_decode_8(p);
162         info->dir_complete = ceph_decode_8(p);
163         if (num == 0)
164                 goto done;
165
166         /* alloc large array */
167         info->dir_nr = num;
168         info->dir_in = kcalloc(num, sizeof(*info->dir_in) +
169                                sizeof(*info->dir_dname) +
170                                sizeof(*info->dir_dname_len) +
171                                sizeof(*info->dir_dlease),
172                                GFP_NOFS);
173         if (info->dir_in == NULL) {
174                 err = -ENOMEM;
175                 goto out_bad;
176         }
177         info->dir_dname = (void *)(info->dir_in + num);
178         info->dir_dname_len = (void *)(info->dir_dname + num);
179         info->dir_dlease = (void *)(info->dir_dname_len + num);
180
181         while (num) {
182                 /* dentry */
183                 ceph_decode_need(p, end, sizeof(u32)*2, bad);
184                 info->dir_dname_len[i] = ceph_decode_32(p);
185                 ceph_decode_need(p, end, info->dir_dname_len[i], bad);
186                 info->dir_dname[i] = *p;
187                 *p += info->dir_dname_len[i];
188                 dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i],
189                      info->dir_dname[i]);
190                 info->dir_dlease[i] = *p;
191                 *p += sizeof(struct ceph_mds_reply_lease);
192
193                 /* inode */
194                 err = parse_reply_info_in(p, end, &info->dir_in[i], features);
195                 if (err < 0)
196                         goto out_bad;
197                 i++;
198                 num--;
199         }
200
201 done:
202         if (*p != end)
203                 goto bad;
204         return 0;
205
206 bad:
207         err = -EIO;
208 out_bad:
209         pr_err("problem parsing dir contents %d\n", err);
210         return err;
211 }
212
213 /*
214  * parse fcntl F_GETLK results
215  */
216 static int parse_reply_info_filelock(void **p, void *end,
217                                      struct ceph_mds_reply_info_parsed *info,
218                                      int features)
219 {
220         if (*p + sizeof(*info->filelock_reply) > end)
221                 goto bad;
222
223         info->filelock_reply = *p;
224         *p += sizeof(*info->filelock_reply);
225
226         if (unlikely(*p != end))
227                 goto bad;
228         return 0;
229
230 bad:
231         return -EIO;
232 }
233
234 /*
235  * parse extra results
236  */
237 static int parse_reply_info_extra(void **p, void *end,
238                                   struct ceph_mds_reply_info_parsed *info,
239                                   int features)
240 {
241         if (info->head->op == CEPH_MDS_OP_GETFILELOCK)
242                 return parse_reply_info_filelock(p, end, info, features);
243         else
244                 return parse_reply_info_dir(p, end, info, features);
245 }
246
247 /*
248  * parse entire mds reply
249  */
250 static int parse_reply_info(struct ceph_msg *msg,
251                             struct ceph_mds_reply_info_parsed *info,
252                             int features)
253 {
254         void *p, *end;
255         u32 len;
256         int err;
257
258         info->head = msg->front.iov_base;
259         p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
260         end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
261
262         /* trace */
263         ceph_decode_32_safe(&p, end, len, bad);
264         if (len > 0) {
265                 err = parse_reply_info_trace(&p, p+len, info, features);
266                 if (err < 0)
267                         goto out_bad;
268         }
269
270         /* extra */
271         ceph_decode_32_safe(&p, end, len, bad);
272         if (len > 0) {
273                 err = parse_reply_info_extra(&p, p+len, info, features);
274                 if (err < 0)
275                         goto out_bad;
276         }
277
278         /* snap blob */
279         ceph_decode_32_safe(&p, end, len, bad);
280         info->snapblob_len = len;
281         info->snapblob = p;
282         p += len;
283
284         if (p != end)
285                 goto bad;
286         return 0;
287
288 bad:
289         err = -EIO;
290 out_bad:
291         pr_err("mds parse_reply err %d\n", err);
292         return err;
293 }
294
295 static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
296 {
297         kfree(info->dir_in);
298 }
299
300
301 /*
302  * sessions
303  */
304 static const char *session_state_name(int s)
305 {
306         switch (s) {
307         case CEPH_MDS_SESSION_NEW: return "new";
308         case CEPH_MDS_SESSION_OPENING: return "opening";
309         case CEPH_MDS_SESSION_OPEN: return "open";
310         case CEPH_MDS_SESSION_HUNG: return "hung";
311         case CEPH_MDS_SESSION_CLOSING: return "closing";
312         case CEPH_MDS_SESSION_RESTARTING: return "restarting";
313         case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
314         default: return "???";
315         }
316 }
317
318 static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
319 {
320         if (atomic_inc_not_zero(&s->s_ref)) {
321                 dout("mdsc get_session %p %d -> %d\n", s,
322                      atomic_read(&s->s_ref)-1, atomic_read(&s->s_ref));
323                 return s;
324         } else {
325                 dout("mdsc get_session %p 0 -- FAIL", s);
326                 return NULL;
327         }
328 }
329
330 void ceph_put_mds_session(struct ceph_mds_session *s)
331 {
332         dout("mdsc put_session %p %d -> %d\n", s,
333              atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
334         if (atomic_dec_and_test(&s->s_ref)) {
335                 if (s->s_authorizer)
336                      s->s_mdsc->fsc->client->monc.auth->ops->destroy_authorizer(
337                              s->s_mdsc->fsc->client->monc.auth,
338                              s->s_authorizer);
339                 kfree(s);
340         }
341 }
342
343 /*
344  * called under mdsc->mutex
345  */
346 struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
347                                                    int mds)
348 {
349         struct ceph_mds_session *session;
350
351         if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
352                 return NULL;
353         session = mdsc->sessions[mds];
354         dout("lookup_mds_session %p %d\n", session,
355              atomic_read(&session->s_ref));
356         get_session(session);
357         return session;
358 }
359
360 static bool __have_session(struct ceph_mds_client *mdsc, int mds)
361 {
362         if (mds >= mdsc->max_sessions)
363                 return false;
364         return mdsc->sessions[mds];
365 }
366
367 static int __verify_registered_session(struct ceph_mds_client *mdsc,
368                                        struct ceph_mds_session *s)
369 {
370         if (s->s_mds >= mdsc->max_sessions ||
371             mdsc->sessions[s->s_mds] != s)
372                 return -ENOENT;
373         return 0;
374 }
375
376 /*
377  * create+register a new session for given mds.
378  * called under mdsc->mutex.
379  */
380 static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
381                                                  int mds)
382 {
383         struct ceph_mds_session *s;
384
385         s = kzalloc(sizeof(*s), GFP_NOFS);
386         if (!s)
387                 return ERR_PTR(-ENOMEM);
388         s->s_mdsc = mdsc;
389         s->s_mds = mds;
390         s->s_state = CEPH_MDS_SESSION_NEW;
391         s->s_ttl = 0;
392         s->s_seq = 0;
393         mutex_init(&s->s_mutex);
394
395         ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
396         s->s_con.private = s;
397         s->s_con.ops = &mds_con_ops;
398         s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
399         s->s_con.peer_name.num = cpu_to_le64(mds);
400
401         spin_lock_init(&s->s_cap_lock);
402         s->s_cap_gen = 0;
403         s->s_cap_ttl = 0;
404         s->s_renew_requested = 0;
405         s->s_renew_seq = 0;
406         INIT_LIST_HEAD(&s->s_caps);
407         s->s_nr_caps = 0;
408         s->s_trim_caps = 0;
409         atomic_set(&s->s_ref, 1);
410         INIT_LIST_HEAD(&s->s_waiting);
411         INIT_LIST_HEAD(&s->s_unsafe);
412         s->s_num_cap_releases = 0;
413         s->s_cap_iterator = NULL;
414         INIT_LIST_HEAD(&s->s_cap_releases);
415         INIT_LIST_HEAD(&s->s_cap_releases_done);
416         INIT_LIST_HEAD(&s->s_cap_flushing);
417         INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
418
419         dout("register_session mds%d\n", mds);
420         if (mds >= mdsc->max_sessions) {
421                 int newmax = 1 << get_count_order(mds+1);
422                 struct ceph_mds_session **sa;
423
424                 dout("register_session realloc to %d\n", newmax);
425                 sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
426                 if (sa == NULL)
427                         goto fail_realloc;
428                 if (mdsc->sessions) {
429                         memcpy(sa, mdsc->sessions,
430                                mdsc->max_sessions * sizeof(void *));
431                         kfree(mdsc->sessions);
432                 }
433                 mdsc->sessions = sa;
434                 mdsc->max_sessions = newmax;
435         }
436         mdsc->sessions[mds] = s;
437         atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
438
439         ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
440
441         return s;
442
443 fail_realloc:
444         kfree(s);
445         return ERR_PTR(-ENOMEM);
446 }
447
448 /*
449  * called under mdsc->mutex
450  */
451 static void __unregister_session(struct ceph_mds_client *mdsc,
452                                struct ceph_mds_session *s)
453 {
454         dout("__unregister_session mds%d %p\n", s->s_mds, s);
455         BUG_ON(mdsc->sessions[s->s_mds] != s);
456         mdsc->sessions[s->s_mds] = NULL;
457         ceph_con_close(&s->s_con);
458         ceph_put_mds_session(s);
459 }
460
461 /*
462  * drop session refs in request.
463  *
464  * should be last request ref, or hold mdsc->mutex
465  */
466 static void put_request_session(struct ceph_mds_request *req)
467 {
468         if (req->r_session) {
469                 ceph_put_mds_session(req->r_session);
470                 req->r_session = NULL;
471         }
472 }
473
474 void ceph_mdsc_release_request(struct kref *kref)
475 {
476         struct ceph_mds_request *req = container_of(kref,
477                                                     struct ceph_mds_request,
478                                                     r_kref);
479         if (req->r_request)
480                 ceph_msg_put(req->r_request);
481         if (req->r_reply) {
482                 ceph_msg_put(req->r_reply);
483                 destroy_reply_info(&req->r_reply_info);
484         }
485         if (req->r_inode) {
486                 ceph_put_cap_refs(ceph_inode(req->r_inode),
487                                   CEPH_CAP_PIN);
488                 iput(req->r_inode);
489         }
490         if (req->r_locked_dir)
491                 ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
492                                   CEPH_CAP_PIN);
493         if (req->r_target_inode)
494                 iput(req->r_target_inode);
495         if (req->r_dentry)
496                 dput(req->r_dentry);
497         if (req->r_old_dentry) {
498                 ceph_put_cap_refs(
499                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
500                         CEPH_CAP_PIN);
501                 dput(req->r_old_dentry);
502         }
503         kfree(req->r_path1);
504         kfree(req->r_path2);
505         put_request_session(req);
506         ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
507         kfree(req);
508 }
509
510 /*
511  * lookup session, bump ref if found.
512  *
513  * called under mdsc->mutex.
514  */
515 static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
516                                              u64 tid)
517 {
518         struct ceph_mds_request *req;
519         struct rb_node *n = mdsc->request_tree.rb_node;
520
521         while (n) {
522                 req = rb_entry(n, struct ceph_mds_request, r_node);
523                 if (tid < req->r_tid)
524                         n = n->rb_left;
525                 else if (tid > req->r_tid)
526                         n = n->rb_right;
527                 else {
528                         ceph_mdsc_get_request(req);
529                         return req;
530                 }
531         }
532         return NULL;
533 }
534
535 static void __insert_request(struct ceph_mds_client *mdsc,
536                              struct ceph_mds_request *new)
537 {
538         struct rb_node **p = &mdsc->request_tree.rb_node;
539         struct rb_node *parent = NULL;
540         struct ceph_mds_request *req = NULL;
541
542         while (*p) {
543                 parent = *p;
544                 req = rb_entry(parent, struct ceph_mds_request, r_node);
545                 if (new->r_tid < req->r_tid)
546                         p = &(*p)->rb_left;
547                 else if (new->r_tid > req->r_tid)
548                         p = &(*p)->rb_right;
549                 else
550                         BUG();
551         }
552
553         rb_link_node(&new->r_node, parent, p);
554         rb_insert_color(&new->r_node, &mdsc->request_tree);
555 }
556
557 /*
558  * Register an in-flight request, and assign a tid.  Link to directory
559  * are modifying (if any).
560  *
561  * Called under mdsc->mutex.
562  */
563 static void __register_request(struct ceph_mds_client *mdsc,
564                                struct ceph_mds_request *req,
565                                struct inode *dir)
566 {
567         req->r_tid = ++mdsc->last_tid;
568         if (req->r_num_caps)
569                 ceph_reserve_caps(mdsc, &req->r_caps_reservation,
570                                   req->r_num_caps);
571         dout("__register_request %p tid %lld\n", req, req->r_tid);
572         ceph_mdsc_get_request(req);
573         __insert_request(mdsc, req);
574
575         req->r_uid = current_fsuid();
576         req->r_gid = current_fsgid();
577
578         if (dir) {
579                 struct ceph_inode_info *ci = ceph_inode(dir);
580
581                 spin_lock(&ci->i_unsafe_lock);
582                 req->r_unsafe_dir = dir;
583                 list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
584                 spin_unlock(&ci->i_unsafe_lock);
585         }
586 }
587
588 static void __unregister_request(struct ceph_mds_client *mdsc,
589                                  struct ceph_mds_request *req)
590 {
591         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
592         rb_erase(&req->r_node, &mdsc->request_tree);
593         RB_CLEAR_NODE(&req->r_node);
594
595         if (req->r_unsafe_dir) {
596                 struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
597
598                 spin_lock(&ci->i_unsafe_lock);
599                 list_del_init(&req->r_unsafe_dir_item);
600                 spin_unlock(&ci->i_unsafe_lock);
601         }
602
603         ceph_mdsc_put_request(req);
604 }
605
606 /*
607  * Choose mds to send request to next.  If there is a hint set in the
608  * request (e.g., due to a prior forward hint from the mds), use that.
609  * Otherwise, consult frag tree and/or caps to identify the
610  * appropriate mds.  If all else fails, choose randomly.
611  *
612  * Called under mdsc->mutex.
613  */
614 struct dentry *get_nonsnap_parent(struct dentry *dentry)
615 {
616         while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
617                 dentry = dentry->d_parent;
618         return dentry;
619 }
620
621 static int __choose_mds(struct ceph_mds_client *mdsc,
622                         struct ceph_mds_request *req)
623 {
624         struct inode *inode;
625         struct ceph_inode_info *ci;
626         struct ceph_cap *cap;
627         int mode = req->r_direct_mode;
628         int mds = -1;
629         u32 hash = req->r_direct_hash;
630         bool is_hash = req->r_direct_is_hash;
631
632         /*
633          * is there a specific mds we should try?  ignore hint if we have
634          * no session and the mds is not up (active or recovering).
635          */
636         if (req->r_resend_mds >= 0 &&
637             (__have_session(mdsc, req->r_resend_mds) ||
638              ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
639                 dout("choose_mds using resend_mds mds%d\n",
640                      req->r_resend_mds);
641                 return req->r_resend_mds;
642         }
643
644         if (mode == USE_RANDOM_MDS)
645                 goto random;
646
647         inode = NULL;
648         if (req->r_inode) {
649                 inode = req->r_inode;
650         } else if (req->r_dentry) {
651                 struct inode *dir = req->r_dentry->d_parent->d_inode;
652
653                 if (dir->i_sb != mdsc->fsc->sb) {
654                         /* not this fs! */
655                         inode = req->r_dentry->d_inode;
656                 } else if (ceph_snap(dir) != CEPH_NOSNAP) {
657                         /* direct snapped/virtual snapdir requests
658                          * based on parent dir inode */
659                         struct dentry *dn =
660                                 get_nonsnap_parent(req->r_dentry->d_parent);
661                         inode = dn->d_inode;
662                         dout("__choose_mds using nonsnap parent %p\n", inode);
663                 } else if (req->r_dentry->d_inode) {
664                         /* dentry target */
665                         inode = req->r_dentry->d_inode;
666                 } else {
667                         /* dir + name */
668                         inode = dir;
669                         hash = ceph_dentry_hash(req->r_dentry);
670                         is_hash = true;
671                 }
672         }
673
674         dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
675              (int)hash, mode);
676         if (!inode)
677                 goto random;
678         ci = ceph_inode(inode);
679
680         if (is_hash && S_ISDIR(inode->i_mode)) {
681                 struct ceph_inode_frag frag;
682                 int found;
683
684                 ceph_choose_frag(ci, hash, &frag, &found);
685                 if (found) {
686                         if (mode == USE_ANY_MDS && frag.ndist > 0) {
687                                 u8 r;
688
689                                 /* choose a random replica */
690                                 get_random_bytes(&r, 1);
691                                 r %= frag.ndist;
692                                 mds = frag.dist[r];
693                                 dout("choose_mds %p %llx.%llx "
694                                      "frag %u mds%d (%d/%d)\n",
695                                      inode, ceph_vinop(inode),
696                                      frag.frag, mds,
697                                      (int)r, frag.ndist);
698                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
699                                     CEPH_MDS_STATE_ACTIVE)
700                                         return mds;
701                         }
702
703                         /* since this file/dir wasn't known to be
704                          * replicated, then we want to look for the
705                          * authoritative mds. */
706                         mode = USE_AUTH_MDS;
707                         if (frag.mds >= 0) {
708                                 /* choose auth mds */
709                                 mds = frag.mds;
710                                 dout("choose_mds %p %llx.%llx "
711                                      "frag %u mds%d (auth)\n",
712                                      inode, ceph_vinop(inode), frag.frag, mds);
713                                 if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
714                                     CEPH_MDS_STATE_ACTIVE)
715                                         return mds;
716                         }
717                 }
718         }
719
720         spin_lock(&inode->i_lock);
721         cap = NULL;
722         if (mode == USE_AUTH_MDS)
723                 cap = ci->i_auth_cap;
724         if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
725                 cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
726         if (!cap) {
727                 spin_unlock(&inode->i_lock);
728                 goto random;
729         }
730         mds = cap->session->s_mds;
731         dout("choose_mds %p %llx.%llx mds%d (%scap %p)\n",
732              inode, ceph_vinop(inode), mds,
733              cap == ci->i_auth_cap ? "auth " : "", cap);
734         spin_unlock(&inode->i_lock);
735         return mds;
736
737 random:
738         mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
739         dout("choose_mds chose random mds%d\n", mds);
740         return mds;
741 }
742
743
744 /*
745  * session messages
746  */
747 static struct ceph_msg *create_session_msg(u32 op, u64 seq)
748 {
749         struct ceph_msg *msg;
750         struct ceph_mds_session_head *h;
751
752         msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS);
753         if (!msg) {
754                 pr_err("create_session_msg ENOMEM creating msg\n");
755                 return NULL;
756         }
757         h = msg->front.iov_base;
758         h->op = cpu_to_le32(op);
759         h->seq = cpu_to_le64(seq);
760         return msg;
761 }
762
763 /*
764  * send session open request.
765  *
766  * called under mdsc->mutex
767  */
768 static int __open_session(struct ceph_mds_client *mdsc,
769                           struct ceph_mds_session *session)
770 {
771         struct ceph_msg *msg;
772         int mstate;
773         int mds = session->s_mds;
774
775         /* wait for mds to go active? */
776         mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
777         dout("open_session to mds%d (%s)\n", mds,
778              ceph_mds_state_name(mstate));
779         session->s_state = CEPH_MDS_SESSION_OPENING;
780         session->s_renew_requested = jiffies;
781
782         /* send connect message */
783         msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
784         if (!msg)
785                 return -ENOMEM;
786         ceph_con_send(&session->s_con, msg);
787         return 0;
788 }
789
790 /*
791  * open sessions for any export targets for the given mds
792  *
793  * called under mdsc->mutex
794  */
795 static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
796                                           struct ceph_mds_session *session)
797 {
798         struct ceph_mds_info *mi;
799         struct ceph_mds_session *ts;
800         int i, mds = session->s_mds;
801         int target;
802
803         if (mds >= mdsc->mdsmap->m_max_mds)
804                 return;
805         mi = &mdsc->mdsmap->m_info[mds];
806         dout("open_export_target_sessions for mds%d (%d targets)\n",
807              session->s_mds, mi->num_export_targets);
808
809         for (i = 0; i < mi->num_export_targets; i++) {
810                 target = mi->export_targets[i];
811                 ts = __ceph_lookup_mds_session(mdsc, target);
812                 if (!ts) {
813                         ts = register_session(mdsc, target);
814                         if (IS_ERR(ts))
815                                 return;
816                 }
817                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
818                     session->s_state == CEPH_MDS_SESSION_CLOSING)
819                         __open_session(mdsc, session);
820                 else
821                         dout(" mds%d target mds%d %p is %s\n", session->s_mds,
822                              i, ts, session_state_name(ts->s_state));
823                 ceph_put_mds_session(ts);
824         }
825 }
826
827 void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc,
828                                            struct ceph_mds_session *session)
829 {
830         mutex_lock(&mdsc->mutex);
831         __open_export_target_sessions(mdsc, session);
832         mutex_unlock(&mdsc->mutex);
833 }
834
835 /*
836  * session caps
837  */
838
839 /*
840  * Free preallocated cap messages assigned to this session
841  */
842 static void cleanup_cap_releases(struct ceph_mds_session *session)
843 {
844         struct ceph_msg *msg;
845
846         spin_lock(&session->s_cap_lock);
847         while (!list_empty(&session->s_cap_releases)) {
848                 msg = list_first_entry(&session->s_cap_releases,
849                                        struct ceph_msg, list_head);
850                 list_del_init(&msg->list_head);
851                 ceph_msg_put(msg);
852         }
853         while (!list_empty(&session->s_cap_releases_done)) {
854                 msg = list_first_entry(&session->s_cap_releases_done,
855                                        struct ceph_msg, list_head);
856                 list_del_init(&msg->list_head);
857                 ceph_msg_put(msg);
858         }
859         spin_unlock(&session->s_cap_lock);
860 }
861
862 /*
863  * Helper to safely iterate over all caps associated with a session, with
864  * special care taken to handle a racing __ceph_remove_cap().
865  *
866  * Caller must hold session s_mutex.
867  */
868 static int iterate_session_caps(struct ceph_mds_session *session,
869                                  int (*cb)(struct inode *, struct ceph_cap *,
870                                             void *), void *arg)
871 {
872         struct list_head *p;
873         struct ceph_cap *cap;
874         struct inode *inode, *last_inode = NULL;
875         struct ceph_cap *old_cap = NULL;
876         int ret;
877
878         dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
879         spin_lock(&session->s_cap_lock);
880         p = session->s_caps.next;
881         while (p != &session->s_caps) {
882                 cap = list_entry(p, struct ceph_cap, session_caps);
883                 inode = igrab(&cap->ci->vfs_inode);
884                 if (!inode) {
885                         p = p->next;
886                         continue;
887                 }
888                 session->s_cap_iterator = cap;
889                 spin_unlock(&session->s_cap_lock);
890
891                 if (last_inode) {
892                         iput(last_inode);
893                         last_inode = NULL;
894                 }
895                 if (old_cap) {
896                         ceph_put_cap(session->s_mdsc, old_cap);
897                         old_cap = NULL;
898                 }
899
900                 ret = cb(inode, cap, arg);
901                 last_inode = inode;
902
903                 spin_lock(&session->s_cap_lock);
904                 p = p->next;
905                 if (cap->ci == NULL) {
906                         dout("iterate_session_caps  finishing cap %p removal\n",
907                              cap);
908                         BUG_ON(cap->session != session);
909                         list_del_init(&cap->session_caps);
910                         session->s_nr_caps--;
911                         cap->session = NULL;
912                         old_cap = cap;  /* put_cap it w/o locks held */
913                 }
914                 if (ret < 0)
915                         goto out;
916         }
917         ret = 0;
918 out:
919         session->s_cap_iterator = NULL;
920         spin_unlock(&session->s_cap_lock);
921
922         if (last_inode)
923                 iput(last_inode);
924         if (old_cap)
925                 ceph_put_cap(session->s_mdsc, old_cap);
926
927         return ret;
928 }
929
930 static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
931                                   void *arg)
932 {
933         struct ceph_inode_info *ci = ceph_inode(inode);
934         int drop = 0;
935
936         dout("removing cap %p, ci is %p, inode is %p\n",
937              cap, ci, &ci->vfs_inode);
938         spin_lock(&inode->i_lock);
939         __ceph_remove_cap(cap);
940         if (!__ceph_is_any_real_caps(ci)) {
941                 struct ceph_mds_client *mdsc =
942                         ceph_sb_to_client(inode->i_sb)->mdsc;
943
944                 spin_lock(&mdsc->cap_dirty_lock);
945                 if (!list_empty(&ci->i_dirty_item)) {
946                         pr_info(" dropping dirty %s state for %p %lld\n",
947                                 ceph_cap_string(ci->i_dirty_caps),
948                                 inode, ceph_ino(inode));
949                         ci->i_dirty_caps = 0;
950                         list_del_init(&ci->i_dirty_item);
951                         drop = 1;
952                 }
953                 if (!list_empty(&ci->i_flushing_item)) {
954                         pr_info(" dropping dirty+flushing %s state for %p %lld\n",
955                                 ceph_cap_string(ci->i_flushing_caps),
956                                 inode, ceph_ino(inode));
957                         ci->i_flushing_caps = 0;
958                         list_del_init(&ci->i_flushing_item);
959                         mdsc->num_cap_flushing--;
960                         drop = 1;
961                 }
962                 if (drop && ci->i_wrbuffer_ref) {
963                         pr_info(" dropping dirty data for %p %lld\n",
964                                 inode, ceph_ino(inode));
965                         ci->i_wrbuffer_ref = 0;
966                         ci->i_wrbuffer_ref_head = 0;
967                         drop++;
968                 }
969                 spin_unlock(&mdsc->cap_dirty_lock);
970         }
971         spin_unlock(&inode->i_lock);
972         while (drop--)
973                 iput(inode);
974         return 0;
975 }
976
977 /*
978  * caller must hold session s_mutex
979  */
980 static void remove_session_caps(struct ceph_mds_session *session)
981 {
982         dout("remove_session_caps on %p\n", session);
983         iterate_session_caps(session, remove_session_caps_cb, NULL);
984         BUG_ON(session->s_nr_caps > 0);
985         BUG_ON(!list_empty(&session->s_cap_flushing));
986         cleanup_cap_releases(session);
987 }
988
989 /*
990  * wake up any threads waiting on this session's caps.  if the cap is
991  * old (didn't get renewed on the client reconnect), remove it now.
992  *
993  * caller must hold s_mutex.
994  */
995 static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
996                               void *arg)
997 {
998         struct ceph_inode_info *ci = ceph_inode(inode);
999
1000         wake_up_all(&ci->i_cap_wq);
1001         if (arg) {
1002                 spin_lock(&inode->i_lock);
1003                 ci->i_wanted_max_size = 0;
1004                 ci->i_requested_max_size = 0;
1005                 spin_unlock(&inode->i_lock);
1006         }
1007         return 0;
1008 }
1009
1010 static void wake_up_session_caps(struct ceph_mds_session *session,
1011                                  int reconnect)
1012 {
1013         dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
1014         iterate_session_caps(session, wake_up_session_cb,
1015                              (void *)(unsigned long)reconnect);
1016 }
1017
1018 /*
1019  * Send periodic message to MDS renewing all currently held caps.  The
1020  * ack will reset the expiration for all caps from this session.
1021  *
1022  * caller holds s_mutex
1023  */
1024 static int send_renew_caps(struct ceph_mds_client *mdsc,
1025                            struct ceph_mds_session *session)
1026 {
1027         struct ceph_msg *msg;
1028         int state;
1029
1030         if (time_after_eq(jiffies, session->s_cap_ttl) &&
1031             time_after_eq(session->s_cap_ttl, session->s_renew_requested))
1032                 pr_info("mds%d caps stale\n", session->s_mds);
1033         session->s_renew_requested = jiffies;
1034
1035         /* do not try to renew caps until a recovering mds has reconnected
1036          * with its clients. */
1037         state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
1038         if (state < CEPH_MDS_STATE_RECONNECT) {
1039                 dout("send_renew_caps ignoring mds%d (%s)\n",
1040                      session->s_mds, ceph_mds_state_name(state));
1041                 return 0;
1042         }
1043
1044         dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
1045                 ceph_mds_state_name(state));
1046         msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
1047                                  ++session->s_renew_seq);
1048         if (!msg)
1049                 return -ENOMEM;
1050         ceph_con_send(&session->s_con, msg);
1051         return 0;
1052 }
1053
1054 /*
1055  * Note new cap ttl, and any transition from stale -> not stale (fresh?).
1056  *
1057  * Called under session->s_mutex
1058  */
1059 static void renewed_caps(struct ceph_mds_client *mdsc,
1060                          struct ceph_mds_session *session, int is_renew)
1061 {
1062         int was_stale;
1063         int wake = 0;
1064
1065         spin_lock(&session->s_cap_lock);
1066         was_stale = is_renew && (session->s_cap_ttl == 0 ||
1067                                  time_after_eq(jiffies, session->s_cap_ttl));
1068
1069         session->s_cap_ttl = session->s_renew_requested +
1070                 mdsc->mdsmap->m_session_timeout*HZ;
1071
1072         if (was_stale) {
1073                 if (time_before(jiffies, session->s_cap_ttl)) {
1074                         pr_info("mds%d caps renewed\n", session->s_mds);
1075                         wake = 1;
1076                 } else {
1077                         pr_info("mds%d caps still stale\n", session->s_mds);
1078                 }
1079         }
1080         dout("renewed_caps mds%d ttl now %lu, was %s, now %s\n",
1081              session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
1082              time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
1083         spin_unlock(&session->s_cap_lock);
1084
1085         if (wake)
1086                 wake_up_session_caps(session, 0);
1087 }
1088
1089 /*
1090  * send a session close request
1091  */
1092 static int request_close_session(struct ceph_mds_client *mdsc,
1093                                  struct ceph_mds_session *session)
1094 {
1095         struct ceph_msg *msg;
1096
1097         dout("request_close_session mds%d state %s seq %lld\n",
1098              session->s_mds, session_state_name(session->s_state),
1099              session->s_seq);
1100         msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE, session->s_seq);
1101         if (!msg)
1102                 return -ENOMEM;
1103         ceph_con_send(&session->s_con, msg);
1104         return 0;
1105 }
1106
1107 /*
1108  * Called with s_mutex held.
1109  */
1110 static int __close_session(struct ceph_mds_client *mdsc,
1111                          struct ceph_mds_session *session)
1112 {
1113         if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
1114                 return 0;
1115         session->s_state = CEPH_MDS_SESSION_CLOSING;
1116         return request_close_session(mdsc, session);
1117 }
1118
1119 /*
1120  * Trim old(er) caps.
1121  *
1122  * Because we can't cache an inode without one or more caps, we do
1123  * this indirectly: if a cap is unused, we prune its aliases, at which
1124  * point the inode will hopefully get dropped to.
1125  *
1126  * Yes, this is a bit sloppy.  Our only real goal here is to respond to
1127  * memory pressure from the MDS, though, so it needn't be perfect.
1128  */
1129 static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
1130 {
1131         struct ceph_mds_session *session = arg;
1132         struct ceph_inode_info *ci = ceph_inode(inode);
1133         int used, oissued, mine;
1134
1135         if (session->s_trim_caps <= 0)
1136                 return -1;
1137
1138         spin_lock(&inode->i_lock);
1139         mine = cap->issued | cap->implemented;
1140         used = __ceph_caps_used(ci);
1141         oissued = __ceph_caps_issued_other(ci, cap);
1142
1143         dout("trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
1144              inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
1145              ceph_cap_string(used));
1146         if (ci->i_dirty_caps)
1147                 goto out;   /* dirty caps */
1148         if ((used & ~oissued) & mine)
1149                 goto out;   /* we need these caps */
1150
1151         session->s_trim_caps--;
1152         if (oissued) {
1153                 /* we aren't the only cap.. just remove us */
1154                 __ceph_remove_cap(cap);
1155         } else {
1156                 /* try to drop referring dentries */
1157                 spin_unlock(&inode->i_lock);
1158                 d_prune_aliases(inode);
1159                 dout("trim_caps_cb %p cap %p  pruned, count now %d\n",
1160                      inode, cap, atomic_read(&inode->i_count));
1161                 return 0;
1162         }
1163
1164 out:
1165         spin_unlock(&inode->i_lock);
1166         return 0;
1167 }
1168
1169 /*
1170  * Trim session cap count down to some max number.
1171  */
1172 static int trim_caps(struct ceph_mds_client *mdsc,
1173                      struct ceph_mds_session *session,
1174                      int max_caps)
1175 {
1176         int trim_caps = session->s_nr_caps - max_caps;
1177
1178         dout("trim_caps mds%d start: %d / %d, trim %d\n",
1179              session->s_mds, session->s_nr_caps, max_caps, trim_caps);
1180         if (trim_caps > 0) {
1181                 session->s_trim_caps = trim_caps;
1182                 iterate_session_caps(session, trim_caps_cb, session);
1183                 dout("trim_caps mds%d done: %d / %d, trimmed %d\n",
1184                      session->s_mds, session->s_nr_caps, max_caps,
1185                         trim_caps - session->s_trim_caps);
1186                 session->s_trim_caps = 0;
1187         }
1188         return 0;
1189 }
1190
1191 /*
1192  * Allocate cap_release messages.  If there is a partially full message
1193  * in the queue, try to allocate enough to cover it's remainder, so that
1194  * we can send it immediately.
1195  *
1196  * Called under s_mutex.
1197  */
1198 int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
1199                           struct ceph_mds_session *session)
1200 {
1201         struct ceph_msg *msg, *partial = NULL;
1202         struct ceph_mds_cap_release *head;
1203         int err = -ENOMEM;
1204         int extra = mdsc->fsc->mount_options->cap_release_safety;
1205         int num;
1206
1207         dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
1208              extra);
1209
1210         spin_lock(&session->s_cap_lock);
1211
1212         if (!list_empty(&session->s_cap_releases)) {
1213                 msg = list_first_entry(&session->s_cap_releases,
1214                                        struct ceph_msg,
1215                                  list_head);
1216                 head = msg->front.iov_base;
1217                 num = le32_to_cpu(head->num);
1218                 if (num) {
1219                         dout(" partial %p with (%d/%d)\n", msg, num,
1220                              (int)CEPH_CAPS_PER_RELEASE);
1221                         extra += CEPH_CAPS_PER_RELEASE - num;
1222                         partial = msg;
1223                 }
1224         }
1225         while (session->s_num_cap_releases < session->s_nr_caps + extra) {
1226                 spin_unlock(&session->s_cap_lock);
1227                 msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
1228                                    GFP_NOFS);
1229                 if (!msg)
1230                         goto out_unlocked;
1231                 dout("add_cap_releases %p msg %p now %d\n", session, msg,
1232                      (int)msg->front.iov_len);
1233                 head = msg->front.iov_base;
1234                 head->num = cpu_to_le32(0);
1235                 msg->front.iov_len = sizeof(*head);
1236                 spin_lock(&session->s_cap_lock);
1237                 list_add(&msg->list_head, &session->s_cap_releases);
1238                 session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
1239         }
1240
1241         if (partial) {
1242                 head = partial->front.iov_base;
1243                 num = le32_to_cpu(head->num);
1244                 dout(" queueing partial %p with %d/%d\n", partial, num,
1245                      (int)CEPH_CAPS_PER_RELEASE);
1246                 list_move_tail(&partial->list_head,
1247                                &session->s_cap_releases_done);
1248                 session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
1249         }
1250         err = 0;
1251         spin_unlock(&session->s_cap_lock);
1252 out_unlocked:
1253         return err;
1254 }
1255
1256 /*
1257  * flush all dirty inode data to disk.
1258  *
1259  * returns true if we've flushed through want_flush_seq
1260  */
1261 static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
1262 {
1263         int mds, ret = 1;
1264
1265         dout("check_cap_flush want %lld\n", want_flush_seq);
1266         mutex_lock(&mdsc->mutex);
1267         for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
1268                 struct ceph_mds_session *session = mdsc->sessions[mds];
1269
1270                 if (!session)
1271                         continue;
1272                 get_session(session);
1273                 mutex_unlock(&mdsc->mutex);
1274
1275                 mutex_lock(&session->s_mutex);
1276                 if (!list_empty(&session->s_cap_flushing)) {
1277                         struct ceph_inode_info *ci =
1278                                 list_entry(session->s_cap_flushing.next,
1279                                            struct ceph_inode_info,
1280                                            i_flushing_item);
1281                         struct inode *inode = &ci->vfs_inode;
1282
1283                         spin_lock(&inode->i_lock);
1284                         if (ci->i_cap_flush_seq <= want_flush_seq) {
1285                                 dout("check_cap_flush still flushing %p "
1286                                      "seq %lld <= %lld to mds%d\n", inode,
1287                                      ci->i_cap_flush_seq, want_flush_seq,
1288                                      session->s_mds);
1289                                 ret = 0;
1290                         }
1291                         spin_unlock(&inode->i_lock);
1292                 }
1293                 mutex_unlock(&session->s_mutex);
1294                 ceph_put_mds_session(session);
1295
1296                 if (!ret)
1297                         return ret;
1298                 mutex_lock(&mdsc->mutex);
1299         }
1300
1301         mutex_unlock(&mdsc->mutex);
1302         dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
1303         return ret;
1304 }
1305
1306 /*
1307  * called under s_mutex
1308  */
1309 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
1310                             struct ceph_mds_session *session)
1311 {
1312         struct ceph_msg *msg;
1313
1314         dout("send_cap_releases mds%d\n", session->s_mds);
1315         spin_lock(&session->s_cap_lock);
1316         while (!list_empty(&session->s_cap_releases_done)) {
1317                 msg = list_first_entry(&session->s_cap_releases_done,
1318                                  struct ceph_msg, list_head);
1319                 list_del_init(&msg->list_head);
1320                 spin_unlock(&session->s_cap_lock);
1321                 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1322                 dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
1323                 ceph_con_send(&session->s_con, msg);
1324                 spin_lock(&session->s_cap_lock);
1325         }
1326         spin_unlock(&session->s_cap_lock);
1327 }
1328
1329 static void discard_cap_releases(struct ceph_mds_client *mdsc,
1330                                  struct ceph_mds_session *session)
1331 {
1332         struct ceph_msg *msg;
1333         struct ceph_mds_cap_release *head;
1334         unsigned num;
1335
1336         dout("discard_cap_releases mds%d\n", session->s_mds);
1337         spin_lock(&session->s_cap_lock);
1338
1339         /* zero out the in-progress message */
1340         msg = list_first_entry(&session->s_cap_releases,
1341                                struct ceph_msg, list_head);
1342         head = msg->front.iov_base;
1343         num = le32_to_cpu(head->num);
1344         dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
1345         head->num = cpu_to_le32(0);
1346         session->s_num_cap_releases += num;
1347
1348         /* requeue completed messages */
1349         while (!list_empty(&session->s_cap_releases_done)) {
1350                 msg = list_first_entry(&session->s_cap_releases_done,
1351                                  struct ceph_msg, list_head);
1352                 list_del_init(&msg->list_head);
1353
1354                 head = msg->front.iov_base;
1355                 num = le32_to_cpu(head->num);
1356                 dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
1357                      num);
1358                 session->s_num_cap_releases += num;
1359                 head->num = cpu_to_le32(0);
1360                 msg->front.iov_len = sizeof(*head);
1361                 list_add(&msg->list_head, &session->s_cap_releases);
1362         }
1363
1364         spin_unlock(&session->s_cap_lock);
1365 }
1366
1367 /*
1368  * requests
1369  */
1370
1371 /*
1372  * Create an mds request.
1373  */
1374 struct ceph_mds_request *
1375 ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
1376 {
1377         struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
1378
1379         if (!req)
1380                 return ERR_PTR(-ENOMEM);
1381
1382         mutex_init(&req->r_fill_mutex);
1383         req->r_mdsc = mdsc;
1384         req->r_started = jiffies;
1385         req->r_resend_mds = -1;
1386         INIT_LIST_HEAD(&req->r_unsafe_dir_item);
1387         req->r_fmode = -1;
1388         kref_init(&req->r_kref);
1389         INIT_LIST_HEAD(&req->r_wait);
1390         init_completion(&req->r_completion);
1391         init_completion(&req->r_safe_completion);
1392         INIT_LIST_HEAD(&req->r_unsafe_item);
1393
1394         req->r_op = op;
1395         req->r_direct_mode = mode;
1396         return req;
1397 }
1398
1399 /*
1400  * return oldest (lowest) request, tid in request tree, 0 if none.
1401  *
1402  * called under mdsc->mutex.
1403  */
1404 static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc)
1405 {
1406         if (RB_EMPTY_ROOT(&mdsc->request_tree))
1407                 return NULL;
1408         return rb_entry(rb_first(&mdsc->request_tree),
1409                         struct ceph_mds_request, r_node);
1410 }
1411
1412 static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
1413 {
1414         struct ceph_mds_request *req = __get_oldest_req(mdsc);
1415
1416         if (req)
1417                 return req->r_tid;
1418         return 0;
1419 }
1420
1421 /*
1422  * Build a dentry's path.  Allocate on heap; caller must kfree.  Based
1423  * on build_path_from_dentry in fs/cifs/dir.c.
1424  *
1425  * If @stop_on_nosnap, generate path relative to the first non-snapped
1426  * inode.
1427  *
1428  * Encode hidden .snap dirs as a double /, i.e.
1429  *   foo/.snap/bar -> foo//bar
1430  */
1431 char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
1432                            int stop_on_nosnap)
1433 {
1434         struct dentry *temp;
1435         char *path;
1436         int len, pos;
1437
1438         if (dentry == NULL)
1439                 return ERR_PTR(-EINVAL);
1440
1441 retry:
1442         len = 0;
1443         for (temp = dentry; !IS_ROOT(temp);) {
1444                 struct inode *inode = temp->d_inode;
1445                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
1446                         len++;  /* slash only */
1447                 else if (stop_on_nosnap && inode &&
1448                          ceph_snap(inode) == CEPH_NOSNAP)
1449                         break;
1450                 else
1451                         len += 1 + temp->d_name.len;
1452                 temp = temp->d_parent;
1453                 if (temp == NULL) {
1454                         pr_err("build_path corrupt dentry %p\n", dentry);
1455                         return ERR_PTR(-EINVAL);
1456                 }
1457         }
1458         if (len)
1459                 len--;  /* no leading '/' */
1460
1461         path = kmalloc(len+1, GFP_NOFS);
1462         if (path == NULL)
1463                 return ERR_PTR(-ENOMEM);
1464         pos = len;
1465         path[pos] = 0;  /* trailing null */
1466         for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
1467                 struct inode *inode = temp->d_inode;
1468
1469                 if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
1470                         dout("build_path path+%d: %p SNAPDIR\n",
1471                              pos, temp);
1472                 } else if (stop_on_nosnap && inode &&
1473                            ceph_snap(inode) == CEPH_NOSNAP) {
1474                         break;
1475                 } else {
1476                         pos -= temp->d_name.len;
1477                         if (pos < 0)
1478                                 break;
1479                         strncpy(path + pos, temp->d_name.name,
1480                                 temp->d_name.len);
1481                 }
1482                 if (pos)
1483                         path[--pos] = '/';
1484                 temp = temp->d_parent;
1485                 if (temp == NULL) {
1486                         pr_err("build_path corrupt dentry\n");
1487                         kfree(path);
1488                         return ERR_PTR(-EINVAL);
1489                 }
1490         }
1491         if (pos != 0) {
1492                 pr_err("build_path did not end path lookup where "
1493                        "expected, namelen is %d, pos is %d\n", len, pos);
1494                 /* presumably this is only possible if racing with a
1495                    rename of one of the parent directories (we can not
1496                    lock the dentries above us to prevent this, but
1497                    retrying should be harmless) */
1498                 kfree(path);
1499                 goto retry;
1500         }
1501
1502         *base = ceph_ino(temp->d_inode);
1503         *plen = len;
1504         dout("build_path on %p %d built %llx '%.*s'\n",
1505              dentry, dentry->d_count, *base, len, path);
1506         return path;
1507 }
1508
1509 static int build_dentry_path(struct dentry *dentry,
1510                              const char **ppath, int *ppathlen, u64 *pino,
1511                              int *pfreepath)
1512 {
1513         char *path;
1514
1515         if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
1516                 *pino = ceph_ino(dentry->d_parent->d_inode);
1517                 *ppath = dentry->d_name.name;
1518                 *ppathlen = dentry->d_name.len;
1519                 return 0;
1520         }
1521         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1522         if (IS_ERR(path))
1523                 return PTR_ERR(path);
1524         *ppath = path;
1525         *pfreepath = 1;
1526         return 0;
1527 }
1528
1529 static int build_inode_path(struct inode *inode,
1530                             const char **ppath, int *ppathlen, u64 *pino,
1531                             int *pfreepath)
1532 {
1533         struct dentry *dentry;
1534         char *path;
1535
1536         if (ceph_snap(inode) == CEPH_NOSNAP) {
1537                 *pino = ceph_ino(inode);
1538                 *ppathlen = 0;
1539                 return 0;
1540         }
1541         dentry = d_find_alias(inode);
1542         path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
1543         dput(dentry);
1544         if (IS_ERR(path))
1545                 return PTR_ERR(path);
1546         *ppath = path;
1547         *pfreepath = 1;
1548         return 0;
1549 }
1550
1551 /*
1552  * request arguments may be specified via an inode *, a dentry *, or
1553  * an explicit ino+path.
1554  */
1555 static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
1556                                   const char *rpath, u64 rino,
1557                                   const char **ppath, int *pathlen,
1558                                   u64 *ino, int *freepath)
1559 {
1560         int r = 0;
1561
1562         if (rinode) {
1563                 r = build_inode_path(rinode, ppath, pathlen, ino, freepath);
1564                 dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
1565                      ceph_snap(rinode));
1566         } else if (rdentry) {
1567                 r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath);
1568                 dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
1569                      *ppath);
1570         } else if (rpath) {
1571                 *ino = rino;
1572                 *ppath = rpath;
1573                 *pathlen = strlen(rpath);
1574                 dout(" path %.*s\n", *pathlen, rpath);
1575         }
1576
1577         return r;
1578 }
1579
1580 /*
1581  * called under mdsc->mutex
1582  */
1583 static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
1584                                                struct ceph_mds_request *req,
1585                                                int mds)
1586 {
1587         struct ceph_msg *msg;
1588         struct ceph_mds_request_head *head;
1589         const char *path1 = NULL;
1590         const char *path2 = NULL;
1591         u64 ino1 = 0, ino2 = 0;
1592         int pathlen1 = 0, pathlen2 = 0;
1593         int freepath1 = 0, freepath2 = 0;
1594         int len;
1595         u16 releases;
1596         void *p, *end;
1597         int ret;
1598
1599         ret = set_request_path_attr(req->r_inode, req->r_dentry,
1600                               req->r_path1, req->r_ino1.ino,
1601                               &path1, &pathlen1, &ino1, &freepath1);
1602         if (ret < 0) {
1603                 msg = ERR_PTR(ret);
1604                 goto out;
1605         }
1606
1607         ret = set_request_path_attr(NULL, req->r_old_dentry,
1608                               req->r_path2, req->r_ino2.ino,
1609                               &path2, &pathlen2, &ino2, &freepath2);
1610         if (ret < 0) {
1611                 msg = ERR_PTR(ret);
1612                 goto out_free1;
1613         }
1614
1615         len = sizeof(*head) +
1616                 pathlen1 + pathlen2 + 2*(1 + sizeof(u32) + sizeof(u64));
1617
1618         /* calculate (max) length for cap releases */
1619         len += sizeof(struct ceph_mds_request_release) *
1620                 (!!req->r_inode_drop + !!req->r_dentry_drop +
1621                  !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
1622         if (req->r_dentry_drop)
1623                 len += req->r_dentry->d_name.len;
1624         if (req->r_old_dentry_drop)
1625                 len += req->r_old_dentry->d_name.len;
1626
1627         msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, GFP_NOFS);
1628         if (!msg) {
1629                 msg = ERR_PTR(-ENOMEM);
1630                 goto out_free2;
1631         }
1632
1633         msg->hdr.tid = cpu_to_le64(req->r_tid);
1634
1635         head = msg->front.iov_base;
1636         p = msg->front.iov_base + sizeof(*head);
1637         end = msg->front.iov_base + msg->front.iov_len;
1638
1639         head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
1640         head->op = cpu_to_le32(req->r_op);
1641         head->caller_uid = cpu_to_le32(req->r_uid);
1642         head->caller_gid = cpu_to_le32(req->r_gid);
1643         head->args = req->r_args;
1644
1645         ceph_encode_filepath(&p, end, ino1, path1);
1646         ceph_encode_filepath(&p, end, ino2, path2);
1647
1648         /* make note of release offset, in case we need to replay */
1649         req->r_request_release_offset = p - msg->front.iov_base;
1650
1651         /* cap releases */
1652         releases = 0;
1653         if (req->r_inode_drop)
1654                 releases += ceph_encode_inode_release(&p,
1655                       req->r_inode ? req->r_inode : req->r_dentry->d_inode,
1656                       mds, req->r_inode_drop, req->r_inode_unless, 0);
1657         if (req->r_dentry_drop)
1658                 releases += ceph_encode_dentry_release(&p, req->r_dentry,
1659                        mds, req->r_dentry_drop, req->r_dentry_unless);
1660         if (req->r_old_dentry_drop)
1661                 releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
1662                        mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
1663         if (req->r_old_inode_drop)
1664                 releases += ceph_encode_inode_release(&p,
1665                       req->r_old_dentry->d_inode,
1666                       mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
1667         head->num_releases = cpu_to_le16(releases);
1668
1669         BUG_ON(p > end);
1670         msg->front.iov_len = p - msg->front.iov_base;
1671         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1672
1673         msg->pages = req->r_pages;
1674         msg->nr_pages = req->r_num_pages;
1675         msg->hdr.data_len = cpu_to_le32(req->r_data_len);
1676         msg->hdr.data_off = cpu_to_le16(0);
1677
1678 out_free2:
1679         if (freepath2)
1680                 kfree((char *)path2);
1681 out_free1:
1682         if (freepath1)
1683                 kfree((char *)path1);
1684 out:
1685         return msg;
1686 }
1687
1688 /*
1689  * called under mdsc->mutex if error, under no mutex if
1690  * success.
1691  */
1692 static void complete_request(struct ceph_mds_client *mdsc,
1693                              struct ceph_mds_request *req)
1694 {
1695         if (req->r_callback)
1696                 req->r_callback(mdsc, req);
1697         else
1698                 complete_all(&req->r_completion);
1699 }
1700
1701 /*
1702  * called under mdsc->mutex
1703  */
1704 static int __prepare_send_request(struct ceph_mds_client *mdsc,
1705                                   struct ceph_mds_request *req,
1706                                   int mds)
1707 {
1708         struct ceph_mds_request_head *rhead;
1709         struct ceph_msg *msg;
1710         int flags = 0;
1711
1712         req->r_attempts++;
1713         if (req->r_inode) {
1714                 struct ceph_cap *cap =
1715                         ceph_get_cap_for_mds(ceph_inode(req->r_inode), mds);
1716
1717                 if (cap)
1718                         req->r_sent_on_mseq = cap->mseq;
1719                 else
1720                         req->r_sent_on_mseq = -1;
1721         }
1722         dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
1723              req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
1724
1725         if (req->r_got_unsafe) {
1726                 /*
1727                  * Replay.  Do not regenerate message (and rebuild
1728                  * paths, etc.); just use the original message.
1729                  * Rebuilding paths will break for renames because
1730                  * d_move mangles the src name.
1731                  */
1732                 msg = req->r_request;
1733                 rhead = msg->front.iov_base;
1734
1735                 flags = le32_to_cpu(rhead->flags);
1736                 flags |= CEPH_MDS_FLAG_REPLAY;
1737                 rhead->flags = cpu_to_le32(flags);
1738
1739                 if (req->r_target_inode)
1740                         rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
1741
1742                 rhead->num_retry = req->r_attempts - 1;
1743
1744                 /* remove cap/dentry releases from message */
1745                 rhead->num_releases = 0;
1746                 msg->hdr.front_len = cpu_to_le32(req->r_request_release_offset);
1747                 msg->front.iov_len = req->r_request_release_offset;
1748                 return 0;
1749         }
1750
1751         if (req->r_request) {
1752                 ceph_msg_put(req->r_request);
1753                 req->r_request = NULL;
1754         }
1755         msg = create_request_message(mdsc, req, mds);
1756         if (IS_ERR(msg)) {
1757                 req->r_err = PTR_ERR(msg);
1758                 complete_request(mdsc, req);
1759                 return PTR_ERR(msg);
1760         }
1761         req->r_request = msg;
1762
1763         rhead = msg->front.iov_base;
1764         rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
1765         if (req->r_got_unsafe)
1766                 flags |= CEPH_MDS_FLAG_REPLAY;
1767         if (req->r_locked_dir)
1768                 flags |= CEPH_MDS_FLAG_WANT_DENTRY;
1769         rhead->flags = cpu_to_le32(flags);
1770         rhead->num_fwd = req->r_num_fwd;
1771         rhead->num_retry = req->r_attempts - 1;
1772         rhead->ino = 0;
1773
1774         dout(" r_locked_dir = %p\n", req->r_locked_dir);
1775         return 0;
1776 }
1777
1778 /*
1779  * send request, or put it on the appropriate wait list.
1780  */
1781 static int __do_request(struct ceph_mds_client *mdsc,
1782                         struct ceph_mds_request *req)
1783 {
1784         struct ceph_mds_session *session = NULL;
1785         int mds = -1;
1786         int err = -EAGAIN;
1787
1788         if (req->r_err || req->r_got_result)
1789                 goto out;
1790
1791         if (req->r_timeout &&
1792             time_after_eq(jiffies, req->r_started + req->r_timeout)) {
1793                 dout("do_request timed out\n");
1794                 err = -EIO;
1795                 goto finish;
1796         }
1797
1798         put_request_session(req);
1799
1800         mds = __choose_mds(mdsc, req);
1801         if (mds < 0 ||
1802             ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
1803                 dout("do_request no mds or not active, waiting for map\n");
1804                 list_add(&req->r_wait, &mdsc->waiting_for_map);
1805                 goto out;
1806         }
1807
1808         /* get, open session */
1809         session = __ceph_lookup_mds_session(mdsc, mds);
1810         if (!session) {
1811                 session = register_session(mdsc, mds);
1812                 if (IS_ERR(session)) {
1813                         err = PTR_ERR(session);
1814                         goto finish;
1815                 }
1816         }
1817         req->r_session = get_session(session);
1818
1819         dout("do_request mds%d session %p state %s\n", mds, session,
1820              session_state_name(session->s_state));
1821         if (session->s_state != CEPH_MDS_SESSION_OPEN &&
1822             session->s_state != CEPH_MDS_SESSION_HUNG) {
1823                 if (session->s_state == CEPH_MDS_SESSION_NEW ||
1824                     session->s_state == CEPH_MDS_SESSION_CLOSING)
1825                         __open_session(mdsc, session);
1826                 list_add(&req->r_wait, &session->s_waiting);
1827                 goto out_session;
1828         }
1829
1830         /* send request */
1831         req->r_resend_mds = -1;   /* forget any previous mds hint */
1832
1833         if (req->r_request_started == 0)   /* note request start time */
1834                 req->r_request_started = jiffies;
1835
1836         err = __prepare_send_request(mdsc, req, mds);
1837         if (!err) {
1838                 ceph_msg_get(req->r_request);
1839                 ceph_con_send(&session->s_con, req->r_request);
1840         }
1841
1842 out_session:
1843         ceph_put_mds_session(session);
1844 out:
1845         return err;
1846
1847 finish:
1848         req->r_err = err;
1849         complete_request(mdsc, req);
1850         goto out;
1851 }
1852
1853 /*
1854  * called under mdsc->mutex
1855  */
1856 static void __wake_requests(struct ceph_mds_client *mdsc,
1857                             struct list_head *head)
1858 {
1859         struct ceph_mds_request *req, *nreq;
1860
1861         list_for_each_entry_safe(req, nreq, head, r_wait) {
1862                 list_del_init(&req->r_wait);
1863                 __do_request(mdsc, req);
1864         }
1865 }
1866
1867 /*
1868  * Wake up threads with requests pending for @mds, so that they can
1869  * resubmit their requests to a possibly different mds.
1870  */
1871 static void kick_requests(struct ceph_mds_client *mdsc, int mds)
1872 {
1873         struct ceph_mds_request *req;
1874         struct rb_node *p;
1875
1876         dout("kick_requests mds%d\n", mds);
1877         for (p = rb_first(&mdsc->request_tree); p; p = rb_next(p)) {
1878                 req = rb_entry(p, struct ceph_mds_request, r_node);
1879                 if (req->r_got_unsafe)
1880                         continue;
1881                 if (req->r_session &&
1882                     req->r_session->s_mds == mds) {
1883                         dout(" kicking tid %llu\n", req->r_tid);
1884                         __do_request(mdsc, req);
1885                 }
1886         }
1887 }
1888
1889 void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
1890                               struct ceph_mds_request *req)
1891 {
1892         dout("submit_request on %p\n", req);
1893         mutex_lock(&mdsc->mutex);
1894         __register_request(mdsc, req, NULL);
1895         __do_request(mdsc, req);
1896         mutex_unlock(&mdsc->mutex);
1897 }
1898
1899 /*
1900  * Synchrously perform an mds request.  Take care of all of the
1901  * session setup, forwarding, retry details.
1902  */
1903 int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
1904                          struct inode *dir,
1905                          struct ceph_mds_request *req)
1906 {
1907         int err;
1908
1909         dout("do_request on %p\n", req);
1910
1911         /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
1912         if (req->r_inode)
1913                 ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
1914         if (req->r_locked_dir)
1915                 ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
1916         if (req->r_old_dentry)
1917                 ceph_get_cap_refs(
1918                         ceph_inode(req->r_old_dentry->d_parent->d_inode),
1919                         CEPH_CAP_PIN);
1920
1921         /* issue */
1922         mutex_lock(&mdsc->mutex);
1923         __register_request(mdsc, req, dir);
1924         __do_request(mdsc, req);
1925
1926         if (req->r_err) {
1927                 err = req->r_err;
1928                 __unregister_request(mdsc, req);
1929                 dout("do_request early error %d\n", err);
1930                 goto out;
1931         }
1932
1933         /* wait */
1934         mutex_unlock(&mdsc->mutex);
1935         dout("do_request waiting\n");
1936         if (req->r_timeout) {
1937                 err = (long)wait_for_completion_killable_timeout(
1938                         &req->r_completion, req->r_timeout);
1939                 if (err == 0)
1940                         err = -EIO;
1941         } else {
1942                 err = wait_for_completion_killable(&req->r_completion);
1943         }
1944         dout("do_request waited, got %d\n", err);
1945         mutex_lock(&mdsc->mutex);
1946
1947         /* only abort if we didn't race with a real reply */
1948         if (req->r_got_result) {
1949                 err = le32_to_cpu(req->r_reply_info.head->result);
1950         } else if (err < 0) {
1951                 dout("aborted request %lld with %d\n", req->r_tid, err);
1952
1953                 /*
1954                  * ensure we aren't running concurrently with
1955                  * ceph_fill_trace or ceph_readdir_prepopulate, which
1956                  * rely on locks (dir mutex) held by our caller.
1957                  */
1958                 mutex_lock(&req->r_fill_mutex);
1959                 req->r_err = err;
1960                 req->r_aborted = true;
1961                 mutex_unlock(&req->r_fill_mutex);
1962
1963                 if (req->r_locked_dir &&
1964                     (req->r_op & CEPH_MDS_OP_WRITE))
1965                         ceph_invalidate_dir_request(req);
1966         } else {
1967                 err = req->r_err;
1968         }
1969
1970 out:
1971         mutex_unlock(&mdsc->mutex);
1972         dout("do_request %p done, result %d\n", req, err);
1973         return err;
1974 }
1975
1976 /*
1977  * Invalidate dir I_COMPLETE, dentry lease state on an aborted MDS
1978  * namespace request.
1979  */
1980 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
1981 {
1982         struct inode *inode = req->r_locked_dir;
1983         struct ceph_inode_info *ci = ceph_inode(inode);
1984
1985         dout("invalidate_dir_request %p (I_COMPLETE, lease(s))\n", inode);
1986         spin_lock(&inode->i_lock);
1987         ci->i_ceph_flags &= ~CEPH_I_COMPLETE;
1988         ci->i_release_count++;
1989         spin_unlock(&inode->i_lock);
1990
1991         if (req->r_dentry)
1992                 ceph_invalidate_dentry_lease(req->r_dentry);
1993         if (req->r_old_dentry)
1994                 ceph_invalidate_dentry_lease(req->r_old_dentry);
1995 }
1996
1997 /*
1998  * Handle mds reply.
1999  *
2000  * We take the session mutex and parse and process the reply immediately.
2001  * This preserves the logical ordering of replies, capabilities, etc., sent
2002  * by the MDS as they are applied to our local cache.
2003  */
2004 static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
2005 {
2006         struct ceph_mds_client *mdsc = session->s_mdsc;
2007         struct ceph_mds_request *req;
2008         struct ceph_mds_reply_head *head = msg->front.iov_base;
2009         struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
2010         u64 tid;
2011         int err, result;
2012         int mds = session->s_mds;
2013
2014         if (msg->front.iov_len < sizeof(*head)) {
2015                 pr_err("mdsc_handle_reply got corrupt (short) reply\n");
2016                 ceph_msg_dump(msg);
2017                 return;
2018         }
2019
2020         /* get request, session */
2021         tid = le64_to_cpu(msg->hdr.tid);
2022         mutex_lock(&mdsc->mutex);
2023         req = __lookup_request(mdsc, tid);
2024         if (!req) {
2025                 dout("handle_reply on unknown tid %llu\n", tid);
2026                 mutex_unlock(&mdsc->mutex);
2027                 return;
2028         }
2029         dout("handle_reply %p\n", req);
2030
2031         /* correct session? */
2032         if (req->r_session != session) {
2033                 pr_err("mdsc_handle_reply got %llu on session mds%d"
2034                        " not mds%d\n", tid, session->s_mds,
2035                        req->r_session ? req->r_session->s_mds : -1);
2036                 mutex_unlock(&mdsc->mutex);
2037                 goto out;
2038         }
2039
2040         /* dup? */
2041         if ((req->r_got_unsafe && !head->safe) ||
2042             (req->r_got_safe && head->safe)) {
2043                 pr_warning("got a dup %s reply on %llu from mds%d\n",
2044                            head->safe ? "safe" : "unsafe", tid, mds);
2045                 mutex_unlock(&mdsc->mutex);
2046                 goto out;
2047         }
2048         if (req->r_got_safe && !head->safe) {
2049                 pr_warning("got unsafe after safe on %llu from mds%d\n",
2050                            tid, mds);
2051                 mutex_unlock(&mdsc->mutex);
2052                 goto out;
2053         }
2054
2055         result = le32_to_cpu(head->result);
2056
2057         /*
2058          * Handle an ESTALE
2059          * if we're not talking to the authority, send to them
2060          * if the authority has changed while we weren't looking,
2061          * send to new authority
2062          * Otherwise we just have to return an ESTALE
2063          */
2064         if (result == -ESTALE) {
2065                 dout("got ESTALE on request %llu", req->r_tid);
2066                 if (!req->r_inode) {
2067                         /* do nothing; not an authority problem */
2068                 } else if (req->r_direct_mode != USE_AUTH_MDS) {
2069                         dout("not using auth, setting for that now");
2070                         req->r_direct_mode = USE_AUTH_MDS;
2071                         __do_request(mdsc, req);
2072                         mutex_unlock(&mdsc->mutex);
2073                         goto out;
2074                 } else  {
2075                         struct ceph_inode_info *ci = ceph_inode(req->r_inode);
2076                         struct ceph_cap *cap = NULL;
2077
2078                         if (req->r_session)
2079                                 cap = ceph_get_cap_for_mds(ci,
2080                                                    req->r_session->s_mds);
2081
2082                         dout("already using auth");
2083                         if ((!cap || cap != ci->i_auth_cap) ||
2084                             (cap->mseq != req->r_sent_on_mseq)) {
2085                                 dout("but cap changed, so resending");
2086                                 __do_request(mdsc, req);
2087                                 mutex_unlock(&mdsc->mutex);
2088                                 goto out;
2089                         }
2090                 }
2091                 dout("have to return ESTALE on request %llu", req->r_tid);
2092         }
2093
2094
2095         if (head->safe) {
2096                 req->r_got_safe = true;
2097                 __unregister_request(mdsc, req);
2098                 complete_all(&req->r_safe_completion);
2099
2100                 if (req->r_got_unsafe) {
2101                         /*
2102                          * We already handled the unsafe response, now do the
2103                          * cleanup.  No need to examine the response; the MDS
2104                          * doesn't include any result info in the safe
2105                          * response.  And even if it did, there is nothing
2106                          * useful we could do with a revised return value.
2107                          */
2108                         dout("got safe reply %llu, mds%d\n", tid, mds);
2109                         list_del_init(&req->r_unsafe_item);
2110
2111                         /* last unsafe request during umount? */
2112                         if (mdsc->stopping && !__get_oldest_req(mdsc))
2113                                 complete_all(&mdsc->safe_umount_waiters);
2114                         mutex_unlock(&mdsc->mutex);
2115                         goto out;
2116                 }
2117         } else {
2118                 req->r_got_unsafe = true;
2119                 list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
2120         }
2121
2122         dout("handle_reply tid %lld result %d\n", tid, result);
2123         rinfo = &req->r_reply_info;
2124         err = parse_reply_info(msg, rinfo, session->s_con.peer_features);
2125         mutex_unlock(&mdsc->mutex);
2126
2127         mutex_lock(&session->s_mutex);
2128         if (err < 0) {
2129                 pr_err("mdsc_handle_reply got corrupt reply mds%d(tid:%lld)\n", mds, tid);
2130                 ceph_msg_dump(msg);
2131                 goto out_err;
2132         }
2133
2134         /* snap trace */
2135         if (rinfo->snapblob_len) {
2136                 down_write(&mdsc->snap_rwsem);
2137                 ceph_update_snap_trace(mdsc, rinfo->snapblob,
2138                                rinfo->snapblob + rinfo->snapblob_len,
2139                                le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
2140                 downgrade_write(&mdsc->snap_rwsem);
2141         } else {
2142                 down_read(&mdsc->snap_rwsem);
2143         }
2144
2145         /* insert trace into our cache */
2146         mutex_lock(&req->r_fill_mutex);
2147         err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
2148         if (err == 0) {
2149                 if (result == 0 && req->r_op != CEPH_MDS_OP_GETFILELOCK &&
2150                     rinfo->dir_nr)
2151                         ceph_readdir_prepopulate(req, req->r_session);
2152                 ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
2153         }
2154         mutex_unlock(&req->r_fill_mutex);
2155
2156         up_read(&mdsc->snap_rwsem);
2157 out_err:
2158         mutex_lock(&mdsc->mutex);
2159         if (!req->r_aborted) {
2160                 if (err) {
2161                         req->r_err = err;
2162                 } else {
2163                         req->r_reply = msg;
2164                         ceph_msg_get(msg);
2165                         req->r_got_result = true;
2166                 }
2167         } else {
2168                 dout("reply arrived after request %lld was aborted\n", tid);
2169         }
2170         mutex_unlock(&mdsc->mutex);
2171
2172         ceph_add_cap_releases(mdsc, req->r_session);
2173         mutex_unlock(&session->s_mutex);
2174
2175         /* kick calling process */
2176         complete_request(mdsc, req);
2177 out:
2178         ceph_mdsc_put_request(req);
2179         return;
2180 }
2181
2182
2183
2184 /*
2185  * handle mds notification that our request has been forwarded.
2186  */
2187 static void handle_forward(struct ceph_mds_client *mdsc,
2188                            struct ceph_mds_session *session,
2189                            struct ceph_msg *msg)
2190 {
2191         struct ceph_mds_request *req;
2192         u64 tid = le64_to_cpu(msg->hdr.tid);
2193         u32 next_mds;
2194         u32 fwd_seq;
2195         int err = -EINVAL;
2196         void *p = msg->front.iov_base;
2197         void *end = p + msg->front.iov_len;
2198
2199         ceph_decode_need(&p, end, 2*sizeof(u32), bad);
2200         next_mds = ceph_decode_32(&p);
2201         fwd_seq = ceph_decode_32(&p);
2202
2203         mutex_lock(&mdsc->mutex);
2204         req = __lookup_request(mdsc, tid);
2205         if (!req) {
2206                 dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
2207                 goto out;  /* dup reply? */
2208         }
2209
2210         if (req->r_aborted) {
2211                 dout("forward tid %llu aborted, unregistering\n", tid);
2212                 __unregister_request(mdsc, req);
2213         } else if (fwd_seq <= req->r_num_fwd) {
2214                 dout("forward tid %llu to mds%d - old seq %d <= %d\n",
2215                      tid, next_mds, req->r_num_fwd, fwd_seq);
2216         } else {
2217                 /* resend. forward race not possible; mds would drop */
2218                 dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
2219                 BUG_ON(req->r_err);
2220                 BUG_ON(req->r_got_result);
2221                 req->r_num_fwd = fwd_seq;
2222                 req->r_resend_mds = next_mds;
2223                 put_request_session(req);
2224                 __do_request(mdsc, req);
2225         }
2226         ceph_mdsc_put_request(req);
2227 out:
2228         mutex_unlock(&mdsc->mutex);
2229         return;
2230
2231 bad:
2232         pr_err("mdsc_handle_forward decode error err=%d\n", err);
2233 }
2234
2235 /*
2236  * handle a mds session control message
2237  */
2238 static void handle_session(struct ceph_mds_session *session,
2239                            struct ceph_msg *msg)
2240 {
2241         struct ceph_mds_client *mdsc = session->s_mdsc;
2242         u32 op;
2243         u64 seq;
2244         int mds = session->s_mds;
2245         struct ceph_mds_session_head *h = msg->front.iov_base;
2246         int wake = 0;
2247
2248         /* decode */
2249         if (msg->front.iov_len != sizeof(*h))
2250                 goto bad;
2251         op = le32_to_cpu(h->op);
2252         seq = le64_to_cpu(h->seq);
2253
2254         mutex_lock(&mdsc->mutex);
2255         if (op == CEPH_SESSION_CLOSE)
2256                 __unregister_session(mdsc, session);
2257         /* FIXME: this ttl calculation is generous */
2258         session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
2259         mutex_unlock(&mdsc->mutex);
2260
2261         mutex_lock(&session->s_mutex);
2262
2263         dout("handle_session mds%d %s %p state %s seq %llu\n",
2264              mds, ceph_session_op_name(op), session,
2265              session_state_name(session->s_state), seq);
2266
2267         if (session->s_state == CEPH_MDS_SESSION_HUNG) {
2268                 session->s_state = CEPH_MDS_SESSION_OPEN;
2269                 pr_info("mds%d came back\n", session->s_mds);
2270         }
2271
2272         switch (op) {
2273         case CEPH_SESSION_OPEN:
2274                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2275                         pr_info("mds%d reconnect success\n", session->s_mds);
2276                 session->s_state = CEPH_MDS_SESSION_OPEN;
2277                 renewed_caps(mdsc, session, 0);
2278                 wake = 1;
2279                 if (mdsc->stopping)
2280                         __close_session(mdsc, session);
2281                 break;
2282
2283         case CEPH_SESSION_RENEWCAPS:
2284                 if (session->s_renew_seq == seq)
2285                         renewed_caps(mdsc, session, 1);
2286                 break;
2287
2288         case CEPH_SESSION_CLOSE:
2289                 if (session->s_state == CEPH_MDS_SESSION_RECONNECTING)
2290                         pr_info("mds%d reconnect denied\n", session->s_mds);
2291                 remove_session_caps(session);
2292                 wake = 1; /* for good measure */
2293                 wake_up_all(&mdsc->session_close_wq);
2294                 kick_requests(mdsc, mds);
2295                 break;
2296
2297         case CEPH_SESSION_STALE:
2298                 pr_info("mds%d caps went stale, renewing\n",
2299                         session->s_mds);
2300                 spin_lock(&session->s_cap_lock);
2301                 session->s_cap_gen++;
2302                 session->s_cap_ttl = 0;
2303                 spin_unlock(&session->s_cap_lock);
2304                 send_renew_caps(mdsc, session);
2305                 break;
2306
2307         case CEPH_SESSION_RECALL_STATE:
2308                 trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
2309                 break;
2310
2311         default:
2312                 pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
2313                 WARN_ON(1);
2314         }
2315
2316         mutex_unlock(&session->s_mutex);
2317         if (wake) {
2318                 mutex_lock(&mdsc->mutex);
2319                 __wake_requests(mdsc, &session->s_waiting);
2320                 mutex_unlock(&mdsc->mutex);
2321         }
2322         return;
2323
2324 bad:
2325         pr_err("mdsc_handle_session corrupt message mds%d len %d\n", mds,
2326                (int)msg->front.iov_len);
2327         ceph_msg_dump(msg);
2328         return;
2329 }
2330
2331
2332 /*
2333  * called under session->mutex.
2334  */
2335 static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
2336                                    struct ceph_mds_session *session)
2337 {
2338         struct ceph_mds_request *req, *nreq;
2339         int err;
2340
2341         dout("replay_unsafe_requests mds%d\n", session->s_mds);
2342
2343         mutex_lock(&mdsc->mutex);
2344         list_for_each_entry_safe(req, nreq, &session->s_unsafe, r_unsafe_item) {
2345                 err = __prepare_send_request(mdsc, req, session->s_mds);
2346                 if (!err) {
2347                         ceph_msg_get(req->r_request);
2348                         ceph_con_send(&session->s_con, req->r_request);
2349                 }
2350         }
2351         mutex_unlock(&mdsc->mutex);
2352 }
2353
2354 /*
2355  * Encode information about a cap for a reconnect with the MDS.
2356  */
2357 static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
2358                           void *arg)
2359 {
2360         union {
2361                 struct ceph_mds_cap_reconnect v2;
2362                 struct ceph_mds_cap_reconnect_v1 v1;
2363         } rec;
2364         size_t reclen;
2365         struct ceph_inode_info *ci;
2366         struct ceph_reconnect_state *recon_state = arg;
2367         struct ceph_pagelist *pagelist = recon_state->pagelist;
2368         char *path;
2369         int pathlen, err;
2370         u64 pathbase;
2371         struct dentry *dentry;
2372
2373         ci = cap->ci;
2374
2375         dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
2376              inode, ceph_vinop(inode), cap, cap->cap_id,
2377              ceph_cap_string(cap->issued));
2378         err = ceph_pagelist_encode_64(pagelist, ceph_ino(inode));
2379         if (err)
2380                 return err;
2381
2382         dentry = d_find_alias(inode);
2383         if (dentry) {
2384                 path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
2385                 if (IS_ERR(path)) {
2386                         err = PTR_ERR(path);
2387                         goto out_dput;
2388                 }
2389         } else {
2390                 path = NULL;
2391                 pathlen = 0;
2392         }
2393         err = ceph_pagelist_encode_string(pagelist, path, pathlen);
2394         if (err)
2395                 goto out_free;
2396
2397         spin_lock(&inode->i_lock);
2398         cap->seq = 0;        /* reset cap seq */
2399         cap->issue_seq = 0;  /* and issue_seq */
2400
2401         if (recon_state->flock) {
2402                 rec.v2.cap_id = cpu_to_le64(cap->cap_id);
2403                 rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2404                 rec.v2.issued = cpu_to_le32(cap->issued);
2405                 rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2406                 rec.v2.pathbase = cpu_to_le64(pathbase);
2407                 rec.v2.flock_len = 0;
2408                 reclen = sizeof(rec.v2);
2409         } else {
2410                 rec.v1.cap_id = cpu_to_le64(cap->cap_id);
2411                 rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
2412                 rec.v1.issued = cpu_to_le32(cap->issued);
2413                 rec.v1.size = cpu_to_le64(inode->i_size);
2414                 ceph_encode_timespec(&rec.v1.mtime, &inode->i_mtime);
2415                 ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
2416                 rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
2417                 rec.v1.pathbase = cpu_to_le64(pathbase);
2418                 reclen = sizeof(rec.v1);
2419         }
2420         spin_unlock(&inode->i_lock);
2421
2422         if (recon_state->flock) {
2423                 int num_fcntl_locks, num_flock_locks;
2424                 struct ceph_pagelist_cursor trunc_point;
2425
2426                 ceph_pagelist_set_cursor(pagelist, &trunc_point);
2427                 do {
2428                         lock_flocks();
2429                         ceph_count_locks(inode, &num_fcntl_locks,
2430                                          &num_flock_locks);
2431                         rec.v2.flock_len = (2*sizeof(u32) +
2432                                             (num_fcntl_locks+num_flock_locks) *
2433                                             sizeof(struct ceph_filelock));
2434                         unlock_flocks();
2435
2436                         /* pre-alloc pagelist */
2437                         ceph_pagelist_truncate(pagelist, &trunc_point);
2438                         err = ceph_pagelist_append(pagelist, &rec, reclen);
2439                         if (!err)
2440                                 err = ceph_pagelist_reserve(pagelist,
2441                                                             rec.v2.flock_len);
2442
2443                         /* encode locks */
2444                         if (!err) {
2445                                 lock_flocks();
2446                                 err = ceph_encode_locks(inode,
2447                                                         pagelist,
2448                                                         num_fcntl_locks,
2449                                                         num_flock_locks);
2450                                 unlock_flocks();
2451                         }
2452                 } while (err == -ENOSPC);
2453         } else {
2454                 err = ceph_pagelist_append(pagelist, &rec, reclen);
2455         }
2456
2457 out_free:
2458         kfree(path);
2459 out_dput:
2460         dput(dentry);
2461         return err;
2462 }
2463
2464
2465 /*
2466  * If an MDS fails and recovers, clients need to reconnect in order to
2467  * reestablish shared state.  This includes all caps issued through
2468  * this session _and_ the snap_realm hierarchy.  Because it's not
2469  * clear which snap realms the mds cares about, we send everything we
2470  * know about.. that ensures we'll then get any new info the
2471  * recovering MDS might have.
2472  *
2473  * This is a relatively heavyweight operation, but it's rare.
2474  *
2475  * called with mdsc->mutex held.
2476  */
2477 static void send_mds_reconnect(struct ceph_mds_client *mdsc,
2478                                struct ceph_mds_session *session)
2479 {
2480         struct ceph_msg *reply;
2481         struct rb_node *p;
2482         int mds = session->s_mds;
2483         int err = -ENOMEM;
2484         struct ceph_pagelist *pagelist;
2485         struct ceph_reconnect_state recon_state;
2486
2487         pr_info("mds%d reconnect start\n", mds);
2488
2489         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
2490         if (!pagelist)
2491                 goto fail_nopagelist;
2492         ceph_pagelist_init(pagelist);
2493
2494         reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, 0, GFP_NOFS);
2495         if (!reply)
2496                 goto fail_nomsg;
2497
2498         mutex_lock(&session->s_mutex);
2499         session->s_state = CEPH_MDS_SESSION_RECONNECTING;
2500         session->s_seq = 0;
2501
2502         ceph_con_open(&session->s_con,
2503                       ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
2504
2505         /* replay unsafe requests */
2506         replay_unsafe_requests(mdsc, session);
2507
2508         down_read(&mdsc->snap_rwsem);
2509
2510         dout("session %p state %s\n", session,
2511              session_state_name(session->s_state));
2512
2513         /* drop old cap expires; we're about to reestablish that state */
2514         discard_cap_releases(mdsc, session);
2515
2516         /* traverse this session's caps */
2517         err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
2518         if (err)
2519                 goto fail;
2520
2521         recon_state.pagelist = pagelist;
2522         recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
2523         err = iterate_session_caps(session, encode_caps_cb, &recon_state);
2524         if (err < 0)
2525                 goto fail;
2526
2527         /*
2528          * snaprealms.  we provide mds with the ino, seq (version), and
2529          * parent for all of our realms.  If the mds has any newer info,
2530          * it will tell us.
2531          */
2532         for (p = rb_first(&mdsc->snap_realms); p; p = rb_next(p)) {
2533                 struct ceph_snap_realm *realm =
2534                         rb_entry(p, struct ceph_snap_realm, node);
2535                 struct ceph_mds_snaprealm_reconnect sr_rec;
2536
2537                 dout(" adding snap realm %llx seq %lld parent %llx\n",
2538                      realm->ino, realm->seq, realm->parent_ino);
2539                 sr_rec.ino = cpu_to_le64(realm->ino);
2540                 sr_rec.seq = cpu_to_le64(realm->seq);
2541                 sr_rec.parent = cpu_to_le64(realm->parent_ino);
2542                 err = ceph_pagelist_append(pagelist, &sr_rec, sizeof(sr_rec));
2543                 if (err)
2544                         goto fail;
2545         }
2546
2547         reply->pagelist = pagelist;
2548         if (recon_state.flock)
2549                 reply->hdr.version = cpu_to_le16(2);
2550         reply->hdr.data_len = cpu_to_le32(pagelist->length);
2551         reply->nr_pages = calc_pages_for(0, pagelist->length);
2552         ceph_con_send(&session->s_con, reply);
2553
2554         mutex_unlock(&session->s_mutex);
2555
2556         mutex_lock(&mdsc->mutex);
2557         __wake_requests(mdsc, &session->s_waiting);
2558         mutex_unlock(&mdsc->mutex);
2559
2560         up_read(&mdsc->snap_rwsem);
2561         return;
2562
2563 fail:
2564         ceph_msg_put(reply);
2565         up_read(&mdsc->snap_rwsem);
2566         mutex_unlock(&session->s_mutex);
2567 fail_nomsg:
2568         ceph_pagelist_release(pagelist);
2569         kfree(pagelist);
2570 fail_nopagelist:
2571         pr_err("error %d preparing reconnect for mds%d\n", err, mds);
2572         return;
2573 }
2574
2575
2576 /*
2577  * compare old and new mdsmaps, kicking requests
2578  * and closing out old connections as necessary
2579  *
2580  * called under mdsc->mutex.
2581  */
2582 static void check_new_map(struct ceph_mds_client *mdsc,
2583                           struct ceph_mdsmap *newmap,
2584                           struct ceph_mdsmap *oldmap)
2585 {
2586         int i;
2587         int oldstate, newstate;
2588         struct ceph_mds_session *s;
2589
2590         dout("check_new_map new %u old %u\n",
2591              newmap->m_epoch, oldmap->m_epoch);
2592
2593         for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
2594                 if (mdsc->sessions[i] == NULL)
2595                         continue;
2596                 s = mdsc->sessions[i];
2597                 oldstate = ceph_mdsmap_get_state(oldmap, i);
2598                 newstate = ceph_mdsmap_get_state(newmap, i);
2599
2600                 dout("check_new_map mds%d state %s%s -> %s%s (session %s)\n",
2601                      i, ceph_mds_state_name(oldstate),
2602                      ceph_mdsmap_is_laggy(oldmap, i) ? " (laggy)" : "",
2603                      ceph_mds_state_name(newstate),
2604                      ceph_mdsmap_is_laggy(newmap, i) ? " (laggy)" : "",
2605                      session_state_name(s->s_state));
2606
2607                 if (memcmp(ceph_mdsmap_get_addr(oldmap, i),
2608                            ceph_mdsmap_get_addr(newmap, i),
2609                            sizeof(struct ceph_entity_addr))) {
2610                         if (s->s_state == CEPH_MDS_SESSION_OPENING) {
2611                                 /* the session never opened, just close it
2612                                  * out now */
2613                                 __wake_requests(mdsc, &s->s_waiting);
2614                                 __unregister_session(mdsc, s);
2615                         } else {
2616                                 /* just close it */
2617                                 mutex_unlock(&mdsc->mutex);
2618                                 mutex_lock(&s->s_mutex);
2619                                 mutex_lock(&mdsc->mutex);
2620                                 ceph_con_close(&s->s_con);
2621                                 mutex_unlock(&s->s_mutex);
2622                                 s->s_state = CEPH_MDS_SESSION_RESTARTING;
2623                         }
2624
2625                         /* kick any requests waiting on the recovering mds */
2626                         kick_requests(mdsc, i);
2627                 } else if (oldstate == newstate) {
2628                         continue;  /* nothing new with this mds */
2629                 }
2630
2631                 /*
2632                  * send reconnect?
2633                  */
2634                 if (s->s_state == CEPH_MDS_SESSION_RESTARTING &&
2635                     newstate >= CEPH_MDS_STATE_RECONNECT) {
2636                         mutex_unlock(&mdsc->mutex);
2637                         send_mds_reconnect(mdsc, s);
2638                         mutex_lock(&mdsc->mutex);
2639                 }
2640
2641                 /*
2642                  * kick request on any mds that has gone active.
2643                  */
2644                 if (oldstate < CEPH_MDS_STATE_ACTIVE &&
2645                     newstate >= CEPH_MDS_STATE_ACTIVE) {
2646                         if (oldstate != CEPH_MDS_STATE_CREATING &&
2647                             oldstate != CEPH_MDS_STATE_STARTING)
2648                                 pr_info("mds%d recovery completed\n", s->s_mds);
2649                         kick_requests(mdsc, i);
2650                         ceph_kick_flushing_caps(mdsc, s);
2651                         wake_up_session_caps(s, 1);
2652                 }
2653         }
2654
2655         for (i = 0; i < newmap->m_max_mds && i < mdsc->max_sessions; i++) {
2656                 s = mdsc->sessions[i];
2657                 if (!s)
2658                         continue;
2659                 if (!ceph_mdsmap_is_laggy(newmap, i))
2660                         continue;
2661                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2662                     s->s_state == CEPH_MDS_SESSION_HUNG ||
2663                     s->s_state == CEPH_MDS_SESSION_CLOSING) {
2664                         dout(" connecting to export targets of laggy mds%d\n",
2665                              i);
2666                         __open_export_target_sessions(mdsc, s);
2667                 }
2668         }
2669 }
2670
2671
2672
2673 /*
2674  * leases
2675  */
2676
2677 /*
2678  * caller must hold session s_mutex, dentry->d_lock
2679  */
2680 void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
2681 {
2682         struct ceph_dentry_info *di = ceph_dentry(dentry);
2683
2684         ceph_put_mds_session(di->lease_session);
2685         di->lease_session = NULL;
2686 }
2687
2688 static void handle_lease(struct ceph_mds_client *mdsc,
2689                          struct ceph_mds_session *session,
2690                          struct ceph_msg *msg)
2691 {
2692         struct super_block *sb = mdsc->fsc->sb;
2693         struct inode *inode;
2694         struct ceph_inode_info *ci;
2695         struct dentry *parent, *dentry;
2696         struct ceph_dentry_info *di;
2697         int mds = session->s_mds;
2698         struct ceph_mds_lease *h = msg->front.iov_base;
2699         u32 seq;
2700         struct ceph_vino vino;
2701         int mask;
2702         struct qstr dname;
2703         int release = 0;
2704
2705         dout("handle_lease from mds%d\n", mds);
2706
2707         /* decode */
2708         if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
2709                 goto bad;
2710         vino.ino = le64_to_cpu(h->ino);
2711         vino.snap = CEPH_NOSNAP;
2712         mask = le16_to_cpu(h->mask);
2713         seq = le32_to_cpu(h->seq);
2714         dname.name = (void *)h + sizeof(*h) + sizeof(u32);
2715         dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
2716         if (dname.len != get_unaligned_le32(h+1))
2717                 goto bad;
2718
2719         mutex_lock(&session->s_mutex);
2720         session->s_seq++;
2721
2722         /* lookup inode */
2723         inode = ceph_find_inode(sb, vino);
2724         dout("handle_lease %s, mask %d, ino %llx %p %.*s\n",
2725              ceph_lease_op_name(h->action), mask, vino.ino, inode,
2726              dname.len, dname.name);
2727         if (inode == NULL) {
2728                 dout("handle_lease no inode %llx\n", vino.ino);
2729                 goto release;
2730         }
2731         ci = ceph_inode(inode);
2732
2733         /* dentry */
2734         parent = d_find_alias(inode);
2735         if (!parent) {
2736                 dout("no parent dentry on inode %p\n", inode);
2737                 WARN_ON(1);
2738                 goto release;  /* hrm... */
2739         }
2740         dname.hash = full_name_hash(dname.name, dname.len);
2741         dentry = d_lookup(parent, &dname);
2742         dput(parent);
2743         if (!dentry)
2744                 goto release;
2745
2746         spin_lock(&dentry->d_lock);
2747         di = ceph_dentry(dentry);
2748         switch (h->action) {
2749         case CEPH_MDS_LEASE_REVOKE:
2750                 if (di && di->lease_session == session) {
2751                         if (ceph_seq_cmp(di->lease_seq, seq) > 0)
2752                                 h->seq = cpu_to_le32(di->lease_seq);
2753                         __ceph_mdsc_drop_dentry_lease(dentry);
2754                 }
2755                 release = 1;
2756                 break;
2757
2758         case CEPH_MDS_LEASE_RENEW:
2759                 if (di && di->lease_session == session &&
2760                     di->lease_gen == session->s_cap_gen &&
2761                     di->lease_renew_from &&
2762                     di->lease_renew_after == 0) {
2763                         unsigned long duration =
2764                                 le32_to_cpu(h->duration_ms) * HZ / 1000;
2765
2766                         di->lease_seq = seq;
2767                         dentry->d_time = di->lease_renew_from + duration;
2768                         di->lease_renew_after = di->lease_renew_from +
2769                                 (duration >> 1);
2770                         di->lease_renew_from = 0;
2771                 }
2772                 break;
2773         }
2774         spin_unlock(&dentry->d_lock);
2775         dput(dentry);
2776
2777         if (!release)
2778                 goto out;
2779
2780 release:
2781         /* let's just reuse the same message */
2782         h->action = CEPH_MDS_LEASE_REVOKE_ACK;
2783         ceph_msg_get(msg);
2784         ceph_con_send(&session->s_con, msg);
2785
2786 out:
2787         iput(inode);
2788         mutex_unlock(&session->s_mutex);
2789         return;
2790
2791 bad:
2792         pr_err("corrupt lease message\n");
2793         ceph_msg_dump(msg);
2794 }
2795
2796 void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
2797                               struct inode *inode,
2798                               struct dentry *dentry, char action,
2799                               u32 seq)
2800 {
2801         struct ceph_msg *msg;
2802         struct ceph_mds_lease *lease;
2803         int len = sizeof(*lease) + sizeof(u32);
2804         int dnamelen = 0;
2805
2806         dout("lease_send_msg inode %p dentry %p %s to mds%d\n",
2807              inode, dentry, ceph_lease_op_name(action), session->s_mds);
2808         dnamelen = dentry->d_name.len;
2809         len += dnamelen;
2810
2811         msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, GFP_NOFS);
2812         if (!msg)
2813                 return;
2814         lease = msg->front.iov_base;
2815         lease->action = action;
2816         lease->mask = cpu_to_le16(1);
2817         lease->ino = cpu_to_le64(ceph_vino(inode).ino);
2818         lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
2819         lease->seq = cpu_to_le32(seq);
2820         put_unaligned_le32(dnamelen, lease + 1);
2821         memcpy((void *)(lease + 1) + 4, dentry->d_name.name, dnamelen);
2822
2823         /*
2824          * if this is a preemptive lease RELEASE, no need to
2825          * flush request stream, since the actual request will
2826          * soon follow.
2827          */
2828         msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
2829
2830         ceph_con_send(&session->s_con, msg);
2831 }
2832
2833 /*
2834  * Preemptively release a lease we expect to invalidate anyway.
2835  * Pass @inode always, @dentry is optional.
2836  */
2837 void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
2838                              struct dentry *dentry, int mask)
2839 {
2840         struct ceph_dentry_info *di;
2841         struct ceph_mds_session *session;
2842         u32 seq;
2843
2844         BUG_ON(inode == NULL);
2845         BUG_ON(dentry == NULL);
2846         BUG_ON(mask == 0);
2847
2848         /* is dentry lease valid? */
2849         spin_lock(&dentry->d_lock);
2850         di = ceph_dentry(dentry);
2851         if (!di || !di->lease_session ||
2852             di->lease_session->s_mds < 0 ||
2853             di->lease_gen != di->lease_session->s_cap_gen ||
2854             !time_before(jiffies, dentry->d_time)) {
2855                 dout("lease_release inode %p dentry %p -- "
2856                      "no lease on %d\n",
2857                      inode, dentry, mask);
2858                 spin_unlock(&dentry->d_lock);
2859                 return;
2860         }
2861
2862         /* we do have a lease on this dentry; note mds and seq */
2863         session = ceph_get_mds_session(di->lease_session);
2864         seq = di->lease_seq;
2865         __ceph_mdsc_drop_dentry_lease(dentry);
2866         spin_unlock(&dentry->d_lock);
2867
2868         dout("lease_release inode %p dentry %p mask %d to mds%d\n",
2869              inode, dentry, mask, session->s_mds);
2870         ceph_mdsc_lease_send_msg(session, inode, dentry,
2871                                  CEPH_MDS_LEASE_RELEASE, seq);
2872         ceph_put_mds_session(session);
2873 }
2874
2875 /*
2876  * drop all leases (and dentry refs) in preparation for umount
2877  */
2878 static void drop_leases(struct ceph_mds_client *mdsc)
2879 {
2880         int i;
2881
2882         dout("drop_leases\n");
2883         mutex_lock(&mdsc->mutex);
2884         for (i = 0; i < mdsc->max_sessions; i++) {
2885                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2886                 if (!s)
2887                         continue;
2888                 mutex_unlock(&mdsc->mutex);
2889                 mutex_lock(&s->s_mutex);
2890                 mutex_unlock(&s->s_mutex);
2891                 ceph_put_mds_session(s);
2892                 mutex_lock(&mdsc->mutex);
2893         }
2894         mutex_unlock(&mdsc->mutex);
2895 }
2896
2897
2898
2899 /*
2900  * delayed work -- periodically trim expired leases, renew caps with mds
2901  */
2902 static void schedule_delayed(struct ceph_mds_client *mdsc)
2903 {
2904         int delay = 5;
2905         unsigned hz = round_jiffies_relative(HZ * delay);
2906         schedule_delayed_work(&mdsc->delayed_work, hz);
2907 }
2908
2909 static void delayed_work(struct work_struct *work)
2910 {
2911         int i;
2912         struct ceph_mds_client *mdsc =
2913                 container_of(work, struct ceph_mds_client, delayed_work.work);
2914         int renew_interval;
2915         int renew_caps;
2916
2917         dout("mdsc delayed_work\n");
2918         ceph_check_delayed_caps(mdsc);
2919
2920         mutex_lock(&mdsc->mutex);
2921         renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
2922         renew_caps = time_after_eq(jiffies, HZ*renew_interval +
2923                                    mdsc->last_renew_caps);
2924         if (renew_caps)
2925                 mdsc->last_renew_caps = jiffies;
2926
2927         for (i = 0; i < mdsc->max_sessions; i++) {
2928                 struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
2929                 if (s == NULL)
2930                         continue;
2931                 if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
2932                         dout("resending session close request for mds%d\n",
2933                              s->s_mds);
2934                         request_close_session(mdsc, s);
2935                         ceph_put_mds_session(s);
2936                         continue;
2937                 }
2938                 if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
2939                         if (s->s_state == CEPH_MDS_SESSION_OPEN) {
2940                                 s->s_state = CEPH_MDS_SESSION_HUNG;
2941                                 pr_info("mds%d hung\n", s->s_mds);
2942                         }
2943                 }
2944                 if (s->s_state < CEPH_MDS_SESSION_OPEN) {
2945                         /* this mds is failed or recovering, just wait */
2946                         ceph_put_mds_session(s);
2947                         continue;
2948                 }
2949                 mutex_unlock(&mdsc->mutex);
2950
2951                 mutex_lock(&s->s_mutex);
2952                 if (renew_caps)
2953                         send_renew_caps(mdsc, s);
2954                 else
2955                         ceph_con_keepalive(&s->s_con);
2956                 ceph_add_cap_releases(mdsc, s);
2957                 if (s->s_state == CEPH_MDS_SESSION_OPEN ||
2958                     s->s_state == CEPH_MDS_SESSION_HUNG)
2959                         ceph_send_cap_releases(mdsc, s);
2960                 mutex_unlock(&s->s_mutex);
2961                 ceph_put_mds_session(s);
2962
2963                 mutex_lock(&mdsc->mutex);
2964         }
2965         mutex_unlock(&mdsc->mutex);
2966
2967         schedule_delayed(mdsc);
2968 }
2969
2970 int ceph_mdsc_init(struct ceph_fs_client *fsc)
2971
2972 {
2973         struct ceph_mds_client *mdsc;
2974
2975         mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
2976         if (!mdsc)
2977                 return -ENOMEM;
2978         mdsc->fsc = fsc;
2979         fsc->mdsc = mdsc;
2980         mutex_init(&mdsc->mutex);
2981         mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
2982         if (mdsc->mdsmap == NULL)
2983                 return -ENOMEM;
2984
2985         init_completion(&mdsc->safe_umount_waiters);
2986         init_waitqueue_head(&mdsc->session_close_wq);
2987         INIT_LIST_HEAD(&mdsc->waiting_for_map);
2988         mdsc->sessions = NULL;
2989         mdsc->max_sessions = 0;
2990         mdsc->stopping = 0;
2991         init_rwsem(&mdsc->snap_rwsem);
2992         mdsc->snap_realms = RB_ROOT;
2993         INIT_LIST_HEAD(&mdsc->snap_empty);
2994         spin_lock_init(&mdsc->snap_empty_lock);
2995         mdsc->last_tid = 0;
2996         mdsc->request_tree = RB_ROOT;
2997         INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
2998         mdsc->last_renew_caps = jiffies;
2999         INIT_LIST_HEAD(&mdsc->cap_delay_list);
3000         spin_lock_init(&mdsc->cap_delay_lock);
3001         INIT_LIST_HEAD(&mdsc->snap_flush_list);
3002         spin_lock_init(&mdsc->snap_flush_lock);
3003         mdsc->cap_flush_seq = 0;
3004         INIT_LIST_HEAD(&mdsc->cap_dirty);
3005         mdsc->num_cap_flushing = 0;
3006         spin_lock_init(&mdsc->cap_dirty_lock);
3007         init_waitqueue_head(&mdsc->cap_flushing_wq);
3008         spin_lock_init(&mdsc->dentry_lru_lock);
3009         INIT_LIST_HEAD(&mdsc->dentry_lru);
3010
3011         ceph_caps_init(mdsc);
3012         ceph_adjust_min_caps(mdsc, fsc->min_caps);
3013
3014         return 0;
3015 }
3016
3017 /*
3018  * Wait for safe replies on open mds requests.  If we time out, drop
3019  * all requests from the tree to avoid dangling dentry refs.
3020  */
3021 static void wait_requests(struct ceph_mds_client *mdsc)
3022 {
3023         struct ceph_mds_request *req;
3024         struct ceph_fs_client *fsc = mdsc->fsc;
3025
3026         mutex_lock(&mdsc->mutex);
3027         if (__get_oldest_req(mdsc)) {
3028                 mutex_unlock(&mdsc->mutex);
3029
3030                 dout("wait_requests waiting for requests\n");
3031                 wait_for_completion_timeout(&mdsc->safe_umount_waiters,
3032                                     fsc->client->options->mount_timeout * HZ);
3033
3034                 /* tear down remaining requests */
3035                 mutex_lock(&mdsc->mutex);
3036                 while ((req = __get_oldest_req(mdsc))) {
3037                         dout("wait_requests timed out on tid %llu\n",
3038                              req->r_tid);
3039                         __unregister_request(mdsc, req);
3040                 }
3041         }
3042         mutex_unlock(&mdsc->mutex);
3043         dout("wait_requests done\n");
3044 }
3045
3046 /*
3047  * called before mount is ro, and before dentries are torn down.
3048  * (hmm, does this still race with new lookups?)
3049  */
3050 void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
3051 {
3052         dout("pre_umount\n");
3053         mdsc->stopping = 1;
3054
3055         drop_leases(mdsc);
3056         ceph_flush_dirty_caps(mdsc);
3057         wait_requests(mdsc);
3058
3059         /*
3060          * wait for reply handlers to drop their request refs and
3061          * their inode/dcache refs
3062          */
3063         ceph_msgr_flush();
3064 }
3065
3066 /*
3067  * wait for all write mds requests to flush.
3068  */
3069 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
3070 {
3071         struct ceph_mds_request *req = NULL, *nextreq;
3072         struct rb_node *n;
3073
3074         mutex_lock(&mdsc->mutex);
3075         dout("wait_unsafe_requests want %lld\n", want_tid);
3076 restart:
3077         req = __get_oldest_req(mdsc);
3078         while (req && req->r_tid <= want_tid) {
3079                 /* find next request */
3080                 n = rb_next(&req->r_node);
3081                 if (n)
3082                         nextreq = rb_entry(n, struct ceph_mds_request, r_node);
3083                 else
3084                         nextreq = NULL;
3085                 if ((req->r_op & CEPH_MDS_OP_WRITE)) {
3086                         /* write op */
3087                         ceph_mdsc_get_request(req);
3088                         if (nextreq)
3089                                 ceph_mdsc_get_request(nextreq);
3090                         mutex_unlock(&mdsc->mutex);
3091                         dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
3092                              req->r_tid, want_tid);
3093                         wait_for_completion(&req->r_safe_completion);
3094                         mutex_lock(&mdsc->mutex);
3095                         ceph_mdsc_put_request(req);
3096                         if (!nextreq)
3097                                 break;  /* next dne before, so we're done! */
3098                         if (RB_EMPTY_NODE(&nextreq->r_node)) {
3099                                 /* next request was removed from tree */
3100                                 ceph_mdsc_put_request(nextreq);
3101                                 goto restart;
3102                         }
3103                         ceph_mdsc_put_request(nextreq);  /* won't go away */
3104                 }
3105                 req = nextreq;
3106         }
3107         mutex_unlock(&mdsc->mutex);
3108         dout("wait_unsafe_requests done\n");
3109 }
3110
3111 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
3112 {
3113         u64 want_tid, want_flush;
3114
3115         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3116                 return;
3117
3118         dout("sync\n");
3119         mutex_lock(&mdsc->mutex);
3120         want_tid = mdsc->last_tid;
3121         want_flush = mdsc->cap_flush_seq;
3122         mutex_unlock(&mdsc->mutex);
3123         dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
3124
3125         ceph_flush_dirty_caps(mdsc);
3126
3127         wait_unsafe_requests(mdsc, want_tid);
3128         wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
3129 }
3130
3131 /*
3132  * true if all sessions are closed, or we force unmount
3133  */
3134 bool done_closing_sessions(struct ceph_mds_client *mdsc)
3135 {
3136         int i, n = 0;
3137
3138         if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
3139                 return true;
3140
3141         mutex_lock(&mdsc->mutex);
3142         for (i = 0; i < mdsc->max_sessions; i++)
3143                 if (mdsc->sessions[i])
3144                         n++;
3145         mutex_unlock(&mdsc->mutex);
3146         return n == 0;
3147 }
3148
3149 /*
3150  * called after sb is ro.
3151  */
3152 void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
3153 {
3154         struct ceph_mds_session *session;
3155         int i;
3156         struct ceph_fs_client *fsc = mdsc->fsc;
3157         unsigned long timeout = fsc->client->options->mount_timeout * HZ;
3158
3159         dout("close_sessions\n");
3160
3161         /* close sessions */
3162         mutex_lock(&mdsc->mutex);
3163         for (i = 0; i < mdsc->max_sessions; i++) {
3164                 session = __ceph_lookup_mds_session(mdsc, i);
3165                 if (!session)
3166                         continue;
3167                 mutex_unlock(&mdsc->mutex);
3168                 mutex_lock(&session->s_mutex);
3169                 __close_session(mdsc, session);
3170                 mutex_unlock(&session->s_mutex);
3171                 ceph_put_mds_session(session);
3172                 mutex_lock(&mdsc->mutex);
3173         }
3174         mutex_unlock(&mdsc->mutex);
3175
3176         dout("waiting for sessions to close\n");
3177         wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
3178                            timeout);
3179
3180         /* tear down remaining sessions */
3181         mutex_lock(&mdsc->mutex);
3182         for (i = 0; i < mdsc->max_sessions; i++) {
3183                 if (mdsc->sessions[i]) {
3184                         session = get_session(mdsc->sessions[i]);
3185                         __unregister_session(mdsc, session);
3186                         mutex_unlock(&mdsc->mutex);
3187                         mutex_lock(&session->s_mutex);
3188                         remove_session_caps(session);
3189                         mutex_unlock(&session->s_mutex);
3190                         ceph_put_mds_session(session);
3191                         mutex_lock(&mdsc->mutex);
3192                 }
3193         }
3194         WARN_ON(!list_empty(&mdsc->cap_delay_list));
3195         mutex_unlock(&mdsc->mutex);
3196
3197         ceph_cleanup_empty_realms(mdsc);
3198
3199         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3200
3201         dout("stopped\n");
3202 }
3203
3204 static void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
3205 {
3206         dout("stop\n");
3207         cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
3208         if (mdsc->mdsmap)
3209                 ceph_mdsmap_destroy(mdsc->mdsmap);
3210         kfree(mdsc->sessions);
3211         ceph_caps_finalize(mdsc);
3212 }
3213
3214 void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
3215 {
3216         struct ceph_mds_client *mdsc = fsc->mdsc;
3217
3218         ceph_mdsc_stop(mdsc);
3219         fsc->mdsc = NULL;
3220         kfree(mdsc);
3221 }
3222
3223
3224 /*
3225  * handle mds map update.
3226  */
3227 void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
3228 {
3229         u32 epoch;
3230         u32 maplen;
3231         void *p = msg->front.iov_base;
3232         void *end = p + msg->front.iov_len;
3233         struct ceph_mdsmap *newmap, *oldmap;
3234         struct ceph_fsid fsid;
3235         int err = -EINVAL;
3236
3237         ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
3238         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3239         if (ceph_check_fsid(mdsc->fsc->client, &fsid) < 0)
3240                 return;
3241         epoch = ceph_decode_32(&p);
3242         maplen = ceph_decode_32(&p);
3243         dout("handle_map epoch %u len %d\n", epoch, (int)maplen);
3244
3245         /* do we need it? */
3246         ceph_monc_got_mdsmap(&mdsc->fsc->client->monc, epoch);
3247         mutex_lock(&mdsc->mutex);
3248         if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
3249                 dout("handle_map epoch %u <= our %u\n",
3250                      epoch, mdsc->mdsmap->m_epoch);
3251                 mutex_unlock(&mdsc->mutex);
3252                 return;
3253         }
3254
3255         newmap = ceph_mdsmap_decode(&p, end);
3256         if (IS_ERR(newmap)) {
3257                 err = PTR_ERR(newmap);
3258                 goto bad_unlock;
3259         }
3260
3261         /* swap into place */
3262         if (mdsc->mdsmap) {
3263                 oldmap = mdsc->mdsmap;
3264                 mdsc->mdsmap = newmap;
3265                 check_new_map(mdsc, newmap, oldmap);
3266                 ceph_mdsmap_destroy(oldmap);
3267         } else {
3268                 mdsc->mdsmap = newmap;  /* first mds map */
3269         }
3270         mdsc->fsc->sb->s_maxbytes = mdsc->mdsmap->m_max_file_size;
3271
3272         __wake_requests(mdsc, &mdsc->waiting_for_map);
3273
3274         mutex_unlock(&mdsc->mutex);
3275         schedule_delayed(mdsc);
3276         return;
3277
3278 bad_unlock:
3279         mutex_unlock(&mdsc->mutex);
3280 bad:
3281         pr_err("error decoding mdsmap %d\n", err);
3282         return;
3283 }
3284
3285 static struct ceph_connection *con_get(struct ceph_connection *con)
3286 {
3287         struct ceph_mds_session *s = con->private;
3288
3289         if (get_session(s)) {
3290                 dout("mdsc con_get %p ok (%d)\n", s, atomic_read(&s->s_ref));
3291                 return con;
3292         }
3293         dout("mdsc con_get %p FAIL\n", s);
3294         return NULL;
3295 }
3296
3297 static void con_put(struct ceph_connection *con)
3298 {
3299         struct ceph_mds_session *s = con->private;
3300
3301         ceph_put_mds_session(s);
3302         dout("mdsc con_put %p (%d)\n", s, atomic_read(&s->s_ref));
3303 }
3304
3305 /*
3306  * if the client is unresponsive for long enough, the mds will kill
3307  * the session entirely.
3308  */
3309 static void peer_reset(struct ceph_connection *con)
3310 {
3311         struct ceph_mds_session *s = con->private;
3312         struct ceph_mds_client *mdsc = s->s_mdsc;
3313
3314         pr_warning("mds%d closed our session\n", s->s_mds);
3315         send_mds_reconnect(mdsc, s);
3316 }
3317
3318 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
3319 {
3320         struct ceph_mds_session *s = con->private;
3321         struct ceph_mds_client *mdsc = s->s_mdsc;
3322         int type = le16_to_cpu(msg->hdr.type);
3323
3324         mutex_lock(&mdsc->mutex);
3325         if (__verify_registered_session(mdsc, s) < 0) {
3326                 mutex_unlock(&mdsc->mutex);
3327                 goto out;
3328         }
3329         mutex_unlock(&mdsc->mutex);
3330
3331         switch (type) {
3332         case CEPH_MSG_MDS_MAP:
3333                 ceph_mdsc_handle_map(mdsc, msg);
3334                 break;
3335         case CEPH_MSG_CLIENT_SESSION:
3336                 handle_session(s, msg);
3337                 break;
3338         case CEPH_MSG_CLIENT_REPLY:
3339                 handle_reply(s, msg);
3340                 break;
3341         case CEPH_MSG_CLIENT_REQUEST_FORWARD:
3342                 handle_forward(mdsc, s, msg);
3343                 break;
3344         case CEPH_MSG_CLIENT_CAPS:
3345                 ceph_handle_caps(s, msg);
3346                 break;
3347         case CEPH_MSG_CLIENT_SNAP:
3348                 ceph_handle_snap(mdsc, s, msg);
3349                 break;
3350         case CEPH_MSG_CLIENT_LEASE:
3351                 handle_lease(mdsc, s, msg);
3352                 break;
3353
3354         default:
3355                 pr_err("received unknown message type %d %s\n", type,
3356                        ceph_msg_type_name(type));
3357         }
3358 out:
3359         ceph_msg_put(msg);
3360 }
3361
3362 /*
3363  * authentication
3364  */
3365 static int get_authorizer(struct ceph_connection *con,
3366                           void **buf, int *len, int *proto,
3367                           void **reply_buf, int *reply_len, int force_new)
3368 {
3369         struct ceph_mds_session *s = con->private;
3370         struct ceph_mds_client *mdsc = s->s_mdsc;
3371         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3372         int ret = 0;
3373
3374         if (force_new && s->s_authorizer) {
3375                 ac->ops->destroy_authorizer(ac, s->s_authorizer);
3376                 s->s_authorizer = NULL;
3377         }
3378         if (s->s_authorizer == NULL) {
3379                 if (ac->ops->create_authorizer) {
3380                         ret = ac->ops->create_authorizer(
3381                                 ac, CEPH_ENTITY_TYPE_MDS,
3382                                 &s->s_authorizer,
3383                                 &s->s_authorizer_buf,
3384                                 &s->s_authorizer_buf_len,
3385                                 &s->s_authorizer_reply_buf,
3386                                 &s->s_authorizer_reply_buf_len);
3387                         if (ret)
3388                                 return ret;
3389                 }
3390         }
3391
3392         *proto = ac->protocol;
3393         *buf = s->s_authorizer_buf;
3394         *len = s->s_authorizer_buf_len;
3395         *reply_buf = s->s_authorizer_reply_buf;
3396         *reply_len = s->s_authorizer_reply_buf_len;
3397         return 0;
3398 }
3399
3400
3401 static int verify_authorizer_reply(struct ceph_connection *con, int len)
3402 {
3403         struct ceph_mds_session *s = con->private;
3404         struct ceph_mds_client *mdsc = s->s_mdsc;
3405         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3406
3407         return ac->ops->verify_authorizer_reply(ac, s->s_authorizer, len);
3408 }
3409
3410 static int invalidate_authorizer(struct ceph_connection *con)
3411 {
3412         struct ceph_mds_session *s = con->private;
3413         struct ceph_mds_client *mdsc = s->s_mdsc;
3414         struct ceph_auth_client *ac = mdsc->fsc->client->monc.auth;
3415
3416         if (ac->ops->invalidate_authorizer)
3417                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_MDS);
3418
3419         return ceph_monc_validate_auth(&mdsc->fsc->client->monc);
3420 }
3421
3422 static const struct ceph_connection_operations mds_con_ops = {
3423         .get = con_get,
3424         .put = con_put,
3425         .dispatch = dispatch,
3426         .get_authorizer = get_authorizer,
3427         .verify_authorizer_reply = verify_authorizer_reply,
3428         .invalidate_authorizer = invalidate_authorizer,
3429         .peer_reset = peer_reset,
3430 };
3431
3432 /* eof */