pandora: defconfig: update
[pandora-kernel.git] / net / ceph / osd_client.c
1 #include <linux/ceph/ceph_debug.h>
2
3 #include <linux/module.h>
4 #include <linux/err.h>
5 #include <linux/highmem.h>
6 #include <linux/mm.h>
7 #include <linux/pagemap.h>
8 #include <linux/slab.h>
9 #include <linux/uaccess.h>
10 #ifdef CONFIG_BLOCK
11 #include <linux/bio.h>
12 #endif
13
14 #include <linux/ceph/libceph.h>
15 #include <linux/ceph/osd_client.h>
16 #include <linux/ceph/messenger.h>
17 #include <linux/ceph/decode.h>
18 #include <linux/ceph/auth.h>
19 #include <linux/ceph/pagelist.h>
20
21 #define OSD_OP_FRONT_LEN        4096
22 #define OSD_OPREPLY_FRONT_LEN   512
23
24 static const struct ceph_connection_operations osd_con_ops;
25
26 static void send_queued(struct ceph_osd_client *osdc);
27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28 static void __register_request(struct ceph_osd_client *osdc,
29                                struct ceph_osd_request *req);
30 static void __unregister_linger_request(struct ceph_osd_client *osdc,
31                                         struct ceph_osd_request *req);
32 static int __send_request(struct ceph_osd_client *osdc,
33                           struct ceph_osd_request *req);
34
35 static int op_needs_trail(int op)
36 {
37         switch (op) {
38         case CEPH_OSD_OP_GETXATTR:
39         case CEPH_OSD_OP_SETXATTR:
40         case CEPH_OSD_OP_CMPXATTR:
41         case CEPH_OSD_OP_CALL:
42         case CEPH_OSD_OP_NOTIFY:
43                 return 1;
44         default:
45                 return 0;
46         }
47 }
48
49 static int op_has_extent(int op)
50 {
51         return (op == CEPH_OSD_OP_READ ||
52                 op == CEPH_OSD_OP_WRITE);
53 }
54
55 void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
56                         struct ceph_file_layout *layout,
57                         u64 snapid,
58                         u64 off, u64 *plen, u64 *bno,
59                         struct ceph_osd_request *req,
60                         struct ceph_osd_req_op *op)
61 {
62         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
63         u64 orig_len = *plen;
64         u64 objoff, objlen;    /* extent in object */
65
66         reqhead->snapid = cpu_to_le64(snapid);
67
68         /* object extent? */
69         ceph_calc_file_object_mapping(layout, off, plen, bno,
70                                       &objoff, &objlen);
71         if (*plen < orig_len)
72                 dout(" skipping last %llu, final file extent %llu~%llu\n",
73                      orig_len - *plen, off, *plen);
74
75         if (op_has_extent(op->op)) {
76                 op->extent.offset = objoff;
77                 op->extent.length = objlen;
78         }
79         req->r_num_pages = calc_pages_for(off, *plen);
80         req->r_page_alignment = off & ~PAGE_MASK;
81         if (op->op == CEPH_OSD_OP_WRITE)
82                 op->payload_len = *plen;
83
84         dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
85              *bno, objoff, objlen, req->r_num_pages);
86
87 }
88 EXPORT_SYMBOL(ceph_calc_raw_layout);
89
90 /*
91  * Implement client access to distributed object storage cluster.
92  *
93  * All data objects are stored within a cluster/cloud of OSDs, or
94  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
95  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
96  * remote daemons serving up and coordinating consistent and safe
97  * access to storage.
98  *
99  * Cluster membership and the mapping of data objects onto storage devices
100  * are described by the osd map.
101  *
102  * We keep track of pending OSD requests (read, write), resubmit
103  * requests to different OSDs when the cluster topology/data layout
104  * change, or retry the affected requests when the communications
105  * channel with an OSD is reset.
106  */
107
108 /*
109  * calculate the mapping of a file extent onto an object, and fill out the
110  * request accordingly.  shorten extent as necessary if it crosses an
111  * object boundary.
112  *
113  * fill osd op in request message.
114  */
115 static void calc_layout(struct ceph_osd_client *osdc,
116                         struct ceph_vino vino,
117                         struct ceph_file_layout *layout,
118                         u64 off, u64 *plen,
119                         struct ceph_osd_request *req,
120                         struct ceph_osd_req_op *op)
121 {
122         u64 bno;
123
124         ceph_calc_raw_layout(osdc, layout, vino.snap, off,
125                              plen, &bno, req, op);
126
127         snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
128         req->r_oid_len = strlen(req->r_oid);
129 }
130
131 /*
132  * requests
133  */
134 void ceph_osdc_release_request(struct kref *kref)
135 {
136         struct ceph_osd_request *req = container_of(kref,
137                                                     struct ceph_osd_request,
138                                                     r_kref);
139
140         if (req->r_request)
141                 ceph_msg_put(req->r_request);
142         if (req->r_reply)
143                 ceph_msg_put(req->r_reply);
144         if (req->r_con_filling_msg) {
145                 dout("release_request revoking pages %p from con %p\n",
146                      req->r_pages, req->r_con_filling_msg);
147                 ceph_con_revoke_message(req->r_con_filling_msg,
148                                       req->r_reply);
149                 ceph_con_put(req->r_con_filling_msg);
150         }
151         if (req->r_own_pages)
152                 ceph_release_page_vector(req->r_pages,
153                                          req->r_num_pages);
154 #ifdef CONFIG_BLOCK
155         if (req->r_bio)
156                 bio_put(req->r_bio);
157 #endif
158         ceph_put_snap_context(req->r_snapc);
159         if (req->r_trail) {
160                 ceph_pagelist_release(req->r_trail);
161                 kfree(req->r_trail);
162         }
163         if (req->r_mempool)
164                 mempool_free(req, req->r_osdc->req_mempool);
165         else
166                 kfree(req);
167 }
168 EXPORT_SYMBOL(ceph_osdc_release_request);
169
170 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171 {
172         int i = 0;
173
174         if (needs_trail)
175                 *needs_trail = 0;
176         while (ops[i].op) {
177                 if (needs_trail && op_needs_trail(ops[i].op))
178                         *needs_trail = 1;
179                 i++;
180         }
181
182         return i;
183 }
184
185 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186                                                int flags,
187                                                struct ceph_snap_context *snapc,
188                                                struct ceph_osd_req_op *ops,
189                                                bool use_mempool,
190                                                gfp_t gfp_flags,
191                                                struct page **pages,
192                                                struct bio *bio)
193 {
194         struct ceph_osd_request *req;
195         struct ceph_msg *msg;
196         int needs_trail;
197         int num_op = get_num_ops(ops, &needs_trail);
198         size_t msg_size = sizeof(struct ceph_osd_request_head);
199
200         msg_size += num_op*sizeof(struct ceph_osd_op);
201
202         if (use_mempool) {
203                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
204                 memset(req, 0, sizeof(*req));
205         } else {
206                 req = kzalloc(sizeof(*req), gfp_flags);
207         }
208         if (req == NULL)
209                 return NULL;
210
211         req->r_osdc = osdc;
212         req->r_mempool = use_mempool;
213
214         kref_init(&req->r_kref);
215         init_completion(&req->r_completion);
216         init_completion(&req->r_safe_completion);
217         INIT_LIST_HEAD(&req->r_unsafe_item);
218         INIT_LIST_HEAD(&req->r_linger_item);
219         INIT_LIST_HEAD(&req->r_linger_osd);
220         INIT_LIST_HEAD(&req->r_req_lru_item);
221         req->r_flags = flags;
222
223         WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
224
225         /* create reply message */
226         if (use_mempool)
227                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
228         else
229                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
230                                    OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
231         if (!msg) {
232                 ceph_osdc_put_request(req);
233                 return NULL;
234         }
235         req->r_reply = msg;
236
237         /* allocate space for the trailing data */
238         if (needs_trail) {
239                 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
240                 if (!req->r_trail) {
241                         ceph_osdc_put_request(req);
242                         return NULL;
243                 }
244                 ceph_pagelist_init(req->r_trail);
245         }
246         /* create request message; allow space for oid */
247         msg_size += MAX_OBJ_NAME_SIZE;
248         if (snapc)
249                 msg_size += sizeof(u64) * snapc->num_snaps;
250         if (use_mempool)
251                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
252         else
253                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
254         if (!msg) {
255                 ceph_osdc_put_request(req);
256                 return NULL;
257         }
258
259         msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
260         memset(msg->front.iov_base, 0, msg->front.iov_len);
261
262         req->r_request = msg;
263         req->r_pages = pages;
264 #ifdef CONFIG_BLOCK
265         if (bio) {
266                 req->r_bio = bio;
267                 bio_get(req->r_bio);
268         }
269 #endif
270
271         return req;
272 }
273 EXPORT_SYMBOL(ceph_osdc_alloc_request);
274
275 static void osd_req_encode_op(struct ceph_osd_request *req,
276                               struct ceph_osd_op *dst,
277                               struct ceph_osd_req_op *src)
278 {
279         dst->op = cpu_to_le16(src->op);
280
281         switch (dst->op) {
282         case CEPH_OSD_OP_READ:
283         case CEPH_OSD_OP_WRITE:
284                 dst->extent.offset =
285                         cpu_to_le64(src->extent.offset);
286                 dst->extent.length =
287                         cpu_to_le64(src->extent.length);
288                 dst->extent.truncate_size =
289                         cpu_to_le64(src->extent.truncate_size);
290                 dst->extent.truncate_seq =
291                         cpu_to_le32(src->extent.truncate_seq);
292                 break;
293
294         case CEPH_OSD_OP_GETXATTR:
295         case CEPH_OSD_OP_SETXATTR:
296         case CEPH_OSD_OP_CMPXATTR:
297                 BUG_ON(!req->r_trail);
298
299                 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
300                 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
301                 dst->xattr.cmp_op = src->xattr.cmp_op;
302                 dst->xattr.cmp_mode = src->xattr.cmp_mode;
303                 ceph_pagelist_append(req->r_trail, src->xattr.name,
304                                      src->xattr.name_len);
305                 ceph_pagelist_append(req->r_trail, src->xattr.val,
306                                      src->xattr.value_len);
307                 break;
308         case CEPH_OSD_OP_CALL:
309                 BUG_ON(!req->r_trail);
310
311                 dst->cls.class_len = src->cls.class_len;
312                 dst->cls.method_len = src->cls.method_len;
313                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
314
315                 ceph_pagelist_append(req->r_trail, src->cls.class_name,
316                                      src->cls.class_len);
317                 ceph_pagelist_append(req->r_trail, src->cls.method_name,
318                                      src->cls.method_len);
319                 ceph_pagelist_append(req->r_trail, src->cls.indata,
320                                      src->cls.indata_len);
321                 break;
322         case CEPH_OSD_OP_ROLLBACK:
323                 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
324                 break;
325         case CEPH_OSD_OP_STARTSYNC:
326                 break;
327         case CEPH_OSD_OP_NOTIFY:
328                 {
329                         __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
330                         __le32 timeout = cpu_to_le32(src->watch.timeout);
331
332                         BUG_ON(!req->r_trail);
333
334                         ceph_pagelist_append(req->r_trail,
335                                                 &prot_ver, sizeof(prot_ver));
336                         ceph_pagelist_append(req->r_trail,
337                                                 &timeout, sizeof(timeout));
338                 }
339         case CEPH_OSD_OP_NOTIFY_ACK:
340         case CEPH_OSD_OP_WATCH:
341                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
342                 dst->watch.ver = cpu_to_le64(src->watch.ver);
343                 dst->watch.flag = src->watch.flag;
344                 break;
345         default:
346                 pr_err("unrecognized osd opcode %d\n", dst->op);
347                 WARN_ON(1);
348                 break;
349         }
350         dst->payload_len = cpu_to_le32(src->payload_len);
351 }
352
353 /*
354  * build new request AND message
355  *
356  */
357 void ceph_osdc_build_request(struct ceph_osd_request *req,
358                              u64 off, u64 *plen,
359                              struct ceph_osd_req_op *src_ops,
360                              struct ceph_snap_context *snapc,
361                              struct timespec *mtime,
362                              const char *oid,
363                              int oid_len)
364 {
365         struct ceph_msg *msg = req->r_request;
366         struct ceph_osd_request_head *head;
367         struct ceph_osd_req_op *src_op;
368         struct ceph_osd_op *op;
369         void *p;
370         int num_op = get_num_ops(src_ops, NULL);
371         size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
372         int flags = req->r_flags;
373         u64 data_len = 0;
374         int i;
375
376         head = msg->front.iov_base;
377         op = (void *)(head + 1);
378         p = (void *)(op + num_op);
379
380         req->r_snapc = ceph_get_snap_context(snapc);
381
382         head->client_inc = cpu_to_le32(1); /* always, for now. */
383         head->flags = cpu_to_le32(flags);
384         if (flags & CEPH_OSD_FLAG_WRITE)
385                 ceph_encode_timespec(&head->mtime, mtime);
386         head->num_ops = cpu_to_le16(num_op);
387
388
389         /* fill in oid */
390         head->object_len = cpu_to_le32(oid_len);
391         memcpy(p, oid, oid_len);
392         p += oid_len;
393
394         src_op = src_ops;
395         while (src_op->op) {
396                 osd_req_encode_op(req, op, src_op);
397                 src_op++;
398                 op++;
399         }
400
401         if (req->r_trail)
402                 data_len += req->r_trail->length;
403
404         if (snapc) {
405                 head->snap_seq = cpu_to_le64(snapc->seq);
406                 head->num_snaps = cpu_to_le32(snapc->num_snaps);
407                 for (i = 0; i < snapc->num_snaps; i++) {
408                         put_unaligned_le64(snapc->snaps[i], p);
409                         p += sizeof(u64);
410                 }
411         }
412
413         if (flags & CEPH_OSD_FLAG_WRITE) {
414                 req->r_request->hdr.data_off = cpu_to_le16(off);
415                 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
416         } else if (data_len) {
417                 req->r_request->hdr.data_off = 0;
418                 req->r_request->hdr.data_len = cpu_to_le32(data_len);
419         }
420
421         req->r_request->page_alignment = req->r_page_alignment;
422
423         BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
424         msg_size = p - msg->front.iov_base;
425         msg->front.iov_len = msg_size;
426         msg->hdr.front_len = cpu_to_le32(msg_size);
427         return;
428 }
429 EXPORT_SYMBOL(ceph_osdc_build_request);
430
431 /*
432  * build new request AND message, calculate layout, and adjust file
433  * extent as needed.
434  *
435  * if the file was recently truncated, we include information about its
436  * old and new size so that the object can be updated appropriately.  (we
437  * avoid synchronously deleting truncated objects because it's slow.)
438  *
439  * if @do_sync, include a 'startsync' command so that the osd will flush
440  * data quickly.
441  */
442 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
443                                                struct ceph_file_layout *layout,
444                                                struct ceph_vino vino,
445                                                u64 off, u64 *plen,
446                                                int opcode, int flags,
447                                                struct ceph_snap_context *snapc,
448                                                int do_sync,
449                                                u32 truncate_seq,
450                                                u64 truncate_size,
451                                                struct timespec *mtime,
452                                                bool use_mempool, int num_reply,
453                                                int page_align)
454 {
455         struct ceph_osd_req_op ops[3];
456         struct ceph_osd_request *req;
457
458         ops[0].op = opcode;
459         ops[0].extent.truncate_seq = truncate_seq;
460         ops[0].extent.truncate_size = truncate_size;
461         ops[0].payload_len = 0;
462
463         if (do_sync) {
464                 ops[1].op = CEPH_OSD_OP_STARTSYNC;
465                 ops[1].payload_len = 0;
466                 ops[2].op = 0;
467         } else
468                 ops[1].op = 0;
469
470         req = ceph_osdc_alloc_request(osdc, flags,
471                                          snapc, ops,
472                                          use_mempool,
473                                          GFP_NOFS, NULL, NULL);
474         if (!req)
475                 return NULL;
476
477         /* calculate max write size */
478         calc_layout(osdc, vino, layout, off, plen, req, ops);
479         req->r_file_layout = *layout;  /* keep a copy */
480
481         /* in case it differs from natural (file) alignment that
482            calc_layout filled in for us */
483         req->r_num_pages = calc_pages_for(page_align, *plen);
484         req->r_page_alignment = page_align;
485
486         ceph_osdc_build_request(req, off, plen, ops,
487                                 snapc,
488                                 mtime,
489                                 req->r_oid, req->r_oid_len);
490
491         return req;
492 }
493 EXPORT_SYMBOL(ceph_osdc_new_request);
494
495 /*
496  * We keep osd requests in an rbtree, sorted by ->r_tid.
497  */
498 static void __insert_request(struct ceph_osd_client *osdc,
499                              struct ceph_osd_request *new)
500 {
501         struct rb_node **p = &osdc->requests.rb_node;
502         struct rb_node *parent = NULL;
503         struct ceph_osd_request *req = NULL;
504
505         while (*p) {
506                 parent = *p;
507                 req = rb_entry(parent, struct ceph_osd_request, r_node);
508                 if (new->r_tid < req->r_tid)
509                         p = &(*p)->rb_left;
510                 else if (new->r_tid > req->r_tid)
511                         p = &(*p)->rb_right;
512                 else
513                         BUG();
514         }
515
516         rb_link_node(&new->r_node, parent, p);
517         rb_insert_color(&new->r_node, &osdc->requests);
518 }
519
520 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
521                                                  u64 tid)
522 {
523         struct ceph_osd_request *req;
524         struct rb_node *n = osdc->requests.rb_node;
525
526         while (n) {
527                 req = rb_entry(n, struct ceph_osd_request, r_node);
528                 if (tid < req->r_tid)
529                         n = n->rb_left;
530                 else if (tid > req->r_tid)
531                         n = n->rb_right;
532                 else
533                         return req;
534         }
535         return NULL;
536 }
537
538 static struct ceph_osd_request *
539 __lookup_request_ge(struct ceph_osd_client *osdc,
540                     u64 tid)
541 {
542         struct ceph_osd_request *req;
543         struct rb_node *n = osdc->requests.rb_node;
544
545         while (n) {
546                 req = rb_entry(n, struct ceph_osd_request, r_node);
547                 if (tid < req->r_tid) {
548                         if (!n->rb_left)
549                                 return req;
550                         n = n->rb_left;
551                 } else if (tid > req->r_tid) {
552                         n = n->rb_right;
553                 } else {
554                         return req;
555                 }
556         }
557         return NULL;
558 }
559
560 /*
561  * Resubmit requests pending on the given osd.
562  */
563 static void __kick_osd_requests(struct ceph_osd_client *osdc,
564                                 struct ceph_osd *osd)
565 {
566         struct ceph_osd_request *req, *nreq;
567         int err;
568
569         dout("__kick_osd_requests osd%d\n", osd->o_osd);
570         err = __reset_osd(osdc, osd);
571         if (err == -EAGAIN)
572                 return;
573
574         list_for_each_entry(req, &osd->o_requests, r_osd_item) {
575                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
576                 dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
577                      osd->o_osd);
578                 if (!req->r_linger)
579                         req->r_flags |= CEPH_OSD_FLAG_RETRY;
580         }
581
582         list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
583                                  r_linger_osd) {
584                 /*
585                  * reregister request prior to unregistering linger so
586                  * that r_osd is preserved.
587                  */
588                 BUG_ON(!list_empty(&req->r_req_lru_item));
589                 __register_request(osdc, req);
590                 list_add(&req->r_req_lru_item, &osdc->req_unsent);
591                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
592                 __unregister_linger_request(osdc, req);
593                 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
594                      osd->o_osd);
595         }
596 }
597
598 static void kick_osd_requests(struct ceph_osd_client *osdc,
599                               struct ceph_osd *kickosd)
600 {
601         mutex_lock(&osdc->request_mutex);
602         __kick_osd_requests(osdc, kickosd);
603         mutex_unlock(&osdc->request_mutex);
604 }
605
606 /*
607  * If the osd connection drops, we need to resubmit all requests.
608  */
609 static void osd_reset(struct ceph_connection *con)
610 {
611         struct ceph_osd *osd = con->private;
612         struct ceph_osd_client *osdc;
613
614         if (!osd)
615                 return;
616         dout("osd_reset osd%d\n", osd->o_osd);
617         osdc = osd->o_osdc;
618         down_read(&osdc->map_sem);
619         kick_osd_requests(osdc, osd);
620         send_queued(osdc);
621         up_read(&osdc->map_sem);
622 }
623
624 /*
625  * Track open sessions with osds.
626  */
627 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
628 {
629         struct ceph_osd *osd;
630
631         osd = kzalloc(sizeof(*osd), GFP_NOFS);
632         if (!osd)
633                 return NULL;
634
635         atomic_set(&osd->o_ref, 1);
636         osd->o_osdc = osdc;
637         INIT_LIST_HEAD(&osd->o_requests);
638         INIT_LIST_HEAD(&osd->o_linger_requests);
639         INIT_LIST_HEAD(&osd->o_osd_lru);
640         osd->o_incarnation = 1;
641
642         ceph_con_init(osdc->client->msgr, &osd->o_con);
643         osd->o_con.private = osd;
644         osd->o_con.ops = &osd_con_ops;
645         osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
646
647         INIT_LIST_HEAD(&osd->o_keepalive_item);
648         return osd;
649 }
650
651 static struct ceph_osd *get_osd(struct ceph_osd *osd)
652 {
653         if (atomic_inc_not_zero(&osd->o_ref)) {
654                 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
655                      atomic_read(&osd->o_ref));
656                 return osd;
657         } else {
658                 dout("get_osd %p FAIL\n", osd);
659                 return NULL;
660         }
661 }
662
663 static void put_osd(struct ceph_osd *osd)
664 {
665         dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
666              atomic_read(&osd->o_ref) - 1);
667         if (atomic_dec_and_test(&osd->o_ref)) {
668                 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
669
670                 if (osd->o_authorizer)
671                         ac->ops->destroy_authorizer(ac, osd->o_authorizer);
672                 kfree(osd);
673         }
674 }
675
676 /*
677  * remove an osd from our map
678  */
679 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
680 {
681         dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
682         BUG_ON(!list_empty(&osd->o_requests));
683         list_del_init(&osd->o_osd_lru);
684         rb_erase(&osd->o_node, &osdc->osds);
685         RB_CLEAR_NODE(&osd->o_node);
686 }
687
688 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
689 {
690         dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
691
692         if (!RB_EMPTY_NODE(&osd->o_node)) {
693                 ceph_con_close(&osd->o_con);
694                 __remove_osd(osdc, osd);
695                 put_osd(osd);
696         }
697 }
698
699 static void remove_all_osds(struct ceph_osd_client *osdc)
700 {
701         dout("__remove_old_osds %p\n", osdc);
702         mutex_lock(&osdc->request_mutex);
703         while (!RB_EMPTY_ROOT(&osdc->osds)) {
704                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
705                                                 struct ceph_osd, o_node);
706                 remove_osd(osdc, osd);
707         }
708         mutex_unlock(&osdc->request_mutex);
709 }
710
711 static void __move_osd_to_lru(struct ceph_osd_client *osdc,
712                               struct ceph_osd *osd)
713 {
714         dout("__move_osd_to_lru %p\n", osd);
715         BUG_ON(!list_empty(&osd->o_osd_lru));
716         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
717         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
718 }
719
720 static void __remove_osd_from_lru(struct ceph_osd *osd)
721 {
722         dout("__remove_osd_from_lru %p\n", osd);
723         if (!list_empty(&osd->o_osd_lru))
724                 list_del_init(&osd->o_osd_lru);
725 }
726
727 static void remove_old_osds(struct ceph_osd_client *osdc)
728 {
729         struct ceph_osd *osd, *nosd;
730
731         dout("__remove_old_osds %p\n", osdc);
732         mutex_lock(&osdc->request_mutex);
733         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
734                 if (time_before(jiffies, osd->lru_ttl))
735                         break;
736                 remove_osd(osdc, osd);
737         }
738         mutex_unlock(&osdc->request_mutex);
739 }
740
741 /*
742  * reset osd connect
743  */
744 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
745 {
746         struct ceph_osd_request *req;
747         int ret = 0;
748
749         dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
750         if (list_empty(&osd->o_requests) &&
751             list_empty(&osd->o_linger_requests)) {
752                 remove_osd(osdc, osd);
753         } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
754                           &osd->o_con.peer_addr,
755                           sizeof(osd->o_con.peer_addr)) == 0 &&
756                    !ceph_con_opened(&osd->o_con)) {
757                 dout(" osd addr hasn't changed and connection never opened,"
758                      " letting msgr retry");
759                 /* touch each r_stamp for handle_timeout()'s benfit */
760                 list_for_each_entry(req, &osd->o_requests, r_osd_item)
761                         req->r_stamp = jiffies;
762                 ret = -EAGAIN;
763         } else {
764                 ceph_con_close(&osd->o_con);
765                 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
766                 osd->o_incarnation++;
767         }
768         return ret;
769 }
770
771 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
772 {
773         struct rb_node **p = &osdc->osds.rb_node;
774         struct rb_node *parent = NULL;
775         struct ceph_osd *osd = NULL;
776
777         dout("__insert_osd %p osd%d\n", new, new->o_osd);
778         while (*p) {
779                 parent = *p;
780                 osd = rb_entry(parent, struct ceph_osd, o_node);
781                 if (new->o_osd < osd->o_osd)
782                         p = &(*p)->rb_left;
783                 else if (new->o_osd > osd->o_osd)
784                         p = &(*p)->rb_right;
785                 else
786                         BUG();
787         }
788
789         rb_link_node(&new->o_node, parent, p);
790         rb_insert_color(&new->o_node, &osdc->osds);
791 }
792
793 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
794 {
795         struct ceph_osd *osd;
796         struct rb_node *n = osdc->osds.rb_node;
797
798         while (n) {
799                 osd = rb_entry(n, struct ceph_osd, o_node);
800                 if (o < osd->o_osd)
801                         n = n->rb_left;
802                 else if (o > osd->o_osd)
803                         n = n->rb_right;
804                 else
805                         return osd;
806         }
807         return NULL;
808 }
809
810 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
811 {
812         schedule_delayed_work(&osdc->timeout_work,
813                         osdc->client->options->osd_keepalive_timeout * HZ);
814 }
815
816 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
817 {
818         cancel_delayed_work(&osdc->timeout_work);
819 }
820
821 /*
822  * Register request, assign tid.  If this is the first request, set up
823  * the timeout event.
824  */
825 static void __register_request(struct ceph_osd_client *osdc,
826                                struct ceph_osd_request *req)
827 {
828         req->r_tid = ++osdc->last_tid;
829         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
830         dout("__register_request %p tid %lld\n", req, req->r_tid);
831         __insert_request(osdc, req);
832         ceph_osdc_get_request(req);
833         osdc->num_requests++;
834         if (osdc->num_requests == 1) {
835                 dout(" first request, scheduling timeout\n");
836                 __schedule_osd_timeout(osdc);
837         }
838 }
839
840 static void register_request(struct ceph_osd_client *osdc,
841                              struct ceph_osd_request *req)
842 {
843         mutex_lock(&osdc->request_mutex);
844         __register_request(osdc, req);
845         mutex_unlock(&osdc->request_mutex);
846 }
847
848 /*
849  * called under osdc->request_mutex
850  */
851 static void __unregister_request(struct ceph_osd_client *osdc,
852                                  struct ceph_osd_request *req)
853 {
854         dout("__unregister_request %p tid %lld\n", req, req->r_tid);
855         rb_erase(&req->r_node, &osdc->requests);
856         osdc->num_requests--;
857
858         if (req->r_osd) {
859                 /* make sure the original request isn't in flight. */
860                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
861
862                 list_del_init(&req->r_osd_item);
863                 if (list_empty(&req->r_osd->o_requests) &&
864                     list_empty(&req->r_osd->o_linger_requests)) {
865                         dout("moving osd to %p lru\n", req->r_osd);
866                         __move_osd_to_lru(osdc, req->r_osd);
867                 }
868                 if (list_empty(&req->r_linger_item))
869                         req->r_osd = NULL;
870         }
871
872         ceph_osdc_put_request(req);
873
874         list_del_init(&req->r_req_lru_item);
875         if (osdc->num_requests == 0) {
876                 dout(" no requests, canceling timeout\n");
877                 __cancel_osd_timeout(osdc);
878         }
879 }
880
881 /*
882  * Cancel a previously queued request message
883  */
884 static void __cancel_request(struct ceph_osd_request *req)
885 {
886         if (req->r_sent && req->r_osd) {
887                 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
888                 req->r_sent = 0;
889         }
890 }
891
892 static void __register_linger_request(struct ceph_osd_client *osdc,
893                                     struct ceph_osd_request *req)
894 {
895         dout("__register_linger_request %p\n", req);
896         list_add_tail(&req->r_linger_item, &osdc->req_linger);
897         list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
898 }
899
900 static void __unregister_linger_request(struct ceph_osd_client *osdc,
901                                         struct ceph_osd_request *req)
902 {
903         dout("__unregister_linger_request %p\n", req);
904         if (req->r_osd) {
905                 list_del_init(&req->r_linger_item);
906                 list_del_init(&req->r_linger_osd);
907
908                 if (list_empty(&req->r_osd->o_requests) &&
909                     list_empty(&req->r_osd->o_linger_requests)) {
910                         dout("moving osd to %p lru\n", req->r_osd);
911                         __move_osd_to_lru(osdc, req->r_osd);
912                 }
913                 if (list_empty(&req->r_osd_item))
914                         req->r_osd = NULL;
915         }
916 }
917
918 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
919                                          struct ceph_osd_request *req)
920 {
921         mutex_lock(&osdc->request_mutex);
922         if (req->r_linger) {
923                 __unregister_linger_request(osdc, req);
924                 ceph_osdc_put_request(req);
925         }
926         mutex_unlock(&osdc->request_mutex);
927 }
928 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
929
930 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
931                                   struct ceph_osd_request *req)
932 {
933         if (!req->r_linger) {
934                 dout("set_request_linger %p\n", req);
935                 req->r_linger = 1;
936                 /*
937                  * caller is now responsible for calling
938                  * unregister_linger_request
939                  */
940                 ceph_osdc_get_request(req);
941         }
942 }
943 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
944
945 /*
946  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
947  * (as needed), and set the request r_osd appropriately.  If there is
948  * no up osd, set r_osd to NULL.  Move the request to the appropriate list
949  * (unsent, homeless) or leave on in-flight lru.
950  *
951  * Return 0 if unchanged, 1 if changed, or negative on error.
952  *
953  * Caller should hold map_sem for read and request_mutex.
954  */
955 static int __map_request(struct ceph_osd_client *osdc,
956                          struct ceph_osd_request *req, int force_resend)
957 {
958         struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
959         struct ceph_pg pgid;
960         int acting[CEPH_PG_MAX_SIZE];
961         int o = -1, num = 0;
962         int err;
963
964         dout("map_request %p tid %lld\n", req, req->r_tid);
965         err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
966                                       &req->r_file_layout, osdc->osdmap);
967         if (err) {
968                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
969                 return err;
970         }
971         pgid = reqhead->layout.ol_pgid;
972         req->r_pgid = pgid;
973
974         err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
975         if (err > 0) {
976                 o = acting[0];
977                 num = err;
978         }
979
980         if ((!force_resend &&
981              req->r_osd && req->r_osd->o_osd == o &&
982              req->r_sent >= req->r_osd->o_incarnation &&
983              req->r_num_pg_osds == num &&
984              memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
985             (req->r_osd == NULL && o == -1))
986                 return 0;  /* no change */
987
988         dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
989              req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
990              req->r_osd ? req->r_osd->o_osd : -1);
991
992         /* record full pg acting set */
993         memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
994         req->r_num_pg_osds = num;
995
996         if (req->r_osd) {
997                 __cancel_request(req);
998                 list_del_init(&req->r_osd_item);
999                 req->r_osd = NULL;
1000         }
1001
1002         req->r_osd = __lookup_osd(osdc, o);
1003         if (!req->r_osd && o >= 0) {
1004                 err = -ENOMEM;
1005                 req->r_osd = create_osd(osdc);
1006                 if (!req->r_osd) {
1007                         list_move(&req->r_req_lru_item, &osdc->req_notarget);
1008                         goto out;
1009                 }
1010
1011                 dout("map_request osd %p is osd%d\n", req->r_osd, o);
1012                 req->r_osd->o_osd = o;
1013                 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
1014                 __insert_osd(osdc, req->r_osd);
1015
1016                 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
1017         }
1018
1019         if (req->r_osd) {
1020                 __remove_osd_from_lru(req->r_osd);
1021                 list_add(&req->r_osd_item, &req->r_osd->o_requests);
1022                 list_move(&req->r_req_lru_item, &osdc->req_unsent);
1023         } else {
1024                 list_move(&req->r_req_lru_item, &osdc->req_notarget);
1025         }
1026         err = 1;   /* osd or pg changed */
1027
1028 out:
1029         return err;
1030 }
1031
1032 /*
1033  * caller should hold map_sem (for read) and request_mutex
1034  */
1035 static int __send_request(struct ceph_osd_client *osdc,
1036                           struct ceph_osd_request *req)
1037 {
1038         struct ceph_osd_request_head *reqhead;
1039
1040         dout("send_request %p tid %llu to osd%d flags %d\n",
1041              req, req->r_tid, req->r_osd->o_osd, req->r_flags);
1042
1043         reqhead = req->r_request->front.iov_base;
1044         reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
1045         reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
1046         reqhead->reassert_version = req->r_reassert_version;
1047
1048         req->r_stamp = jiffies;
1049         list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
1050
1051         ceph_msg_get(req->r_request); /* send consumes a ref */
1052         ceph_con_send(&req->r_osd->o_con, req->r_request);
1053         req->r_sent = req->r_osd->o_incarnation;
1054         return 0;
1055 }
1056
1057 /*
1058  * Send any requests in the queue (req_unsent).
1059  */
1060 static void send_queued(struct ceph_osd_client *osdc)
1061 {
1062         struct ceph_osd_request *req, *tmp;
1063
1064         dout("send_queued\n");
1065         mutex_lock(&osdc->request_mutex);
1066         list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
1067                 __send_request(osdc, req);
1068         }
1069         mutex_unlock(&osdc->request_mutex);
1070 }
1071
1072 /*
1073  * Timeout callback, called every N seconds when 1 or more osd
1074  * requests has been active for more than N seconds.  When this
1075  * happens, we ping all OSDs with requests who have timed out to
1076  * ensure any communications channel reset is detected.  Reset the
1077  * request timeouts another N seconds in the future as we go.
1078  * Reschedule the timeout event another N seconds in future (unless
1079  * there are no open requests).
1080  */
1081 static void handle_timeout(struct work_struct *work)
1082 {
1083         struct ceph_osd_client *osdc =
1084                 container_of(work, struct ceph_osd_client, timeout_work.work);
1085         struct ceph_osd_request *req, *last_req = NULL;
1086         struct ceph_osd *osd;
1087         unsigned long timeout = osdc->client->options->osd_timeout * HZ;
1088         unsigned long keepalive =
1089                 osdc->client->options->osd_keepalive_timeout * HZ;
1090         unsigned long last_stamp = 0;
1091         struct list_head slow_osds;
1092         dout("timeout\n");
1093         down_read(&osdc->map_sem);
1094
1095         ceph_monc_request_next_osdmap(&osdc->client->monc);
1096
1097         mutex_lock(&osdc->request_mutex);
1098
1099         /*
1100          * reset osds that appear to be _really_ unresponsive.  this
1101          * is a failsafe measure.. we really shouldn't be getting to
1102          * this point if the system is working properly.  the monitors
1103          * should mark the osd as failed and we should find out about
1104          * it from an updated osd map.
1105          */
1106         while (timeout && !list_empty(&osdc->req_lru)) {
1107                 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
1108                                  r_req_lru_item);
1109
1110                 /* hasn't been long enough since we sent it? */
1111                 if (time_before(jiffies, req->r_stamp + timeout))
1112                         break;
1113
1114                 /* hasn't been long enough since it was acked? */
1115                 if (req->r_request->ack_stamp == 0 ||
1116                     time_before(jiffies, req->r_request->ack_stamp + timeout))
1117                         break;
1118
1119                 BUG_ON(req == last_req && req->r_stamp == last_stamp);
1120                 last_req = req;
1121                 last_stamp = req->r_stamp;
1122
1123                 osd = req->r_osd;
1124                 BUG_ON(!osd);
1125                 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
1126                            req->r_tid, osd->o_osd);
1127                 __kick_osd_requests(osdc, osd);
1128         }
1129
1130         /*
1131          * ping osds that are a bit slow.  this ensures that if there
1132          * is a break in the TCP connection we will notice, and reopen
1133          * a connection with that osd (from the fault callback).
1134          */
1135         INIT_LIST_HEAD(&slow_osds);
1136         list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
1137                 if (time_before(jiffies, req->r_stamp + keepalive))
1138                         break;
1139
1140                 osd = req->r_osd;
1141                 BUG_ON(!osd);
1142                 dout(" tid %llu is slow, will send keepalive on osd%d\n",
1143                      req->r_tid, osd->o_osd);
1144                 list_move_tail(&osd->o_keepalive_item, &slow_osds);
1145         }
1146         while (!list_empty(&slow_osds)) {
1147                 osd = list_entry(slow_osds.next, struct ceph_osd,
1148                                  o_keepalive_item);
1149                 list_del_init(&osd->o_keepalive_item);
1150                 ceph_con_keepalive(&osd->o_con);
1151         }
1152
1153         __schedule_osd_timeout(osdc);
1154         mutex_unlock(&osdc->request_mutex);
1155         send_queued(osdc);
1156         up_read(&osdc->map_sem);
1157 }
1158
1159 static void handle_osds_timeout(struct work_struct *work)
1160 {
1161         struct ceph_osd_client *osdc =
1162                 container_of(work, struct ceph_osd_client,
1163                              osds_timeout_work.work);
1164         unsigned long delay =
1165                 osdc->client->options->osd_idle_ttl * HZ >> 2;
1166
1167         dout("osds timeout\n");
1168         down_read(&osdc->map_sem);
1169         remove_old_osds(osdc);
1170         up_read(&osdc->map_sem);
1171
1172         schedule_delayed_work(&osdc->osds_timeout_work,
1173                               round_jiffies_relative(delay));
1174 }
1175
1176 static void complete_request(struct ceph_osd_request *req)
1177 {
1178         if (req->r_safe_callback)
1179                 req->r_safe_callback(req, NULL);
1180         complete_all(&req->r_safe_completion);  /* fsync waiter */
1181 }
1182
1183 /*
1184  * handle osd op reply.  either call the callback if it is specified,
1185  * or do the completion to wake up the waiting thread.
1186  */
1187 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1188                          struct ceph_connection *con)
1189 {
1190         struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1191         struct ceph_osd_request *req;
1192         u64 tid;
1193         int numops, object_len, flags;
1194         s32 result;
1195
1196         tid = le64_to_cpu(msg->hdr.tid);
1197         if (msg->front.iov_len < sizeof(*rhead))
1198                 goto bad;
1199         numops = le32_to_cpu(rhead->num_ops);
1200         object_len = le32_to_cpu(rhead->object_len);
1201         result = le32_to_cpu(rhead->result);
1202         if (msg->front.iov_len != sizeof(*rhead) + object_len +
1203             numops * sizeof(struct ceph_osd_op))
1204                 goto bad;
1205         dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
1206         /* lookup */
1207         mutex_lock(&osdc->request_mutex);
1208         req = __lookup_request(osdc, tid);
1209         if (req == NULL) {
1210                 dout("handle_reply tid %llu dne\n", tid);
1211                 mutex_unlock(&osdc->request_mutex);
1212                 return;
1213         }
1214         ceph_osdc_get_request(req);
1215         flags = le32_to_cpu(rhead->flags);
1216
1217         /*
1218          * if this connection filled our message, drop our reference now, to
1219          * avoid a (safe but slower) revoke later.
1220          */
1221         if (req->r_con_filling_msg == con && req->r_reply == msg) {
1222                 dout(" dropping con_filling_msg ref %p\n", con);
1223                 req->r_con_filling_msg = NULL;
1224                 ceph_con_put(con);
1225         }
1226
1227         if (!req->r_got_reply) {
1228                 unsigned bytes;
1229
1230                 req->r_result = le32_to_cpu(rhead->result);
1231                 bytes = le32_to_cpu(msg->hdr.data_len);
1232                 dout("handle_reply result %d bytes %d\n", req->r_result,
1233                      bytes);
1234                 if (req->r_result == 0)
1235                         req->r_result = bytes;
1236
1237                 /* in case this is a write and we need to replay, */
1238                 req->r_reassert_version = rhead->reassert_version;
1239
1240                 req->r_got_reply = 1;
1241         } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1242                 dout("handle_reply tid %llu dup ack\n", tid);
1243                 mutex_unlock(&osdc->request_mutex);
1244                 goto done;
1245         }
1246
1247         dout("handle_reply tid %llu flags %d\n", tid, flags);
1248
1249         if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
1250                 __register_linger_request(osdc, req);
1251
1252         /* either this is a read, or we got the safe response */
1253         if (result < 0 ||
1254             (flags & CEPH_OSD_FLAG_ONDISK) ||
1255             ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1256                 __unregister_request(osdc, req);
1257
1258         mutex_unlock(&osdc->request_mutex);
1259
1260         if (req->r_callback)
1261                 req->r_callback(req, msg);
1262         else
1263                 complete_all(&req->r_completion);
1264
1265         if (flags & CEPH_OSD_FLAG_ONDISK)
1266                 complete_request(req);
1267
1268 done:
1269         dout("req=%p req->r_linger=%d\n", req, req->r_linger);
1270         ceph_osdc_put_request(req);
1271         return;
1272
1273 bad:
1274         pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1275                (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1276                (int)sizeof(*rhead));
1277         ceph_msg_dump(msg);
1278 }
1279
1280 static void reset_changed_osds(struct ceph_osd_client *osdc)
1281 {
1282         struct rb_node *p, *n;
1283
1284         dout("%s %p\n", __func__, osdc);
1285         for (p = rb_first(&osdc->osds); p; p = n) {
1286                 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
1287
1288                 n = rb_next(p);
1289                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
1290                     memcmp(&osd->o_con.peer_addr,
1291                            ceph_osd_addr(osdc->osdmap,
1292                                          osd->o_osd),
1293                            sizeof(struct ceph_entity_addr)) != 0)
1294                         __reset_osd(osdc, osd);
1295         }
1296 }
1297
1298 /*
1299  * Requeue requests whose mapping to an OSD has changed.  If requests map to
1300  * no osd, request a new map.
1301  *
1302  * Caller should hold map_sem for read and request_mutex.
1303  */
1304 static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
1305 {
1306         struct ceph_osd_request *req, *nreq;
1307         struct rb_node *p;
1308         int needmap = 0;
1309         int err;
1310
1311         dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
1312         mutex_lock(&osdc->request_mutex);
1313         for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1314                 req = rb_entry(p, struct ceph_osd_request, r_node);
1315                 err = __map_request(osdc, req, force_resend);
1316                 if (err < 0)
1317                         continue;  /* error */
1318                 if (req->r_osd == NULL) {
1319                         dout("%p tid %llu maps to no osd\n", req, req->r_tid);
1320                         needmap++;  /* request a newer map */
1321                 } else if (err > 0) {
1322                         dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
1323                              req->r_osd ? req->r_osd->o_osd : -1);
1324                         if (!req->r_linger)
1325                                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1326                 }
1327         }
1328
1329         list_for_each_entry_safe(req, nreq, &osdc->req_linger,
1330                                  r_linger_item) {
1331                 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
1332
1333                 err = __map_request(osdc, req, force_resend);
1334                 if (err == 0)
1335                         continue;  /* no change and no osd was specified */
1336                 if (err < 0)
1337                         continue;  /* hrm! */
1338                 if (req->r_osd == NULL) {
1339                         dout("tid %llu maps to no valid osd\n", req->r_tid);
1340                         needmap++;  /* request a newer map */
1341                         continue;
1342                 }
1343
1344                 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
1345                      req->r_osd ? req->r_osd->o_osd : -1);
1346                 __unregister_linger_request(osdc, req);
1347                 __register_request(osdc, req);
1348         }
1349         mutex_unlock(&osdc->request_mutex);
1350
1351         if (needmap) {
1352                 dout("%d requests for down osds, need new map\n", needmap);
1353                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1354         }
1355 }
1356
1357
1358 /*
1359  * Process updated osd map.
1360  *
1361  * The message contains any number of incremental and full maps, normally
1362  * indicating some sort of topology change in the cluster.  Kick requests
1363  * off to different OSDs as needed.
1364  */
1365 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1366 {
1367         void *p, *end, *next;
1368         u32 nr_maps, maplen;
1369         u32 epoch;
1370         struct ceph_osdmap *newmap = NULL, *oldmap;
1371         int err;
1372         struct ceph_fsid fsid;
1373
1374         dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1375         p = msg->front.iov_base;
1376         end = p + msg->front.iov_len;
1377
1378         /* verify fsid */
1379         ceph_decode_need(&p, end, sizeof(fsid), bad);
1380         ceph_decode_copy(&p, &fsid, sizeof(fsid));
1381         if (ceph_check_fsid(osdc->client, &fsid) < 0)
1382                 return;
1383
1384         down_write(&osdc->map_sem);
1385
1386         /* incremental maps */
1387         ceph_decode_32_safe(&p, end, nr_maps, bad);
1388         dout(" %d inc maps\n", nr_maps);
1389         while (nr_maps > 0) {
1390                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1391                 epoch = ceph_decode_32(&p);
1392                 maplen = ceph_decode_32(&p);
1393                 ceph_decode_need(&p, end, maplen, bad);
1394                 next = p + maplen;
1395                 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1396                         dout("applying incremental map %u len %d\n",
1397                              epoch, maplen);
1398                         newmap = osdmap_apply_incremental(&p, next,
1399                                                           osdc->osdmap,
1400                                                           osdc->client->msgr);
1401                         if (IS_ERR(newmap)) {
1402                                 err = PTR_ERR(newmap);
1403                                 goto bad;
1404                         }
1405                         BUG_ON(!newmap);
1406                         if (newmap != osdc->osdmap) {
1407                                 ceph_osdmap_destroy(osdc->osdmap);
1408                                 osdc->osdmap = newmap;
1409                         }
1410                         kick_requests(osdc, 0);
1411                         reset_changed_osds(osdc);
1412                 } else {
1413                         dout("ignoring incremental map %u len %d\n",
1414                              epoch, maplen);
1415                 }
1416                 p = next;
1417                 nr_maps--;
1418         }
1419         if (newmap)
1420                 goto done;
1421
1422         /* full maps */
1423         ceph_decode_32_safe(&p, end, nr_maps, bad);
1424         dout(" %d full maps\n", nr_maps);
1425         while (nr_maps) {
1426                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
1427                 epoch = ceph_decode_32(&p);
1428                 maplen = ceph_decode_32(&p);
1429                 ceph_decode_need(&p, end, maplen, bad);
1430                 if (nr_maps > 1) {
1431                         dout("skipping non-latest full map %u len %d\n",
1432                              epoch, maplen);
1433                 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1434                         dout("skipping full map %u len %d, "
1435                              "older than our %u\n", epoch, maplen,
1436                              osdc->osdmap->epoch);
1437                 } else {
1438                         int skipped_map = 0;
1439
1440                         dout("taking full map %u len %d\n", epoch, maplen);
1441                         newmap = osdmap_decode(&p, p+maplen);
1442                         if (IS_ERR(newmap)) {
1443                                 err = PTR_ERR(newmap);
1444                                 goto bad;
1445                         }
1446                         BUG_ON(!newmap);
1447                         oldmap = osdc->osdmap;
1448                         osdc->osdmap = newmap;
1449                         if (oldmap) {
1450                                 if (oldmap->epoch + 1 < newmap->epoch)
1451                                         skipped_map = 1;
1452                                 ceph_osdmap_destroy(oldmap);
1453                         }
1454                         kick_requests(osdc, skipped_map);
1455                 }
1456                 p += maplen;
1457                 nr_maps--;
1458         }
1459
1460 done:
1461         downgrade_write(&osdc->map_sem);
1462         ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1463
1464         /*
1465          * subscribe to subsequent osdmap updates if full to ensure
1466          * we find out when we are no longer full and stop returning
1467          * ENOSPC.
1468          */
1469         if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
1470                 ceph_monc_request_next_osdmap(&osdc->client->monc);
1471
1472         send_queued(osdc);
1473         up_read(&osdc->map_sem);
1474         wake_up_all(&osdc->client->auth_wq);
1475         return;
1476
1477 bad:
1478         pr_err("osdc handle_map corrupt msg\n");
1479         ceph_msg_dump(msg);
1480         up_write(&osdc->map_sem);
1481         return;
1482 }
1483
1484 /*
1485  * watch/notify callback event infrastructure
1486  *
1487  * These callbacks are used both for watch and notify operations.
1488  */
1489 static void __release_event(struct kref *kref)
1490 {
1491         struct ceph_osd_event *event =
1492                 container_of(kref, struct ceph_osd_event, kref);
1493
1494         dout("__release_event %p\n", event);
1495         kfree(event);
1496 }
1497
1498 static void get_event(struct ceph_osd_event *event)
1499 {
1500         kref_get(&event->kref);
1501 }
1502
1503 void ceph_osdc_put_event(struct ceph_osd_event *event)
1504 {
1505         kref_put(&event->kref, __release_event);
1506 }
1507 EXPORT_SYMBOL(ceph_osdc_put_event);
1508
1509 static void __insert_event(struct ceph_osd_client *osdc,
1510                              struct ceph_osd_event *new)
1511 {
1512         struct rb_node **p = &osdc->event_tree.rb_node;
1513         struct rb_node *parent = NULL;
1514         struct ceph_osd_event *event = NULL;
1515
1516         while (*p) {
1517                 parent = *p;
1518                 event = rb_entry(parent, struct ceph_osd_event, node);
1519                 if (new->cookie < event->cookie)
1520                         p = &(*p)->rb_left;
1521                 else if (new->cookie > event->cookie)
1522                         p = &(*p)->rb_right;
1523                 else
1524                         BUG();
1525         }
1526
1527         rb_link_node(&new->node, parent, p);
1528         rb_insert_color(&new->node, &osdc->event_tree);
1529 }
1530
1531 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
1532                                                 u64 cookie)
1533 {
1534         struct rb_node **p = &osdc->event_tree.rb_node;
1535         struct rb_node *parent = NULL;
1536         struct ceph_osd_event *event = NULL;
1537
1538         while (*p) {
1539                 parent = *p;
1540                 event = rb_entry(parent, struct ceph_osd_event, node);
1541                 if (cookie < event->cookie)
1542                         p = &(*p)->rb_left;
1543                 else if (cookie > event->cookie)
1544                         p = &(*p)->rb_right;
1545                 else
1546                         return event;
1547         }
1548         return NULL;
1549 }
1550
1551 static void __remove_event(struct ceph_osd_event *event)
1552 {
1553         struct ceph_osd_client *osdc = event->osdc;
1554
1555         if (!RB_EMPTY_NODE(&event->node)) {
1556                 dout("__remove_event removed %p\n", event);
1557                 rb_erase(&event->node, &osdc->event_tree);
1558                 ceph_osdc_put_event(event);
1559         } else {
1560                 dout("__remove_event didn't remove %p\n", event);
1561         }
1562 }
1563
1564 int ceph_osdc_create_event(struct ceph_osd_client *osdc,
1565                            void (*event_cb)(u64, u64, u8, void *),
1566                            int one_shot, void *data,
1567                            struct ceph_osd_event **pevent)
1568 {
1569         struct ceph_osd_event *event;
1570
1571         event = kmalloc(sizeof(*event), GFP_NOIO);
1572         if (!event)
1573                 return -ENOMEM;
1574
1575         dout("create_event %p\n", event);
1576         event->cb = event_cb;
1577         event->one_shot = one_shot;
1578         event->data = data;
1579         event->osdc = osdc;
1580         INIT_LIST_HEAD(&event->osd_node);
1581         kref_init(&event->kref);   /* one ref for us */
1582         kref_get(&event->kref);    /* one ref for the caller */
1583         init_completion(&event->completion);
1584
1585         spin_lock(&osdc->event_lock);
1586         event->cookie = ++osdc->event_count;
1587         __insert_event(osdc, event);
1588         spin_unlock(&osdc->event_lock);
1589
1590         *pevent = event;
1591         return 0;
1592 }
1593 EXPORT_SYMBOL(ceph_osdc_create_event);
1594
1595 void ceph_osdc_cancel_event(struct ceph_osd_event *event)
1596 {
1597         struct ceph_osd_client *osdc = event->osdc;
1598
1599         dout("cancel_event %p\n", event);
1600         spin_lock(&osdc->event_lock);
1601         __remove_event(event);
1602         spin_unlock(&osdc->event_lock);
1603         ceph_osdc_put_event(event); /* caller's */
1604 }
1605 EXPORT_SYMBOL(ceph_osdc_cancel_event);
1606
1607
1608 static void do_event_work(struct work_struct *work)
1609 {
1610         struct ceph_osd_event_work *event_work =
1611                 container_of(work, struct ceph_osd_event_work, work);
1612         struct ceph_osd_event *event = event_work->event;
1613         u64 ver = event_work->ver;
1614         u64 notify_id = event_work->notify_id;
1615         u8 opcode = event_work->opcode;
1616
1617         dout("do_event_work completing %p\n", event);
1618         event->cb(ver, notify_id, opcode, event->data);
1619         complete(&event->completion);
1620         dout("do_event_work completed %p\n", event);
1621         ceph_osdc_put_event(event);
1622         kfree(event_work);
1623 }
1624
1625
1626 /*
1627  * Process osd watch notifications
1628  */
1629 void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1630 {
1631         void *p, *end;
1632         u8 proto_ver;
1633         u64 cookie, ver, notify_id;
1634         u8 opcode;
1635         struct ceph_osd_event *event;
1636         struct ceph_osd_event_work *event_work;
1637
1638         p = msg->front.iov_base;
1639         end = p + msg->front.iov_len;
1640
1641         ceph_decode_8_safe(&p, end, proto_ver, bad);
1642         ceph_decode_8_safe(&p, end, opcode, bad);
1643         ceph_decode_64_safe(&p, end, cookie, bad);
1644         ceph_decode_64_safe(&p, end, ver, bad);
1645         ceph_decode_64_safe(&p, end, notify_id, bad);
1646
1647         spin_lock(&osdc->event_lock);
1648         event = __find_event(osdc, cookie);
1649         if (event) {
1650                 get_event(event);
1651                 if (event->one_shot)
1652                         __remove_event(event);
1653         }
1654         spin_unlock(&osdc->event_lock);
1655         dout("handle_watch_notify cookie %lld ver %lld event %p\n",
1656              cookie, ver, event);
1657         if (event) {
1658                 event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
1659                 if (!event_work) {
1660                         dout("ERROR: could not allocate event_work\n");
1661                         goto done_err;
1662                 }
1663                 INIT_WORK(&event_work->work, do_event_work);
1664                 event_work->event = event;
1665                 event_work->ver = ver;
1666                 event_work->notify_id = notify_id;
1667                 event_work->opcode = opcode;
1668                 if (!queue_work(osdc->notify_wq, &event_work->work)) {
1669                         dout("WARNING: failed to queue notify event work\n");
1670                         goto done_err;
1671                 }
1672         }
1673
1674         return;
1675
1676 done_err:
1677         complete(&event->completion);
1678         ceph_osdc_put_event(event);
1679         return;
1680
1681 bad:
1682         pr_err("osdc handle_watch_notify corrupt msg\n");
1683         return;
1684 }
1685
1686 int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
1687 {
1688         int err;
1689
1690         dout("wait_event %p\n", event);
1691         err = wait_for_completion_interruptible_timeout(&event->completion,
1692                                                         timeout * HZ);
1693         ceph_osdc_put_event(event);
1694         if (err > 0)
1695                 err = 0;
1696         dout("wait_event %p returns %d\n", event, err);
1697         return err;
1698 }
1699 EXPORT_SYMBOL(ceph_osdc_wait_event);
1700
1701 /*
1702  * Register request, send initial attempt.
1703  */
1704 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1705                             struct ceph_osd_request *req,
1706                             bool nofail)
1707 {
1708         int rc = 0;
1709
1710         req->r_request->pages = req->r_pages;
1711         req->r_request->nr_pages = req->r_num_pages;
1712 #ifdef CONFIG_BLOCK
1713         req->r_request->bio = req->r_bio;
1714 #endif
1715         req->r_request->trail = req->r_trail;
1716
1717         register_request(osdc, req);
1718
1719         down_read(&osdc->map_sem);
1720         mutex_lock(&osdc->request_mutex);
1721         /*
1722          * a racing kick_requests() may have sent the message for us
1723          * while we dropped request_mutex above, so only send now if
1724          * the request still han't been touched yet.
1725          */
1726         if (req->r_sent == 0) {
1727                 rc = __map_request(osdc, req, 0);
1728                 if (rc < 0) {
1729                         if (nofail) {
1730                                 dout("osdc_start_request failed map, "
1731                                      " will retry %lld\n", req->r_tid);
1732                                 rc = 0;
1733                         } else {
1734                                 __unregister_request(osdc, req);
1735                         }
1736                         goto out_unlock;
1737                 }
1738                 if (req->r_osd == NULL) {
1739                         dout("send_request %p no up osds in pg\n", req);
1740                         ceph_monc_request_next_osdmap(&osdc->client->monc);
1741                 } else {
1742                         rc = __send_request(osdc, req);
1743                         if (rc) {
1744                                 if (nofail) {
1745                                         dout("osdc_start_request failed send, "
1746                                              " will retry %lld\n", req->r_tid);
1747                                         rc = 0;
1748                                 } else {
1749                                         __unregister_request(osdc, req);
1750                                 }
1751                         }
1752                 }
1753         }
1754
1755 out_unlock:
1756         mutex_unlock(&osdc->request_mutex);
1757         up_read(&osdc->map_sem);
1758         return rc;
1759 }
1760 EXPORT_SYMBOL(ceph_osdc_start_request);
1761
1762 /*
1763  * wait for a request to complete
1764  */
1765 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1766                            struct ceph_osd_request *req)
1767 {
1768         int rc;
1769
1770         rc = wait_for_completion_interruptible(&req->r_completion);
1771         if (rc < 0) {
1772                 mutex_lock(&osdc->request_mutex);
1773                 __cancel_request(req);
1774                 __unregister_request(osdc, req);
1775                 mutex_unlock(&osdc->request_mutex);
1776                 complete_request(req);
1777                 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
1778                 return rc;
1779         }
1780
1781         dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1782         return req->r_result;
1783 }
1784 EXPORT_SYMBOL(ceph_osdc_wait_request);
1785
1786 /*
1787  * sync - wait for all in-flight requests to flush.  avoid starvation.
1788  */
1789 void ceph_osdc_sync(struct ceph_osd_client *osdc)
1790 {
1791         struct ceph_osd_request *req;
1792         u64 last_tid, next_tid = 0;
1793
1794         mutex_lock(&osdc->request_mutex);
1795         last_tid = osdc->last_tid;
1796         while (1) {
1797                 req = __lookup_request_ge(osdc, next_tid);
1798                 if (!req)
1799                         break;
1800                 if (req->r_tid > last_tid)
1801                         break;
1802
1803                 next_tid = req->r_tid + 1;
1804                 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1805                         continue;
1806
1807                 ceph_osdc_get_request(req);
1808                 mutex_unlock(&osdc->request_mutex);
1809                 dout("sync waiting on tid %llu (last is %llu)\n",
1810                      req->r_tid, last_tid);
1811                 wait_for_completion(&req->r_safe_completion);
1812                 mutex_lock(&osdc->request_mutex);
1813                 ceph_osdc_put_request(req);
1814         }
1815         mutex_unlock(&osdc->request_mutex);
1816         dout("sync done (thru tid %llu)\n", last_tid);
1817 }
1818 EXPORT_SYMBOL(ceph_osdc_sync);
1819
1820 /*
1821  * init, shutdown
1822  */
1823 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1824 {
1825         int err;
1826
1827         dout("init\n");
1828         osdc->client = client;
1829         osdc->osdmap = NULL;
1830         init_rwsem(&osdc->map_sem);
1831         init_completion(&osdc->map_waiters);
1832         osdc->last_requested_map = 0;
1833         mutex_init(&osdc->request_mutex);
1834         osdc->last_tid = 0;
1835         osdc->osds = RB_ROOT;
1836         INIT_LIST_HEAD(&osdc->osd_lru);
1837         osdc->requests = RB_ROOT;
1838         INIT_LIST_HEAD(&osdc->req_lru);
1839         INIT_LIST_HEAD(&osdc->req_unsent);
1840         INIT_LIST_HEAD(&osdc->req_notarget);
1841         INIT_LIST_HEAD(&osdc->req_linger);
1842         osdc->num_requests = 0;
1843         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
1844         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1845         spin_lock_init(&osdc->event_lock);
1846         osdc->event_tree = RB_ROOT;
1847         osdc->event_count = 0;
1848
1849         schedule_delayed_work(&osdc->osds_timeout_work,
1850            round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
1851
1852         err = -ENOMEM;
1853         osdc->req_mempool = mempool_create_kmalloc_pool(10,
1854                                         sizeof(struct ceph_osd_request));
1855         if (!osdc->req_mempool)
1856                 goto out;
1857
1858         err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1859                                 "osd_op");
1860         if (err < 0)
1861                 goto out_mempool;
1862         err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1863                                 OSD_OPREPLY_FRONT_LEN, 10, true,
1864                                 "osd_op_reply");
1865         if (err < 0)
1866                 goto out_msgpool;
1867
1868         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
1869         if (IS_ERR(osdc->notify_wq)) {
1870                 err = PTR_ERR(osdc->notify_wq);
1871                 osdc->notify_wq = NULL;
1872                 goto out_msgpool;
1873         }
1874         return 0;
1875
1876 out_msgpool:
1877         ceph_msgpool_destroy(&osdc->msgpool_op);
1878 out_mempool:
1879         mempool_destroy(osdc->req_mempool);
1880 out:
1881         return err;
1882 }
1883 EXPORT_SYMBOL(ceph_osdc_init);
1884
1885 void ceph_osdc_stop(struct ceph_osd_client *osdc)
1886 {
1887         flush_workqueue(osdc->notify_wq);
1888         destroy_workqueue(osdc->notify_wq);
1889         cancel_delayed_work_sync(&osdc->timeout_work);
1890         cancel_delayed_work_sync(&osdc->osds_timeout_work);
1891         if (osdc->osdmap) {
1892                 ceph_osdmap_destroy(osdc->osdmap);
1893                 osdc->osdmap = NULL;
1894         }
1895         remove_all_osds(osdc);
1896         mempool_destroy(osdc->req_mempool);
1897         ceph_msgpool_destroy(&osdc->msgpool_op);
1898         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
1899 }
1900 EXPORT_SYMBOL(ceph_osdc_stop);
1901
1902 /*
1903  * Read some contiguous pages.  If we cross a stripe boundary, shorten
1904  * *plen.  Return number of bytes read, or error.
1905  */
1906 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1907                         struct ceph_vino vino, struct ceph_file_layout *layout,
1908                         u64 off, u64 *plen,
1909                         u32 truncate_seq, u64 truncate_size,
1910                         struct page **pages, int num_pages, int page_align)
1911 {
1912         struct ceph_osd_request *req;
1913         int rc = 0;
1914
1915         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1916              vino.snap, off, *plen);
1917         req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1918                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1919                                     NULL, 0, truncate_seq, truncate_size, NULL,
1920                                     false, 1, page_align);
1921         if (!req)
1922                 return -ENOMEM;
1923
1924         /* it may be a short read due to an object boundary */
1925         req->r_pages = pages;
1926
1927         dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
1928              off, *plen, req->r_num_pages, page_align);
1929
1930         rc = ceph_osdc_start_request(osdc, req, false);
1931         if (!rc)
1932                 rc = ceph_osdc_wait_request(osdc, req);
1933
1934         ceph_osdc_put_request(req);
1935         dout("readpages result %d\n", rc);
1936         return rc;
1937 }
1938 EXPORT_SYMBOL(ceph_osdc_readpages);
1939
1940 /*
1941  * do a synchronous write on N pages
1942  */
1943 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1944                          struct ceph_file_layout *layout,
1945                          struct ceph_snap_context *snapc,
1946                          u64 off, u64 len,
1947                          u32 truncate_seq, u64 truncate_size,
1948                          struct timespec *mtime,
1949                          struct page **pages, int num_pages,
1950                          int flags, int do_sync, bool nofail)
1951 {
1952         struct ceph_osd_request *req;
1953         int rc = 0;
1954         int page_align = off & ~PAGE_MASK;
1955
1956         BUG_ON(vino.snap != CEPH_NOSNAP);
1957         req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1958                                     CEPH_OSD_OP_WRITE,
1959                                     flags | CEPH_OSD_FLAG_ONDISK |
1960                                             CEPH_OSD_FLAG_WRITE,
1961                                     snapc, do_sync,
1962                                     truncate_seq, truncate_size, mtime,
1963                                     nofail, 1, page_align);
1964         if (!req)
1965                 return -ENOMEM;
1966
1967         /* it may be a short write due to an object boundary */
1968         req->r_pages = pages;
1969         dout("writepages %llu~%llu (%d pages)\n", off, len,
1970              req->r_num_pages);
1971
1972         rc = ceph_osdc_start_request(osdc, req, nofail);
1973         if (!rc)
1974                 rc = ceph_osdc_wait_request(osdc, req);
1975
1976         ceph_osdc_put_request(req);
1977         if (rc == 0)
1978                 rc = len;
1979         dout("writepages result %d\n", rc);
1980         return rc;
1981 }
1982 EXPORT_SYMBOL(ceph_osdc_writepages);
1983
1984 /*
1985  * handle incoming message
1986  */
1987 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1988 {
1989         struct ceph_osd *osd = con->private;
1990         struct ceph_osd_client *osdc;
1991         int type = le16_to_cpu(msg->hdr.type);
1992
1993         if (!osd)
1994                 goto out;
1995         osdc = osd->o_osdc;
1996
1997         switch (type) {
1998         case CEPH_MSG_OSD_MAP:
1999                 ceph_osdc_handle_map(osdc, msg);
2000                 break;
2001         case CEPH_MSG_OSD_OPREPLY:
2002                 handle_reply(osdc, msg, con);
2003                 break;
2004         case CEPH_MSG_WATCH_NOTIFY:
2005                 handle_watch_notify(osdc, msg);
2006                 break;
2007
2008         default:
2009                 pr_err("received unknown message type %d %s\n", type,
2010                        ceph_msg_type_name(type));
2011         }
2012 out:
2013         ceph_msg_put(msg);
2014 }
2015
2016 /*
2017  * lookup and return message for incoming reply.  set up reply message
2018  * pages.
2019  */
2020 static struct ceph_msg *get_reply(struct ceph_connection *con,
2021                                   struct ceph_msg_header *hdr,
2022                                   int *skip)
2023 {
2024         struct ceph_osd *osd = con->private;
2025         struct ceph_osd_client *osdc = osd->o_osdc;
2026         struct ceph_msg *m;
2027         struct ceph_osd_request *req;
2028         int front = le32_to_cpu(hdr->front_len);
2029         int data_len = le32_to_cpu(hdr->data_len);
2030         u64 tid;
2031
2032         tid = le64_to_cpu(hdr->tid);
2033         mutex_lock(&osdc->request_mutex);
2034         req = __lookup_request(osdc, tid);
2035         if (!req) {
2036                 *skip = 1;
2037                 m = NULL;
2038                 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
2039                         osd->o_osd);
2040                 goto out;
2041         }
2042
2043         if (req->r_con_filling_msg) {
2044                 dout("get_reply revoking msg %p from old con %p\n",
2045                      req->r_reply, req->r_con_filling_msg);
2046                 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
2047                 ceph_con_put(req->r_con_filling_msg);
2048                 req->r_con_filling_msg = NULL;
2049         }
2050
2051         if (front > req->r_reply->front.iov_len) {
2052                 pr_warning("get_reply front %d > preallocated %d\n",
2053                            front, (int)req->r_reply->front.iov_len);
2054                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
2055                 if (!m)
2056                         goto out;
2057                 ceph_msg_put(req->r_reply);
2058                 req->r_reply = m;
2059         }
2060         m = ceph_msg_get(req->r_reply);
2061
2062         if (data_len > 0) {
2063                 int want = calc_pages_for(req->r_page_alignment, data_len);
2064
2065                 if (unlikely(req->r_num_pages < want)) {
2066                         pr_warning("tid %lld reply has %d bytes %d pages, we"
2067                                    " had only %d pages ready\n", tid, data_len,
2068                                    want, req->r_num_pages);
2069                         *skip = 1;
2070                         ceph_msg_put(m);
2071                         m = NULL;
2072                         goto out;
2073                 }
2074                 m->pages = req->r_pages;
2075                 m->nr_pages = req->r_num_pages;
2076                 m->page_alignment = req->r_page_alignment;
2077 #ifdef CONFIG_BLOCK
2078                 m->bio = req->r_bio;
2079 #endif
2080         }
2081         *skip = 0;
2082         req->r_con_filling_msg = ceph_con_get(con);
2083         dout("get_reply tid %lld %p\n", tid, m);
2084
2085 out:
2086         mutex_unlock(&osdc->request_mutex);
2087         return m;
2088
2089 }
2090
2091 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
2092                                   struct ceph_msg_header *hdr,
2093                                   int *skip)
2094 {
2095         struct ceph_osd *osd = con->private;
2096         int type = le16_to_cpu(hdr->type);
2097         int front = le32_to_cpu(hdr->front_len);
2098
2099         switch (type) {
2100         case CEPH_MSG_OSD_MAP:
2101         case CEPH_MSG_WATCH_NOTIFY:
2102                 return ceph_msg_new(type, front, GFP_NOFS, false);
2103         case CEPH_MSG_OSD_OPREPLY:
2104                 return get_reply(con, hdr, skip);
2105         default:
2106                 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
2107                         osd->o_osd);
2108                 *skip = 1;
2109                 return NULL;
2110         }
2111 }
2112
2113 /*
2114  * Wrappers to refcount containing ceph_osd struct
2115  */
2116 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
2117 {
2118         struct ceph_osd *osd = con->private;
2119         if (get_osd(osd))
2120                 return con;
2121         return NULL;
2122 }
2123
2124 static void put_osd_con(struct ceph_connection *con)
2125 {
2126         struct ceph_osd *osd = con->private;
2127         put_osd(osd);
2128 }
2129
2130 /*
2131  * authentication
2132  */
2133 static int get_authorizer(struct ceph_connection *con,
2134                           void **buf, int *len, int *proto,
2135                           void **reply_buf, int *reply_len, int force_new)
2136 {
2137         struct ceph_osd *o = con->private;
2138         struct ceph_osd_client *osdc = o->o_osdc;
2139         struct ceph_auth_client *ac = osdc->client->monc.auth;
2140         int ret = 0;
2141
2142         if (force_new && o->o_authorizer) {
2143                 ac->ops->destroy_authorizer(ac, o->o_authorizer);
2144                 o->o_authorizer = NULL;
2145         }
2146         if (o->o_authorizer == NULL) {
2147                 ret = ac->ops->create_authorizer(
2148                         ac, CEPH_ENTITY_TYPE_OSD,
2149                         &o->o_authorizer,
2150                         &o->o_authorizer_buf,
2151                         &o->o_authorizer_buf_len,
2152                         &o->o_authorizer_reply_buf,
2153                         &o->o_authorizer_reply_buf_len);
2154                 if (ret)
2155                         return ret;
2156         }
2157
2158         *proto = ac->protocol;
2159         *buf = o->o_authorizer_buf;
2160         *len = o->o_authorizer_buf_len;
2161         *reply_buf = o->o_authorizer_reply_buf;
2162         *reply_len = o->o_authorizer_reply_buf_len;
2163         return 0;
2164 }
2165
2166
2167 static int verify_authorizer_reply(struct ceph_connection *con, int len)
2168 {
2169         struct ceph_osd *o = con->private;
2170         struct ceph_osd_client *osdc = o->o_osdc;
2171         struct ceph_auth_client *ac = osdc->client->monc.auth;
2172
2173         return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
2174 }
2175
2176 static int invalidate_authorizer(struct ceph_connection *con)
2177 {
2178         struct ceph_osd *o = con->private;
2179         struct ceph_osd_client *osdc = o->o_osdc;
2180         struct ceph_auth_client *ac = osdc->client->monc.auth;
2181
2182         if (ac->ops->invalidate_authorizer)
2183                 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
2184
2185         return ceph_monc_validate_auth(&osdc->client->monc);
2186 }
2187
2188 static const struct ceph_connection_operations osd_con_ops = {
2189         .get = get_osd_con,
2190         .put = put_osd_con,
2191         .dispatch = dispatch,
2192         .get_authorizer = get_authorizer,
2193         .verify_authorizer_reply = verify_authorizer_reply,
2194         .invalidate_authorizer = invalidate_authorizer,
2195         .alloc_msg = alloc_msg,
2196         .fault = osd_reset,
2197 };