Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
[pandora-kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 /*
96  * a single io request
97  */
98 struct rbd_request {
99         struct request          *rq;            /* blk layer request */
100         struct bio              *bio;           /* cloned bio */
101         struct page             **pages;        /* list of used pages */
102         u64                     len;
103 };
104
105 struct rbd_snap {
106         struct  device          dev;
107         const char              *name;
108         size_t                  size;
109         struct list_head        node;
110         u64                     id;
111 };
112
113 /*
114  * a single device
115  */
116 struct rbd_device {
117         int                     id;             /* blkdev unique id */
118
119         int                     major;          /* blkdev assigned major */
120         struct gendisk          *disk;          /* blkdev's gendisk and rq */
121         struct request_queue    *q;
122
123         struct ceph_client      *client;
124         struct rbd_client       *rbd_client;
125
126         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
127
128         spinlock_t              lock;           /* queue lock */
129
130         struct rbd_image_header header;
131         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
132         int                     obj_len;
133         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
134         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
135         int                     poolid;
136
137         struct ceph_osd_event   *watch_event;
138         struct ceph_osd_request *watch_request;
139
140         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
141         u32 cur_snap;   /* index+1 of current snapshot within snap context
142                            0 - for the head */
143         int read_only;
144
145         struct list_head        node;
146
147         /* list of snapshots */
148         struct list_head        snaps;
149
150         /* sysfs related */
151         struct device           dev;
152 };
153
154 static struct bus_type rbd_bus_type = {
155         .name           = "rbd",
156 };
157
158 static spinlock_t node_lock;      /* protects client get/put */
159
160 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
161 static LIST_HEAD(rbd_dev_list);    /* devices */
162 static LIST_HEAD(rbd_client_list);      /* clients */
163
164 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
165 static void rbd_dev_release(struct device *dev);
166 static ssize_t rbd_snap_rollback(struct device *dev,
167                                  struct device_attribute *attr,
168                                  const char *buf,
169                                  size_t size);
170 static ssize_t rbd_snap_add(struct device *dev,
171                             struct device_attribute *attr,
172                             const char *buf,
173                             size_t count);
174 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
175                                   struct rbd_snap *snap);;
176
177
178 static struct rbd_device *dev_to_rbd(struct device *dev)
179 {
180         return container_of(dev, struct rbd_device, dev);
181 }
182
183 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
184 {
185         return get_device(&rbd_dev->dev);
186 }
187
188 static void rbd_put_dev(struct rbd_device *rbd_dev)
189 {
190         put_device(&rbd_dev->dev);
191 }
192
193 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
194
195 static int rbd_open(struct block_device *bdev, fmode_t mode)
196 {
197         struct gendisk *disk = bdev->bd_disk;
198         struct rbd_device *rbd_dev = disk->private_data;
199
200         rbd_get_dev(rbd_dev);
201
202         set_device_ro(bdev, rbd_dev->read_only);
203
204         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
205                 return -EROFS;
206
207         return 0;
208 }
209
210 static int rbd_release(struct gendisk *disk, fmode_t mode)
211 {
212         struct rbd_device *rbd_dev = disk->private_data;
213
214         rbd_put_dev(rbd_dev);
215
216         return 0;
217 }
218
219 static const struct block_device_operations rbd_bd_ops = {
220         .owner                  = THIS_MODULE,
221         .open                   = rbd_open,
222         .release                = rbd_release,
223 };
224
225 /*
226  * Initialize an rbd client instance.
227  * We own *opt.
228  */
229 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
230                                             struct rbd_options *rbd_opts)
231 {
232         struct rbd_client *rbdc;
233         int ret = -ENOMEM;
234
235         dout("rbd_client_create\n");
236         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
237         if (!rbdc)
238                 goto out_opt;
239
240         kref_init(&rbdc->kref);
241         INIT_LIST_HEAD(&rbdc->node);
242
243         rbdc->client = ceph_create_client(opt, rbdc);
244         if (IS_ERR(rbdc->client))
245                 goto out_rbdc;
246         opt = NULL; /* Now rbdc->client is responsible for opt */
247
248         ret = ceph_open_session(rbdc->client);
249         if (ret < 0)
250                 goto out_err;
251
252         rbdc->rbd_opts = rbd_opts;
253
254         spin_lock(&node_lock);
255         list_add_tail(&rbdc->node, &rbd_client_list);
256         spin_unlock(&node_lock);
257
258         dout("rbd_client_create created %p\n", rbdc);
259         return rbdc;
260
261 out_err:
262         ceph_destroy_client(rbdc->client);
263 out_rbdc:
264         kfree(rbdc);
265 out_opt:
266         if (opt)
267                 ceph_destroy_options(opt);
268         return ERR_PTR(ret);
269 }
270
271 /*
272  * Find a ceph client with specific addr and configuration.
273  */
274 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
275 {
276         struct rbd_client *client_node;
277
278         if (opt->flags & CEPH_OPT_NOSHARE)
279                 return NULL;
280
281         list_for_each_entry(client_node, &rbd_client_list, node)
282                 if (ceph_compare_options(opt, client_node->client) == 0)
283                         return client_node;
284         return NULL;
285 }
286
287 /*
288  * mount options
289  */
290 enum {
291         Opt_notify_timeout,
292         Opt_last_int,
293         /* int args above */
294         Opt_last_string,
295         /* string args above */
296 };
297
298 static match_table_t rbdopt_tokens = {
299         {Opt_notify_timeout, "notify_timeout=%d"},
300         /* int args above */
301         /* string args above */
302         {-1, NULL}
303 };
304
305 static int parse_rbd_opts_token(char *c, void *private)
306 {
307         struct rbd_options *rbdopt = private;
308         substring_t argstr[MAX_OPT_ARGS];
309         int token, intval, ret;
310
311         token = match_token((char *)c, rbdopt_tokens, argstr);
312         if (token < 0)
313                 return -EINVAL;
314
315         if (token < Opt_last_int) {
316                 ret = match_int(&argstr[0], &intval);
317                 if (ret < 0) {
318                         pr_err("bad mount option arg (not int) "
319                                "at '%s'\n", c);
320                         return ret;
321                 }
322                 dout("got int token %d val %d\n", token, intval);
323         } else if (token > Opt_last_int && token < Opt_last_string) {
324                 dout("got string token %d val %s\n", token,
325                      argstr[0].from);
326         } else {
327                 dout("got token %d\n", token);
328         }
329
330         switch (token) {
331         case Opt_notify_timeout:
332                 rbdopt->notify_timeout = intval;
333                 break;
334         default:
335                 BUG_ON(token);
336         }
337         return 0;
338 }
339
340 /*
341  * Get a ceph client with specific addr and configuration, if one does
342  * not exist create it.
343  */
344 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
345                           char *options)
346 {
347         struct rbd_client *rbdc;
348         struct ceph_options *opt;
349         int ret;
350         struct rbd_options *rbd_opts;
351
352         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
353         if (!rbd_opts)
354                 return -ENOMEM;
355
356         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
357
358         ret = ceph_parse_options(&opt, options, mon_addr,
359                                  mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
360         if (ret < 0)
361                 goto done_err;
362
363         spin_lock(&node_lock);
364         rbdc = __rbd_client_find(opt);
365         if (rbdc) {
366                 ceph_destroy_options(opt);
367
368                 /* using an existing client */
369                 kref_get(&rbdc->kref);
370                 rbd_dev->rbd_client = rbdc;
371                 rbd_dev->client = rbdc->client;
372                 spin_unlock(&node_lock);
373                 return 0;
374         }
375         spin_unlock(&node_lock);
376
377         rbdc = rbd_client_create(opt, rbd_opts);
378         if (IS_ERR(rbdc)) {
379                 ret = PTR_ERR(rbdc);
380                 goto done_err;
381         }
382
383         rbd_dev->rbd_client = rbdc;
384         rbd_dev->client = rbdc->client;
385         return 0;
386 done_err:
387         kfree(rbd_opts);
388         return ret;
389 }
390
391 /*
392  * Destroy ceph client
393  */
394 static void rbd_client_release(struct kref *kref)
395 {
396         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
397
398         dout("rbd_release_client %p\n", rbdc);
399         spin_lock(&node_lock);
400         list_del(&rbdc->node);
401         spin_unlock(&node_lock);
402
403         ceph_destroy_client(rbdc->client);
404         kfree(rbdc->rbd_opts);
405         kfree(rbdc);
406 }
407
408 /*
409  * Drop reference to ceph client node. If it's not referenced anymore, release
410  * it.
411  */
412 static void rbd_put_client(struct rbd_device *rbd_dev)
413 {
414         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
415         rbd_dev->rbd_client = NULL;
416         rbd_dev->client = NULL;
417 }
418
419
420 /*
421  * Create a new header structure, translate header format from the on-disk
422  * header.
423  */
424 static int rbd_header_from_disk(struct rbd_image_header *header,
425                                  struct rbd_image_header_ondisk *ondisk,
426                                  int allocated_snaps,
427                                  gfp_t gfp_flags)
428 {
429         int i;
430         u32 snap_count = le32_to_cpu(ondisk->snap_count);
431         int ret = -ENOMEM;
432
433         init_rwsem(&header->snap_rwsem);
434         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
435         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
436                                 snap_count *
437                                  sizeof(struct rbd_image_snap_ondisk),
438                                 gfp_flags);
439         if (!header->snapc)
440                 return -ENOMEM;
441         if (snap_count) {
442                 header->snap_names = kmalloc(header->snap_names_len,
443                                              GFP_KERNEL);
444                 if (!header->snap_names)
445                         goto err_snapc;
446                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
447                                              GFP_KERNEL);
448                 if (!header->snap_sizes)
449                         goto err_names;
450         } else {
451                 header->snap_names = NULL;
452                 header->snap_sizes = NULL;
453         }
454         memcpy(header->block_name, ondisk->block_name,
455                sizeof(ondisk->block_name));
456
457         header->image_size = le64_to_cpu(ondisk->image_size);
458         header->obj_order = ondisk->options.order;
459         header->crypt_type = ondisk->options.crypt_type;
460         header->comp_type = ondisk->options.comp_type;
461
462         atomic_set(&header->snapc->nref, 1);
463         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
464         header->snapc->num_snaps = snap_count;
465         header->total_snaps = snap_count;
466
467         if (snap_count &&
468             allocated_snaps == snap_count) {
469                 for (i = 0; i < snap_count; i++) {
470                         header->snapc->snaps[i] =
471                                 le64_to_cpu(ondisk->snaps[i].id);
472                         header->snap_sizes[i] =
473                                 le64_to_cpu(ondisk->snaps[i].image_size);
474                 }
475
476                 /* copy snapshot names */
477                 memcpy(header->snap_names, &ondisk->snaps[i],
478                         header->snap_names_len);
479         }
480
481         return 0;
482
483 err_names:
484         kfree(header->snap_names);
485 err_snapc:
486         kfree(header->snapc);
487         return ret;
488 }
489
490 static int snap_index(struct rbd_image_header *header, int snap_num)
491 {
492         return header->total_snaps - snap_num;
493 }
494
495 static u64 cur_snap_id(struct rbd_device *rbd_dev)
496 {
497         struct rbd_image_header *header = &rbd_dev->header;
498
499         if (!rbd_dev->cur_snap)
500                 return 0;
501
502         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
503 }
504
505 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
506                         u64 *seq, u64 *size)
507 {
508         int i;
509         char *p = header->snap_names;
510
511         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
512                 if (strcmp(snap_name, p) == 0)
513                         break;
514         }
515         if (i == header->total_snaps)
516                 return -ENOENT;
517         if (seq)
518                 *seq = header->snapc->snaps[i];
519
520         if (size)
521                 *size = header->snap_sizes[i];
522
523         return i;
524 }
525
526 static int rbd_header_set_snap(struct rbd_device *dev,
527                                const char *snap_name,
528                                u64 *size)
529 {
530         struct rbd_image_header *header = &dev->header;
531         struct ceph_snap_context *snapc = header->snapc;
532         int ret = -ENOENT;
533
534         down_write(&header->snap_rwsem);
535
536         if (!snap_name ||
537             !*snap_name ||
538             strcmp(snap_name, "-") == 0 ||
539             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
540                 if (header->total_snaps)
541                         snapc->seq = header->snap_seq;
542                 else
543                         snapc->seq = 0;
544                 dev->cur_snap = 0;
545                 dev->read_only = 0;
546                 if (size)
547                         *size = header->image_size;
548         } else {
549                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
550                 if (ret < 0)
551                         goto done;
552
553                 dev->cur_snap = header->total_snaps - ret;
554                 dev->read_only = 1;
555         }
556
557         ret = 0;
558 done:
559         up_write(&header->snap_rwsem);
560         return ret;
561 }
562
563 static void rbd_header_free(struct rbd_image_header *header)
564 {
565         kfree(header->snapc);
566         kfree(header->snap_names);
567         kfree(header->snap_sizes);
568 }
569
570 /*
571  * get the actual striped segment name, offset and length
572  */
573 static u64 rbd_get_segment(struct rbd_image_header *header,
574                            const char *block_name,
575                            u64 ofs, u64 len,
576                            char *seg_name, u64 *segofs)
577 {
578         u64 seg = ofs >> header->obj_order;
579
580         if (seg_name)
581                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
582                          "%s.%012llx", block_name, seg);
583
584         ofs = ofs & ((1 << header->obj_order) - 1);
585         len = min_t(u64, len, (1 << header->obj_order) - ofs);
586
587         if (segofs)
588                 *segofs = ofs;
589
590         return len;
591 }
592
593 /*
594  * bio helpers
595  */
596
597 static void bio_chain_put(struct bio *chain)
598 {
599         struct bio *tmp;
600
601         while (chain) {
602                 tmp = chain;
603                 chain = chain->bi_next;
604                 bio_put(tmp);
605         }
606 }
607
608 /*
609  * zeros a bio chain, starting at specific offset
610  */
611 static void zero_bio_chain(struct bio *chain, int start_ofs)
612 {
613         struct bio_vec *bv;
614         unsigned long flags;
615         void *buf;
616         int i;
617         int pos = 0;
618
619         while (chain) {
620                 bio_for_each_segment(bv, chain, i) {
621                         if (pos + bv->bv_len > start_ofs) {
622                                 int remainder = max(start_ofs - pos, 0);
623                                 buf = bvec_kmap_irq(bv, &flags);
624                                 memset(buf + remainder, 0,
625                                        bv->bv_len - remainder);
626                                 bvec_kunmap_irq(buf, &flags);
627                         }
628                         pos += bv->bv_len;
629                 }
630
631                 chain = chain->bi_next;
632         }
633 }
634
635 /*
636  * bio_chain_clone - clone a chain of bios up to a certain length.
637  * might return a bio_pair that will need to be released.
638  */
639 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
640                                    struct bio_pair **bp,
641                                    int len, gfp_t gfpmask)
642 {
643         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
644         int total = 0;
645
646         if (*bp) {
647                 bio_pair_release(*bp);
648                 *bp = NULL;
649         }
650
651         while (old_chain && (total < len)) {
652                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
653                 if (!tmp)
654                         goto err_out;
655
656                 if (total + old_chain->bi_size > len) {
657                         struct bio_pair *bp;
658
659                         /*
660                          * this split can only happen with a single paged bio,
661                          * split_bio will BUG_ON if this is not the case
662                          */
663                         dout("bio_chain_clone split! total=%d remaining=%d"
664                              "bi_size=%d\n",
665                              (int)total, (int)len-total,
666                              (int)old_chain->bi_size);
667
668                         /* split the bio. We'll release it either in the next
669                            call, or it will have to be released outside */
670                         bp = bio_split(old_chain, (len - total) / 512ULL);
671                         if (!bp)
672                                 goto err_out;
673
674                         __bio_clone(tmp, &bp->bio1);
675
676                         *next = &bp->bio2;
677                 } else {
678                         __bio_clone(tmp, old_chain);
679                         *next = old_chain->bi_next;
680                 }
681
682                 tmp->bi_bdev = NULL;
683                 gfpmask &= ~__GFP_WAIT;
684                 tmp->bi_next = NULL;
685
686                 if (!new_chain) {
687                         new_chain = tail = tmp;
688                 } else {
689                         tail->bi_next = tmp;
690                         tail = tmp;
691                 }
692                 old_chain = old_chain->bi_next;
693
694                 total += tmp->bi_size;
695         }
696
697         BUG_ON(total < len);
698
699         if (tail)
700                 tail->bi_next = NULL;
701
702         *old = old_chain;
703
704         return new_chain;
705
706 err_out:
707         dout("bio_chain_clone with err\n");
708         bio_chain_put(new_chain);
709         return NULL;
710 }
711
712 /*
713  * helpers for osd request op vectors.
714  */
715 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
716                             int num_ops,
717                             int opcode,
718                             u32 payload_len)
719 {
720         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
721                        GFP_NOIO);
722         if (!*ops)
723                 return -ENOMEM;
724         (*ops)[0].op = opcode;
725         /*
726          * op extent offset and length will be set later on
727          * in calc_raw_layout()
728          */
729         (*ops)[0].payload_len = payload_len;
730         return 0;
731 }
732
733 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
734 {
735         kfree(ops);
736 }
737
738 /*
739  * Send ceph osd request
740  */
741 static int rbd_do_request(struct request *rq,
742                           struct rbd_device *dev,
743                           struct ceph_snap_context *snapc,
744                           u64 snapid,
745                           const char *obj, u64 ofs, u64 len,
746                           struct bio *bio,
747                           struct page **pages,
748                           int num_pages,
749                           int flags,
750                           struct ceph_osd_req_op *ops,
751                           int num_reply,
752                           void (*rbd_cb)(struct ceph_osd_request *req,
753                                          struct ceph_msg *msg),
754                           struct ceph_osd_request **linger_req,
755                           u64 *ver)
756 {
757         struct ceph_osd_request *req;
758         struct ceph_file_layout *layout;
759         int ret;
760         u64 bno;
761         struct timespec mtime = CURRENT_TIME;
762         struct rbd_request *req_data;
763         struct ceph_osd_request_head *reqhead;
764         struct rbd_image_header *header = &dev->header;
765
766         ret = -ENOMEM;
767         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
768         if (!req_data)
769                 goto done;
770
771         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
772
773         down_read(&header->snap_rwsem);
774
775         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
776                                       snapc,
777                                       ops,
778                                       false,
779                                       GFP_NOIO, pages, bio);
780         if (IS_ERR(req)) {
781                 up_read(&header->snap_rwsem);
782                 ret = PTR_ERR(req);
783                 goto done_pages;
784         }
785
786         req->r_callback = rbd_cb;
787
788         req_data->rq = rq;
789         req_data->bio = bio;
790         req_data->pages = pages;
791         req_data->len = len;
792
793         req->r_priv = req_data;
794
795         reqhead = req->r_request->front.iov_base;
796         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
797
798         strncpy(req->r_oid, obj, sizeof(req->r_oid));
799         req->r_oid_len = strlen(req->r_oid);
800
801         layout = &req->r_file_layout;
802         memset(layout, 0, sizeof(*layout));
803         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
804         layout->fl_stripe_count = cpu_to_le32(1);
805         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
806         layout->fl_pg_preferred = cpu_to_le32(-1);
807         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
808         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
809                              ofs, &len, &bno, req, ops);
810
811         ceph_osdc_build_request(req, ofs, &len,
812                                 ops,
813                                 snapc,
814                                 &mtime,
815                                 req->r_oid, req->r_oid_len);
816         up_read(&header->snap_rwsem);
817
818         if (linger_req) {
819                 ceph_osdc_set_request_linger(&dev->client->osdc, req);
820                 *linger_req = req;
821         }
822
823         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
824         if (ret < 0)
825                 goto done_err;
826
827         if (!rbd_cb) {
828                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
829                 if (ver)
830                         *ver = le64_to_cpu(req->r_reassert_version.version);
831                 dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
832                 ceph_osdc_put_request(req);
833         }
834         return ret;
835
836 done_err:
837         bio_chain_put(req_data->bio);
838         ceph_osdc_put_request(req);
839 done_pages:
840         kfree(req_data);
841 done:
842         if (rq)
843                 blk_end_request(rq, ret, len);
844         return ret;
845 }
846
847 /*
848  * Ceph osd op callback
849  */
850 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
851 {
852         struct rbd_request *req_data = req->r_priv;
853         struct ceph_osd_reply_head *replyhead;
854         struct ceph_osd_op *op;
855         __s32 rc;
856         u64 bytes;
857         int read_op;
858
859         /* parse reply */
860         replyhead = msg->front.iov_base;
861         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
862         op = (void *)(replyhead + 1);
863         rc = le32_to_cpu(replyhead->result);
864         bytes = le64_to_cpu(op->extent.length);
865         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
866
867         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
868
869         if (rc == -ENOENT && read_op) {
870                 zero_bio_chain(req_data->bio, 0);
871                 rc = 0;
872         } else if (rc == 0 && read_op && bytes < req_data->len) {
873                 zero_bio_chain(req_data->bio, bytes);
874                 bytes = req_data->len;
875         }
876
877         blk_end_request(req_data->rq, rc, bytes);
878
879         if (req_data->bio)
880                 bio_chain_put(req_data->bio);
881
882         ceph_osdc_put_request(req);
883         kfree(req_data);
884 }
885
886 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
887 {
888         ceph_osdc_put_request(req);
889 }
890
891 /*
892  * Do a synchronous ceph osd operation
893  */
894 static int rbd_req_sync_op(struct rbd_device *dev,
895                            struct ceph_snap_context *snapc,
896                            u64 snapid,
897                            int opcode,
898                            int flags,
899                            struct ceph_osd_req_op *orig_ops,
900                            int num_reply,
901                            const char *obj,
902                            u64 ofs, u64 len,
903                            char *buf,
904                            struct ceph_osd_request **linger_req,
905                            u64 *ver)
906 {
907         int ret;
908         struct page **pages;
909         int num_pages;
910         struct ceph_osd_req_op *ops = orig_ops;
911         u32 payload_len;
912
913         num_pages = calc_pages_for(ofs , len);
914         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
915         if (IS_ERR(pages))
916                 return PTR_ERR(pages);
917
918         if (!orig_ops) {
919                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
920                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
921                 if (ret < 0)
922                         goto done;
923
924                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
925                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
926                         if (ret < 0)
927                                 goto done_ops;
928                 }
929         }
930
931         ret = rbd_do_request(NULL, dev, snapc, snapid,
932                           obj, ofs, len, NULL,
933                           pages, num_pages,
934                           flags,
935                           ops,
936                           2,
937                           NULL,
938                           linger_req, ver);
939         if (ret < 0)
940                 goto done_ops;
941
942         if ((flags & CEPH_OSD_FLAG_READ) && buf)
943                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
944
945 done_ops:
946         if (!orig_ops)
947                 rbd_destroy_ops(ops);
948 done:
949         ceph_release_page_vector(pages, num_pages);
950         return ret;
951 }
952
953 /*
954  * Do an asynchronous ceph osd operation
955  */
956 static int rbd_do_op(struct request *rq,
957                      struct rbd_device *rbd_dev ,
958                      struct ceph_snap_context *snapc,
959                      u64 snapid,
960                      int opcode, int flags, int num_reply,
961                      u64 ofs, u64 len,
962                      struct bio *bio)
963 {
964         char *seg_name;
965         u64 seg_ofs;
966         u64 seg_len;
967         int ret;
968         struct ceph_osd_req_op *ops;
969         u32 payload_len;
970
971         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
972         if (!seg_name)
973                 return -ENOMEM;
974
975         seg_len = rbd_get_segment(&rbd_dev->header,
976                                   rbd_dev->header.block_name,
977                                   ofs, len,
978                                   seg_name, &seg_ofs);
979
980         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
981
982         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
983         if (ret < 0)
984                 goto done;
985
986         /* we've taken care of segment sizes earlier when we
987            cloned the bios. We should never have a segment
988            truncated at this point */
989         BUG_ON(seg_len < len);
990
991         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
992                              seg_name, seg_ofs, seg_len,
993                              bio,
994                              NULL, 0,
995                              flags,
996                              ops,
997                              num_reply,
998                              rbd_req_cb, 0, NULL);
999 done:
1000         kfree(seg_name);
1001         return ret;
1002 }
1003
1004 /*
1005  * Request async osd write
1006  */
1007 static int rbd_req_write(struct request *rq,
1008                          struct rbd_device *rbd_dev,
1009                          struct ceph_snap_context *snapc,
1010                          u64 ofs, u64 len,
1011                          struct bio *bio)
1012 {
1013         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1014                          CEPH_OSD_OP_WRITE,
1015                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1016                          2,
1017                          ofs, len, bio);
1018 }
1019
1020 /*
1021  * Request async osd read
1022  */
1023 static int rbd_req_read(struct request *rq,
1024                          struct rbd_device *rbd_dev,
1025                          u64 snapid,
1026                          u64 ofs, u64 len,
1027                          struct bio *bio)
1028 {
1029         return rbd_do_op(rq, rbd_dev, NULL,
1030                          (snapid ? snapid : CEPH_NOSNAP),
1031                          CEPH_OSD_OP_READ,
1032                          CEPH_OSD_FLAG_READ,
1033                          2,
1034                          ofs, len, bio);
1035 }
1036
1037 /*
1038  * Request sync osd read
1039  */
1040 static int rbd_req_sync_read(struct rbd_device *dev,
1041                           struct ceph_snap_context *snapc,
1042                           u64 snapid,
1043                           const char *obj,
1044                           u64 ofs, u64 len,
1045                           char *buf,
1046                           u64 *ver)
1047 {
1048         return rbd_req_sync_op(dev, NULL,
1049                                (snapid ? snapid : CEPH_NOSNAP),
1050                                CEPH_OSD_OP_READ,
1051                                CEPH_OSD_FLAG_READ,
1052                                NULL,
1053                                1, obj, ofs, len, buf, NULL, ver);
1054 }
1055
1056 /*
1057  * Request sync osd watch
1058  */
1059 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1060                                    u64 ver,
1061                                    u64 notify_id,
1062                                    const char *obj)
1063 {
1064         struct ceph_osd_req_op *ops;
1065         struct page **pages = NULL;
1066         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1067         if (ret < 0)
1068                 return ret;
1069
1070         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1071         ops[0].watch.cookie = notify_id;
1072         ops[0].watch.flag = 0;
1073
1074         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1075                           obj, 0, 0, NULL,
1076                           pages, 0,
1077                           CEPH_OSD_FLAG_READ,
1078                           ops,
1079                           1,
1080                           rbd_simple_req_cb, 0, NULL);
1081
1082         rbd_destroy_ops(ops);
1083         return ret;
1084 }
1085
1086 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1087 {
1088         struct rbd_device *dev = (struct rbd_device *)data;
1089         if (!dev)
1090                 return;
1091
1092         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1093                 notify_id, (int)opcode);
1094         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1095         __rbd_update_snaps(dev);
1096         mutex_unlock(&ctl_mutex);
1097
1098         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1099 }
1100
1101 /*
1102  * Request sync osd watch
1103  */
1104 static int rbd_req_sync_watch(struct rbd_device *dev,
1105                               const char *obj,
1106                               u64 ver)
1107 {
1108         struct ceph_osd_req_op *ops;
1109         struct ceph_osd_client *osdc = &dev->client->osdc;
1110
1111         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1112         if (ret < 0)
1113                 return ret;
1114
1115         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1116                                      (void *)dev, &dev->watch_event);
1117         if (ret < 0)
1118                 goto fail;
1119
1120         ops[0].watch.ver = cpu_to_le64(ver);
1121         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1122         ops[0].watch.flag = 1;
1123
1124         ret = rbd_req_sync_op(dev, NULL,
1125                               CEPH_NOSNAP,
1126                               0,
1127                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1128                               ops,
1129                               1, obj, 0, 0, NULL,
1130                               &dev->watch_request, NULL);
1131
1132         if (ret < 0)
1133                 goto fail_event;
1134
1135         rbd_destroy_ops(ops);
1136         return 0;
1137
1138 fail_event:
1139         ceph_osdc_cancel_event(dev->watch_event);
1140         dev->watch_event = NULL;
1141 fail:
1142         rbd_destroy_ops(ops);
1143         return ret;
1144 }
1145
1146 struct rbd_notify_info {
1147         struct rbd_device *dev;
1148 };
1149
1150 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1151 {
1152         struct rbd_device *dev = (struct rbd_device *)data;
1153         if (!dev)
1154                 return;
1155
1156         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1157                 notify_id, (int)opcode);
1158 }
1159
1160 /*
1161  * Request sync osd notify
1162  */
1163 static int rbd_req_sync_notify(struct rbd_device *dev,
1164                           const char *obj)
1165 {
1166         struct ceph_osd_req_op *ops;
1167         struct ceph_osd_client *osdc = &dev->client->osdc;
1168         struct ceph_osd_event *event;
1169         struct rbd_notify_info info;
1170         int payload_len = sizeof(u32) + sizeof(u32);
1171         int ret;
1172
1173         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1174         if (ret < 0)
1175                 return ret;
1176
1177         info.dev = dev;
1178
1179         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1180                                      (void *)&info, &event);
1181         if (ret < 0)
1182                 goto fail;
1183
1184         ops[0].watch.ver = 1;
1185         ops[0].watch.flag = 1;
1186         ops[0].watch.cookie = event->cookie;
1187         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1188         ops[0].watch.timeout = 12;
1189
1190         ret = rbd_req_sync_op(dev, NULL,
1191                                CEPH_NOSNAP,
1192                                0,
1193                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1194                                ops,
1195                                1, obj, 0, 0, NULL, NULL, NULL);
1196         if (ret < 0)
1197                 goto fail_event;
1198
1199         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1200         dout("ceph_osdc_wait_event returned %d\n", ret);
1201         rbd_destroy_ops(ops);
1202         return 0;
1203
1204 fail_event:
1205         ceph_osdc_cancel_event(event);
1206 fail:
1207         rbd_destroy_ops(ops);
1208         return ret;
1209 }
1210
1211 /*
1212  * Request sync osd rollback
1213  */
1214 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
1215                                      u64 snapid,
1216                                      const char *obj)
1217 {
1218         struct ceph_osd_req_op *ops;
1219         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
1220         if (ret < 0)
1221                 return ret;
1222
1223         ops[0].snap.snapid = snapid;
1224
1225         ret = rbd_req_sync_op(dev, NULL,
1226                                CEPH_NOSNAP,
1227                                0,
1228                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1229                                ops,
1230                                1, obj, 0, 0, NULL, NULL, NULL);
1231
1232         rbd_destroy_ops(ops);
1233
1234         return ret;
1235 }
1236
1237 /*
1238  * Request sync osd read
1239  */
1240 static int rbd_req_sync_exec(struct rbd_device *dev,
1241                              const char *obj,
1242                              const char *cls,
1243                              const char *method,
1244                              const char *data,
1245                              int len,
1246                              u64 *ver)
1247 {
1248         struct ceph_osd_req_op *ops;
1249         int cls_len = strlen(cls);
1250         int method_len = strlen(method);
1251         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1252                                     cls_len + method_len + len);
1253         if (ret < 0)
1254                 return ret;
1255
1256         ops[0].cls.class_name = cls;
1257         ops[0].cls.class_len = (__u8)cls_len;
1258         ops[0].cls.method_name = method;
1259         ops[0].cls.method_len = (__u8)method_len;
1260         ops[0].cls.argc = 0;
1261         ops[0].cls.indata = data;
1262         ops[0].cls.indata_len = len;
1263
1264         ret = rbd_req_sync_op(dev, NULL,
1265                                CEPH_NOSNAP,
1266                                0,
1267                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268                                ops,
1269                                1, obj, 0, 0, NULL, NULL, ver);
1270
1271         rbd_destroy_ops(ops);
1272
1273         dout("cls_exec returned %d\n", ret);
1274         return ret;
1275 }
1276
1277 /*
1278  * block device queue callback
1279  */
1280 static void rbd_rq_fn(struct request_queue *q)
1281 {
1282         struct rbd_device *rbd_dev = q->queuedata;
1283         struct request *rq;
1284         struct bio_pair *bp = NULL;
1285
1286         rq = blk_fetch_request(q);
1287
1288         while (1) {
1289                 struct bio *bio;
1290                 struct bio *rq_bio, *next_bio = NULL;
1291                 bool do_write;
1292                 int size, op_size = 0;
1293                 u64 ofs;
1294
1295                 /* peek at request from block layer */
1296                 if (!rq)
1297                         break;
1298
1299                 dout("fetched request\n");
1300
1301                 /* filter out block requests we don't understand */
1302                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1303                         __blk_end_request_all(rq, 0);
1304                         goto next;
1305                 }
1306
1307                 /* deduce our operation (read, write) */
1308                 do_write = (rq_data_dir(rq) == WRITE);
1309
1310                 size = blk_rq_bytes(rq);
1311                 ofs = blk_rq_pos(rq) * 512ULL;
1312                 rq_bio = rq->bio;
1313                 if (do_write && rbd_dev->read_only) {
1314                         __blk_end_request_all(rq, -EROFS);
1315                         goto next;
1316                 }
1317
1318                 spin_unlock_irq(q->queue_lock);
1319
1320                 dout("%s 0x%x bytes at 0x%llx\n",
1321                      do_write ? "write" : "read",
1322                      size, blk_rq_pos(rq) * 512ULL);
1323
1324                 do {
1325                         /* a bio clone to be passed down to OSD req */
1326                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1327                         op_size = rbd_get_segment(&rbd_dev->header,
1328                                                   rbd_dev->header.block_name,
1329                                                   ofs, size,
1330                                                   NULL, NULL);
1331                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1332                                               op_size, GFP_ATOMIC);
1333                         if (!bio) {
1334                                 spin_lock_irq(q->queue_lock);
1335                                 __blk_end_request_all(rq, -ENOMEM);
1336                                 goto next;
1337                         }
1338
1339                         /* init OSD command: write or read */
1340                         if (do_write)
1341                                 rbd_req_write(rq, rbd_dev,
1342                                               rbd_dev->header.snapc,
1343                                               ofs,
1344                                               op_size, bio);
1345                         else
1346                                 rbd_req_read(rq, rbd_dev,
1347                                              cur_snap_id(rbd_dev),
1348                                              ofs,
1349                                              op_size, bio);
1350
1351                         size -= op_size;
1352                         ofs += op_size;
1353
1354                         rq_bio = next_bio;
1355                 } while (size > 0);
1356
1357                 if (bp)
1358                         bio_pair_release(bp);
1359
1360                 spin_lock_irq(q->queue_lock);
1361 next:
1362                 rq = blk_fetch_request(q);
1363         }
1364 }
1365
1366 /*
1367  * a queue callback. Makes sure that we don't create a bio that spans across
1368  * multiple osd objects. One exception would be with a single page bios,
1369  * which we handle later at bio_chain_clone
1370  */
1371 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1372                           struct bio_vec *bvec)
1373 {
1374         struct rbd_device *rbd_dev = q->queuedata;
1375         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1376         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1377         unsigned int bio_sectors = bmd->bi_size >> 9;
1378         int max;
1379
1380         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1381                                  + bio_sectors)) << 9;
1382         if (max < 0)
1383                 max = 0; /* bio_add cannot handle a negative return */
1384         if (max <= bvec->bv_len && bio_sectors == 0)
1385                 return bvec->bv_len;
1386         return max;
1387 }
1388
1389 static void rbd_free_disk(struct rbd_device *rbd_dev)
1390 {
1391         struct gendisk *disk = rbd_dev->disk;
1392
1393         if (!disk)
1394                 return;
1395
1396         rbd_header_free(&rbd_dev->header);
1397
1398         if (disk->flags & GENHD_FL_UP)
1399                 del_gendisk(disk);
1400         if (disk->queue)
1401                 blk_cleanup_queue(disk->queue);
1402         put_disk(disk);
1403 }
1404
1405 /*
1406  * reload the ondisk the header 
1407  */
1408 static int rbd_read_header(struct rbd_device *rbd_dev,
1409                            struct rbd_image_header *header)
1410 {
1411         ssize_t rc;
1412         struct rbd_image_header_ondisk *dh;
1413         int snap_count = 0;
1414         u64 snap_names_len = 0;
1415         u64 ver;
1416
1417         while (1) {
1418                 int len = sizeof(*dh) +
1419                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1420                           snap_names_len;
1421
1422                 rc = -ENOMEM;
1423                 dh = kmalloc(len, GFP_KERNEL);
1424                 if (!dh)
1425                         return -ENOMEM;
1426
1427                 rc = rbd_req_sync_read(rbd_dev,
1428                                        NULL, CEPH_NOSNAP,
1429                                        rbd_dev->obj_md_name,
1430                                        0, len,
1431                                        (char *)dh, &ver);
1432                 if (rc < 0)
1433                         goto out_dh;
1434
1435                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1436                 if (rc < 0)
1437                         goto out_dh;
1438
1439                 if (snap_count != header->total_snaps) {
1440                         snap_count = header->total_snaps;
1441                         snap_names_len = header->snap_names_len;
1442                         rbd_header_free(header);
1443                         kfree(dh);
1444                         continue;
1445                 }
1446                 break;
1447         }
1448         header->obj_version = ver;
1449
1450 out_dh:
1451         kfree(dh);
1452         return rc;
1453 }
1454
1455 /*
1456  * create a snapshot
1457  */
1458 static int rbd_header_add_snap(struct rbd_device *dev,
1459                                const char *snap_name,
1460                                gfp_t gfp_flags)
1461 {
1462         int name_len = strlen(snap_name);
1463         u64 new_snapid;
1464         int ret;
1465         void *data, *data_start, *data_end;
1466         u64 ver;
1467
1468         /* we should create a snapshot only if we're pointing at the head */
1469         if (dev->cur_snap)
1470                 return -EINVAL;
1471
1472         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1473                                       &new_snapid);
1474         dout("created snapid=%lld\n", new_snapid);
1475         if (ret < 0)
1476                 return ret;
1477
1478         data = kmalloc(name_len + 16, gfp_flags);
1479         if (!data)
1480                 return -ENOMEM;
1481
1482         data_start = data;
1483         data_end = data + name_len + 16;
1484
1485         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1486         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1487
1488         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1489                                 data_start, data - data_start, &ver);
1490
1491         kfree(data_start);
1492
1493         if (ret < 0)
1494                 return ret;
1495
1496         dev->header.snapc->seq =  new_snapid;
1497
1498         return 0;
1499 bad:
1500         return -ERANGE;
1501 }
1502
1503 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1504 {
1505         struct rbd_snap *snap;
1506
1507         while (!list_empty(&rbd_dev->snaps)) {
1508                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1509                 __rbd_remove_snap_dev(rbd_dev, snap);
1510         }
1511 }
1512
1513 /*
1514  * only read the first part of the ondisk header, without the snaps info
1515  */
1516 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1517 {
1518         int ret;
1519         struct rbd_image_header h;
1520         u64 snap_seq;
1521         int follow_seq = 0;
1522
1523         ret = rbd_read_header(rbd_dev, &h);
1524         if (ret < 0)
1525                 return ret;
1526
1527         down_write(&rbd_dev->header.snap_rwsem);
1528
1529         snap_seq = rbd_dev->header.snapc->seq;
1530         if (rbd_dev->header.total_snaps &&
1531             rbd_dev->header.snapc->snaps[0] == snap_seq)
1532                 /* pointing at the head, will need to follow that
1533                    if head moves */
1534                 follow_seq = 1;
1535
1536         kfree(rbd_dev->header.snapc);
1537         kfree(rbd_dev->header.snap_names);
1538         kfree(rbd_dev->header.snap_sizes);
1539
1540         rbd_dev->header.total_snaps = h.total_snaps;
1541         rbd_dev->header.snapc = h.snapc;
1542         rbd_dev->header.snap_names = h.snap_names;
1543         rbd_dev->header.snap_names_len = h.snap_names_len;
1544         rbd_dev->header.snap_sizes = h.snap_sizes;
1545         if (follow_seq)
1546                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1547         else
1548                 rbd_dev->header.snapc->seq = snap_seq;
1549
1550         ret = __rbd_init_snaps_header(rbd_dev);
1551
1552         up_write(&rbd_dev->header.snap_rwsem);
1553
1554         return ret;
1555 }
1556
1557 static int rbd_init_disk(struct rbd_device *rbd_dev)
1558 {
1559         struct gendisk *disk;
1560         struct request_queue *q;
1561         int rc;
1562         u64 total_size = 0;
1563
1564         /* contact OSD, request size info about the object being mapped */
1565         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1566         if (rc)
1567                 return rc;
1568
1569         /* no need to lock here, as rbd_dev is not registered yet */
1570         rc = __rbd_init_snaps_header(rbd_dev);
1571         if (rc)
1572                 return rc;
1573
1574         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1575         if (rc)
1576                 return rc;
1577
1578         /* create gendisk info */
1579         rc = -ENOMEM;
1580         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1581         if (!disk)
1582                 goto out;
1583
1584         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1585         disk->major = rbd_dev->major;
1586         disk->first_minor = 0;
1587         disk->fops = &rbd_bd_ops;
1588         disk->private_data = rbd_dev;
1589
1590         /* init rq */
1591         rc = -ENOMEM;
1592         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1593         if (!q)
1594                 goto out_disk;
1595         blk_queue_merge_bvec(q, rbd_merge_bvec);
1596         disk->queue = q;
1597
1598         q->queuedata = rbd_dev;
1599
1600         rbd_dev->disk = disk;
1601         rbd_dev->q = q;
1602
1603         /* finally, announce the disk to the world */
1604         set_capacity(disk, total_size / 512ULL);
1605         add_disk(disk);
1606
1607         pr_info("%s: added with size 0x%llx\n",
1608                 disk->disk_name, (unsigned long long)total_size);
1609         return 0;
1610
1611 out_disk:
1612         put_disk(disk);
1613 out:
1614         return rc;
1615 }
1616
1617 /*
1618   sysfs
1619 */
1620
1621 static ssize_t rbd_size_show(struct device *dev,
1622                              struct device_attribute *attr, char *buf)
1623 {
1624         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1625
1626         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1627 }
1628
1629 static ssize_t rbd_major_show(struct device *dev,
1630                               struct device_attribute *attr, char *buf)
1631 {
1632         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1633
1634         return sprintf(buf, "%d\n", rbd_dev->major);
1635 }
1636
1637 static ssize_t rbd_client_id_show(struct device *dev,
1638                                   struct device_attribute *attr, char *buf)
1639 {
1640         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1641
1642         return sprintf(buf, "client%lld\n", ceph_client_id(rbd_dev->client));
1643 }
1644
1645 static ssize_t rbd_pool_show(struct device *dev,
1646                              struct device_attribute *attr, char *buf)
1647 {
1648         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1649
1650         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1651 }
1652
1653 static ssize_t rbd_name_show(struct device *dev,
1654                              struct device_attribute *attr, char *buf)
1655 {
1656         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1657
1658         return sprintf(buf, "%s\n", rbd_dev->obj);
1659 }
1660
1661 static ssize_t rbd_snap_show(struct device *dev,
1662                              struct device_attribute *attr,
1663                              char *buf)
1664 {
1665         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1666
1667         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1668 }
1669
1670 static ssize_t rbd_image_refresh(struct device *dev,
1671                                  struct device_attribute *attr,
1672                                  const char *buf,
1673                                  size_t size)
1674 {
1675         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1676         int rc;
1677         int ret = size;
1678
1679         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1680
1681         rc = __rbd_update_snaps(rbd_dev);
1682         if (rc < 0)
1683                 ret = rc;
1684
1685         mutex_unlock(&ctl_mutex);
1686         return ret;
1687 }
1688
1689 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1690 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1691 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1692 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1693 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1694 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1695 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1696 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1697 static DEVICE_ATTR(rollback_snap, S_IWUSR, NULL, rbd_snap_rollback);
1698
1699 static struct attribute *rbd_attrs[] = {
1700         &dev_attr_size.attr,
1701         &dev_attr_major.attr,
1702         &dev_attr_client_id.attr,
1703         &dev_attr_pool.attr,
1704         &dev_attr_name.attr,
1705         &dev_attr_current_snap.attr,
1706         &dev_attr_refresh.attr,
1707         &dev_attr_create_snap.attr,
1708         &dev_attr_rollback_snap.attr,
1709         NULL
1710 };
1711
1712 static struct attribute_group rbd_attr_group = {
1713         .attrs = rbd_attrs,
1714 };
1715
1716 static const struct attribute_group *rbd_attr_groups[] = {
1717         &rbd_attr_group,
1718         NULL
1719 };
1720
1721 static void rbd_sysfs_dev_release(struct device *dev)
1722 {
1723 }
1724
1725 static struct device_type rbd_device_type = {
1726         .name           = "rbd",
1727         .groups         = rbd_attr_groups,
1728         .release        = rbd_sysfs_dev_release,
1729 };
1730
1731
1732 /*
1733   sysfs - snapshots
1734 */
1735
1736 static ssize_t rbd_snap_size_show(struct device *dev,
1737                                   struct device_attribute *attr,
1738                                   char *buf)
1739 {
1740         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1741
1742         return sprintf(buf, "%lld\n", (long long)snap->size);
1743 }
1744
1745 static ssize_t rbd_snap_id_show(struct device *dev,
1746                                 struct device_attribute *attr,
1747                                 char *buf)
1748 {
1749         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1750
1751         return sprintf(buf, "%lld\n", (long long)snap->id);
1752 }
1753
1754 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1755 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1756
1757 static struct attribute *rbd_snap_attrs[] = {
1758         &dev_attr_snap_size.attr,
1759         &dev_attr_snap_id.attr,
1760         NULL,
1761 };
1762
1763 static struct attribute_group rbd_snap_attr_group = {
1764         .attrs = rbd_snap_attrs,
1765 };
1766
1767 static void rbd_snap_dev_release(struct device *dev)
1768 {
1769         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1770         kfree(snap->name);
1771         kfree(snap);
1772 }
1773
1774 static const struct attribute_group *rbd_snap_attr_groups[] = {
1775         &rbd_snap_attr_group,
1776         NULL
1777 };
1778
1779 static struct device_type rbd_snap_device_type = {
1780         .groups         = rbd_snap_attr_groups,
1781         .release        = rbd_snap_dev_release,
1782 };
1783
1784 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1785                                   struct rbd_snap *snap)
1786 {
1787         list_del(&snap->node);
1788         device_unregister(&snap->dev);
1789 }
1790
1791 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1792                                   struct rbd_snap *snap,
1793                                   struct device *parent)
1794 {
1795         struct device *dev = &snap->dev;
1796         int ret;
1797
1798         dev->type = &rbd_snap_device_type;
1799         dev->parent = parent;
1800         dev->release = rbd_snap_dev_release;
1801         dev_set_name(dev, "snap_%s", snap->name);
1802         ret = device_register(dev);
1803
1804         return ret;
1805 }
1806
1807 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1808                               int i, const char *name,
1809                               struct rbd_snap **snapp)
1810 {
1811         int ret;
1812         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1813         if (!snap)
1814                 return -ENOMEM;
1815         snap->name = kstrdup(name, GFP_KERNEL);
1816         snap->size = rbd_dev->header.snap_sizes[i];
1817         snap->id = rbd_dev->header.snapc->snaps[i];
1818         if (device_is_registered(&rbd_dev->dev)) {
1819                 ret = rbd_register_snap_dev(rbd_dev, snap,
1820                                              &rbd_dev->dev);
1821                 if (ret < 0)
1822                         goto err;
1823         }
1824         *snapp = snap;
1825         return 0;
1826 err:
1827         kfree(snap->name);
1828         kfree(snap);
1829         return ret;
1830 }
1831
1832 /*
1833  * search for the previous snap in a null delimited string list
1834  */
1835 const char *rbd_prev_snap_name(const char *name, const char *start)
1836 {
1837         if (name < start + 2)
1838                 return NULL;
1839
1840         name -= 2;
1841         while (*name) {
1842                 if (name == start)
1843                         return start;
1844                 name--;
1845         }
1846         return name + 1;
1847 }
1848
1849 /*
1850  * compare the old list of snapshots that we have to what's in the header
1851  * and update it accordingly. Note that the header holds the snapshots
1852  * in a reverse order (from newest to oldest) and we need to go from
1853  * older to new so that we don't get a duplicate snap name when
1854  * doing the process (e.g., removed snapshot and recreated a new
1855  * one with the same name.
1856  */
1857 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
1858 {
1859         const char *name, *first_name;
1860         int i = rbd_dev->header.total_snaps;
1861         struct rbd_snap *snap, *old_snap = NULL;
1862         int ret;
1863         struct list_head *p, *n;
1864
1865         first_name = rbd_dev->header.snap_names;
1866         name = first_name + rbd_dev->header.snap_names_len;
1867
1868         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
1869                 u64 cur_id;
1870
1871                 old_snap = list_entry(p, struct rbd_snap, node);
1872
1873                 if (i)
1874                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
1875
1876                 if (!i || old_snap->id < cur_id) {
1877                         /* old_snap->id was skipped, thus was removed */
1878                         __rbd_remove_snap_dev(rbd_dev, old_snap);
1879                         continue;
1880                 }
1881                 if (old_snap->id == cur_id) {
1882                         /* we have this snapshot already */
1883                         i--;
1884                         name = rbd_prev_snap_name(name, first_name);
1885                         continue;
1886                 }
1887                 for (; i > 0;
1888                      i--, name = rbd_prev_snap_name(name, first_name)) {
1889                         if (!name) {
1890                                 WARN_ON(1);
1891                                 return -EINVAL;
1892                         }
1893                         cur_id = rbd_dev->header.snapc->snaps[i];
1894                         /* snapshot removal? handle it above */
1895                         if (cur_id >= old_snap->id)
1896                                 break;
1897                         /* a new snapshot */
1898                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1899                         if (ret < 0)
1900                                 return ret;
1901
1902                         /* note that we add it backward so using n and not p */
1903                         list_add(&snap->node, n);
1904                         p = &snap->node;
1905                 }
1906         }
1907         /* we're done going over the old snap list, just add what's left */
1908         for (; i > 0; i--) {
1909                 name = rbd_prev_snap_name(name, first_name);
1910                 if (!name) {
1911                         WARN_ON(1);
1912                         return -EINVAL;
1913                 }
1914                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
1915                 if (ret < 0)
1916                         return ret;
1917                 list_add(&snap->node, &rbd_dev->snaps);
1918         }
1919
1920         return 0;
1921 }
1922
1923
1924 static void rbd_root_dev_release(struct device *dev)
1925 {
1926 }
1927
1928 static struct device rbd_root_dev = {
1929         .init_name =    "rbd",
1930         .release =      rbd_root_dev_release,
1931 };
1932
1933 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
1934 {
1935         int ret = -ENOMEM;
1936         struct device *dev;
1937         struct rbd_snap *snap;
1938
1939         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1940         dev = &rbd_dev->dev;
1941
1942         dev->bus = &rbd_bus_type;
1943         dev->type = &rbd_device_type;
1944         dev->parent = &rbd_root_dev;
1945         dev->release = rbd_dev_release;
1946         dev_set_name(dev, "%d", rbd_dev->id);
1947         ret = device_register(dev);
1948         if (ret < 0)
1949                 goto done_free;
1950
1951         list_for_each_entry(snap, &rbd_dev->snaps, node) {
1952                 ret = rbd_register_snap_dev(rbd_dev, snap,
1953                                              &rbd_dev->dev);
1954                 if (ret < 0)
1955                         break;
1956         }
1957
1958         mutex_unlock(&ctl_mutex);
1959         return 0;
1960 done_free:
1961         mutex_unlock(&ctl_mutex);
1962         return ret;
1963 }
1964
1965 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
1966 {
1967         device_unregister(&rbd_dev->dev);
1968 }
1969
1970 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
1971 {
1972         int ret, rc;
1973
1974         do {
1975                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
1976                                          rbd_dev->header.obj_version);
1977                 if (ret == -ERANGE) {
1978                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1979                         rc = __rbd_update_snaps(rbd_dev);
1980                         mutex_unlock(&ctl_mutex);
1981                         if (rc < 0)
1982                                 return rc;
1983                 }
1984         } while (ret == -ERANGE);
1985
1986         return ret;
1987 }
1988
1989 static ssize_t rbd_add(struct bus_type *bus,
1990                        const char *buf,
1991                        size_t count)
1992 {
1993         struct ceph_osd_client *osdc;
1994         struct rbd_device *rbd_dev;
1995         ssize_t rc = -ENOMEM;
1996         int irc, new_id = 0;
1997         struct list_head *tmp;
1998         char *mon_dev_name;
1999         char *options;
2000
2001         if (!try_module_get(THIS_MODULE))
2002                 return -ENODEV;
2003
2004         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2005         if (!mon_dev_name)
2006                 goto err_out_mod;
2007
2008         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2009         if (!options)
2010                 goto err_mon_dev;
2011
2012         /* new rbd_device object */
2013         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2014         if (!rbd_dev)
2015                 goto err_out_opt;
2016
2017         /* static rbd_device initialization */
2018         spin_lock_init(&rbd_dev->lock);
2019         INIT_LIST_HEAD(&rbd_dev->node);
2020         INIT_LIST_HEAD(&rbd_dev->snaps);
2021
2022         /* generate unique id: find highest unique id, add one */
2023         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2024
2025         list_for_each(tmp, &rbd_dev_list) {
2026                 struct rbd_device *rbd_dev;
2027
2028                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2029                 if (rbd_dev->id >= new_id)
2030                         new_id = rbd_dev->id + 1;
2031         }
2032
2033         rbd_dev->id = new_id;
2034
2035         /* add to global list */
2036         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2037
2038         /* parse add command */
2039         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2040                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2041                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2042                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2043                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2044                    mon_dev_name, options, rbd_dev->pool_name,
2045                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2046                 rc = -EINVAL;
2047                 goto err_out_slot;
2048         }
2049
2050         if (rbd_dev->snap_name[0] == 0)
2051                 rbd_dev->snap_name[0] = '-';
2052
2053         rbd_dev->obj_len = strlen(rbd_dev->obj);
2054         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2055                  rbd_dev->obj, RBD_SUFFIX);
2056
2057         /* initialize rest of new object */
2058         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2059         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2060         if (rc < 0)
2061                 goto err_out_slot;
2062
2063         mutex_unlock(&ctl_mutex);
2064
2065         /* pick the pool */
2066         osdc = &rbd_dev->client->osdc;
2067         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2068         if (rc < 0)
2069                 goto err_out_client;
2070         rbd_dev->poolid = rc;
2071
2072         /* register our block device */
2073         irc = register_blkdev(0, rbd_dev->name);
2074         if (irc < 0) {
2075                 rc = irc;
2076                 goto err_out_client;
2077         }
2078         rbd_dev->major = irc;
2079
2080         rc = rbd_bus_add_dev(rbd_dev);
2081         if (rc)
2082                 goto err_out_blkdev;
2083
2084         /* set up and announce blkdev mapping */
2085         rc = rbd_init_disk(rbd_dev);
2086         if (rc)
2087                 goto err_out_bus;
2088
2089         rc = rbd_init_watch_dev(rbd_dev);
2090         if (rc)
2091                 goto err_out_bus;
2092
2093         return count;
2094
2095 err_out_bus:
2096         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2097         list_del_init(&rbd_dev->node);
2098         mutex_unlock(&ctl_mutex);
2099
2100         /* this will also clean up rest of rbd_dev stuff */
2101
2102         rbd_bus_del_dev(rbd_dev);
2103         kfree(options);
2104         kfree(mon_dev_name);
2105         return rc;
2106
2107 err_out_blkdev:
2108         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2109 err_out_client:
2110         rbd_put_client(rbd_dev);
2111         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2112 err_out_slot:
2113         list_del_init(&rbd_dev->node);
2114         mutex_unlock(&ctl_mutex);
2115
2116         kfree(rbd_dev);
2117 err_out_opt:
2118         kfree(options);
2119 err_mon_dev:
2120         kfree(mon_dev_name);
2121 err_out_mod:
2122         dout("Error adding device %s\n", buf);
2123         module_put(THIS_MODULE);
2124         return rc;
2125 }
2126
2127 static struct rbd_device *__rbd_get_dev(unsigned long id)
2128 {
2129         struct list_head *tmp;
2130         struct rbd_device *rbd_dev;
2131
2132         list_for_each(tmp, &rbd_dev_list) {
2133                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2134                 if (rbd_dev->id == id)
2135                         return rbd_dev;
2136         }
2137         return NULL;
2138 }
2139
2140 static void rbd_dev_release(struct device *dev)
2141 {
2142         struct rbd_device *rbd_dev =
2143                         container_of(dev, struct rbd_device, dev);
2144
2145         if (rbd_dev->watch_request)
2146                 ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
2147                                                     rbd_dev->watch_request);
2148         if (rbd_dev->watch_event)
2149                 ceph_osdc_cancel_event(rbd_dev->watch_event);
2150
2151         rbd_put_client(rbd_dev);
2152
2153         /* clean up and free blkdev */
2154         rbd_free_disk(rbd_dev);
2155         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2156         kfree(rbd_dev);
2157
2158         /* release module ref */
2159         module_put(THIS_MODULE);
2160 }
2161
2162 static ssize_t rbd_remove(struct bus_type *bus,
2163                           const char *buf,
2164                           size_t count)
2165 {
2166         struct rbd_device *rbd_dev = NULL;
2167         int target_id, rc;
2168         unsigned long ul;
2169         int ret = count;
2170
2171         rc = strict_strtoul(buf, 10, &ul);
2172         if (rc)
2173                 return rc;
2174
2175         /* convert to int; abort if we lost anything in the conversion */
2176         target_id = (int) ul;
2177         if (target_id != ul)
2178                 return -EINVAL;
2179
2180         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2181
2182         rbd_dev = __rbd_get_dev(target_id);
2183         if (!rbd_dev) {
2184                 ret = -ENOENT;
2185                 goto done;
2186         }
2187
2188         list_del_init(&rbd_dev->node);
2189
2190         __rbd_remove_all_snaps(rbd_dev);
2191         rbd_bus_del_dev(rbd_dev);
2192
2193 done:
2194         mutex_unlock(&ctl_mutex);
2195         return ret;
2196 }
2197
2198 static ssize_t rbd_snap_add(struct device *dev,
2199                             struct device_attribute *attr,
2200                             const char *buf,
2201                             size_t count)
2202 {
2203         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2204         int ret;
2205         char *name = kmalloc(count + 1, GFP_KERNEL);
2206         if (!name)
2207                 return -ENOMEM;
2208
2209         snprintf(name, count, "%s", buf);
2210
2211         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2212
2213         ret = rbd_header_add_snap(rbd_dev,
2214                                   name, GFP_KERNEL);
2215         if (ret < 0)
2216                 goto err_unlock;
2217
2218         ret = __rbd_update_snaps(rbd_dev);
2219         if (ret < 0)
2220                 goto err_unlock;
2221
2222         /* shouldn't hold ctl_mutex when notifying.. notify might
2223            trigger a watch callback that would need to get that mutex */
2224         mutex_unlock(&ctl_mutex);
2225
2226         /* make a best effort, don't error if failed */
2227         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2228
2229         ret = count;
2230         kfree(name);
2231         return ret;
2232
2233 err_unlock:
2234         mutex_unlock(&ctl_mutex);
2235         kfree(name);
2236         return ret;
2237 }
2238
2239 static ssize_t rbd_snap_rollback(struct device *dev,
2240                                  struct device_attribute *attr,
2241                                  const char *buf,
2242                                  size_t count)
2243 {
2244         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2245         int ret;
2246         u64 snapid;
2247         u64 cur_ofs;
2248         char *seg_name = NULL;
2249         char *snap_name = kmalloc(count + 1, GFP_KERNEL);
2250         ret = -ENOMEM;
2251         if (!snap_name)
2252                 return ret;
2253
2254         /* parse snaps add command */
2255         snprintf(snap_name, count, "%s", buf);
2256         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
2257         if (!seg_name)
2258                 goto done;
2259
2260         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2261
2262         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
2263         if (ret < 0)
2264                 goto done_unlock;
2265
2266         dout("snapid=%lld\n", snapid);
2267
2268         cur_ofs = 0;
2269         while (cur_ofs < rbd_dev->header.image_size) {
2270                 cur_ofs += rbd_get_segment(&rbd_dev->header,
2271                                            rbd_dev->obj,
2272                                            cur_ofs, (u64)-1,
2273                                            seg_name, NULL);
2274                 dout("seg_name=%s\n", seg_name);
2275
2276                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
2277                 if (ret < 0)
2278                         pr_warning("could not roll back obj %s err=%d\n",
2279                                    seg_name, ret);
2280         }
2281
2282         ret = __rbd_update_snaps(rbd_dev);
2283         if (ret < 0)
2284                 goto done_unlock;
2285
2286         ret = count;
2287
2288 done_unlock:
2289         mutex_unlock(&ctl_mutex);
2290 done:
2291         kfree(seg_name);
2292         kfree(snap_name);
2293
2294         return ret;
2295 }
2296
2297 static struct bus_attribute rbd_bus_attrs[] = {
2298         __ATTR(add, S_IWUSR, NULL, rbd_add),
2299         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2300         __ATTR_NULL
2301 };
2302
2303 /*
2304  * create control files in sysfs
2305  * /sys/bus/rbd/...
2306  */
2307 static int rbd_sysfs_init(void)
2308 {
2309         int ret;
2310
2311         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2312
2313         ret = bus_register(&rbd_bus_type);
2314          if (ret < 0)
2315                 return ret;
2316
2317         ret = device_register(&rbd_root_dev);
2318
2319         return ret;
2320 }
2321
2322 static void rbd_sysfs_cleanup(void)
2323 {
2324         device_unregister(&rbd_root_dev);
2325         bus_unregister(&rbd_bus_type);
2326 }
2327
2328 int __init rbd_init(void)
2329 {
2330         int rc;
2331
2332         rc = rbd_sysfs_init();
2333         if (rc)
2334                 return rc;
2335         spin_lock_init(&node_lock);
2336         pr_info("loaded " DRV_NAME_LONG "\n");
2337         return 0;
2338 }
2339
2340 void __exit rbd_exit(void)
2341 {
2342         rbd_sysfs_cleanup();
2343 }
2344
2345 module_init(rbd_init);
2346 module_exit(rbd_exit);
2347
2348 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2349 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2350 MODULE_DESCRIPTION("rados block device");
2351
2352 /* following authorship retained from original osdblk.c */
2353 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2354
2355 MODULE_LICENSE("GPL");