pandora: defconfig: update
[pandora-kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101         struct request          *rq;            /* blk layer request */
102         struct bio              *bio;           /* cloned bio */
103         struct page             **pages;        /* list of used pages */
104         u64                     len;
105         int                     coll_index;
106         struct rbd_req_coll     *coll;
107 };
108
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 struct rbd_snap {
126         struct  device          dev;
127         const char              *name;
128         size_t                  size;
129         struct list_head        node;
130         u64                     id;
131 };
132
133 /*
134  * a single device
135  */
136 struct rbd_device {
137         int                     id;             /* blkdev unique id */
138
139         int                     major;          /* blkdev assigned major */
140         struct gendisk          *disk;          /* blkdev's gendisk and rq */
141         struct request_queue    *q;
142
143         struct ceph_client      *client;
144         struct rbd_client       *rbd_client;
145
146         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147
148         spinlock_t              lock;           /* queue lock */
149
150         struct rbd_image_header header;
151         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152         int                     obj_len;
153         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
154         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
155         int                     poolid;
156
157         struct ceph_osd_event   *watch_event;
158         struct ceph_osd_request *watch_request;
159
160         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
161         u32 cur_snap;   /* index+1 of current snapshot within snap context
162                            0 - for the head */
163         int read_only;
164
165         struct list_head        node;
166
167         /* list of snapshots */
168         struct list_head        snaps;
169
170         /* sysfs related */
171         struct device           dev;
172 };
173
174 static struct bus_type rbd_bus_type = {
175         .name           = "rbd",
176 };
177
178 static spinlock_t node_lock;      /* protects client get/put */
179
180 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
181 static LIST_HEAD(rbd_dev_list);    /* devices */
182 static LIST_HEAD(rbd_client_list);      /* clients */
183
184 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
185 static void rbd_dev_release(struct device *dev);
186 static ssize_t rbd_snap_add(struct device *dev,
187                             struct device_attribute *attr,
188                             const char *buf,
189                             size_t count);
190 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
191                                   struct rbd_snap *snap);
192
193
194 static struct rbd_device *dev_to_rbd(struct device *dev)
195 {
196         return container_of(dev, struct rbd_device, dev);
197 }
198
199 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
200 {
201         return get_device(&rbd_dev->dev);
202 }
203
204 static void rbd_put_dev(struct rbd_device *rbd_dev)
205 {
206         put_device(&rbd_dev->dev);
207 }
208
209 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
210
211 static int rbd_open(struct block_device *bdev, fmode_t mode)
212 {
213         struct gendisk *disk = bdev->bd_disk;
214         struct rbd_device *rbd_dev = disk->private_data;
215
216         rbd_get_dev(rbd_dev);
217
218         set_device_ro(bdev, rbd_dev->read_only);
219
220         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
221                 return -EROFS;
222
223         return 0;
224 }
225
226 static int rbd_release(struct gendisk *disk, fmode_t mode)
227 {
228         struct rbd_device *rbd_dev = disk->private_data;
229
230         rbd_put_dev(rbd_dev);
231
232         return 0;
233 }
234
235 static const struct block_device_operations rbd_bd_ops = {
236         .owner                  = THIS_MODULE,
237         .open                   = rbd_open,
238         .release                = rbd_release,
239 };
240
241 /*
242  * Initialize an rbd client instance.
243  * We own *opt.
244  */
245 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
246                                             struct rbd_options *rbd_opts)
247 {
248         struct rbd_client *rbdc;
249         int ret = -ENOMEM;
250
251         dout("rbd_client_create\n");
252         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
253         if (!rbdc)
254                 goto out_opt;
255
256         kref_init(&rbdc->kref);
257         INIT_LIST_HEAD(&rbdc->node);
258
259         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
260         if (IS_ERR(rbdc->client))
261                 goto out_rbdc;
262         opt = NULL; /* Now rbdc->client is responsible for opt */
263
264         ret = ceph_open_session(rbdc->client);
265         if (ret < 0)
266                 goto out_err;
267
268         rbdc->rbd_opts = rbd_opts;
269
270         spin_lock(&node_lock);
271         list_add_tail(&rbdc->node, &rbd_client_list);
272         spin_unlock(&node_lock);
273
274         dout("rbd_client_create created %p\n", rbdc);
275         return rbdc;
276
277 out_err:
278         ceph_destroy_client(rbdc->client);
279 out_rbdc:
280         kfree(rbdc);
281 out_opt:
282         if (opt)
283                 ceph_destroy_options(opt);
284         return ERR_PTR(ret);
285 }
286
287 /*
288  * Find a ceph client with specific addr and configuration.
289  */
290 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
291 {
292         struct rbd_client *client_node;
293
294         if (opt->flags & CEPH_OPT_NOSHARE)
295                 return NULL;
296
297         list_for_each_entry(client_node, &rbd_client_list, node)
298                 if (ceph_compare_options(opt, client_node->client) == 0)
299                         return client_node;
300         return NULL;
301 }
302
303 /*
304  * mount options
305  */
306 enum {
307         Opt_notify_timeout,
308         Opt_last_int,
309         /* int args above */
310         Opt_last_string,
311         /* string args above */
312 };
313
314 static match_table_t rbdopt_tokens = {
315         {Opt_notify_timeout, "notify_timeout=%d"},
316         /* int args above */
317         /* string args above */
318         {-1, NULL}
319 };
320
321 static int parse_rbd_opts_token(char *c, void *private)
322 {
323         struct rbd_options *rbdopt = private;
324         substring_t argstr[MAX_OPT_ARGS];
325         int token, intval, ret;
326
327         token = match_token((char *)c, rbdopt_tokens, argstr);
328         if (token < 0)
329                 return -EINVAL;
330
331         if (token < Opt_last_int) {
332                 ret = match_int(&argstr[0], &intval);
333                 if (ret < 0) {
334                         pr_err("bad mount option arg (not int) "
335                                "at '%s'\n", c);
336                         return ret;
337                 }
338                 dout("got int token %d val %d\n", token, intval);
339         } else if (token > Opt_last_int && token < Opt_last_string) {
340                 dout("got string token %d val %s\n", token,
341                      argstr[0].from);
342         } else {
343                 dout("got token %d\n", token);
344         }
345
346         switch (token) {
347         case Opt_notify_timeout:
348                 rbdopt->notify_timeout = intval;
349                 break;
350         default:
351                 BUG_ON(token);
352         }
353         return 0;
354 }
355
356 /*
357  * Get a ceph client with specific addr and configuration, if one does
358  * not exist create it.
359  */
360 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
361                           char *options)
362 {
363         struct rbd_client *rbdc;
364         struct ceph_options *opt;
365         int ret;
366         struct rbd_options *rbd_opts;
367
368         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
369         if (!rbd_opts)
370                 return -ENOMEM;
371
372         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
373
374         ret = ceph_parse_options(&opt, options, mon_addr,
375                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
376         if (ret < 0)
377                 goto done_err;
378
379         spin_lock(&node_lock);
380         rbdc = __rbd_client_find(opt);
381         if (rbdc) {
382                 ceph_destroy_options(opt);
383
384                 /* using an existing client */
385                 kref_get(&rbdc->kref);
386                 rbd_dev->rbd_client = rbdc;
387                 rbd_dev->client = rbdc->client;
388                 spin_unlock(&node_lock);
389                 return 0;
390         }
391         spin_unlock(&node_lock);
392
393         rbdc = rbd_client_create(opt, rbd_opts);
394         if (IS_ERR(rbdc)) {
395                 ret = PTR_ERR(rbdc);
396                 goto done_err;
397         }
398
399         rbd_dev->rbd_client = rbdc;
400         rbd_dev->client = rbdc->client;
401         return 0;
402 done_err:
403         kfree(rbd_opts);
404         return ret;
405 }
406
407 /*
408  * Destroy ceph client
409  */
410 static void rbd_client_release(struct kref *kref)
411 {
412         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
413
414         dout("rbd_release_client %p\n", rbdc);
415         spin_lock(&node_lock);
416         list_del(&rbdc->node);
417         spin_unlock(&node_lock);
418
419         ceph_destroy_client(rbdc->client);
420         kfree(rbdc->rbd_opts);
421         kfree(rbdc);
422 }
423
424 /*
425  * Drop reference to ceph client node. If it's not referenced anymore, release
426  * it.
427  */
428 static void rbd_put_client(struct rbd_device *rbd_dev)
429 {
430         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
431         rbd_dev->rbd_client = NULL;
432         rbd_dev->client = NULL;
433 }
434
435 /*
436  * Destroy requests collection
437  */
438 static void rbd_coll_release(struct kref *kref)
439 {
440         struct rbd_req_coll *coll =
441                 container_of(kref, struct rbd_req_coll, kref);
442
443         dout("rbd_coll_release %p\n", coll);
444         kfree(coll);
445 }
446
447 /*
448  * Create a new header structure, translate header format from the on-disk
449  * header.
450  */
451 static int rbd_header_from_disk(struct rbd_image_header *header,
452                                  struct rbd_image_header_ondisk *ondisk,
453                                  int allocated_snaps,
454                                  gfp_t gfp_flags)
455 {
456         int i;
457         u32 snap_count = le32_to_cpu(ondisk->snap_count);
458         int ret = -ENOMEM;
459
460         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT))) {
461                 return -ENXIO;
462         }
463
464         init_rwsem(&header->snap_rwsem);
465         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
466         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
467                                 snap_count *
468                                  sizeof(struct rbd_image_snap_ondisk),
469                                 gfp_flags);
470         if (!header->snapc)
471                 return -ENOMEM;
472         if (snap_count) {
473                 header->snap_names = kmalloc(header->snap_names_len,
474                                              GFP_KERNEL);
475                 if (!header->snap_names)
476                         goto err_snapc;
477                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478                                              GFP_KERNEL);
479                 if (!header->snap_sizes)
480                         goto err_names;
481         } else {
482                 header->snap_names = NULL;
483                 header->snap_sizes = NULL;
484         }
485         memcpy(header->block_name, ondisk->block_name,
486                sizeof(ondisk->block_name));
487
488         header->image_size = le64_to_cpu(ondisk->image_size);
489         header->obj_order = ondisk->options.order;
490         header->crypt_type = ondisk->options.crypt_type;
491         header->comp_type = ondisk->options.comp_type;
492
493         atomic_set(&header->snapc->nref, 1);
494         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495         header->snapc->num_snaps = snap_count;
496         header->total_snaps = snap_count;
497
498         if (snap_count &&
499             allocated_snaps == snap_count) {
500                 for (i = 0; i < snap_count; i++) {
501                         header->snapc->snaps[i] =
502                                 le64_to_cpu(ondisk->snaps[i].id);
503                         header->snap_sizes[i] =
504                                 le64_to_cpu(ondisk->snaps[i].image_size);
505                 }
506
507                 /* copy snapshot names */
508                 memcpy(header->snap_names, &ondisk->snaps[i],
509                         header->snap_names_len);
510         }
511
512         return 0;
513
514 err_names:
515         kfree(header->snap_names);
516 err_snapc:
517         kfree(header->snapc);
518         return ret;
519 }
520
521 static int snap_index(struct rbd_image_header *header, int snap_num)
522 {
523         return header->total_snaps - snap_num;
524 }
525
526 static u64 cur_snap_id(struct rbd_device *rbd_dev)
527 {
528         struct rbd_image_header *header = &rbd_dev->header;
529
530         if (!rbd_dev->cur_snap)
531                 return 0;
532
533         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
534 }
535
536 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
537                         u64 *seq, u64 *size)
538 {
539         int i;
540         char *p = header->snap_names;
541
542         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
543                 if (strcmp(snap_name, p) == 0)
544                         break;
545         }
546         if (i == header->total_snaps)
547                 return -ENOENT;
548         if (seq)
549                 *seq = header->snapc->snaps[i];
550
551         if (size)
552                 *size = header->snap_sizes[i];
553
554         return i;
555 }
556
557 static int rbd_header_set_snap(struct rbd_device *dev,
558                                const char *snap_name,
559                                u64 *size)
560 {
561         struct rbd_image_header *header = &dev->header;
562         struct ceph_snap_context *snapc = header->snapc;
563         int ret = -ENOENT;
564
565         down_write(&header->snap_rwsem);
566
567         if (!snap_name ||
568             !*snap_name ||
569             strcmp(snap_name, "-") == 0 ||
570             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
571                 if (header->total_snaps)
572                         snapc->seq = header->snap_seq;
573                 else
574                         snapc->seq = 0;
575                 dev->cur_snap = 0;
576                 dev->read_only = 0;
577                 if (size)
578                         *size = header->image_size;
579         } else {
580                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
581                 if (ret < 0)
582                         goto done;
583
584                 dev->cur_snap = header->total_snaps - ret;
585                 dev->read_only = 1;
586         }
587
588         ret = 0;
589 done:
590         up_write(&header->snap_rwsem);
591         return ret;
592 }
593
594 static void rbd_header_free(struct rbd_image_header *header)
595 {
596         kfree(header->snapc);
597         kfree(header->snap_names);
598         kfree(header->snap_sizes);
599 }
600
601 /*
602  * get the actual striped segment name, offset and length
603  */
604 static u64 rbd_get_segment(struct rbd_image_header *header,
605                            const char *block_name,
606                            u64 ofs, u64 len,
607                            char *seg_name, u64 *segofs)
608 {
609         u64 seg = ofs >> header->obj_order;
610
611         if (seg_name)
612                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
613                          "%s.%012llx", block_name, seg);
614
615         ofs = ofs & ((1 << header->obj_order) - 1);
616         len = min_t(u64, len, (1 << header->obj_order) - ofs);
617
618         if (segofs)
619                 *segofs = ofs;
620
621         return len;
622 }
623
624 static int rbd_get_num_segments(struct rbd_image_header *header,
625                                 u64 ofs, u64 len)
626 {
627         u64 start_seg = ofs >> header->obj_order;
628         u64 end_seg = (ofs + len - 1) >> header->obj_order;
629         return end_seg - start_seg + 1;
630 }
631
632 /*
633  * returns the size of an object in the image
634  */
635 static u64 rbd_obj_bytes(struct rbd_image_header *header)
636 {
637         return 1 << header->obj_order;
638 }
639
640 /*
641  * bio helpers
642  */
643
644 static void bio_chain_put(struct bio *chain)
645 {
646         struct bio *tmp;
647
648         while (chain) {
649                 tmp = chain;
650                 chain = chain->bi_next;
651                 bio_put(tmp);
652         }
653 }
654
655 /*
656  * zeros a bio chain, starting at specific offset
657  */
658 static void zero_bio_chain(struct bio *chain, int start_ofs)
659 {
660         struct bio_vec *bv;
661         unsigned long flags;
662         void *buf;
663         int i;
664         int pos = 0;
665
666         while (chain) {
667                 bio_for_each_segment(bv, chain, i) {
668                         if (pos + bv->bv_len > start_ofs) {
669                                 int remainder = max(start_ofs - pos, 0);
670                                 buf = bvec_kmap_irq(bv, &flags);
671                                 memset(buf + remainder, 0,
672                                        bv->bv_len - remainder);
673                                 bvec_kunmap_irq(buf, &flags);
674                         }
675                         pos += bv->bv_len;
676                 }
677
678                 chain = chain->bi_next;
679         }
680 }
681
682 /*
683  * bio_chain_clone - clone a chain of bios up to a certain length.
684  * might return a bio_pair that will need to be released.
685  */
686 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
687                                    struct bio_pair **bp,
688                                    int len, gfp_t gfpmask)
689 {
690         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
691         int total = 0;
692
693         if (*bp) {
694                 bio_pair_release(*bp);
695                 *bp = NULL;
696         }
697
698         while (old_chain && (total < len)) {
699                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
700                 if (!tmp)
701                         goto err_out;
702
703                 if (total + old_chain->bi_size > len) {
704                         struct bio_pair *bp;
705
706                         /*
707                          * this split can only happen with a single paged bio,
708                          * split_bio will BUG_ON if this is not the case
709                          */
710                         dout("bio_chain_clone split! total=%d remaining=%d"
711                              "bi_size=%d\n",
712                              (int)total, (int)len-total,
713                              (int)old_chain->bi_size);
714
715                         /* split the bio. We'll release it either in the next
716                            call, or it will have to be released outside */
717                         bp = bio_split(old_chain, (len - total) / 512ULL);
718                         if (!bp)
719                                 goto err_out;
720
721                         __bio_clone(tmp, &bp->bio1);
722
723                         *next = &bp->bio2;
724                 } else {
725                         __bio_clone(tmp, old_chain);
726                         *next = old_chain->bi_next;
727                 }
728
729                 tmp->bi_bdev = NULL;
730                 gfpmask &= ~__GFP_WAIT;
731                 tmp->bi_next = NULL;
732
733                 if (!new_chain) {
734                         new_chain = tail = tmp;
735                 } else {
736                         tail->bi_next = tmp;
737                         tail = tmp;
738                 }
739                 old_chain = old_chain->bi_next;
740
741                 total += tmp->bi_size;
742         }
743
744         BUG_ON(total < len);
745
746         if (tail)
747                 tail->bi_next = NULL;
748
749         *old = old_chain;
750
751         return new_chain;
752
753 err_out:
754         dout("bio_chain_clone with err\n");
755         bio_chain_put(new_chain);
756         return NULL;
757 }
758
759 /*
760  * helpers for osd request op vectors.
761  */
762 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
763                             int num_ops,
764                             int opcode,
765                             u32 payload_len)
766 {
767         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
768                        GFP_NOIO);
769         if (!*ops)
770                 return -ENOMEM;
771         (*ops)[0].op = opcode;
772         /*
773          * op extent offset and length will be set later on
774          * in calc_raw_layout()
775          */
776         (*ops)[0].payload_len = payload_len;
777         return 0;
778 }
779
780 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
781 {
782         kfree(ops);
783 }
784
785 static void rbd_coll_end_req_index(struct request *rq,
786                                    struct rbd_req_coll *coll,
787                                    int index,
788                                    int ret, u64 len)
789 {
790         struct request_queue *q;
791         int min, max, i;
792
793         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
794              coll, index, ret, len);
795
796         if (!rq)
797                 return;
798
799         if (!coll) {
800                 blk_end_request(rq, ret, len);
801                 return;
802         }
803
804         q = rq->q;
805
806         spin_lock_irq(q->queue_lock);
807         coll->status[index].done = 1;
808         coll->status[index].rc = ret;
809         coll->status[index].bytes = len;
810         max = min = coll->num_done;
811         while (max < coll->total && coll->status[max].done)
812                 max++;
813
814         for (i = min; i<max; i++) {
815                 __blk_end_request(rq, coll->status[i].rc,
816                                   coll->status[i].bytes);
817                 coll->num_done++;
818                 kref_put(&coll->kref, rbd_coll_release);
819         }
820         spin_unlock_irq(q->queue_lock);
821 }
822
823 static void rbd_coll_end_req(struct rbd_request *req,
824                              int ret, u64 len)
825 {
826         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
827 }
828
829 /*
830  * Send ceph osd request
831  */
832 static int rbd_do_request(struct request *rq,
833                           struct rbd_device *dev,
834                           struct ceph_snap_context *snapc,
835                           u64 snapid,
836                           const char *obj, u64 ofs, u64 len,
837                           struct bio *bio,
838                           struct page **pages,
839                           int num_pages,
840                           int flags,
841                           struct ceph_osd_req_op *ops,
842                           int num_reply,
843                           struct rbd_req_coll *coll,
844                           int coll_index,
845                           void (*rbd_cb)(struct ceph_osd_request *req,
846                                          struct ceph_msg *msg),
847                           struct ceph_osd_request **linger_req,
848                           u64 *ver)
849 {
850         struct ceph_osd_request *req;
851         struct ceph_file_layout *layout;
852         int ret;
853         u64 bno;
854         struct timespec mtime = CURRENT_TIME;
855         struct rbd_request *req_data;
856         struct ceph_osd_request_head *reqhead;
857         struct rbd_image_header *header = &dev->header;
858
859         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
860         if (!req_data) {
861                 if (coll)
862                         rbd_coll_end_req_index(rq, coll, coll_index,
863                                                -ENOMEM, len);
864                 return -ENOMEM;
865         }
866
867         if (coll) {
868                 req_data->coll = coll;
869                 req_data->coll_index = coll_index;
870         }
871
872         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
873
874         down_read(&header->snap_rwsem);
875
876         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
877                                       snapc,
878                                       ops,
879                                       false,
880                                       GFP_NOIO, pages, bio);
881         if (!req) {
882                 up_read(&header->snap_rwsem);
883                 ret = -ENOMEM;
884                 goto done_pages;
885         }
886
887         req->r_callback = rbd_cb;
888
889         req_data->rq = rq;
890         req_data->bio = bio;
891         req_data->pages = pages;
892         req_data->len = len;
893
894         req->r_priv = req_data;
895
896         reqhead = req->r_request->front.iov_base;
897         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
898
899         strncpy(req->r_oid, obj, sizeof(req->r_oid));
900         req->r_oid_len = strlen(req->r_oid);
901
902         layout = &req->r_file_layout;
903         memset(layout, 0, sizeof(*layout));
904         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905         layout->fl_stripe_count = cpu_to_le32(1);
906         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
907         layout->fl_pg_preferred = cpu_to_le32(-1);
908         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
909         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
910                              ofs, &len, &bno, req, ops);
911
912         ceph_osdc_build_request(req, ofs, &len,
913                                 ops,
914                                 snapc,
915                                 &mtime,
916                                 req->r_oid, req->r_oid_len);
917         up_read(&header->snap_rwsem);
918
919         if (linger_req) {
920                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
921                 *linger_req = req;
922         }
923
924         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
925         if (ret < 0)
926                 goto done_err;
927
928         if (!rbd_cb) {
929                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
930                 if (ver)
931                         *ver = le64_to_cpu(req->r_reassert_version.version);
932                 dout("reassert_ver=%lld\n",
933                      le64_to_cpu(req->r_reassert_version.version));
934                 ceph_osdc_put_request(req);
935         }
936         return ret;
937
938 done_err:
939         bio_chain_put(req_data->bio);
940         ceph_osdc_put_request(req);
941 done_pages:
942         rbd_coll_end_req(req_data, ret, len);
943         kfree(req_data);
944         return ret;
945 }
946
947 /*
948  * Ceph osd op callback
949  */
950 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
951 {
952         struct rbd_request *req_data = req->r_priv;
953         struct ceph_osd_reply_head *replyhead;
954         struct ceph_osd_op *op;
955         __s32 rc;
956         u64 bytes;
957         int read_op;
958
959         /* parse reply */
960         replyhead = msg->front.iov_base;
961         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
962         op = (void *)(replyhead + 1);
963         rc = le32_to_cpu(replyhead->result);
964         bytes = le64_to_cpu(op->extent.length);
965         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
966
967         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
968
969         if (rc == -ENOENT && read_op) {
970                 zero_bio_chain(req_data->bio, 0);
971                 rc = 0;
972         } else if (rc == 0 && read_op && bytes < req_data->len) {
973                 zero_bio_chain(req_data->bio, bytes);
974                 bytes = req_data->len;
975         }
976
977         rbd_coll_end_req(req_data, rc, bytes);
978
979         if (req_data->bio)
980                 bio_chain_put(req_data->bio);
981
982         ceph_osdc_put_request(req);
983         kfree(req_data);
984 }
985
986 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
987 {
988         ceph_osdc_put_request(req);
989 }
990
991 /*
992  * Do a synchronous ceph osd operation
993  */
994 static int rbd_req_sync_op(struct rbd_device *dev,
995                            struct ceph_snap_context *snapc,
996                            u64 snapid,
997                            int opcode,
998                            int flags,
999                            struct ceph_osd_req_op *orig_ops,
1000                            int num_reply,
1001                            const char *obj,
1002                            u64 ofs, u64 len,
1003                            char *buf,
1004                            struct ceph_osd_request **linger_req,
1005                            u64 *ver)
1006 {
1007         int ret;
1008         struct page **pages;
1009         int num_pages;
1010         struct ceph_osd_req_op *ops = orig_ops;
1011         u32 payload_len;
1012
1013         num_pages = calc_pages_for(ofs , len);
1014         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1015         if (IS_ERR(pages))
1016                 return PTR_ERR(pages);
1017
1018         if (!orig_ops) {
1019                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1020                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1021                 if (ret < 0)
1022                         goto done;
1023
1024                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1025                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1026                         if (ret < 0)
1027                                 goto done_ops;
1028                 }
1029         }
1030
1031         ret = rbd_do_request(NULL, dev, snapc, snapid,
1032                           obj, ofs, len, NULL,
1033                           pages, num_pages,
1034                           flags,
1035                           ops,
1036                           2,
1037                           NULL, 0,
1038                           NULL,
1039                           linger_req, ver);
1040         if (ret < 0)
1041                 goto done_ops;
1042
1043         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1044                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1045
1046 done_ops:
1047         if (!orig_ops)
1048                 rbd_destroy_ops(ops);
1049 done:
1050         ceph_release_page_vector(pages, num_pages);
1051         return ret;
1052 }
1053
1054 /*
1055  * Do an asynchronous ceph osd operation
1056  */
1057 static int rbd_do_op(struct request *rq,
1058                      struct rbd_device *rbd_dev ,
1059                      struct ceph_snap_context *snapc,
1060                      u64 snapid,
1061                      int opcode, int flags, int num_reply,
1062                      u64 ofs, u64 len,
1063                      struct bio *bio,
1064                      struct rbd_req_coll *coll,
1065                      int coll_index)
1066 {
1067         char *seg_name;
1068         u64 seg_ofs;
1069         u64 seg_len;
1070         int ret;
1071         struct ceph_osd_req_op *ops;
1072         u32 payload_len;
1073
1074         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1075         if (!seg_name)
1076                 return -ENOMEM;
1077
1078         seg_len = rbd_get_segment(&rbd_dev->header,
1079                                   rbd_dev->header.block_name,
1080                                   ofs, len,
1081                                   seg_name, &seg_ofs);
1082
1083         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1084
1085         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1086         if (ret < 0)
1087                 goto done;
1088
1089         /* we've taken care of segment sizes earlier when we
1090            cloned the bios. We should never have a segment
1091            truncated at this point */
1092         BUG_ON(seg_len < len);
1093
1094         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1095                              seg_name, seg_ofs, seg_len,
1096                              bio,
1097                              NULL, 0,
1098                              flags,
1099                              ops,
1100                              num_reply,
1101                              coll, coll_index,
1102                              rbd_req_cb, 0, NULL);
1103
1104         rbd_destroy_ops(ops);
1105 done:
1106         kfree(seg_name);
1107         return ret;
1108 }
1109
1110 /*
1111  * Request async osd write
1112  */
1113 static int rbd_req_write(struct request *rq,
1114                          struct rbd_device *rbd_dev,
1115                          struct ceph_snap_context *snapc,
1116                          u64 ofs, u64 len,
1117                          struct bio *bio,
1118                          struct rbd_req_coll *coll,
1119                          int coll_index)
1120 {
1121         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1122                          CEPH_OSD_OP_WRITE,
1123                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1124                          2,
1125                          ofs, len, bio, coll, coll_index);
1126 }
1127
1128 /*
1129  * Request async osd read
1130  */
1131 static int rbd_req_read(struct request *rq,
1132                          struct rbd_device *rbd_dev,
1133                          u64 snapid,
1134                          u64 ofs, u64 len,
1135                          struct bio *bio,
1136                          struct rbd_req_coll *coll,
1137                          int coll_index)
1138 {
1139         return rbd_do_op(rq, rbd_dev, NULL,
1140                          (snapid ? snapid : CEPH_NOSNAP),
1141                          CEPH_OSD_OP_READ,
1142                          CEPH_OSD_FLAG_READ,
1143                          2,
1144                          ofs, len, bio, coll, coll_index);
1145 }
1146
1147 /*
1148  * Request sync osd read
1149  */
1150 static int rbd_req_sync_read(struct rbd_device *dev,
1151                           struct ceph_snap_context *snapc,
1152                           u64 snapid,
1153                           const char *obj,
1154                           u64 ofs, u64 len,
1155                           char *buf,
1156                           u64 *ver)
1157 {
1158         return rbd_req_sync_op(dev, NULL,
1159                                (snapid ? snapid : CEPH_NOSNAP),
1160                                CEPH_OSD_OP_READ,
1161                                CEPH_OSD_FLAG_READ,
1162                                NULL,
1163                                1, obj, ofs, len, buf, NULL, ver);
1164 }
1165
1166 /*
1167  * Request sync osd watch
1168  */
1169 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1170                                    u64 ver,
1171                                    u64 notify_id,
1172                                    const char *obj)
1173 {
1174         struct ceph_osd_req_op *ops;
1175         struct page **pages = NULL;
1176         int ret;
1177
1178         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1179         if (ret < 0)
1180                 return ret;
1181
1182         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1183         ops[0].watch.cookie = notify_id;
1184         ops[0].watch.flag = 0;
1185
1186         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1187                           obj, 0, 0, NULL,
1188                           pages, 0,
1189                           CEPH_OSD_FLAG_READ,
1190                           ops,
1191                           1,
1192                           NULL, 0,
1193                           rbd_simple_req_cb, 0, NULL);
1194
1195         rbd_destroy_ops(ops);
1196         return ret;
1197 }
1198
1199 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1200 {
1201         struct rbd_device *dev = (struct rbd_device *)data;
1202         int rc;
1203
1204         if (!dev)
1205                 return;
1206
1207         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1208                 notify_id, (int)opcode);
1209         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1210         rc = __rbd_update_snaps(dev);
1211         mutex_unlock(&ctl_mutex);
1212         if (rc)
1213                 pr_warning(DRV_NAME "%d got notification but failed to update"
1214                            " snaps: %d\n", dev->major, rc);
1215
1216         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1217 }
1218
1219 /*
1220  * Request sync osd watch
1221  */
1222 static int rbd_req_sync_watch(struct rbd_device *dev,
1223                               const char *obj,
1224                               u64 ver)
1225 {
1226         struct ceph_osd_req_op *ops;
1227         struct ceph_osd_client *osdc = &dev->client->osdc;
1228
1229         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1230         if (ret < 0)
1231                 return ret;
1232
1233         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1234                                      (void *)dev, &dev->watch_event);
1235         if (ret < 0)
1236                 goto fail;
1237
1238         ops[0].watch.ver = cpu_to_le64(ver);
1239         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1240         ops[0].watch.flag = 1;
1241
1242         ret = rbd_req_sync_op(dev, NULL,
1243                               CEPH_NOSNAP,
1244                               0,
1245                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1246                               ops,
1247                               1, obj, 0, 0, NULL,
1248                               &dev->watch_request, NULL);
1249
1250         if (ret < 0)
1251                 goto fail_event;
1252
1253         rbd_destroy_ops(ops);
1254         return 0;
1255
1256 fail_event:
1257         ceph_osdc_cancel_event(dev->watch_event);
1258         dev->watch_event = NULL;
1259 fail:
1260         rbd_destroy_ops(ops);
1261         return ret;
1262 }
1263
1264 /*
1265  * Request sync osd unwatch
1266  */
1267 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268                                 const char *obj)
1269 {
1270         struct ceph_osd_req_op *ops;
1271
1272         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1273         if (ret < 0)
1274                 return ret;
1275
1276         ops[0].watch.ver = 0;
1277         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1278         ops[0].watch.flag = 0;
1279
1280         ret = rbd_req_sync_op(dev, NULL,
1281                               CEPH_NOSNAP,
1282                               0,
1283                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1284                               ops,
1285                               1, obj, 0, 0, NULL, NULL, NULL);
1286
1287         rbd_destroy_ops(ops);
1288         ceph_osdc_cancel_event(dev->watch_event);
1289         dev->watch_event = NULL;
1290         return ret;
1291 }
1292
1293 struct rbd_notify_info {
1294         struct rbd_device *dev;
1295 };
1296
1297 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1298 {
1299         struct rbd_device *dev = (struct rbd_device *)data;
1300         if (!dev)
1301                 return;
1302
1303         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1304                 notify_id, (int)opcode);
1305 }
1306
1307 /*
1308  * Request sync osd notify
1309  */
1310 static int rbd_req_sync_notify(struct rbd_device *dev,
1311                           const char *obj)
1312 {
1313         struct ceph_osd_req_op *ops;
1314         struct ceph_osd_client *osdc = &dev->client->osdc;
1315         struct ceph_osd_event *event;
1316         struct rbd_notify_info info;
1317         int payload_len = sizeof(u32) + sizeof(u32);
1318         int ret;
1319
1320         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1321         if (ret < 0)
1322                 return ret;
1323
1324         info.dev = dev;
1325
1326         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1327                                      (void *)&info, &event);
1328         if (ret < 0)
1329                 goto fail;
1330
1331         ops[0].watch.ver = 1;
1332         ops[0].watch.flag = 1;
1333         ops[0].watch.cookie = event->cookie;
1334         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1335         ops[0].watch.timeout = 12;
1336
1337         ret = rbd_req_sync_op(dev, NULL,
1338                                CEPH_NOSNAP,
1339                                0,
1340                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1341                                ops,
1342                                1, obj, 0, 0, NULL, NULL, NULL);
1343         if (ret < 0)
1344                 goto fail_event;
1345
1346         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1347         dout("ceph_osdc_wait_event returned %d\n", ret);
1348         rbd_destroy_ops(ops);
1349         return 0;
1350
1351 fail_event:
1352         ceph_osdc_cancel_event(event);
1353 fail:
1354         rbd_destroy_ops(ops);
1355         return ret;
1356 }
1357
1358 /*
1359  * Request sync osd read
1360  */
1361 static int rbd_req_sync_exec(struct rbd_device *dev,
1362                              const char *obj,
1363                              const char *cls,
1364                              const char *method,
1365                              const char *data,
1366                              int len,
1367                              u64 *ver)
1368 {
1369         struct ceph_osd_req_op *ops;
1370         int cls_len = strlen(cls);
1371         int method_len = strlen(method);
1372         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1373                                     cls_len + method_len + len);
1374         if (ret < 0)
1375                 return ret;
1376
1377         ops[0].cls.class_name = cls;
1378         ops[0].cls.class_len = (__u8)cls_len;
1379         ops[0].cls.method_name = method;
1380         ops[0].cls.method_len = (__u8)method_len;
1381         ops[0].cls.argc = 0;
1382         ops[0].cls.indata = data;
1383         ops[0].cls.indata_len = len;
1384
1385         ret = rbd_req_sync_op(dev, NULL,
1386                                CEPH_NOSNAP,
1387                                0,
1388                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1389                                ops,
1390                                1, obj, 0, 0, NULL, NULL, ver);
1391
1392         rbd_destroy_ops(ops);
1393
1394         dout("cls_exec returned %d\n", ret);
1395         return ret;
1396 }
1397
1398 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1399 {
1400         struct rbd_req_coll *coll =
1401                         kzalloc(sizeof(struct rbd_req_coll) +
1402                                 sizeof(struct rbd_req_status) * num_reqs,
1403                                 GFP_ATOMIC);
1404
1405         if (!coll)
1406                 return NULL;
1407         coll->total = num_reqs;
1408         kref_init(&coll->kref);
1409         return coll;
1410 }
1411
1412 /*
1413  * block device queue callback
1414  */
1415 static void rbd_rq_fn(struct request_queue *q)
1416 {
1417         struct rbd_device *rbd_dev = q->queuedata;
1418         struct request *rq;
1419         struct bio_pair *bp = NULL;
1420
1421         rq = blk_fetch_request(q);
1422
1423         while (1) {
1424                 struct bio *bio;
1425                 struct bio *rq_bio, *next_bio = NULL;
1426                 bool do_write;
1427                 int size, op_size = 0;
1428                 u64 ofs;
1429                 int num_segs, cur_seg = 0;
1430                 struct rbd_req_coll *coll;
1431
1432                 /* peek at request from block layer */
1433                 if (!rq)
1434                         break;
1435
1436                 dout("fetched request\n");
1437
1438                 /* filter out block requests we don't understand */
1439                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1440                         __blk_end_request_all(rq, 0);
1441                         goto next;
1442                 }
1443
1444                 /* deduce our operation (read, write) */
1445                 do_write = (rq_data_dir(rq) == WRITE);
1446
1447                 size = blk_rq_bytes(rq);
1448                 ofs = blk_rq_pos(rq) * 512ULL;
1449                 rq_bio = rq->bio;
1450                 if (do_write && rbd_dev->read_only) {
1451                         __blk_end_request_all(rq, -EROFS);
1452                         goto next;
1453                 }
1454
1455                 spin_unlock_irq(q->queue_lock);
1456
1457                 dout("%s 0x%x bytes at 0x%llx\n",
1458                      do_write ? "write" : "read",
1459                      size, blk_rq_pos(rq) * 512ULL);
1460
1461                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1462                 coll = rbd_alloc_coll(num_segs);
1463                 if (!coll) {
1464                         spin_lock_irq(q->queue_lock);
1465                         __blk_end_request_all(rq, -ENOMEM);
1466                         goto next;
1467                 }
1468
1469                 do {
1470                         /* a bio clone to be passed down to OSD req */
1471                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1472                         op_size = rbd_get_segment(&rbd_dev->header,
1473                                                   rbd_dev->header.block_name,
1474                                                   ofs, size,
1475                                                   NULL, NULL);
1476                         kref_get(&coll->kref);
1477                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1478                                               op_size, GFP_ATOMIC);
1479                         if (!bio) {
1480                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1481                                                        -ENOMEM, op_size);
1482                                 goto next_seg;
1483                         }
1484
1485
1486                         /* init OSD command: write or read */
1487                         if (do_write)
1488                                 rbd_req_write(rq, rbd_dev,
1489                                               rbd_dev->header.snapc,
1490                                               ofs,
1491                                               op_size, bio,
1492                                               coll, cur_seg);
1493                         else
1494                                 rbd_req_read(rq, rbd_dev,
1495                                              cur_snap_id(rbd_dev),
1496                                              ofs,
1497                                              op_size, bio,
1498                                              coll, cur_seg);
1499
1500 next_seg:
1501                         size -= op_size;
1502                         ofs += op_size;
1503
1504                         cur_seg++;
1505                         rq_bio = next_bio;
1506                 } while (size > 0);
1507                 kref_put(&coll->kref, rbd_coll_release);
1508
1509                 if (bp)
1510                         bio_pair_release(bp);
1511                 spin_lock_irq(q->queue_lock);
1512 next:
1513                 rq = blk_fetch_request(q);
1514         }
1515 }
1516
1517 /*
1518  * a queue callback. Makes sure that we don't create a bio that spans across
1519  * multiple osd objects. One exception would be with a single page bios,
1520  * which we handle later at bio_chain_clone
1521  */
1522 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1523                           struct bio_vec *bvec)
1524 {
1525         struct rbd_device *rbd_dev = q->queuedata;
1526         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1527         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1528         unsigned int bio_sectors = bmd->bi_size >> 9;
1529         int max;
1530
1531         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1532                                  + bio_sectors)) << 9;
1533         if (max < 0)
1534                 max = 0; /* bio_add cannot handle a negative return */
1535         if (max <= bvec->bv_len && bio_sectors == 0)
1536                 return bvec->bv_len;
1537         return max;
1538 }
1539
1540 static void rbd_free_disk(struct rbd_device *rbd_dev)
1541 {
1542         struct gendisk *disk = rbd_dev->disk;
1543
1544         if (!disk)
1545                 return;
1546
1547         rbd_header_free(&rbd_dev->header);
1548
1549         if (disk->flags & GENHD_FL_UP)
1550                 del_gendisk(disk);
1551         if (disk->queue)
1552                 blk_cleanup_queue(disk->queue);
1553         put_disk(disk);
1554 }
1555
1556 /*
1557  * reload the ondisk the header 
1558  */
1559 static int rbd_read_header(struct rbd_device *rbd_dev,
1560                            struct rbd_image_header *header)
1561 {
1562         ssize_t rc;
1563         struct rbd_image_header_ondisk *dh;
1564         int snap_count = 0;
1565         u64 snap_names_len = 0;
1566         u64 ver;
1567
1568         while (1) {
1569                 int len = sizeof(*dh) +
1570                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1571                           snap_names_len;
1572
1573                 rc = -ENOMEM;
1574                 dh = kmalloc(len, GFP_KERNEL);
1575                 if (!dh)
1576                         return -ENOMEM;
1577
1578                 rc = rbd_req_sync_read(rbd_dev,
1579                                        NULL, CEPH_NOSNAP,
1580                                        rbd_dev->obj_md_name,
1581                                        0, len,
1582                                        (char *)dh, &ver);
1583                 if (rc < 0)
1584                         goto out_dh;
1585
1586                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1587                 if (rc < 0) {
1588                         if (rc == -ENXIO) {
1589                                 pr_warning("unrecognized header format"
1590                                            " for image %s", rbd_dev->obj);
1591                         }
1592                         goto out_dh;
1593                 }
1594
1595                 if (snap_count != header->total_snaps) {
1596                         snap_count = header->total_snaps;
1597                         snap_names_len = header->snap_names_len;
1598                         rbd_header_free(header);
1599                         kfree(dh);
1600                         continue;
1601                 }
1602                 break;
1603         }
1604         header->obj_version = ver;
1605
1606 out_dh:
1607         kfree(dh);
1608         return rc;
1609 }
1610
1611 /*
1612  * create a snapshot
1613  */
1614 static int rbd_header_add_snap(struct rbd_device *dev,
1615                                const char *snap_name,
1616                                gfp_t gfp_flags)
1617 {
1618         int name_len = strlen(snap_name);
1619         u64 new_snapid;
1620         int ret;
1621         void *data, *p, *e;
1622         u64 ver;
1623
1624         /* we should create a snapshot only if we're pointing at the head */
1625         if (dev->cur_snap)
1626                 return -EINVAL;
1627
1628         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1629                                       &new_snapid);
1630         dout("created snapid=%lld\n", new_snapid);
1631         if (ret < 0)
1632                 return ret;
1633
1634         data = kmalloc(name_len + 16, gfp_flags);
1635         if (!data)
1636                 return -ENOMEM;
1637
1638         p = data;
1639         e = data + name_len + 16;
1640
1641         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1642         ceph_encode_64_safe(&p, e, new_snapid, bad);
1643
1644         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1645                                 data, p - data, &ver);
1646
1647         kfree(data);
1648
1649         if (ret < 0)
1650                 return ret;
1651
1652         dev->header.snapc->seq =  new_snapid;
1653
1654         return 0;
1655 bad:
1656         return -ERANGE;
1657 }
1658
1659 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1660 {
1661         struct rbd_snap *snap;
1662
1663         while (!list_empty(&rbd_dev->snaps)) {
1664                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1665                 __rbd_remove_snap_dev(rbd_dev, snap);
1666         }
1667 }
1668
1669 /*
1670  * only read the first part of the ondisk header, without the snaps info
1671  */
1672 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1673 {
1674         int ret;
1675         struct rbd_image_header h;
1676         u64 snap_seq;
1677         int follow_seq = 0;
1678
1679         ret = rbd_read_header(rbd_dev, &h);
1680         if (ret < 0)
1681                 return ret;
1682
1683         /* resized? */
1684         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1685
1686         down_write(&rbd_dev->header.snap_rwsem);
1687
1688         snap_seq = rbd_dev->header.snapc->seq;
1689         if (rbd_dev->header.total_snaps &&
1690             rbd_dev->header.snapc->snaps[0] == snap_seq)
1691                 /* pointing at the head, will need to follow that
1692                    if head moves */
1693                 follow_seq = 1;
1694
1695         kfree(rbd_dev->header.snapc);
1696         kfree(rbd_dev->header.snap_names);
1697         kfree(rbd_dev->header.snap_sizes);
1698
1699         rbd_dev->header.total_snaps = h.total_snaps;
1700         rbd_dev->header.snapc = h.snapc;
1701         rbd_dev->header.snap_names = h.snap_names;
1702         rbd_dev->header.snap_names_len = h.snap_names_len;
1703         rbd_dev->header.snap_sizes = h.snap_sizes;
1704         if (follow_seq)
1705                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1706         else
1707                 rbd_dev->header.snapc->seq = snap_seq;
1708
1709         ret = __rbd_init_snaps_header(rbd_dev);
1710
1711         up_write(&rbd_dev->header.snap_rwsem);
1712
1713         return ret;
1714 }
1715
1716 static int rbd_init_disk(struct rbd_device *rbd_dev)
1717 {
1718         struct gendisk *disk;
1719         struct request_queue *q;
1720         int rc;
1721         u64 total_size = 0;
1722
1723         /* contact OSD, request size info about the object being mapped */
1724         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1725         if (rc)
1726                 return rc;
1727
1728         /* no need to lock here, as rbd_dev is not registered yet */
1729         rc = __rbd_init_snaps_header(rbd_dev);
1730         if (rc)
1731                 return rc;
1732
1733         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1734         if (rc)
1735                 return rc;
1736
1737         /* create gendisk info */
1738         rc = -ENOMEM;
1739         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1740         if (!disk)
1741                 goto out;
1742
1743         snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1744                  rbd_dev->id);
1745         disk->major = rbd_dev->major;
1746         disk->first_minor = 0;
1747         disk->fops = &rbd_bd_ops;
1748         disk->private_data = rbd_dev;
1749
1750         /* init rq */
1751         rc = -ENOMEM;
1752         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1753         if (!q)
1754                 goto out_disk;
1755
1756         /* set io sizes to object size */
1757         blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1758         blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1759         blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1760         blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1761
1762         blk_queue_merge_bvec(q, rbd_merge_bvec);
1763         disk->queue = q;
1764
1765         q->queuedata = rbd_dev;
1766
1767         rbd_dev->disk = disk;
1768         rbd_dev->q = q;
1769
1770         /* finally, announce the disk to the world */
1771         set_capacity(disk, total_size / 512ULL);
1772         add_disk(disk);
1773
1774         pr_info("%s: added with size 0x%llx\n",
1775                 disk->disk_name, (unsigned long long)total_size);
1776         return 0;
1777
1778 out_disk:
1779         put_disk(disk);
1780 out:
1781         return rc;
1782 }
1783
1784 /*
1785   sysfs
1786 */
1787
1788 static ssize_t rbd_size_show(struct device *dev,
1789                              struct device_attribute *attr, char *buf)
1790 {
1791         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1792
1793         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1794 }
1795
1796 static ssize_t rbd_major_show(struct device *dev,
1797                               struct device_attribute *attr, char *buf)
1798 {
1799         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1800
1801         return sprintf(buf, "%d\n", rbd_dev->major);
1802 }
1803
1804 static ssize_t rbd_client_id_show(struct device *dev,
1805                                   struct device_attribute *attr, char *buf)
1806 {
1807         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1808
1809         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1810 }
1811
1812 static ssize_t rbd_pool_show(struct device *dev,
1813                              struct device_attribute *attr, char *buf)
1814 {
1815         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1816
1817         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1818 }
1819
1820 static ssize_t rbd_name_show(struct device *dev,
1821                              struct device_attribute *attr, char *buf)
1822 {
1823         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824
1825         return sprintf(buf, "%s\n", rbd_dev->obj);
1826 }
1827
1828 static ssize_t rbd_snap_show(struct device *dev,
1829                              struct device_attribute *attr,
1830                              char *buf)
1831 {
1832         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833
1834         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1835 }
1836
1837 static ssize_t rbd_image_refresh(struct device *dev,
1838                                  struct device_attribute *attr,
1839                                  const char *buf,
1840                                  size_t size)
1841 {
1842         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1843         int rc;
1844         int ret = size;
1845
1846         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1847
1848         rc = __rbd_update_snaps(rbd_dev);
1849         if (rc < 0)
1850                 ret = rc;
1851
1852         mutex_unlock(&ctl_mutex);
1853         return ret;
1854 }
1855
1856 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1864
1865 static struct attribute *rbd_attrs[] = {
1866         &dev_attr_size.attr,
1867         &dev_attr_major.attr,
1868         &dev_attr_client_id.attr,
1869         &dev_attr_pool.attr,
1870         &dev_attr_name.attr,
1871         &dev_attr_current_snap.attr,
1872         &dev_attr_refresh.attr,
1873         &dev_attr_create_snap.attr,
1874         NULL
1875 };
1876
1877 static struct attribute_group rbd_attr_group = {
1878         .attrs = rbd_attrs,
1879 };
1880
1881 static const struct attribute_group *rbd_attr_groups[] = {
1882         &rbd_attr_group,
1883         NULL
1884 };
1885
1886 static void rbd_sysfs_dev_release(struct device *dev)
1887 {
1888 }
1889
1890 static struct device_type rbd_device_type = {
1891         .name           = "rbd",
1892         .groups         = rbd_attr_groups,
1893         .release        = rbd_sysfs_dev_release,
1894 };
1895
1896
1897 /*
1898   sysfs - snapshots
1899 */
1900
1901 static ssize_t rbd_snap_size_show(struct device *dev,
1902                                   struct device_attribute *attr,
1903                                   char *buf)
1904 {
1905         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1906
1907         return sprintf(buf, "%lld\n", (long long)snap->size);
1908 }
1909
1910 static ssize_t rbd_snap_id_show(struct device *dev,
1911                                 struct device_attribute *attr,
1912                                 char *buf)
1913 {
1914         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915
1916         return sprintf(buf, "%lld\n", (long long)snap->id);
1917 }
1918
1919 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1921
1922 static struct attribute *rbd_snap_attrs[] = {
1923         &dev_attr_snap_size.attr,
1924         &dev_attr_snap_id.attr,
1925         NULL,
1926 };
1927
1928 static struct attribute_group rbd_snap_attr_group = {
1929         .attrs = rbd_snap_attrs,
1930 };
1931
1932 static void rbd_snap_dev_release(struct device *dev)
1933 {
1934         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935         kfree(snap->name);
1936         kfree(snap);
1937 }
1938
1939 static const struct attribute_group *rbd_snap_attr_groups[] = {
1940         &rbd_snap_attr_group,
1941         NULL
1942 };
1943
1944 static struct device_type rbd_snap_device_type = {
1945         .groups         = rbd_snap_attr_groups,
1946         .release        = rbd_snap_dev_release,
1947 };
1948
1949 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950                                   struct rbd_snap *snap)
1951 {
1952         list_del(&snap->node);
1953         device_unregister(&snap->dev);
1954 }
1955
1956 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957                                   struct rbd_snap *snap,
1958                                   struct device *parent)
1959 {
1960         struct device *dev = &snap->dev;
1961         int ret;
1962
1963         dev->type = &rbd_snap_device_type;
1964         dev->parent = parent;
1965         dev->release = rbd_snap_dev_release;
1966         dev_set_name(dev, "snap_%s", snap->name);
1967         ret = device_register(dev);
1968
1969         return ret;
1970 }
1971
1972 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973                               int i, const char *name,
1974                               struct rbd_snap **snapp)
1975 {
1976         int ret;
1977         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1978         if (!snap)
1979                 return -ENOMEM;
1980         snap->name = kstrdup(name, GFP_KERNEL);
1981         snap->size = rbd_dev->header.snap_sizes[i];
1982         snap->id = rbd_dev->header.snapc->snaps[i];
1983         if (device_is_registered(&rbd_dev->dev)) {
1984                 ret = rbd_register_snap_dev(rbd_dev, snap,
1985                                              &rbd_dev->dev);
1986                 if (ret < 0)
1987                         goto err;
1988         }
1989         *snapp = snap;
1990         return 0;
1991 err:
1992         kfree(snap->name);
1993         kfree(snap);
1994         return ret;
1995 }
1996
1997 /*
1998  * search for the previous snap in a null delimited string list
1999  */
2000 const char *rbd_prev_snap_name(const char *name, const char *start)
2001 {
2002         if (name < start + 2)
2003                 return NULL;
2004
2005         name -= 2;
2006         while (*name) {
2007                 if (name == start)
2008                         return start;
2009                 name--;
2010         }
2011         return name + 1;
2012 }
2013
2014 /*
2015  * compare the old list of snapshots that we have to what's in the header
2016  * and update it accordingly. Note that the header holds the snapshots
2017  * in a reverse order (from newest to oldest) and we need to go from
2018  * older to new so that we don't get a duplicate snap name when
2019  * doing the process (e.g., removed snapshot and recreated a new
2020  * one with the same name.
2021  */
2022 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2023 {
2024         const char *name, *first_name;
2025         int i = rbd_dev->header.total_snaps;
2026         struct rbd_snap *snap, *old_snap = NULL;
2027         int ret;
2028         struct list_head *p, *n;
2029
2030         first_name = rbd_dev->header.snap_names;
2031         name = first_name + rbd_dev->header.snap_names_len;
2032
2033         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2034                 u64 cur_id;
2035
2036                 old_snap = list_entry(p, struct rbd_snap, node);
2037
2038                 if (i)
2039                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2040
2041                 if (!i || old_snap->id < cur_id) {
2042                         /* old_snap->id was skipped, thus was removed */
2043                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2044                         continue;
2045                 }
2046                 if (old_snap->id == cur_id) {
2047                         /* we have this snapshot already */
2048                         i--;
2049                         name = rbd_prev_snap_name(name, first_name);
2050                         continue;
2051                 }
2052                 for (; i > 0;
2053                      i--, name = rbd_prev_snap_name(name, first_name)) {
2054                         if (!name) {
2055                                 WARN_ON(1);
2056                                 return -EINVAL;
2057                         }
2058                         cur_id = rbd_dev->header.snapc->snaps[i];
2059                         /* snapshot removal? handle it above */
2060                         if (cur_id >= old_snap->id)
2061                                 break;
2062                         /* a new snapshot */
2063                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2064                         if (ret < 0)
2065                                 return ret;
2066
2067                         /* note that we add it backward so using n and not p */
2068                         list_add(&snap->node, n);
2069                         p = &snap->node;
2070                 }
2071         }
2072         /* we're done going over the old snap list, just add what's left */
2073         for (; i > 0; i--) {
2074                 name = rbd_prev_snap_name(name, first_name);
2075                 if (!name) {
2076                         WARN_ON(1);
2077                         return -EINVAL;
2078                 }
2079                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2080                 if (ret < 0)
2081                         return ret;
2082                 list_add(&snap->node, &rbd_dev->snaps);
2083         }
2084
2085         return 0;
2086 }
2087
2088
2089 static void rbd_root_dev_release(struct device *dev)
2090 {
2091 }
2092
2093 static struct device rbd_root_dev = {
2094         .init_name =    "rbd",
2095         .release =      rbd_root_dev_release,
2096 };
2097
2098 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2099 {
2100         int ret = -ENOMEM;
2101         struct device *dev;
2102         struct rbd_snap *snap;
2103
2104         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105         dev = &rbd_dev->dev;
2106
2107         dev->bus = &rbd_bus_type;
2108         dev->type = &rbd_device_type;
2109         dev->parent = &rbd_root_dev;
2110         dev->release = rbd_dev_release;
2111         dev_set_name(dev, "%d", rbd_dev->id);
2112         ret = device_register(dev);
2113         if (ret < 0)
2114                 goto done_free;
2115
2116         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117                 ret = rbd_register_snap_dev(rbd_dev, snap,
2118                                              &rbd_dev->dev);
2119                 if (ret < 0)
2120                         break;
2121         }
2122
2123         mutex_unlock(&ctl_mutex);
2124         return 0;
2125 done_free:
2126         mutex_unlock(&ctl_mutex);
2127         return ret;
2128 }
2129
2130 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2131 {
2132         device_unregister(&rbd_dev->dev);
2133 }
2134
2135 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2136 {
2137         int ret, rc;
2138
2139         do {
2140                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141                                          rbd_dev->header.obj_version);
2142                 if (ret == -ERANGE) {
2143                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144                         rc = __rbd_update_snaps(rbd_dev);
2145                         mutex_unlock(&ctl_mutex);
2146                         if (rc < 0)
2147                                 return rc;
2148                 }
2149         } while (ret == -ERANGE);
2150
2151         return ret;
2152 }
2153
2154 static ssize_t rbd_add(struct bus_type *bus,
2155                        const char *buf,
2156                        size_t count)
2157 {
2158         struct ceph_osd_client *osdc;
2159         struct rbd_device *rbd_dev;
2160         ssize_t rc = -ENOMEM;
2161         int irc, new_id = 0;
2162         struct list_head *tmp;
2163         char *mon_dev_name;
2164         char *options;
2165
2166         if (!try_module_get(THIS_MODULE))
2167                 return -ENODEV;
2168
2169         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2170         if (!mon_dev_name)
2171                 goto err_out_mod;
2172
2173         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2174         if (!options)
2175                 goto err_mon_dev;
2176
2177         /* new rbd_device object */
2178         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2179         if (!rbd_dev)
2180                 goto err_out_opt;
2181
2182         /* static rbd_device initialization */
2183         spin_lock_init(&rbd_dev->lock);
2184         INIT_LIST_HEAD(&rbd_dev->node);
2185         INIT_LIST_HEAD(&rbd_dev->snaps);
2186
2187         /* generate unique id: find highest unique id, add one */
2188         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2189
2190         list_for_each(tmp, &rbd_dev_list) {
2191                 struct rbd_device *rbd_dev;
2192
2193                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2194                 if (rbd_dev->id >= new_id)
2195                         new_id = rbd_dev->id + 1;
2196         }
2197
2198         rbd_dev->id = new_id;
2199
2200         /* add to global list */
2201         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2202
2203         /* parse add command */
2204         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2205                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2206                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2207                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2208                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2209                    mon_dev_name, options, rbd_dev->pool_name,
2210                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2211                 rc = -EINVAL;
2212                 goto err_out_slot;
2213         }
2214
2215         if (rbd_dev->snap_name[0] == 0)
2216                 rbd_dev->snap_name[0] = '-';
2217
2218         rbd_dev->obj_len = strlen(rbd_dev->obj);
2219         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2220                  rbd_dev->obj, RBD_SUFFIX);
2221
2222         /* initialize rest of new object */
2223         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2224         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2225         if (rc < 0)
2226                 goto err_out_slot;
2227
2228         mutex_unlock(&ctl_mutex);
2229
2230         /* pick the pool */
2231         osdc = &rbd_dev->client->osdc;
2232         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2233         if (rc < 0)
2234                 goto err_out_client;
2235         rbd_dev->poolid = rc;
2236
2237         /* register our block device */
2238         irc = register_blkdev(0, rbd_dev->name);
2239         if (irc < 0) {
2240                 rc = irc;
2241                 goto err_out_client;
2242         }
2243         rbd_dev->major = irc;
2244
2245         rc = rbd_bus_add_dev(rbd_dev);
2246         if (rc)
2247                 goto err_out_blkdev;
2248
2249         /* set up and announce blkdev mapping */
2250         rc = rbd_init_disk(rbd_dev);
2251         if (rc)
2252                 goto err_out_bus;
2253
2254         rc = rbd_init_watch_dev(rbd_dev);
2255         if (rc)
2256                 goto err_out_bus;
2257
2258         return count;
2259
2260 err_out_bus:
2261         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2262         list_del_init(&rbd_dev->node);
2263         mutex_unlock(&ctl_mutex);
2264
2265         /* this will also clean up rest of rbd_dev stuff */
2266
2267         rbd_bus_del_dev(rbd_dev);
2268         kfree(options);
2269         kfree(mon_dev_name);
2270         return rc;
2271
2272 err_out_blkdev:
2273         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2274 err_out_client:
2275         rbd_put_client(rbd_dev);
2276         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2277 err_out_slot:
2278         list_del_init(&rbd_dev->node);
2279         mutex_unlock(&ctl_mutex);
2280
2281         kfree(rbd_dev);
2282 err_out_opt:
2283         kfree(options);
2284 err_mon_dev:
2285         kfree(mon_dev_name);
2286 err_out_mod:
2287         dout("Error adding device %s\n", buf);
2288         module_put(THIS_MODULE);
2289         return rc;
2290 }
2291
2292 static struct rbd_device *__rbd_get_dev(unsigned long id)
2293 {
2294         struct list_head *tmp;
2295         struct rbd_device *rbd_dev;
2296
2297         list_for_each(tmp, &rbd_dev_list) {
2298                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2299                 if (rbd_dev->id == id)
2300                         return rbd_dev;
2301         }
2302         return NULL;
2303 }
2304
2305 static void rbd_dev_release(struct device *dev)
2306 {
2307         struct rbd_device *rbd_dev =
2308                         container_of(dev, struct rbd_device, dev);
2309
2310         if (rbd_dev->watch_request)
2311                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2312                                                     rbd_dev->watch_request);
2313         if (rbd_dev->watch_event)
2314                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2315
2316         rbd_put_client(rbd_dev);
2317
2318         /* clean up and free blkdev */
2319         rbd_free_disk(rbd_dev);
2320         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2321         kfree(rbd_dev);
2322
2323         /* release module ref */
2324         module_put(THIS_MODULE);
2325 }
2326
2327 static ssize_t rbd_remove(struct bus_type *bus,
2328                           const char *buf,
2329                           size_t count)
2330 {
2331         struct rbd_device *rbd_dev = NULL;
2332         int target_id, rc;
2333         unsigned long ul;
2334         int ret = count;
2335
2336         rc = strict_strtoul(buf, 10, &ul);
2337         if (rc)
2338                 return rc;
2339
2340         /* convert to int; abort if we lost anything in the conversion */
2341         target_id = (int) ul;
2342         if (target_id != ul)
2343                 return -EINVAL;
2344
2345         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2346
2347         rbd_dev = __rbd_get_dev(target_id);
2348         if (!rbd_dev) {
2349                 ret = -ENOENT;
2350                 goto done;
2351         }
2352
2353         list_del_init(&rbd_dev->node);
2354
2355         __rbd_remove_all_snaps(rbd_dev);
2356         rbd_bus_del_dev(rbd_dev);
2357
2358 done:
2359         mutex_unlock(&ctl_mutex);
2360         return ret;
2361 }
2362
2363 static ssize_t rbd_snap_add(struct device *dev,
2364                             struct device_attribute *attr,
2365                             const char *buf,
2366                             size_t count)
2367 {
2368         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2369         int ret;
2370         char *name = kmalloc(count + 1, GFP_KERNEL);
2371         if (!name)
2372                 return -ENOMEM;
2373
2374         snprintf(name, count, "%s", buf);
2375
2376         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2377
2378         ret = rbd_header_add_snap(rbd_dev,
2379                                   name, GFP_KERNEL);
2380         if (ret < 0)
2381                 goto err_unlock;
2382
2383         ret = __rbd_update_snaps(rbd_dev);
2384         if (ret < 0)
2385                 goto err_unlock;
2386
2387         /* shouldn't hold ctl_mutex when notifying.. notify might
2388            trigger a watch callback that would need to get that mutex */
2389         mutex_unlock(&ctl_mutex);
2390
2391         /* make a best effort, don't error if failed */
2392         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2393
2394         ret = count;
2395         kfree(name);
2396         return ret;
2397
2398 err_unlock:
2399         mutex_unlock(&ctl_mutex);
2400         kfree(name);
2401         return ret;
2402 }
2403
2404 static struct bus_attribute rbd_bus_attrs[] = {
2405         __ATTR(add, S_IWUSR, NULL, rbd_add),
2406         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2407         __ATTR_NULL
2408 };
2409
2410 /*
2411  * create control files in sysfs
2412  * /sys/bus/rbd/...
2413  */
2414 static int rbd_sysfs_init(void)
2415 {
2416         int ret;
2417
2418         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2419
2420         ret = bus_register(&rbd_bus_type);
2421          if (ret < 0)
2422                 return ret;
2423
2424         ret = device_register(&rbd_root_dev);
2425
2426         return ret;
2427 }
2428
2429 static void rbd_sysfs_cleanup(void)
2430 {
2431         device_unregister(&rbd_root_dev);
2432         bus_unregister(&rbd_bus_type);
2433 }
2434
2435 int __init rbd_init(void)
2436 {
2437         int rc;
2438
2439         rc = rbd_sysfs_init();
2440         if (rc)
2441                 return rc;
2442         spin_lock_init(&node_lock);
2443         pr_info("loaded " DRV_NAME_LONG "\n");
2444         return 0;
2445 }
2446
2447 void __exit rbd_exit(void)
2448 {
2449         rbd_sysfs_cleanup();
2450 }
2451
2452 module_init(rbd_init);
2453 module_exit(rbd_exit);
2454
2455 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2456 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2457 MODULE_DESCRIPTION("rados block device");
2458
2459 /* following authorship retained from original osdblk.c */
2460 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2461
2462 MODULE_LICENSE("GPL");