rbd: don't allocate mon_addrs buffer in rbd_add()
[pandora-kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DRV_NAME "rbd"
45 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 /*
57  * An RBD device name will be "rbd#", where the "rbd" comes from
58  * RBD_DRV_NAME above, and # is a unique integer identifier.
59  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
60  * enough to hold all possible device names.
61  */
62 #define DEV_NAME_LEN            32
63 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
64
65 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
66
67 /*
68  * block device image metadata (in-memory version)
69  */
70 struct rbd_image_header {
71         u64 image_size;
72         char block_name[32];
73         __u8 obj_order;
74         __u8 crypt_type;
75         __u8 comp_type;
76         struct rw_semaphore snap_rwsem;
77         struct ceph_snap_context *snapc;
78         size_t snap_names_len;
79         u64 snap_seq;
80         u32 total_snaps;
81
82         char *snap_names;
83         u64 *snap_sizes;
84
85         u64 obj_version;
86 };
87
88 struct rbd_options {
89         int     notify_timeout;
90 };
91
92 /*
93  * an instance of the client.  multiple devices may share an rbd client.
94  */
95 struct rbd_client {
96         struct ceph_client      *client;
97         struct rbd_options      *rbd_opts;
98         struct kref             kref;
99         struct list_head        node;
100 };
101
102 /*
103  * a request completion status
104  */
105 struct rbd_req_status {
106         int done;
107         int rc;
108         u64 bytes;
109 };
110
111 /*
112  * a collection of requests
113  */
114 struct rbd_req_coll {
115         int                     total;
116         int                     num_done;
117         struct kref             kref;
118         struct rbd_req_status   status[0];
119 };
120
121 /*
122  * a single io request
123  */
124 struct rbd_request {
125         struct request          *rq;            /* blk layer request */
126         struct bio              *bio;           /* cloned bio */
127         struct page             **pages;        /* list of used pages */
128         u64                     len;
129         int                     coll_index;
130         struct rbd_req_coll     *coll;
131 };
132
133 struct rbd_snap {
134         struct  device          dev;
135         const char              *name;
136         size_t                  size;
137         struct list_head        node;
138         u64                     id;
139 };
140
141 /*
142  * a single device
143  */
144 struct rbd_device {
145         int                     id;             /* blkdev unique id */
146
147         int                     major;          /* blkdev assigned major */
148         struct gendisk          *disk;          /* blkdev's gendisk and rq */
149         struct request_queue    *q;
150
151         struct rbd_client       *rbd_client;
152
153         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
154
155         spinlock_t              lock;           /* queue lock */
156
157         struct rbd_image_header header;
158         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
159         int                     obj_len;
160         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
161         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
162         int                     poolid;
163
164         struct ceph_osd_event   *watch_event;
165         struct ceph_osd_request *watch_request;
166
167         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
168         u32 cur_snap;   /* index+1 of current snapshot within snap context
169                            0 - for the head */
170         int read_only;
171
172         struct list_head        node;
173
174         /* list of snapshots */
175         struct list_head        snaps;
176
177         /* sysfs related */
178         struct device           dev;
179 };
180
181 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
182
183 static LIST_HEAD(rbd_dev_list);    /* devices */
184 static DEFINE_SPINLOCK(rbd_dev_list_lock);
185
186 static LIST_HEAD(rbd_client_list);              /* clients */
187 static DEFINE_SPINLOCK(rbd_client_list_lock);
188
189 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
190 static void rbd_dev_release(struct device *dev);
191 static ssize_t rbd_snap_add(struct device *dev,
192                             struct device_attribute *attr,
193                             const char *buf,
194                             size_t count);
195 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
196                                   struct rbd_snap *snap);
197
198 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
199                        size_t count);
200 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
201                           size_t count);
202
203 static struct bus_attribute rbd_bus_attrs[] = {
204         __ATTR(add, S_IWUSR, NULL, rbd_add),
205         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
206         __ATTR_NULL
207 };
208
209 static struct bus_type rbd_bus_type = {
210         .name           = "rbd",
211         .bus_attrs      = rbd_bus_attrs,
212 };
213
214 static void rbd_root_dev_release(struct device *dev)
215 {
216 }
217
218 static struct device rbd_root_dev = {
219         .init_name =    "rbd",
220         .release =      rbd_root_dev_release,
221 };
222
223
224 static struct rbd_device *dev_to_rbd(struct device *dev)
225 {
226         return container_of(dev, struct rbd_device, dev);
227 }
228
229 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
230 {
231         return get_device(&rbd_dev->dev);
232 }
233
234 static void rbd_put_dev(struct rbd_device *rbd_dev)
235 {
236         put_device(&rbd_dev->dev);
237 }
238
239 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
240
241 static int rbd_open(struct block_device *bdev, fmode_t mode)
242 {
243         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
244
245         rbd_get_dev(rbd_dev);
246
247         set_device_ro(bdev, rbd_dev->read_only);
248
249         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250                 return -EROFS;
251
252         return 0;
253 }
254
255 static int rbd_release(struct gendisk *disk, fmode_t mode)
256 {
257         struct rbd_device *rbd_dev = disk->private_data;
258
259         rbd_put_dev(rbd_dev);
260
261         return 0;
262 }
263
264 static const struct block_device_operations rbd_bd_ops = {
265         .owner                  = THIS_MODULE,
266         .open                   = rbd_open,
267         .release                = rbd_release,
268 };
269
270 /*
271  * Initialize an rbd client instance.
272  * We own *opt.
273  */
274 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
275                                             struct rbd_options *rbd_opts)
276 {
277         struct rbd_client *rbdc;
278         int ret = -ENOMEM;
279
280         dout("rbd_client_create\n");
281         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
282         if (!rbdc)
283                 goto out_opt;
284
285         kref_init(&rbdc->kref);
286         INIT_LIST_HEAD(&rbdc->node);
287
288         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
289
290         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
291         if (IS_ERR(rbdc->client))
292                 goto out_mutex;
293         opt = NULL; /* Now rbdc->client is responsible for opt */
294
295         ret = ceph_open_session(rbdc->client);
296         if (ret < 0)
297                 goto out_err;
298
299         rbdc->rbd_opts = rbd_opts;
300
301         spin_lock(&rbd_client_list_lock);
302         list_add_tail(&rbdc->node, &rbd_client_list);
303         spin_unlock(&rbd_client_list_lock);
304
305         mutex_unlock(&ctl_mutex);
306
307         dout("rbd_client_create created %p\n", rbdc);
308         return rbdc;
309
310 out_err:
311         ceph_destroy_client(rbdc->client);
312 out_mutex:
313         mutex_unlock(&ctl_mutex);
314         kfree(rbdc);
315 out_opt:
316         if (opt)
317                 ceph_destroy_options(opt);
318         return ERR_PTR(ret);
319 }
320
321 /*
322  * Find a ceph client with specific addr and configuration.
323  */
324 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
325 {
326         struct rbd_client *client_node;
327
328         if (opt->flags & CEPH_OPT_NOSHARE)
329                 return NULL;
330
331         list_for_each_entry(client_node, &rbd_client_list, node)
332                 if (ceph_compare_options(opt, client_node->client) == 0)
333                         return client_node;
334         return NULL;
335 }
336
337 /*
338  * mount options
339  */
340 enum {
341         Opt_notify_timeout,
342         Opt_last_int,
343         /* int args above */
344         Opt_last_string,
345         /* string args above */
346 };
347
348 static match_table_t rbdopt_tokens = {
349         {Opt_notify_timeout, "notify_timeout=%d"},
350         /* int args above */
351         /* string args above */
352         {-1, NULL}
353 };
354
355 static int parse_rbd_opts_token(char *c, void *private)
356 {
357         struct rbd_options *rbdopt = private;
358         substring_t argstr[MAX_OPT_ARGS];
359         int token, intval, ret;
360
361         token = match_token(c, rbdopt_tokens, argstr);
362         if (token < 0)
363                 return -EINVAL;
364
365         if (token < Opt_last_int) {
366                 ret = match_int(&argstr[0], &intval);
367                 if (ret < 0) {
368                         pr_err("bad mount option arg (not int) "
369                                "at '%s'\n", c);
370                         return ret;
371                 }
372                 dout("got int token %d val %d\n", token, intval);
373         } else if (token > Opt_last_int && token < Opt_last_string) {
374                 dout("got string token %d val %s\n", token,
375                      argstr[0].from);
376         } else {
377                 dout("got token %d\n", token);
378         }
379
380         switch (token) {
381         case Opt_notify_timeout:
382                 rbdopt->notify_timeout = intval;
383                 break;
384         default:
385                 BUG_ON(token);
386         }
387         return 0;
388 }
389
390 /*
391  * Get a ceph client with specific addr and configuration, if one does
392  * not exist create it.
393  */
394 static struct rbd_client *rbd_get_client(const char *mon_addr,
395                                          size_t mon_addr_len,
396                                          char *options)
397 {
398         struct rbd_client *rbdc;
399         struct ceph_options *opt;
400         struct rbd_options *rbd_opts;
401
402         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
403         if (!rbd_opts)
404                 return ERR_PTR(-ENOMEM);
405
406         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
407
408         opt = ceph_parse_options(options, mon_addr,
409                                 mon_addr + mon_addr_len,
410                                 parse_rbd_opts_token, rbd_opts);
411         if (IS_ERR(opt)) {
412                 kfree(rbd_opts);
413                 return ERR_CAST(opt);
414         }
415
416         spin_lock(&rbd_client_list_lock);
417         rbdc = __rbd_client_find(opt);
418         if (rbdc) {
419                 /* using an existing client */
420                 kref_get(&rbdc->kref);
421                 spin_unlock(&rbd_client_list_lock);
422
423                 ceph_destroy_options(opt);
424                 kfree(rbd_opts);
425
426                 return rbdc;
427         }
428         spin_unlock(&rbd_client_list_lock);
429
430         rbdc = rbd_client_create(opt, rbd_opts);
431
432         if (IS_ERR(rbdc))
433                 kfree(rbd_opts);
434
435         return rbdc;
436 }
437
438 /*
439  * Destroy ceph client
440  *
441  * Caller must hold rbd_client_list_lock.
442  */
443 static void rbd_client_release(struct kref *kref)
444 {
445         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
446
447         dout("rbd_release_client %p\n", rbdc);
448         list_del(&rbdc->node);
449
450         ceph_destroy_client(rbdc->client);
451         kfree(rbdc->rbd_opts);
452         kfree(rbdc);
453 }
454
455 /*
456  * Drop reference to ceph client node. If it's not referenced anymore, release
457  * it.
458  */
459 static void rbd_put_client(struct rbd_device *rbd_dev)
460 {
461         spin_lock(&rbd_client_list_lock);
462         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
463         spin_unlock(&rbd_client_list_lock);
464         rbd_dev->rbd_client = NULL;
465 }
466
467 /*
468  * Destroy requests collection
469  */
470 static void rbd_coll_release(struct kref *kref)
471 {
472         struct rbd_req_coll *coll =
473                 container_of(kref, struct rbd_req_coll, kref);
474
475         dout("rbd_coll_release %p\n", coll);
476         kfree(coll);
477 }
478
479 /*
480  * Create a new header structure, translate header format from the on-disk
481  * header.
482  */
483 static int rbd_header_from_disk(struct rbd_image_header *header,
484                                  struct rbd_image_header_ondisk *ondisk,
485                                  int allocated_snaps,
486                                  gfp_t gfp_flags)
487 {
488         int i;
489         u32 snap_count = le32_to_cpu(ondisk->snap_count);
490         int ret = -ENOMEM;
491
492         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
493                 return -ENXIO;
494
495         init_rwsem(&header->snap_rwsem);
496         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
497         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
498                                 snap_count * sizeof (*ondisk),
499                                 gfp_flags);
500         if (!header->snapc)
501                 return -ENOMEM;
502         if (snap_count) {
503                 header->snap_names = kmalloc(header->snap_names_len,
504                                              GFP_KERNEL);
505                 if (!header->snap_names)
506                         goto err_snapc;
507                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
508                                              GFP_KERNEL);
509                 if (!header->snap_sizes)
510                         goto err_names;
511         } else {
512                 header->snap_names = NULL;
513                 header->snap_sizes = NULL;
514         }
515         memcpy(header->block_name, ondisk->block_name,
516                sizeof(ondisk->block_name));
517
518         header->image_size = le64_to_cpu(ondisk->image_size);
519         header->obj_order = ondisk->options.order;
520         header->crypt_type = ondisk->options.crypt_type;
521         header->comp_type = ondisk->options.comp_type;
522
523         atomic_set(&header->snapc->nref, 1);
524         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
525         header->snapc->num_snaps = snap_count;
526         header->total_snaps = snap_count;
527
528         if (snap_count && allocated_snaps == snap_count) {
529                 for (i = 0; i < snap_count; i++) {
530                         header->snapc->snaps[i] =
531                                 le64_to_cpu(ondisk->snaps[i].id);
532                         header->snap_sizes[i] =
533                                 le64_to_cpu(ondisk->snaps[i].image_size);
534                 }
535
536                 /* copy snapshot names */
537                 memcpy(header->snap_names, &ondisk->snaps[i],
538                         header->snap_names_len);
539         }
540
541         return 0;
542
543 err_names:
544         kfree(header->snap_names);
545 err_snapc:
546         kfree(header->snapc);
547         return ret;
548 }
549
550 static int snap_index(struct rbd_image_header *header, int snap_num)
551 {
552         return header->total_snaps - snap_num;
553 }
554
555 static u64 cur_snap_id(struct rbd_device *rbd_dev)
556 {
557         struct rbd_image_header *header = &rbd_dev->header;
558
559         if (!rbd_dev->cur_snap)
560                 return 0;
561
562         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
563 }
564
565 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
566                         u64 *seq, u64 *size)
567 {
568         int i;
569         char *p = header->snap_names;
570
571         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
572                 if (strcmp(snap_name, p) == 0)
573                         break;
574         }
575         if (i == header->total_snaps)
576                 return -ENOENT;
577         if (seq)
578                 *seq = header->snapc->snaps[i];
579
580         if (size)
581                 *size = header->snap_sizes[i];
582
583         return i;
584 }
585
586 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
587 {
588         struct rbd_image_header *header = &dev->header;
589         struct ceph_snap_context *snapc = header->snapc;
590         int ret = -ENOENT;
591
592         BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
593
594         down_write(&header->snap_rwsem);
595
596         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
597                     sizeof (RBD_SNAP_HEAD_NAME))) {
598                 if (header->total_snaps)
599                         snapc->seq = header->snap_seq;
600                 else
601                         snapc->seq = 0;
602                 dev->cur_snap = 0;
603                 dev->read_only = 0;
604                 if (size)
605                         *size = header->image_size;
606         } else {
607                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
608                 if (ret < 0)
609                         goto done;
610
611                 dev->cur_snap = header->total_snaps - ret;
612                 dev->read_only = 1;
613         }
614
615         ret = 0;
616 done:
617         up_write(&header->snap_rwsem);
618         return ret;
619 }
620
621 static void rbd_header_free(struct rbd_image_header *header)
622 {
623         kfree(header->snapc);
624         kfree(header->snap_names);
625         kfree(header->snap_sizes);
626 }
627
628 /*
629  * get the actual striped segment name, offset and length
630  */
631 static u64 rbd_get_segment(struct rbd_image_header *header,
632                            const char *block_name,
633                            u64 ofs, u64 len,
634                            char *seg_name, u64 *segofs)
635 {
636         u64 seg = ofs >> header->obj_order;
637
638         if (seg_name)
639                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
640                          "%s.%012llx", block_name, seg);
641
642         ofs = ofs & ((1 << header->obj_order) - 1);
643         len = min_t(u64, len, (1 << header->obj_order) - ofs);
644
645         if (segofs)
646                 *segofs = ofs;
647
648         return len;
649 }
650
651 static int rbd_get_num_segments(struct rbd_image_header *header,
652                                 u64 ofs, u64 len)
653 {
654         u64 start_seg = ofs >> header->obj_order;
655         u64 end_seg = (ofs + len - 1) >> header->obj_order;
656         return end_seg - start_seg + 1;
657 }
658
659 /*
660  * returns the size of an object in the image
661  */
662 static u64 rbd_obj_bytes(struct rbd_image_header *header)
663 {
664         return 1 << header->obj_order;
665 }
666
667 /*
668  * bio helpers
669  */
670
671 static void bio_chain_put(struct bio *chain)
672 {
673         struct bio *tmp;
674
675         while (chain) {
676                 tmp = chain;
677                 chain = chain->bi_next;
678                 bio_put(tmp);
679         }
680 }
681
682 /*
683  * zeros a bio chain, starting at specific offset
684  */
685 static void zero_bio_chain(struct bio *chain, int start_ofs)
686 {
687         struct bio_vec *bv;
688         unsigned long flags;
689         void *buf;
690         int i;
691         int pos = 0;
692
693         while (chain) {
694                 bio_for_each_segment(bv, chain, i) {
695                         if (pos + bv->bv_len > start_ofs) {
696                                 int remainder = max(start_ofs - pos, 0);
697                                 buf = bvec_kmap_irq(bv, &flags);
698                                 memset(buf + remainder, 0,
699                                        bv->bv_len - remainder);
700                                 bvec_kunmap_irq(buf, &flags);
701                         }
702                         pos += bv->bv_len;
703                 }
704
705                 chain = chain->bi_next;
706         }
707 }
708
709 /*
710  * bio_chain_clone - clone a chain of bios up to a certain length.
711  * might return a bio_pair that will need to be released.
712  */
713 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
714                                    struct bio_pair **bp,
715                                    int len, gfp_t gfpmask)
716 {
717         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
718         int total = 0;
719
720         if (*bp) {
721                 bio_pair_release(*bp);
722                 *bp = NULL;
723         }
724
725         while (old_chain && (total < len)) {
726                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
727                 if (!tmp)
728                         goto err_out;
729
730                 if (total + old_chain->bi_size > len) {
731                         struct bio_pair *bp;
732
733                         /*
734                          * this split can only happen with a single paged bio,
735                          * split_bio will BUG_ON if this is not the case
736                          */
737                         dout("bio_chain_clone split! total=%d remaining=%d"
738                              "bi_size=%d\n",
739                              (int)total, (int)len-total,
740                              (int)old_chain->bi_size);
741
742                         /* split the bio. We'll release it either in the next
743                            call, or it will have to be released outside */
744                         bp = bio_split(old_chain, (len - total) / 512ULL);
745                         if (!bp)
746                                 goto err_out;
747
748                         __bio_clone(tmp, &bp->bio1);
749
750                         *next = &bp->bio2;
751                 } else {
752                         __bio_clone(tmp, old_chain);
753                         *next = old_chain->bi_next;
754                 }
755
756                 tmp->bi_bdev = NULL;
757                 gfpmask &= ~__GFP_WAIT;
758                 tmp->bi_next = NULL;
759
760                 if (!new_chain) {
761                         new_chain = tail = tmp;
762                 } else {
763                         tail->bi_next = tmp;
764                         tail = tmp;
765                 }
766                 old_chain = old_chain->bi_next;
767
768                 total += tmp->bi_size;
769         }
770
771         BUG_ON(total < len);
772
773         if (tail)
774                 tail->bi_next = NULL;
775
776         *old = old_chain;
777
778         return new_chain;
779
780 err_out:
781         dout("bio_chain_clone with err\n");
782         bio_chain_put(new_chain);
783         return NULL;
784 }
785
786 /*
787  * helpers for osd request op vectors.
788  */
789 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
790                             int num_ops,
791                             int opcode,
792                             u32 payload_len)
793 {
794         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
795                        GFP_NOIO);
796         if (!*ops)
797                 return -ENOMEM;
798         (*ops)[0].op = opcode;
799         /*
800          * op extent offset and length will be set later on
801          * in calc_raw_layout()
802          */
803         (*ops)[0].payload_len = payload_len;
804         return 0;
805 }
806
807 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
808 {
809         kfree(ops);
810 }
811
812 static void rbd_coll_end_req_index(struct request *rq,
813                                    struct rbd_req_coll *coll,
814                                    int index,
815                                    int ret, u64 len)
816 {
817         struct request_queue *q;
818         int min, max, i;
819
820         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
821              coll, index, ret, len);
822
823         if (!rq)
824                 return;
825
826         if (!coll) {
827                 blk_end_request(rq, ret, len);
828                 return;
829         }
830
831         q = rq->q;
832
833         spin_lock_irq(q->queue_lock);
834         coll->status[index].done = 1;
835         coll->status[index].rc = ret;
836         coll->status[index].bytes = len;
837         max = min = coll->num_done;
838         while (max < coll->total && coll->status[max].done)
839                 max++;
840
841         for (i = min; i<max; i++) {
842                 __blk_end_request(rq, coll->status[i].rc,
843                                   coll->status[i].bytes);
844                 coll->num_done++;
845                 kref_put(&coll->kref, rbd_coll_release);
846         }
847         spin_unlock_irq(q->queue_lock);
848 }
849
850 static void rbd_coll_end_req(struct rbd_request *req,
851                              int ret, u64 len)
852 {
853         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
854 }
855
856 /*
857  * Send ceph osd request
858  */
859 static int rbd_do_request(struct request *rq,
860                           struct rbd_device *dev,
861                           struct ceph_snap_context *snapc,
862                           u64 snapid,
863                           const char *obj, u64 ofs, u64 len,
864                           struct bio *bio,
865                           struct page **pages,
866                           int num_pages,
867                           int flags,
868                           struct ceph_osd_req_op *ops,
869                           int num_reply,
870                           struct rbd_req_coll *coll,
871                           int coll_index,
872                           void (*rbd_cb)(struct ceph_osd_request *req,
873                                          struct ceph_msg *msg),
874                           struct ceph_osd_request **linger_req,
875                           u64 *ver)
876 {
877         struct ceph_osd_request *req;
878         struct ceph_file_layout *layout;
879         int ret;
880         u64 bno;
881         struct timespec mtime = CURRENT_TIME;
882         struct rbd_request *req_data;
883         struct ceph_osd_request_head *reqhead;
884         struct rbd_image_header *header = &dev->header;
885         struct ceph_osd_client *osdc;
886
887         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
888         if (!req_data) {
889                 if (coll)
890                         rbd_coll_end_req_index(rq, coll, coll_index,
891                                                -ENOMEM, len);
892                 return -ENOMEM;
893         }
894
895         if (coll) {
896                 req_data->coll = coll;
897                 req_data->coll_index = coll_index;
898         }
899
900         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
901
902         down_read(&header->snap_rwsem);
903
904         osdc = &dev->rbd_client->client->osdc;
905         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
906                                         false, GFP_NOIO, pages, bio);
907         if (!req) {
908                 up_read(&header->snap_rwsem);
909                 ret = -ENOMEM;
910                 goto done_pages;
911         }
912
913         req->r_callback = rbd_cb;
914
915         req_data->rq = rq;
916         req_data->bio = bio;
917         req_data->pages = pages;
918         req_data->len = len;
919
920         req->r_priv = req_data;
921
922         reqhead = req->r_request->front.iov_base;
923         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
925         strncpy(req->r_oid, obj, sizeof(req->r_oid));
926         req->r_oid_len = strlen(req->r_oid);
927
928         layout = &req->r_file_layout;
929         memset(layout, 0, sizeof(*layout));
930         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_stripe_count = cpu_to_le32(1);
932         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933         layout->fl_pg_preferred = cpu_to_le32(-1);
934         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
935         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
936                                 req, ops);
937
938         ceph_osdc_build_request(req, ofs, &len,
939                                 ops,
940                                 snapc,
941                                 &mtime,
942                                 req->r_oid, req->r_oid_len);
943         up_read(&header->snap_rwsem);
944
945         if (linger_req) {
946                 ceph_osdc_set_request_linger(osdc, req);
947                 *linger_req = req;
948         }
949
950         ret = ceph_osdc_start_request(osdc, req, false);
951         if (ret < 0)
952                 goto done_err;
953
954         if (!rbd_cb) {
955                 ret = ceph_osdc_wait_request(osdc, req);
956                 if (ver)
957                         *ver = le64_to_cpu(req->r_reassert_version.version);
958                 dout("reassert_ver=%lld\n",
959                      le64_to_cpu(req->r_reassert_version.version));
960                 ceph_osdc_put_request(req);
961         }
962         return ret;
963
964 done_err:
965         bio_chain_put(req_data->bio);
966         ceph_osdc_put_request(req);
967 done_pages:
968         rbd_coll_end_req(req_data, ret, len);
969         kfree(req_data);
970         return ret;
971 }
972
973 /*
974  * Ceph osd op callback
975  */
976 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
977 {
978         struct rbd_request *req_data = req->r_priv;
979         struct ceph_osd_reply_head *replyhead;
980         struct ceph_osd_op *op;
981         __s32 rc;
982         u64 bytes;
983         int read_op;
984
985         /* parse reply */
986         replyhead = msg->front.iov_base;
987         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
988         op = (void *)(replyhead + 1);
989         rc = le32_to_cpu(replyhead->result);
990         bytes = le64_to_cpu(op->extent.length);
991         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
992
993         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
994
995         if (rc == -ENOENT && read_op) {
996                 zero_bio_chain(req_data->bio, 0);
997                 rc = 0;
998         } else if (rc == 0 && read_op && bytes < req_data->len) {
999                 zero_bio_chain(req_data->bio, bytes);
1000                 bytes = req_data->len;
1001         }
1002
1003         rbd_coll_end_req(req_data, rc, bytes);
1004
1005         if (req_data->bio)
1006                 bio_chain_put(req_data->bio);
1007
1008         ceph_osdc_put_request(req);
1009         kfree(req_data);
1010 }
1011
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1013 {
1014         ceph_osdc_put_request(req);
1015 }
1016
1017 /*
1018  * Do a synchronous ceph osd operation
1019  */
1020 static int rbd_req_sync_op(struct rbd_device *dev,
1021                            struct ceph_snap_context *snapc,
1022                            u64 snapid,
1023                            int opcode,
1024                            int flags,
1025                            struct ceph_osd_req_op *orig_ops,
1026                            int num_reply,
1027                            const char *obj,
1028                            u64 ofs, u64 len,
1029                            char *buf,
1030                            struct ceph_osd_request **linger_req,
1031                            u64 *ver)
1032 {
1033         int ret;
1034         struct page **pages;
1035         int num_pages;
1036         struct ceph_osd_req_op *ops = orig_ops;
1037         u32 payload_len;
1038
1039         num_pages = calc_pages_for(ofs , len);
1040         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1041         if (IS_ERR(pages))
1042                 return PTR_ERR(pages);
1043
1044         if (!orig_ops) {
1045                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1046                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1047                 if (ret < 0)
1048                         goto done;
1049
1050                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1052                         if (ret < 0)
1053                                 goto done_ops;
1054                 }
1055         }
1056
1057         ret = rbd_do_request(NULL, dev, snapc, snapid,
1058                           obj, ofs, len, NULL,
1059                           pages, num_pages,
1060                           flags,
1061                           ops,
1062                           2,
1063                           NULL, 0,
1064                           NULL,
1065                           linger_req, ver);
1066         if (ret < 0)
1067                 goto done_ops;
1068
1069         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071
1072 done_ops:
1073         if (!orig_ops)
1074                 rbd_destroy_ops(ops);
1075 done:
1076         ceph_release_page_vector(pages, num_pages);
1077         return ret;
1078 }
1079
1080 /*
1081  * Do an asynchronous ceph osd operation
1082  */
1083 static int rbd_do_op(struct request *rq,
1084                      struct rbd_device *rbd_dev ,
1085                      struct ceph_snap_context *snapc,
1086                      u64 snapid,
1087                      int opcode, int flags, int num_reply,
1088                      u64 ofs, u64 len,
1089                      struct bio *bio,
1090                      struct rbd_req_coll *coll,
1091                      int coll_index)
1092 {
1093         char *seg_name;
1094         u64 seg_ofs;
1095         u64 seg_len;
1096         int ret;
1097         struct ceph_osd_req_op *ops;
1098         u32 payload_len;
1099
1100         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101         if (!seg_name)
1102                 return -ENOMEM;
1103
1104         seg_len = rbd_get_segment(&rbd_dev->header,
1105                                   rbd_dev->header.block_name,
1106                                   ofs, len,
1107                                   seg_name, &seg_ofs);
1108
1109         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1110
1111         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1112         if (ret < 0)
1113                 goto done;
1114
1115         /* we've taken care of segment sizes earlier when we
1116            cloned the bios. We should never have a segment
1117            truncated at this point */
1118         BUG_ON(seg_len < len);
1119
1120         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121                              seg_name, seg_ofs, seg_len,
1122                              bio,
1123                              NULL, 0,
1124                              flags,
1125                              ops,
1126                              num_reply,
1127                              coll, coll_index,
1128                              rbd_req_cb, 0, NULL);
1129
1130         rbd_destroy_ops(ops);
1131 done:
1132         kfree(seg_name);
1133         return ret;
1134 }
1135
1136 /*
1137  * Request async osd write
1138  */
1139 static int rbd_req_write(struct request *rq,
1140                          struct rbd_device *rbd_dev,
1141                          struct ceph_snap_context *snapc,
1142                          u64 ofs, u64 len,
1143                          struct bio *bio,
1144                          struct rbd_req_coll *coll,
1145                          int coll_index)
1146 {
1147         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1148                          CEPH_OSD_OP_WRITE,
1149                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1150                          2,
1151                          ofs, len, bio, coll, coll_index);
1152 }
1153
1154 /*
1155  * Request async osd read
1156  */
1157 static int rbd_req_read(struct request *rq,
1158                          struct rbd_device *rbd_dev,
1159                          u64 snapid,
1160                          u64 ofs, u64 len,
1161                          struct bio *bio,
1162                          struct rbd_req_coll *coll,
1163                          int coll_index)
1164 {
1165         return rbd_do_op(rq, rbd_dev, NULL,
1166                          (snapid ? snapid : CEPH_NOSNAP),
1167                          CEPH_OSD_OP_READ,
1168                          CEPH_OSD_FLAG_READ,
1169                          2,
1170                          ofs, len, bio, coll, coll_index);
1171 }
1172
1173 /*
1174  * Request sync osd read
1175  */
1176 static int rbd_req_sync_read(struct rbd_device *dev,
1177                           struct ceph_snap_context *snapc,
1178                           u64 snapid,
1179                           const char *obj,
1180                           u64 ofs, u64 len,
1181                           char *buf,
1182                           u64 *ver)
1183 {
1184         return rbd_req_sync_op(dev, NULL,
1185                                (snapid ? snapid : CEPH_NOSNAP),
1186                                CEPH_OSD_OP_READ,
1187                                CEPH_OSD_FLAG_READ,
1188                                NULL,
1189                                1, obj, ofs, len, buf, NULL, ver);
1190 }
1191
1192 /*
1193  * Request sync osd watch
1194  */
1195 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1196                                    u64 ver,
1197                                    u64 notify_id,
1198                                    const char *obj)
1199 {
1200         struct ceph_osd_req_op *ops;
1201         struct page **pages = NULL;
1202         int ret;
1203
1204         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1205         if (ret < 0)
1206                 return ret;
1207
1208         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1209         ops[0].watch.cookie = notify_id;
1210         ops[0].watch.flag = 0;
1211
1212         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1213                           obj, 0, 0, NULL,
1214                           pages, 0,
1215                           CEPH_OSD_FLAG_READ,
1216                           ops,
1217                           1,
1218                           NULL, 0,
1219                           rbd_simple_req_cb, 0, NULL);
1220
1221         rbd_destroy_ops(ops);
1222         return ret;
1223 }
1224
1225 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1226 {
1227         struct rbd_device *dev = (struct rbd_device *)data;
1228         int rc;
1229
1230         if (!dev)
1231                 return;
1232
1233         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1234                 notify_id, (int)opcode);
1235         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1236         rc = __rbd_update_snaps(dev);
1237         mutex_unlock(&ctl_mutex);
1238         if (rc)
1239                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1240                            " update snaps: %d\n", dev->major, rc);
1241
1242         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1243 }
1244
1245 /*
1246  * Request sync osd watch
1247  */
1248 static int rbd_req_sync_watch(struct rbd_device *dev,
1249                               const char *obj,
1250                               u64 ver)
1251 {
1252         struct ceph_osd_req_op *ops;
1253         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1254
1255         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1256         if (ret < 0)
1257                 return ret;
1258
1259         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1260                                      (void *)dev, &dev->watch_event);
1261         if (ret < 0)
1262                 goto fail;
1263
1264         ops[0].watch.ver = cpu_to_le64(ver);
1265         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1266         ops[0].watch.flag = 1;
1267
1268         ret = rbd_req_sync_op(dev, NULL,
1269                               CEPH_NOSNAP,
1270                               0,
1271                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1272                               ops,
1273                               1, obj, 0, 0, NULL,
1274                               &dev->watch_request, NULL);
1275
1276         if (ret < 0)
1277                 goto fail_event;
1278
1279         rbd_destroy_ops(ops);
1280         return 0;
1281
1282 fail_event:
1283         ceph_osdc_cancel_event(dev->watch_event);
1284         dev->watch_event = NULL;
1285 fail:
1286         rbd_destroy_ops(ops);
1287         return ret;
1288 }
1289
1290 /*
1291  * Request sync osd unwatch
1292  */
1293 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1294                                 const char *obj)
1295 {
1296         struct ceph_osd_req_op *ops;
1297
1298         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1299         if (ret < 0)
1300                 return ret;
1301
1302         ops[0].watch.ver = 0;
1303         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1304         ops[0].watch.flag = 0;
1305
1306         ret = rbd_req_sync_op(dev, NULL,
1307                               CEPH_NOSNAP,
1308                               0,
1309                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1310                               ops,
1311                               1, obj, 0, 0, NULL, NULL, NULL);
1312
1313         rbd_destroy_ops(ops);
1314         ceph_osdc_cancel_event(dev->watch_event);
1315         dev->watch_event = NULL;
1316         return ret;
1317 }
1318
1319 struct rbd_notify_info {
1320         struct rbd_device *dev;
1321 };
1322
1323 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1324 {
1325         struct rbd_device *dev = (struct rbd_device *)data;
1326         if (!dev)
1327                 return;
1328
1329         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1330                 notify_id, (int)opcode);
1331 }
1332
1333 /*
1334  * Request sync osd notify
1335  */
1336 static int rbd_req_sync_notify(struct rbd_device *dev,
1337                           const char *obj)
1338 {
1339         struct ceph_osd_req_op *ops;
1340         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1341         struct ceph_osd_event *event;
1342         struct rbd_notify_info info;
1343         int payload_len = sizeof(u32) + sizeof(u32);
1344         int ret;
1345
1346         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1347         if (ret < 0)
1348                 return ret;
1349
1350         info.dev = dev;
1351
1352         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1353                                      (void *)&info, &event);
1354         if (ret < 0)
1355                 goto fail;
1356
1357         ops[0].watch.ver = 1;
1358         ops[0].watch.flag = 1;
1359         ops[0].watch.cookie = event->cookie;
1360         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1361         ops[0].watch.timeout = 12;
1362
1363         ret = rbd_req_sync_op(dev, NULL,
1364                                CEPH_NOSNAP,
1365                                0,
1366                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1367                                ops,
1368                                1, obj, 0, 0, NULL, NULL, NULL);
1369         if (ret < 0)
1370                 goto fail_event;
1371
1372         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1373         dout("ceph_osdc_wait_event returned %d\n", ret);
1374         rbd_destroy_ops(ops);
1375         return 0;
1376
1377 fail_event:
1378         ceph_osdc_cancel_event(event);
1379 fail:
1380         rbd_destroy_ops(ops);
1381         return ret;
1382 }
1383
1384 /*
1385  * Request sync osd read
1386  */
1387 static int rbd_req_sync_exec(struct rbd_device *dev,
1388                              const char *obj,
1389                              const char *cls,
1390                              const char *method,
1391                              const char *data,
1392                              int len,
1393                              u64 *ver)
1394 {
1395         struct ceph_osd_req_op *ops;
1396         int cls_len = strlen(cls);
1397         int method_len = strlen(method);
1398         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1399                                     cls_len + method_len + len);
1400         if (ret < 0)
1401                 return ret;
1402
1403         ops[0].cls.class_name = cls;
1404         ops[0].cls.class_len = (__u8)cls_len;
1405         ops[0].cls.method_name = method;
1406         ops[0].cls.method_len = (__u8)method_len;
1407         ops[0].cls.argc = 0;
1408         ops[0].cls.indata = data;
1409         ops[0].cls.indata_len = len;
1410
1411         ret = rbd_req_sync_op(dev, NULL,
1412                                CEPH_NOSNAP,
1413                                0,
1414                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1415                                ops,
1416                                1, obj, 0, 0, NULL, NULL, ver);
1417
1418         rbd_destroy_ops(ops);
1419
1420         dout("cls_exec returned %d\n", ret);
1421         return ret;
1422 }
1423
1424 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1425 {
1426         struct rbd_req_coll *coll =
1427                         kzalloc(sizeof(struct rbd_req_coll) +
1428                                 sizeof(struct rbd_req_status) * num_reqs,
1429                                 GFP_ATOMIC);
1430
1431         if (!coll)
1432                 return NULL;
1433         coll->total = num_reqs;
1434         kref_init(&coll->kref);
1435         return coll;
1436 }
1437
1438 /*
1439  * block device queue callback
1440  */
1441 static void rbd_rq_fn(struct request_queue *q)
1442 {
1443         struct rbd_device *rbd_dev = q->queuedata;
1444         struct request *rq;
1445         struct bio_pair *bp = NULL;
1446
1447         rq = blk_fetch_request(q);
1448
1449         while (1) {
1450                 struct bio *bio;
1451                 struct bio *rq_bio, *next_bio = NULL;
1452                 bool do_write;
1453                 int size, op_size = 0;
1454                 u64 ofs;
1455                 int num_segs, cur_seg = 0;
1456                 struct rbd_req_coll *coll;
1457
1458                 /* peek at request from block layer */
1459                 if (!rq)
1460                         break;
1461
1462                 dout("fetched request\n");
1463
1464                 /* filter out block requests we don't understand */
1465                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466                         __blk_end_request_all(rq, 0);
1467                         goto next;
1468                 }
1469
1470                 /* deduce our operation (read, write) */
1471                 do_write = (rq_data_dir(rq) == WRITE);
1472
1473                 size = blk_rq_bytes(rq);
1474                 ofs = blk_rq_pos(rq) * 512ULL;
1475                 rq_bio = rq->bio;
1476                 if (do_write && rbd_dev->read_only) {
1477                         __blk_end_request_all(rq, -EROFS);
1478                         goto next;
1479                 }
1480
1481                 spin_unlock_irq(q->queue_lock);
1482
1483                 dout("%s 0x%x bytes at 0x%llx\n",
1484                      do_write ? "write" : "read",
1485                      size, blk_rq_pos(rq) * 512ULL);
1486
1487                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1488                 coll = rbd_alloc_coll(num_segs);
1489                 if (!coll) {
1490                         spin_lock_irq(q->queue_lock);
1491                         __blk_end_request_all(rq, -ENOMEM);
1492                         goto next;
1493                 }
1494
1495                 do {
1496                         /* a bio clone to be passed down to OSD req */
1497                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1498                         op_size = rbd_get_segment(&rbd_dev->header,
1499                                                   rbd_dev->header.block_name,
1500                                                   ofs, size,
1501                                                   NULL, NULL);
1502                         kref_get(&coll->kref);
1503                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1504                                               op_size, GFP_ATOMIC);
1505                         if (!bio) {
1506                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1507                                                        -ENOMEM, op_size);
1508                                 goto next_seg;
1509                         }
1510
1511
1512                         /* init OSD command: write or read */
1513                         if (do_write)
1514                                 rbd_req_write(rq, rbd_dev,
1515                                               rbd_dev->header.snapc,
1516                                               ofs,
1517                                               op_size, bio,
1518                                               coll, cur_seg);
1519                         else
1520                                 rbd_req_read(rq, rbd_dev,
1521                                              cur_snap_id(rbd_dev),
1522                                              ofs,
1523                                              op_size, bio,
1524                                              coll, cur_seg);
1525
1526 next_seg:
1527                         size -= op_size;
1528                         ofs += op_size;
1529
1530                         cur_seg++;
1531                         rq_bio = next_bio;
1532                 } while (size > 0);
1533                 kref_put(&coll->kref, rbd_coll_release);
1534
1535                 if (bp)
1536                         bio_pair_release(bp);
1537                 spin_lock_irq(q->queue_lock);
1538 next:
1539                 rq = blk_fetch_request(q);
1540         }
1541 }
1542
1543 /*
1544  * a queue callback. Makes sure that we don't create a bio that spans across
1545  * multiple osd objects. One exception would be with a single page bios,
1546  * which we handle later at bio_chain_clone
1547  */
1548 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1549                           struct bio_vec *bvec)
1550 {
1551         struct rbd_device *rbd_dev = q->queuedata;
1552         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1553         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1554         unsigned int bio_sectors = bmd->bi_size >> 9;
1555         int max;
1556
1557         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1558                                  + bio_sectors)) << 9;
1559         if (max < 0)
1560                 max = 0; /* bio_add cannot handle a negative return */
1561         if (max <= bvec->bv_len && bio_sectors == 0)
1562                 return bvec->bv_len;
1563         return max;
1564 }
1565
1566 static void rbd_free_disk(struct rbd_device *rbd_dev)
1567 {
1568         struct gendisk *disk = rbd_dev->disk;
1569
1570         if (!disk)
1571                 return;
1572
1573         rbd_header_free(&rbd_dev->header);
1574
1575         if (disk->flags & GENHD_FL_UP)
1576                 del_gendisk(disk);
1577         if (disk->queue)
1578                 blk_cleanup_queue(disk->queue);
1579         put_disk(disk);
1580 }
1581
1582 /*
1583  * reload the ondisk the header 
1584  */
1585 static int rbd_read_header(struct rbd_device *rbd_dev,
1586                            struct rbd_image_header *header)
1587 {
1588         ssize_t rc;
1589         struct rbd_image_header_ondisk *dh;
1590         int snap_count = 0;
1591         u64 snap_names_len = 0;
1592         u64 ver;
1593
1594         while (1) {
1595                 int len = sizeof(*dh) +
1596                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1597                           snap_names_len;
1598
1599                 rc = -ENOMEM;
1600                 dh = kmalloc(len, GFP_KERNEL);
1601                 if (!dh)
1602                         return -ENOMEM;
1603
1604                 rc = rbd_req_sync_read(rbd_dev,
1605                                        NULL, CEPH_NOSNAP,
1606                                        rbd_dev->obj_md_name,
1607                                        0, len,
1608                                        (char *)dh, &ver);
1609                 if (rc < 0)
1610                         goto out_dh;
1611
1612                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1613                 if (rc < 0) {
1614                         if (rc == -ENXIO) {
1615                                 pr_warning("unrecognized header format"
1616                                            " for image %s", rbd_dev->obj);
1617                         }
1618                         goto out_dh;
1619                 }
1620
1621                 if (snap_count != header->total_snaps) {
1622                         snap_count = header->total_snaps;
1623                         snap_names_len = header->snap_names_len;
1624                         rbd_header_free(header);
1625                         kfree(dh);
1626                         continue;
1627                 }
1628                 break;
1629         }
1630         header->obj_version = ver;
1631
1632 out_dh:
1633         kfree(dh);
1634         return rc;
1635 }
1636
1637 /*
1638  * create a snapshot
1639  */
1640 static int rbd_header_add_snap(struct rbd_device *dev,
1641                                const char *snap_name,
1642                                gfp_t gfp_flags)
1643 {
1644         int name_len = strlen(snap_name);
1645         u64 new_snapid;
1646         int ret;
1647         void *data, *p, *e;
1648         u64 ver;
1649         struct ceph_mon_client *monc;
1650
1651         /* we should create a snapshot only if we're pointing at the head */
1652         if (dev->cur_snap)
1653                 return -EINVAL;
1654
1655         monc = &dev->rbd_client->client->monc;
1656         ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1657         dout("created snapid=%lld\n", new_snapid);
1658         if (ret < 0)
1659                 return ret;
1660
1661         data = kmalloc(name_len + 16, gfp_flags);
1662         if (!data)
1663                 return -ENOMEM;
1664
1665         p = data;
1666         e = data + name_len + 16;
1667
1668         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1669         ceph_encode_64_safe(&p, e, new_snapid, bad);
1670
1671         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1672                                 data, p - data, &ver);
1673
1674         kfree(data);
1675
1676         if (ret < 0)
1677                 return ret;
1678
1679         dev->header.snapc->seq =  new_snapid;
1680
1681         return 0;
1682 bad:
1683         return -ERANGE;
1684 }
1685
1686 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1687 {
1688         struct rbd_snap *snap;
1689
1690         while (!list_empty(&rbd_dev->snaps)) {
1691                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1692                 __rbd_remove_snap_dev(rbd_dev, snap);
1693         }
1694 }
1695
1696 /*
1697  * only read the first part of the ondisk header, without the snaps info
1698  */
1699 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1700 {
1701         int ret;
1702         struct rbd_image_header h;
1703         u64 snap_seq;
1704         int follow_seq = 0;
1705
1706         ret = rbd_read_header(rbd_dev, &h);
1707         if (ret < 0)
1708                 return ret;
1709
1710         /* resized? */
1711         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1712
1713         down_write(&rbd_dev->header.snap_rwsem);
1714
1715         snap_seq = rbd_dev->header.snapc->seq;
1716         if (rbd_dev->header.total_snaps &&
1717             rbd_dev->header.snapc->snaps[0] == snap_seq)
1718                 /* pointing at the head, will need to follow that
1719                    if head moves */
1720                 follow_seq = 1;
1721
1722         kfree(rbd_dev->header.snapc);
1723         kfree(rbd_dev->header.snap_names);
1724         kfree(rbd_dev->header.snap_sizes);
1725
1726         rbd_dev->header.total_snaps = h.total_snaps;
1727         rbd_dev->header.snapc = h.snapc;
1728         rbd_dev->header.snap_names = h.snap_names;
1729         rbd_dev->header.snap_names_len = h.snap_names_len;
1730         rbd_dev->header.snap_sizes = h.snap_sizes;
1731         if (follow_seq)
1732                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1733         else
1734                 rbd_dev->header.snapc->seq = snap_seq;
1735
1736         ret = __rbd_init_snaps_header(rbd_dev);
1737
1738         up_write(&rbd_dev->header.snap_rwsem);
1739
1740         return ret;
1741 }
1742
1743 static int rbd_init_disk(struct rbd_device *rbd_dev)
1744 {
1745         struct gendisk *disk;
1746         struct request_queue *q;
1747         int rc;
1748         u64 total_size = 0;
1749
1750         /* contact OSD, request size info about the object being mapped */
1751         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1752         if (rc)
1753                 return rc;
1754
1755         /* no need to lock here, as rbd_dev is not registered yet */
1756         rc = __rbd_init_snaps_header(rbd_dev);
1757         if (rc)
1758                 return rc;
1759
1760         rc = rbd_header_set_snap(rbd_dev, &total_size);
1761         if (rc)
1762                 return rc;
1763
1764         /* create gendisk info */
1765         rc = -ENOMEM;
1766         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1767         if (!disk)
1768                 goto out;
1769
1770         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1771                  rbd_dev->id);
1772         disk->major = rbd_dev->major;
1773         disk->first_minor = 0;
1774         disk->fops = &rbd_bd_ops;
1775         disk->private_data = rbd_dev;
1776
1777         /* init rq */
1778         rc = -ENOMEM;
1779         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1780         if (!q)
1781                 goto out_disk;
1782
1783         /* set io sizes to object size */
1784         blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1785         blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1786         blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1787         blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1788
1789         blk_queue_merge_bvec(q, rbd_merge_bvec);
1790         disk->queue = q;
1791
1792         q->queuedata = rbd_dev;
1793
1794         rbd_dev->disk = disk;
1795         rbd_dev->q = q;
1796
1797         /* finally, announce the disk to the world */
1798         set_capacity(disk, total_size / 512ULL);
1799         add_disk(disk);
1800
1801         pr_info("%s: added with size 0x%llx\n",
1802                 disk->disk_name, (unsigned long long)total_size);
1803         return 0;
1804
1805 out_disk:
1806         put_disk(disk);
1807 out:
1808         return rc;
1809 }
1810
1811 /*
1812   sysfs
1813 */
1814
1815 static ssize_t rbd_size_show(struct device *dev,
1816                              struct device_attribute *attr, char *buf)
1817 {
1818         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1819
1820         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1821 }
1822
1823 static ssize_t rbd_major_show(struct device *dev,
1824                               struct device_attribute *attr, char *buf)
1825 {
1826         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1827
1828         return sprintf(buf, "%d\n", rbd_dev->major);
1829 }
1830
1831 static ssize_t rbd_client_id_show(struct device *dev,
1832                                   struct device_attribute *attr, char *buf)
1833 {
1834         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1835
1836         return sprintf(buf, "client%lld\n",
1837                         ceph_client_id(rbd_dev->rbd_client->client));
1838 }
1839
1840 static ssize_t rbd_pool_show(struct device *dev,
1841                              struct device_attribute *attr, char *buf)
1842 {
1843         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1844
1845         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1846 }
1847
1848 static ssize_t rbd_name_show(struct device *dev,
1849                              struct device_attribute *attr, char *buf)
1850 {
1851         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1852
1853         return sprintf(buf, "%s\n", rbd_dev->obj);
1854 }
1855
1856 static ssize_t rbd_snap_show(struct device *dev,
1857                              struct device_attribute *attr,
1858                              char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1861
1862         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1863 }
1864
1865 static ssize_t rbd_image_refresh(struct device *dev,
1866                                  struct device_attribute *attr,
1867                                  const char *buf,
1868                                  size_t size)
1869 {
1870         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1871         int rc;
1872         int ret = size;
1873
1874         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1875
1876         rc = __rbd_update_snaps(rbd_dev);
1877         if (rc < 0)
1878                 ret = rc;
1879
1880         mutex_unlock(&ctl_mutex);
1881         return ret;
1882 }
1883
1884 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1885 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1886 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1887 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1888 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1889 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1890 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1891 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1892
1893 static struct attribute *rbd_attrs[] = {
1894         &dev_attr_size.attr,
1895         &dev_attr_major.attr,
1896         &dev_attr_client_id.attr,
1897         &dev_attr_pool.attr,
1898         &dev_attr_name.attr,
1899         &dev_attr_current_snap.attr,
1900         &dev_attr_refresh.attr,
1901         &dev_attr_create_snap.attr,
1902         NULL
1903 };
1904
1905 static struct attribute_group rbd_attr_group = {
1906         .attrs = rbd_attrs,
1907 };
1908
1909 static const struct attribute_group *rbd_attr_groups[] = {
1910         &rbd_attr_group,
1911         NULL
1912 };
1913
1914 static void rbd_sysfs_dev_release(struct device *dev)
1915 {
1916 }
1917
1918 static struct device_type rbd_device_type = {
1919         .name           = "rbd",
1920         .groups         = rbd_attr_groups,
1921         .release        = rbd_sysfs_dev_release,
1922 };
1923
1924
1925 /*
1926   sysfs - snapshots
1927 */
1928
1929 static ssize_t rbd_snap_size_show(struct device *dev,
1930                                   struct device_attribute *attr,
1931                                   char *buf)
1932 {
1933         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1934
1935         return sprintf(buf, "%lld\n", (long long)snap->size);
1936 }
1937
1938 static ssize_t rbd_snap_id_show(struct device *dev,
1939                                 struct device_attribute *attr,
1940                                 char *buf)
1941 {
1942         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1943
1944         return sprintf(buf, "%lld\n", (long long)snap->id);
1945 }
1946
1947 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1948 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1949
1950 static struct attribute *rbd_snap_attrs[] = {
1951         &dev_attr_snap_size.attr,
1952         &dev_attr_snap_id.attr,
1953         NULL,
1954 };
1955
1956 static struct attribute_group rbd_snap_attr_group = {
1957         .attrs = rbd_snap_attrs,
1958 };
1959
1960 static void rbd_snap_dev_release(struct device *dev)
1961 {
1962         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1963         kfree(snap->name);
1964         kfree(snap);
1965 }
1966
1967 static const struct attribute_group *rbd_snap_attr_groups[] = {
1968         &rbd_snap_attr_group,
1969         NULL
1970 };
1971
1972 static struct device_type rbd_snap_device_type = {
1973         .groups         = rbd_snap_attr_groups,
1974         .release        = rbd_snap_dev_release,
1975 };
1976
1977 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1978                                   struct rbd_snap *snap)
1979 {
1980         list_del(&snap->node);
1981         device_unregister(&snap->dev);
1982 }
1983
1984 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1985                                   struct rbd_snap *snap,
1986                                   struct device *parent)
1987 {
1988         struct device *dev = &snap->dev;
1989         int ret;
1990
1991         dev->type = &rbd_snap_device_type;
1992         dev->parent = parent;
1993         dev->release = rbd_snap_dev_release;
1994         dev_set_name(dev, "snap_%s", snap->name);
1995         ret = device_register(dev);
1996
1997         return ret;
1998 }
1999
2000 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2001                               int i, const char *name,
2002                               struct rbd_snap **snapp)
2003 {
2004         int ret;
2005         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2006         if (!snap)
2007                 return -ENOMEM;
2008         snap->name = kstrdup(name, GFP_KERNEL);
2009         snap->size = rbd_dev->header.snap_sizes[i];
2010         snap->id = rbd_dev->header.snapc->snaps[i];
2011         if (device_is_registered(&rbd_dev->dev)) {
2012                 ret = rbd_register_snap_dev(rbd_dev, snap,
2013                                              &rbd_dev->dev);
2014                 if (ret < 0)
2015                         goto err;
2016         }
2017         *snapp = snap;
2018         return 0;
2019 err:
2020         kfree(snap->name);
2021         kfree(snap);
2022         return ret;
2023 }
2024
2025 /*
2026  * search for the previous snap in a null delimited string list
2027  */
2028 const char *rbd_prev_snap_name(const char *name, const char *start)
2029 {
2030         if (name < start + 2)
2031                 return NULL;
2032
2033         name -= 2;
2034         while (*name) {
2035                 if (name == start)
2036                         return start;
2037                 name--;
2038         }
2039         return name + 1;
2040 }
2041
2042 /*
2043  * compare the old list of snapshots that we have to what's in the header
2044  * and update it accordingly. Note that the header holds the snapshots
2045  * in a reverse order (from newest to oldest) and we need to go from
2046  * older to new so that we don't get a duplicate snap name when
2047  * doing the process (e.g., removed snapshot and recreated a new
2048  * one with the same name.
2049  */
2050 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2051 {
2052         const char *name, *first_name;
2053         int i = rbd_dev->header.total_snaps;
2054         struct rbd_snap *snap, *old_snap = NULL;
2055         int ret;
2056         struct list_head *p, *n;
2057
2058         first_name = rbd_dev->header.snap_names;
2059         name = first_name + rbd_dev->header.snap_names_len;
2060
2061         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2062                 u64 cur_id;
2063
2064                 old_snap = list_entry(p, struct rbd_snap, node);
2065
2066                 if (i)
2067                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2068
2069                 if (!i || old_snap->id < cur_id) {
2070                         /* old_snap->id was skipped, thus was removed */
2071                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2072                         continue;
2073                 }
2074                 if (old_snap->id == cur_id) {
2075                         /* we have this snapshot already */
2076                         i--;
2077                         name = rbd_prev_snap_name(name, first_name);
2078                         continue;
2079                 }
2080                 for (; i > 0;
2081                      i--, name = rbd_prev_snap_name(name, first_name)) {
2082                         if (!name) {
2083                                 WARN_ON(1);
2084                                 return -EINVAL;
2085                         }
2086                         cur_id = rbd_dev->header.snapc->snaps[i];
2087                         /* snapshot removal? handle it above */
2088                         if (cur_id >= old_snap->id)
2089                                 break;
2090                         /* a new snapshot */
2091                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2092                         if (ret < 0)
2093                                 return ret;
2094
2095                         /* note that we add it backward so using n and not p */
2096                         list_add(&snap->node, n);
2097                         p = &snap->node;
2098                 }
2099         }
2100         /* we're done going over the old snap list, just add what's left */
2101         for (; i > 0; i--) {
2102                 name = rbd_prev_snap_name(name, first_name);
2103                 if (!name) {
2104                         WARN_ON(1);
2105                         return -EINVAL;
2106                 }
2107                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2108                 if (ret < 0)
2109                         return ret;
2110                 list_add(&snap->node, &rbd_dev->snaps);
2111         }
2112
2113         return 0;
2114 }
2115
2116 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2117 {
2118         int ret;
2119         struct device *dev;
2120         struct rbd_snap *snap;
2121
2122         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2123         dev = &rbd_dev->dev;
2124
2125         dev->bus = &rbd_bus_type;
2126         dev->type = &rbd_device_type;
2127         dev->parent = &rbd_root_dev;
2128         dev->release = rbd_dev_release;
2129         dev_set_name(dev, "%d", rbd_dev->id);
2130         ret = device_register(dev);
2131         if (ret < 0)
2132                 goto out;
2133
2134         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2135                 ret = rbd_register_snap_dev(rbd_dev, snap,
2136                                              &rbd_dev->dev);
2137                 if (ret < 0)
2138                         break;
2139         }
2140 out:
2141         mutex_unlock(&ctl_mutex);
2142         return ret;
2143 }
2144
2145 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2146 {
2147         device_unregister(&rbd_dev->dev);
2148 }
2149
2150 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2151 {
2152         int ret, rc;
2153
2154         do {
2155                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2156                                          rbd_dev->header.obj_version);
2157                 if (ret == -ERANGE) {
2158                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2159                         rc = __rbd_update_snaps(rbd_dev);
2160                         mutex_unlock(&ctl_mutex);
2161                         if (rc < 0)
2162                                 return rc;
2163                 }
2164         } while (ret == -ERANGE);
2165
2166         return ret;
2167 }
2168
2169 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2170
2171 /*
2172  * Get a unique rbd identifier for the given new rbd_dev, and add
2173  * the rbd_dev to the global list.  The minimum rbd id is 1.
2174  */
2175 static void rbd_id_get(struct rbd_device *rbd_dev)
2176 {
2177         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2178
2179         spin_lock(&rbd_dev_list_lock);
2180         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2181         spin_unlock(&rbd_dev_list_lock);
2182 }
2183
2184 /*
2185  * Remove an rbd_dev from the global list, and record that its
2186  * identifier is no longer in use.
2187  */
2188 static void rbd_id_put(struct rbd_device *rbd_dev)
2189 {
2190         struct list_head *tmp;
2191         int rbd_id = rbd_dev->id;
2192         int max_id;
2193
2194         BUG_ON(rbd_id < 1);
2195
2196         spin_lock(&rbd_dev_list_lock);
2197         list_del_init(&rbd_dev->node);
2198
2199         /*
2200          * If the id being "put" is not the current maximum, there
2201          * is nothing special we need to do.
2202          */
2203         if (rbd_id != atomic64_read(&rbd_id_max)) {
2204                 spin_unlock(&rbd_dev_list_lock);
2205                 return;
2206         }
2207
2208         /*
2209          * We need to update the current maximum id.  Search the
2210          * list to find out what it is.  We're more likely to find
2211          * the maximum at the end, so search the list backward.
2212          */
2213         max_id = 0;
2214         list_for_each_prev(tmp, &rbd_dev_list) {
2215                 struct rbd_device *rbd_dev;
2216
2217                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2218                 if (rbd_id > max_id)
2219                         max_id = rbd_id;
2220         }
2221         spin_unlock(&rbd_dev_list_lock);
2222
2223         /*
2224          * The max id could have been updated by rbd_id_get(), in
2225          * which case it now accurately reflects the new maximum.
2226          * Be careful not to overwrite the maximum value in that
2227          * case.
2228          */
2229         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2230 }
2231
2232 /*
2233  * Skips over white space at *buf, and updates *buf to point to the
2234  * first found non-space character (if any). Returns the length of
2235  * the token (string of non-white space characters) found.
2236  */
2237 static inline size_t next_token(const char **buf)
2238 {
2239         /*
2240         * These are the characters that produce nonzero for
2241         * isspace() in the "C" and "POSIX" locales.
2242         */
2243         const char *spaces = " \f\n\r\t\v";
2244
2245         *buf += strspn(*buf, spaces);   /* Find start of token */
2246
2247         return strcspn(*buf, spaces);   /* Return token length */
2248 }
2249
2250 /*
2251  * Finds the next token in *buf, and if the provided token buffer is
2252  * big enough, copies the found token into it.  The result, if
2253  * copied, is guaranteed to be terminated with '\0'.
2254  *
2255  * Returns the length of the token found (not including the '\0').
2256  * Return value will be 0 if no token is found, and it will be >=
2257  * token_size if the token would not fit.
2258  *
2259  * The *buf pointer will be updated point beyond the end of the
2260  * found token.  Note that this occurs even if the token buffer is
2261  * too small to hold it.
2262  */
2263 static inline size_t copy_token(const char **buf,
2264                                 char *token,
2265                                 size_t token_size)
2266 {
2267         size_t len;
2268
2269         len = next_token(buf);
2270         if (len < token_size) {
2271                 memcpy(token, *buf, len);
2272                 *(token + len) = '\0';
2273         }
2274         *buf += len;
2275
2276         return len;
2277 }
2278
2279 /*
2280  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2281  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2282  * on the list of monitor addresses and other options provided via
2283  * /sys/bus/rbd/add.
2284  */
2285 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2286                               const char *buf,
2287                               const char **mon_addrs,
2288                               size_t *mon_addrs_size,
2289                               char *options,
2290                               size_t options_size)
2291 {
2292         size_t  len;
2293
2294         /* The first four tokens are required */
2295
2296         len = next_token(&buf);
2297         if (!len)
2298                 return -EINVAL;
2299         *mon_addrs_size = len + 1;
2300         *mon_addrs = buf;
2301
2302         buf += len;
2303
2304         len = copy_token(&buf, options, options_size);
2305         if (!len || len >= options_size)
2306                 return -EINVAL;
2307
2308         len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2309         if (!len || len >= sizeof (rbd_dev->pool_name))
2310                 return -EINVAL;
2311
2312         len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2313         if (!len || len >= sizeof (rbd_dev->obj))
2314                 return -EINVAL;
2315
2316         /* We have the object length in hand, save it. */
2317
2318         rbd_dev->obj_len = len;
2319
2320         BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2321                                 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2322         sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2323
2324         /*
2325          * The snapshot name is optional, but it's an error if it's
2326          * too long.  If no snapshot is supplied, fill in the default.
2327          */
2328         len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2329         if (!len)
2330                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2331                         sizeof (RBD_SNAP_HEAD_NAME));
2332         else if (len >= sizeof (rbd_dev->snap_name))
2333                 return -EINVAL;
2334
2335         return 0;
2336 }
2337
2338 static ssize_t rbd_add(struct bus_type *bus,
2339                        const char *buf,
2340                        size_t count)
2341 {
2342         struct rbd_device *rbd_dev;
2343         const char *mon_addrs = NULL;
2344         size_t mon_addrs_size = 0;
2345         char *options = NULL;
2346         struct ceph_osd_client *osdc;
2347         int rc = -ENOMEM;
2348
2349         if (!try_module_get(THIS_MODULE))
2350                 return -ENODEV;
2351
2352         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2353         if (!rbd_dev)
2354                 goto err_nomem;
2355         options = kmalloc(count, GFP_KERNEL);
2356         if (!options)
2357                 goto err_nomem;
2358
2359         /* static rbd_device initialization */
2360         spin_lock_init(&rbd_dev->lock);
2361         INIT_LIST_HEAD(&rbd_dev->node);
2362         INIT_LIST_HEAD(&rbd_dev->snaps);
2363
2364         init_rwsem(&rbd_dev->header.snap_rwsem);
2365
2366         /* generate unique id: find highest unique id, add one */
2367         rbd_id_get(rbd_dev);
2368
2369         /* Fill in the device name, now that we have its id. */
2370         BUILD_BUG_ON(DEV_NAME_LEN
2371                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2372         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2373
2374         /* parse add command */
2375         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2376                                 options, count);
2377         if (rc)
2378                 goto err_put_id;
2379
2380         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2381                                                 options);
2382         if (IS_ERR(rbd_dev->rbd_client)) {
2383                 rc = PTR_ERR(rbd_dev->rbd_client);
2384                 goto err_put_id;
2385         }
2386
2387         /* pick the pool */
2388         osdc = &rbd_dev->rbd_client->client->osdc;
2389         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2390         if (rc < 0)
2391                 goto err_out_client;
2392         rbd_dev->poolid = rc;
2393
2394         /* register our block device */
2395         rc = register_blkdev(0, rbd_dev->name);
2396         if (rc < 0)
2397                 goto err_out_client;
2398         rbd_dev->major = rc;
2399
2400         rc = rbd_bus_add_dev(rbd_dev);
2401         if (rc)
2402                 goto err_out_blkdev;
2403
2404         /* set up and announce blkdev mapping */
2405         rc = rbd_init_disk(rbd_dev);
2406         if (rc)
2407                 goto err_out_bus;
2408
2409         rc = rbd_init_watch_dev(rbd_dev);
2410         if (rc)
2411                 goto err_out_bus;
2412
2413         return count;
2414
2415 err_out_bus:
2416         rbd_id_put(rbd_dev);
2417
2418         /* this will also clean up rest of rbd_dev stuff */
2419
2420         rbd_bus_del_dev(rbd_dev);
2421         kfree(options);
2422         return rc;
2423
2424 err_out_blkdev:
2425         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2426 err_out_client:
2427         rbd_put_client(rbd_dev);
2428 err_put_id:
2429         rbd_id_put(rbd_dev);
2430 err_nomem:
2431         kfree(options);
2432         kfree(rbd_dev);
2433
2434         dout("Error adding device %s\n", buf);
2435         module_put(THIS_MODULE);
2436
2437         return (ssize_t) rc;
2438 }
2439
2440 static struct rbd_device *__rbd_get_dev(unsigned long id)
2441 {
2442         struct list_head *tmp;
2443         struct rbd_device *rbd_dev;
2444
2445         spin_lock(&rbd_dev_list_lock);
2446         list_for_each(tmp, &rbd_dev_list) {
2447                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2448                 if (rbd_dev->id == id) {
2449                         spin_unlock(&rbd_dev_list_lock);
2450                         return rbd_dev;
2451                 }
2452         }
2453         spin_unlock(&rbd_dev_list_lock);
2454         return NULL;
2455 }
2456
2457 static void rbd_dev_release(struct device *dev)
2458 {
2459         struct rbd_device *rbd_dev =
2460                         container_of(dev, struct rbd_device, dev);
2461
2462         if (rbd_dev->watch_request) {
2463                 struct ceph_client *client = rbd_dev->rbd_client->client;
2464
2465                 ceph_osdc_unregister_linger_request(&client->osdc,
2466                                                     rbd_dev->watch_request);
2467         }
2468         if (rbd_dev->watch_event)
2469                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2470
2471         rbd_put_client(rbd_dev);
2472
2473         /* clean up and free blkdev */
2474         rbd_free_disk(rbd_dev);
2475         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2476         kfree(rbd_dev);
2477
2478         /* release module ref */
2479         module_put(THIS_MODULE);
2480 }
2481
2482 static ssize_t rbd_remove(struct bus_type *bus,
2483                           const char *buf,
2484                           size_t count)
2485 {
2486         struct rbd_device *rbd_dev = NULL;
2487         int target_id, rc;
2488         unsigned long ul;
2489         int ret = count;
2490
2491         rc = strict_strtoul(buf, 10, &ul);
2492         if (rc)
2493                 return rc;
2494
2495         /* convert to int; abort if we lost anything in the conversion */
2496         target_id = (int) ul;
2497         if (target_id != ul)
2498                 return -EINVAL;
2499
2500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2501
2502         rbd_dev = __rbd_get_dev(target_id);
2503         if (!rbd_dev) {
2504                 ret = -ENOENT;
2505                 goto done;
2506         }
2507
2508         rbd_id_put(rbd_dev);
2509
2510         __rbd_remove_all_snaps(rbd_dev);
2511         rbd_bus_del_dev(rbd_dev);
2512
2513 done:
2514         mutex_unlock(&ctl_mutex);
2515         return ret;
2516 }
2517
2518 static ssize_t rbd_snap_add(struct device *dev,
2519                             struct device_attribute *attr,
2520                             const char *buf,
2521                             size_t count)
2522 {
2523         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2524         int ret;
2525         char *name = kmalloc(count + 1, GFP_KERNEL);
2526         if (!name)
2527                 return -ENOMEM;
2528
2529         snprintf(name, count, "%s", buf);
2530
2531         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2532
2533         ret = rbd_header_add_snap(rbd_dev,
2534                                   name, GFP_KERNEL);
2535         if (ret < 0)
2536                 goto err_unlock;
2537
2538         ret = __rbd_update_snaps(rbd_dev);
2539         if (ret < 0)
2540                 goto err_unlock;
2541
2542         /* shouldn't hold ctl_mutex when notifying.. notify might
2543            trigger a watch callback that would need to get that mutex */
2544         mutex_unlock(&ctl_mutex);
2545
2546         /* make a best effort, don't error if failed */
2547         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2548
2549         ret = count;
2550         kfree(name);
2551         return ret;
2552
2553 err_unlock:
2554         mutex_unlock(&ctl_mutex);
2555         kfree(name);
2556         return ret;
2557 }
2558
2559 /*
2560  * create control files in sysfs
2561  * /sys/bus/rbd/...
2562  */
2563 static int rbd_sysfs_init(void)
2564 {
2565         int ret;
2566
2567         ret = bus_register(&rbd_bus_type);
2568         if (ret < 0)
2569                 return ret;
2570
2571         ret = device_register(&rbd_root_dev);
2572
2573         return ret;
2574 }
2575
2576 static void rbd_sysfs_cleanup(void)
2577 {
2578         device_unregister(&rbd_root_dev);
2579         bus_unregister(&rbd_bus_type);
2580 }
2581
2582 int __init rbd_init(void)
2583 {
2584         int rc;
2585
2586         rc = rbd_sysfs_init();
2587         if (rc)
2588                 return rc;
2589         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2590         return 0;
2591 }
2592
2593 void __exit rbd_exit(void)
2594 {
2595         rbd_sysfs_cleanup();
2596 }
2597
2598 module_init(rbd_init);
2599 module_exit(rbd_exit);
2600
2601 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2602 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2603 MODULE_DESCRIPTION("rados block device");
2604
2605 /* following authorship retained from original osdblk.c */
2606 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2607
2608 MODULE_LICENSE("GPL");