83b15dd24853ca39949e21e9264afc0ec92b632d
[pandora-kernel.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    Instructions for use
25    --------------------
26
27    1) Map a Linux block device to an existing rbd image.
28
29       Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30
31       $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32
33       The snapshot name can be "-" or omitted to map the image read/write.
34
35    2) List all active blkdev<->object mappings.
36
37       In this example, we have performed step #1 twice, creating two blkdevs,
38       mapped to two separate rados objects in the rados rbd pool
39
40       $ cat /sys/class/rbd/list
41       #id     major   client_name     pool    name    snap    KB
42       0       254     client4143      rbd     foo     -      1024000
43
44       The columns, in order, are:
45       - blkdev unique id
46       - blkdev assigned major
47       - rados client id
48       - rados pool name
49       - rados block device name
50       - mapped snapshot ("-" if none)
51       - device size in KB
52
53
54    3) Create a snapshot.
55
56       Usage: <blkdev id> <snapname>
57
58       $ echo "0 mysnap" > /sys/class/rbd/snap_create
59
60
61    4) Listing a snapshot.
62
63       $ cat /sys/class/rbd/snaps_list
64       #id     snap    KB
65       0       -       1024000 (*)
66       0       foo     1024000
67
68       The columns, in order, are:
69       - blkdev unique id
70       - snapshot name, '-' means none (active read/write version)
71       - size of device at time of snapshot
72       - the (*) indicates this is the active version
73
74    5) Rollback to snapshot.
75
76       Usage: <blkdev id> <snapname>
77
78       $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79
80
81    6) Mapping an image using snapshot.
82
83       A snapshot mapping is read-only. This is being done by passing
84       snap=<snapname> to the options when adding a device.
85
86       $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87
88
89    7) Remove an active blkdev<->rbd image mapping.
90
91       In this example, we remove the mapping with blkdev unique id 1.
92
93       $ echo 1 > /sys/class/rbd/remove
94
95
96    NOTE:  The actual creation and deletion of rados objects is outside the scope
97    of this driver.
98
99  */
100
101 #include <linux/ceph/libceph.h>
102 #include <linux/ceph/osd_client.h>
103 #include <linux/ceph/mon_client.h>
104 #include <linux/ceph/decode.h>
105
106 #include <linux/kernel.h>
107 #include <linux/device.h>
108 #include <linux/module.h>
109 #include <linux/fs.h>
110 #include <linux/blkdev.h>
111
112 #include "rbd_types.h"
113
114 #define DRV_NAME "rbd"
115 #define DRV_NAME_LONG "rbd (rados block device)"
116
117 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
118
119 #define RBD_MAX_MD_NAME_LEN     (96 + sizeof(RBD_SUFFIX))
120 #define RBD_MAX_POOL_NAME_LEN   64
121 #define RBD_MAX_SNAP_NAME_LEN   32
122 #define RBD_MAX_OPT_LEN         1024
123
124 #define RBD_SNAP_HEAD_NAME      "-"
125
126 #define DEV_NAME_LEN            32
127
128 /*
129  * block device image metadata (in-memory version)
130  */
131 struct rbd_image_header {
132         u64 image_size;
133         char block_name[32];
134         __u8 obj_order;
135         __u8 crypt_type;
136         __u8 comp_type;
137         struct rw_semaphore snap_rwsem;
138         struct ceph_snap_context *snapc;
139         size_t snap_names_len;
140         u64 snap_seq;
141         u32 total_snaps;
142
143         char *snap_names;
144         u64 *snap_sizes;
145 };
146
147 /*
148  * an instance of the client.  multiple devices may share a client.
149  */
150 struct rbd_client {
151         struct ceph_client      *client;
152         struct kref             kref;
153         struct list_head        node;
154 };
155
156 /*
157  * a single io request
158  */
159 struct rbd_request {
160         struct request          *rq;            /* blk layer request */
161         struct bio              *bio;           /* cloned bio */
162         struct page             **pages;        /* list of used pages */
163         u64                     len;
164 };
165
166 /*
167  * a single device
168  */
169 struct rbd_device {
170         int                     id;             /* blkdev unique id */
171
172         int                     major;          /* blkdev assigned major */
173         struct gendisk          *disk;          /* blkdev's gendisk and rq */
174         struct request_queue    *q;
175
176         struct ceph_client      *client;
177         struct rbd_client       *rbd_client;
178
179         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181         spinlock_t              lock;           /* queue lock */
182
183         struct rbd_image_header header;
184         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185         int                     obj_len;
186         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
188         int                     poolid;
189
190         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
191         u32 cur_snap;   /* index+1 of current snapshot within snap context
192                            0 - for the head */
193         int read_only;
194
195         struct list_head        node;
196 };
197
198 static spinlock_t node_lock;      /* protects client get/put */
199
200 static struct class *class_rbd;   /* /sys/class/rbd */
201 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
202 static LIST_HEAD(rbd_dev_list);    /* devices */
203 static LIST_HEAD(rbd_client_list);      /* clients */
204
205
206 static int rbd_open(struct block_device *bdev, fmode_t mode)
207 {
208         struct gendisk *disk = bdev->bd_disk;
209         struct rbd_device *rbd_dev = disk->private_data;
210
211         set_device_ro(bdev, rbd_dev->read_only);
212
213         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214                 return -EROFS;
215
216         return 0;
217 }
218
219 static const struct block_device_operations rbd_bd_ops = {
220         .owner                  = THIS_MODULE,
221         .open                   = rbd_open,
222 };
223
224 /*
225  * Initialize an rbd client instance.
226  * We own *opt.
227  */
228 static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229 {
230         struct rbd_client *rbdc;
231         int ret = -ENOMEM;
232
233         dout("rbd_client_create\n");
234         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235         if (!rbdc)
236                 goto out_opt;
237
238         kref_init(&rbdc->kref);
239         INIT_LIST_HEAD(&rbdc->node);
240
241         rbdc->client = ceph_create_client(opt, rbdc);
242         if (IS_ERR(rbdc->client))
243                 goto out_rbdc;
244         opt = NULL; /* Now rbdc->client is responsible for opt */
245
246         ret = ceph_open_session(rbdc->client);
247         if (ret < 0)
248                 goto out_err;
249
250         spin_lock(&node_lock);
251         list_add_tail(&rbdc->node, &rbd_client_list);
252         spin_unlock(&node_lock);
253
254         dout("rbd_client_create created %p\n", rbdc);
255         return rbdc;
256
257 out_err:
258         ceph_destroy_client(rbdc->client);
259 out_rbdc:
260         kfree(rbdc);
261 out_opt:
262         if (opt)
263                 ceph_destroy_options(opt);
264         return ERR_PTR(ret);
265 }
266
267 /*
268  * Find a ceph client with specific addr and configuration.
269  */
270 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271 {
272         struct rbd_client *client_node;
273
274         if (opt->flags & CEPH_OPT_NOSHARE)
275                 return NULL;
276
277         list_for_each_entry(client_node, &rbd_client_list, node)
278                 if (ceph_compare_options(opt, client_node->client) == 0)
279                         return client_node;
280         return NULL;
281 }
282
283 /*
284  * Get a ceph client with specific addr and configuration, if one does
285  * not exist create it.
286  */
287 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288                           char *options)
289 {
290         struct rbd_client *rbdc;
291         struct ceph_options *opt;
292         int ret;
293
294         ret = ceph_parse_options(&opt, options, mon_addr,
295                                  mon_addr + strlen(mon_addr), NULL, NULL);
296         if (ret < 0)
297                 return ret;
298
299         spin_lock(&node_lock);
300         rbdc = __rbd_client_find(opt);
301         if (rbdc) {
302                 ceph_destroy_options(opt);
303
304                 /* using an existing client */
305                 kref_get(&rbdc->kref);
306                 rbd_dev->rbd_client = rbdc;
307                 rbd_dev->client = rbdc->client;
308                 spin_unlock(&node_lock);
309                 return 0;
310         }
311         spin_unlock(&node_lock);
312
313         rbdc = rbd_client_create(opt);
314         if (IS_ERR(rbdc))
315                 return PTR_ERR(rbdc);
316
317         rbd_dev->rbd_client = rbdc;
318         rbd_dev->client = rbdc->client;
319         return 0;
320 }
321
322 /*
323  * Destroy ceph client
324  */
325 static void rbd_client_release(struct kref *kref)
326 {
327         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328
329         dout("rbd_release_client %p\n", rbdc);
330         spin_lock(&node_lock);
331         list_del(&rbdc->node);
332         spin_unlock(&node_lock);
333
334         ceph_destroy_client(rbdc->client);
335         kfree(rbdc);
336 }
337
338 /*
339  * Drop reference to ceph client node. If it's not referenced anymore, release
340  * it.
341  */
342 static void rbd_put_client(struct rbd_device *rbd_dev)
343 {
344         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345         rbd_dev->rbd_client = NULL;
346         rbd_dev->client = NULL;
347 }
348
349
350 /*
351  * Create a new header structure, translate header format from the on-disk
352  * header.
353  */
354 static int rbd_header_from_disk(struct rbd_image_header *header,
355                                  struct rbd_image_header_ondisk *ondisk,
356                                  int allocated_snaps,
357                                  gfp_t gfp_flags)
358 {
359         int i;
360         u32 snap_count = le32_to_cpu(ondisk->snap_count);
361         int ret = -ENOMEM;
362
363         init_rwsem(&header->snap_rwsem);
364
365         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367                                 snap_count *
368                                  sizeof(struct rbd_image_snap_ondisk),
369                                 gfp_flags);
370         if (!header->snapc)
371                 return -ENOMEM;
372         if (snap_count) {
373                 header->snap_names = kmalloc(header->snap_names_len,
374                                              GFP_KERNEL);
375                 if (!header->snap_names)
376                         goto err_snapc;
377                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378                                              GFP_KERNEL);
379                 if (!header->snap_sizes)
380                         goto err_names;
381         } else {
382                 header->snap_names = NULL;
383                 header->snap_sizes = NULL;
384         }
385         memcpy(header->block_name, ondisk->block_name,
386                sizeof(ondisk->block_name));
387
388         header->image_size = le64_to_cpu(ondisk->image_size);
389         header->obj_order = ondisk->options.order;
390         header->crypt_type = ondisk->options.crypt_type;
391         header->comp_type = ondisk->options.comp_type;
392
393         atomic_set(&header->snapc->nref, 1);
394         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395         header->snapc->num_snaps = snap_count;
396         header->total_snaps = snap_count;
397
398         if (snap_count &&
399             allocated_snaps == snap_count) {
400                 for (i = 0; i < snap_count; i++) {
401                         header->snapc->snaps[i] =
402                                 le64_to_cpu(ondisk->snaps[i].id);
403                         header->snap_sizes[i] =
404                                 le64_to_cpu(ondisk->snaps[i].image_size);
405                 }
406
407                 /* copy snapshot names */
408                 memcpy(header->snap_names, &ondisk->snaps[i],
409                         header->snap_names_len);
410         }
411
412         return 0;
413
414 err_names:
415         kfree(header->snap_names);
416 err_snapc:
417         kfree(header->snapc);
418         return ret;
419 }
420
421 static int snap_index(struct rbd_image_header *header, int snap_num)
422 {
423         return header->total_snaps - snap_num;
424 }
425
426 static u64 cur_snap_id(struct rbd_device *rbd_dev)
427 {
428         struct rbd_image_header *header = &rbd_dev->header;
429
430         if (!rbd_dev->cur_snap)
431                 return 0;
432
433         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434 }
435
436 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437                         u64 *seq, u64 *size)
438 {
439         int i;
440         char *p = header->snap_names;
441
442         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443                 if (strcmp(snap_name, p) == 0)
444                         break;
445         }
446         if (i == header->total_snaps)
447                 return -ENOENT;
448         if (seq)
449                 *seq = header->snapc->snaps[i];
450
451         if (size)
452                 *size = header->snap_sizes[i];
453
454         return i;
455 }
456
457 static int rbd_header_set_snap(struct rbd_device *dev,
458                                const char *snap_name,
459                                u64 *size)
460 {
461         struct rbd_image_header *header = &dev->header;
462         struct ceph_snap_context *snapc = header->snapc;
463         int ret = -ENOENT;
464
465         down_write(&header->snap_rwsem);
466
467         if (!snap_name ||
468             !*snap_name ||
469             strcmp(snap_name, "-") == 0 ||
470             strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471                 if (header->total_snaps)
472                         snapc->seq = header->snap_seq;
473                 else
474                         snapc->seq = 0;
475                 dev->cur_snap = 0;
476                 dev->read_only = 0;
477                 if (size)
478                         *size = header->image_size;
479         } else {
480                 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481                 if (ret < 0)
482                         goto done;
483
484                 dev->cur_snap = header->total_snaps - ret;
485                 dev->read_only = 1;
486         }
487
488         ret = 0;
489 done:
490         up_write(&header->snap_rwsem);
491         return ret;
492 }
493
494 static void rbd_header_free(struct rbd_image_header *header)
495 {
496         kfree(header->snapc);
497         kfree(header->snap_names);
498         kfree(header->snap_sizes);
499 }
500
501 /*
502  * get the actual striped segment name, offset and length
503  */
504 static u64 rbd_get_segment(struct rbd_image_header *header,
505                            const char *block_name,
506                            u64 ofs, u64 len,
507                            char *seg_name, u64 *segofs)
508 {
509         u64 seg = ofs >> header->obj_order;
510
511         if (seg_name)
512                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513                          "%s.%012llx", block_name, seg);
514
515         ofs = ofs & ((1 << header->obj_order) - 1);
516         len = min_t(u64, len, (1 << header->obj_order) - ofs);
517
518         if (segofs)
519                 *segofs = ofs;
520
521         return len;
522 }
523
524 /*
525  * bio helpers
526  */
527
528 static void bio_chain_put(struct bio *chain)
529 {
530         struct bio *tmp;
531
532         while (chain) {
533                 tmp = chain;
534                 chain = chain->bi_next;
535                 bio_put(tmp);
536         }
537 }
538
539 /*
540  * zeros a bio chain, starting at specific offset
541  */
542 static void zero_bio_chain(struct bio *chain, int start_ofs)
543 {
544         struct bio_vec *bv;
545         unsigned long flags;
546         void *buf;
547         int i;
548         int pos = 0;
549
550         while (chain) {
551                 bio_for_each_segment(bv, chain, i) {
552                         if (pos + bv->bv_len > start_ofs) {
553                                 int remainder = max(start_ofs - pos, 0);
554                                 buf = bvec_kmap_irq(bv, &flags);
555                                 memset(buf + remainder, 0,
556                                        bv->bv_len - remainder);
557                                 bvec_kunmap_irq(bv, &flags);
558                         }
559                         pos += bv->bv_len;
560                 }
561
562                 chain = chain->bi_next;
563         }
564 }
565
566 /*
567  * bio_chain_clone - clone a chain of bios up to a certain length.
568  * might return a bio_pair that will need to be released.
569  */
570 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571                                    struct bio_pair **bp,
572                                    int len, gfp_t gfpmask)
573 {
574         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575         int total = 0;
576
577         if (*bp) {
578                 bio_pair_release(*bp);
579                 *bp = NULL;
580         }
581
582         while (old_chain && (total < len)) {
583                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584                 if (!tmp)
585                         goto err_out;
586
587                 if (total + old_chain->bi_size > len) {
588                         struct bio_pair *bp;
589
590                         /*
591                          * this split can only happen with a single paged bio,
592                          * split_bio will BUG_ON if this is not the case
593                          */
594                         dout("bio_chain_clone split! total=%d remaining=%d"
595                              "bi_size=%d\n",
596                              (int)total, (int)len-total,
597                              (int)old_chain->bi_size);
598
599                         /* split the bio. We'll release it either in the next
600                            call, or it will have to be released outside */
601                         bp = bio_split(old_chain, (len - total) / 512ULL);
602                         if (!bp)
603                                 goto err_out;
604
605                         __bio_clone(tmp, &bp->bio1);
606
607                         *next = &bp->bio2;
608                 } else {
609                         __bio_clone(tmp, old_chain);
610                         *next = old_chain->bi_next;
611                 }
612
613                 tmp->bi_bdev = NULL;
614                 gfpmask &= ~__GFP_WAIT;
615                 tmp->bi_next = NULL;
616
617                 if (!new_chain) {
618                         new_chain = tail = tmp;
619                 } else {
620                         tail->bi_next = tmp;
621                         tail = tmp;
622                 }
623                 old_chain = old_chain->bi_next;
624
625                 total += tmp->bi_size;
626         }
627
628         BUG_ON(total < len);
629
630         if (tail)
631                 tail->bi_next = NULL;
632
633         *old = old_chain;
634
635         return new_chain;
636
637 err_out:
638         dout("bio_chain_clone with err\n");
639         bio_chain_put(new_chain);
640         return NULL;
641 }
642
643 /*
644  * helpers for osd request op vectors.
645  */
646 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647                             int num_ops,
648                             int opcode,
649                             u32 payload_len)
650 {
651         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652                        GFP_NOIO);
653         if (!*ops)
654                 return -ENOMEM;
655         (*ops)[0].op = opcode;
656         /*
657          * op extent offset and length will be set later on
658          * in calc_raw_layout()
659          */
660         (*ops)[0].payload_len = payload_len;
661         return 0;
662 }
663
664 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665 {
666         kfree(ops);
667 }
668
669 /*
670  * Send ceph osd request
671  */
672 static int rbd_do_request(struct request *rq,
673                           struct rbd_device *dev,
674                           struct ceph_snap_context *snapc,
675                           u64 snapid,
676                           const char *obj, u64 ofs, u64 len,
677                           struct bio *bio,
678                           struct page **pages,
679                           int num_pages,
680                           int flags,
681                           struct ceph_osd_req_op *ops,
682                           int num_reply,
683                           void (*rbd_cb)(struct ceph_osd_request *req,
684                                          struct ceph_msg *msg))
685 {
686         struct ceph_osd_request *req;
687         struct ceph_file_layout *layout;
688         int ret;
689         u64 bno;
690         struct timespec mtime = CURRENT_TIME;
691         struct rbd_request *req_data;
692         struct ceph_osd_request_head *reqhead;
693         struct rbd_image_header *header = &dev->header;
694
695         ret = -ENOMEM;
696         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697         if (!req_data)
698                 goto done;
699
700         dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701
702         down_read(&header->snap_rwsem);
703
704         req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705                                       snapc,
706                                       ops,
707                                       false,
708                                       GFP_NOIO, pages, bio);
709         if (IS_ERR(req)) {
710                 up_read(&header->snap_rwsem);
711                 ret = PTR_ERR(req);
712                 goto done_pages;
713         }
714
715         req->r_callback = rbd_cb;
716
717         req_data->rq = rq;
718         req_data->bio = bio;
719         req_data->pages = pages;
720         req_data->len = len;
721
722         req->r_priv = req_data;
723
724         reqhead = req->r_request->front.iov_base;
725         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726
727         strncpy(req->r_oid, obj, sizeof(req->r_oid));
728         req->r_oid_len = strlen(req->r_oid);
729
730         layout = &req->r_file_layout;
731         memset(layout, 0, sizeof(*layout));
732         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733         layout->fl_stripe_count = cpu_to_le32(1);
734         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735         layout->fl_pg_preferred = cpu_to_le32(-1);
736         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737         ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738                              ofs, &len, &bno, req, ops);
739
740         ceph_osdc_build_request(req, ofs, &len,
741                                 ops,
742                                 snapc,
743                                 &mtime,
744                                 req->r_oid, req->r_oid_len);
745         up_read(&header->snap_rwsem);
746
747         ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748         if (ret < 0)
749                 goto done_err;
750
751         if (!rbd_cb) {
752                 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753                 ceph_osdc_put_request(req);
754         }
755         return ret;
756
757 done_err:
758         bio_chain_put(req_data->bio);
759         ceph_osdc_put_request(req);
760 done_pages:
761         kfree(req_data);
762 done:
763         if (rq)
764                 blk_end_request(rq, ret, len);
765         return ret;
766 }
767
768 /*
769  * Ceph osd op callback
770  */
771 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772 {
773         struct rbd_request *req_data = req->r_priv;
774         struct ceph_osd_reply_head *replyhead;
775         struct ceph_osd_op *op;
776         __s32 rc;
777         u64 bytes;
778         int read_op;
779
780         /* parse reply */
781         replyhead = msg->front.iov_base;
782         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783         op = (void *)(replyhead + 1);
784         rc = le32_to_cpu(replyhead->result);
785         bytes = le64_to_cpu(op->extent.length);
786         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787
788         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789
790         if (rc == -ENOENT && read_op) {
791                 zero_bio_chain(req_data->bio, 0);
792                 rc = 0;
793         } else if (rc == 0 && read_op && bytes < req_data->len) {
794                 zero_bio_chain(req_data->bio, bytes);
795                 bytes = req_data->len;
796         }
797
798         blk_end_request(req_data->rq, rc, bytes);
799
800         if (req_data->bio)
801                 bio_chain_put(req_data->bio);
802
803         ceph_osdc_put_request(req);
804         kfree(req_data);
805 }
806
807 /*
808  * Do a synchronous ceph osd operation
809  */
810 static int rbd_req_sync_op(struct rbd_device *dev,
811                            struct ceph_snap_context *snapc,
812                            u64 snapid,
813                            int opcode,
814                            int flags,
815                            struct ceph_osd_req_op *orig_ops,
816                            int num_reply,
817                            const char *obj,
818                            u64 ofs, u64 len,
819                            char *buf)
820 {
821         int ret;
822         struct page **pages;
823         int num_pages;
824         struct ceph_osd_req_op *ops = orig_ops;
825         u32 payload_len;
826
827         num_pages = calc_pages_for(ofs , len);
828         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
829         if (!pages)
830                 return -ENOMEM;
831
832         if (!orig_ops) {
833                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835                 if (ret < 0)
836                         goto done;
837
838                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840                         if (ret < 0)
841                                 goto done_ops;
842                 }
843         }
844
845         ret = rbd_do_request(NULL, dev, snapc, snapid,
846                           obj, ofs, len, NULL,
847                           pages, num_pages,
848                           flags,
849                           ops,
850                           2,
851                           NULL);
852         if (ret < 0)
853                 goto done_ops;
854
855         if ((flags & CEPH_OSD_FLAG_READ) && buf)
856                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857
858 done_ops:
859         if (!orig_ops)
860                 rbd_destroy_ops(ops);
861 done:
862         ceph_release_page_vector(pages, num_pages);
863         return ret;
864 }
865
866 /*
867  * Do an asynchronous ceph osd operation
868  */
869 static int rbd_do_op(struct request *rq,
870                      struct rbd_device *rbd_dev ,
871                      struct ceph_snap_context *snapc,
872                      u64 snapid,
873                      int opcode, int flags, int num_reply,
874                      u64 ofs, u64 len,
875                      struct bio *bio)
876 {
877         char *seg_name;
878         u64 seg_ofs;
879         u64 seg_len;
880         int ret;
881         struct ceph_osd_req_op *ops;
882         u32 payload_len;
883
884         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885         if (!seg_name)
886                 return -ENOMEM;
887
888         seg_len = rbd_get_segment(&rbd_dev->header,
889                                   rbd_dev->header.block_name,
890                                   ofs, len,
891                                   seg_name, &seg_ofs);
892         if ((s64)seg_len < 0) {
893                 ret = seg_len;
894                 goto done;
895         }
896
897         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
898
899         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
900         if (ret < 0)
901                 goto done;
902
903         /* we've taken care of segment sizes earlier when we
904            cloned the bios. We should never have a segment
905            truncated at this point */
906         BUG_ON(seg_len < len);
907
908         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
909                              seg_name, seg_ofs, seg_len,
910                              bio,
911                              NULL, 0,
912                              flags,
913                              ops,
914                              num_reply,
915                              rbd_req_cb);
916 done:
917         kfree(seg_name);
918         return ret;
919 }
920
921 /*
922  * Request async osd write
923  */
924 static int rbd_req_write(struct request *rq,
925                          struct rbd_device *rbd_dev,
926                          struct ceph_snap_context *snapc,
927                          u64 ofs, u64 len,
928                          struct bio *bio)
929 {
930         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
931                          CEPH_OSD_OP_WRITE,
932                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
933                          2,
934                          ofs, len, bio);
935 }
936
937 /*
938  * Request async osd read
939  */
940 static int rbd_req_read(struct request *rq,
941                          struct rbd_device *rbd_dev,
942                          u64 snapid,
943                          u64 ofs, u64 len,
944                          struct bio *bio)
945 {
946         return rbd_do_op(rq, rbd_dev, NULL,
947                          (snapid ? snapid : CEPH_NOSNAP),
948                          CEPH_OSD_OP_READ,
949                          CEPH_OSD_FLAG_READ,
950                          2,
951                          ofs, len, bio);
952 }
953
954 /*
955  * Request sync osd read
956  */
957 static int rbd_req_sync_read(struct rbd_device *dev,
958                           struct ceph_snap_context *snapc,
959                           u64 snapid,
960                           const char *obj,
961                           u64 ofs, u64 len,
962                           char *buf)
963 {
964         return rbd_req_sync_op(dev, NULL,
965                                (snapid ? snapid : CEPH_NOSNAP),
966                                CEPH_OSD_OP_READ,
967                                CEPH_OSD_FLAG_READ,
968                                NULL,
969                                1, obj, ofs, len, buf);
970 }
971
972 /*
973  * Request sync osd read
974  */
975 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
976                                      u64 snapid,
977                                      const char *obj)
978 {
979         struct ceph_osd_req_op *ops;
980         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
981         if (ret < 0)
982                 return ret;
983
984         ops[0].snap.snapid = snapid;
985
986         ret = rbd_req_sync_op(dev, NULL,
987                                CEPH_NOSNAP,
988                                0,
989                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
990                                ops,
991                                1, obj, 0, 0, NULL);
992
993         rbd_destroy_ops(ops);
994
995         if (ret < 0)
996                 return ret;
997
998         return ret;
999 }
1000
1001 /*
1002  * Request sync osd read
1003  */
1004 static int rbd_req_sync_exec(struct rbd_device *dev,
1005                              const char *obj,
1006                              const char *cls,
1007                              const char *method,
1008                              const char *data,
1009                              int len)
1010 {
1011         struct ceph_osd_req_op *ops;
1012         int cls_len = strlen(cls);
1013         int method_len = strlen(method);
1014         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1015                                     cls_len + method_len + len);
1016         if (ret < 0)
1017                 return ret;
1018
1019         ops[0].cls.class_name = cls;
1020         ops[0].cls.class_len = (__u8)cls_len;
1021         ops[0].cls.method_name = method;
1022         ops[0].cls.method_len = (__u8)method_len;
1023         ops[0].cls.argc = 0;
1024         ops[0].cls.indata = data;
1025         ops[0].cls.indata_len = len;
1026
1027         ret = rbd_req_sync_op(dev, NULL,
1028                                CEPH_NOSNAP,
1029                                0,
1030                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1031                                ops,
1032                                1, obj, 0, 0, NULL);
1033
1034         rbd_destroy_ops(ops);
1035
1036         dout("cls_exec returned %d\n", ret);
1037         return ret;
1038 }
1039
1040 /*
1041  * block device queue callback
1042  */
1043 static void rbd_rq_fn(struct request_queue *q)
1044 {
1045         struct rbd_device *rbd_dev = q->queuedata;
1046         struct request *rq;
1047         struct bio_pair *bp = NULL;
1048
1049         rq = blk_fetch_request(q);
1050
1051         while (1) {
1052                 struct bio *bio;
1053                 struct bio *rq_bio, *next_bio = NULL;
1054                 bool do_write;
1055                 int size, op_size = 0;
1056                 u64 ofs;
1057
1058                 /* peek at request from block layer */
1059                 if (!rq)
1060                         break;
1061
1062                 dout("fetched request\n");
1063
1064                 /* filter out block requests we don't understand */
1065                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1066                         __blk_end_request_all(rq, 0);
1067                         goto next;
1068                 }
1069
1070                 /* deduce our operation (read, write) */
1071                 do_write = (rq_data_dir(rq) == WRITE);
1072
1073                 size = blk_rq_bytes(rq);
1074                 ofs = blk_rq_pos(rq) * 512ULL;
1075                 rq_bio = rq->bio;
1076                 if (do_write && rbd_dev->read_only) {
1077                         __blk_end_request_all(rq, -EROFS);
1078                         goto next;
1079                 }
1080
1081                 spin_unlock_irq(q->queue_lock);
1082
1083                 dout("%s 0x%x bytes at 0x%llx\n",
1084                      do_write ? "write" : "read",
1085                      size, blk_rq_pos(rq) * 512ULL);
1086
1087                 do {
1088                         /* a bio clone to be passed down to OSD req */
1089                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1090                         op_size = rbd_get_segment(&rbd_dev->header,
1091                                                   rbd_dev->header.block_name,
1092                                                   ofs, size,
1093                                                   NULL, NULL);
1094                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1095                                               op_size, GFP_ATOMIC);
1096                         if (!bio) {
1097                                 spin_lock_irq(q->queue_lock);
1098                                 __blk_end_request_all(rq, -ENOMEM);
1099                                 goto next;
1100                         }
1101
1102                         /* init OSD command: write or read */
1103                         if (do_write)
1104                                 rbd_req_write(rq, rbd_dev,
1105                                               rbd_dev->header.snapc,
1106                                               ofs,
1107                                               op_size, bio);
1108                         else
1109                                 rbd_req_read(rq, rbd_dev,
1110                                              cur_snap_id(rbd_dev),
1111                                              ofs,
1112                                              op_size, bio);
1113
1114                         size -= op_size;
1115                         ofs += op_size;
1116
1117                         rq_bio = next_bio;
1118                 } while (size > 0);
1119
1120                 if (bp)
1121                         bio_pair_release(bp);
1122
1123                 spin_lock_irq(q->queue_lock);
1124 next:
1125                 rq = blk_fetch_request(q);
1126         }
1127 }
1128
1129 /*
1130  * a queue callback. Makes sure that we don't create a bio that spans across
1131  * multiple osd objects. One exception would be with a single page bios,
1132  * which we handle later at bio_chain_clone
1133  */
1134 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1135                           struct bio_vec *bvec)
1136 {
1137         struct rbd_device *rbd_dev = q->queuedata;
1138         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1139         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1140         unsigned int bio_sectors = bmd->bi_size >> 9;
1141         int max;
1142
1143         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1144                                  + bio_sectors)) << 9;
1145         if (max < 0)
1146                 max = 0; /* bio_add cannot handle a negative return */
1147         if (max <= bvec->bv_len && bio_sectors == 0)
1148                 return bvec->bv_len;
1149         return max;
1150 }
1151
1152 static void rbd_free_disk(struct rbd_device *rbd_dev)
1153 {
1154         struct gendisk *disk = rbd_dev->disk;
1155
1156         if (!disk)
1157                 return;
1158
1159         rbd_header_free(&rbd_dev->header);
1160
1161         if (disk->flags & GENHD_FL_UP)
1162                 del_gendisk(disk);
1163         if (disk->queue)
1164                 blk_cleanup_queue(disk->queue);
1165         put_disk(disk);
1166 }
1167
1168 /*
1169  * reload the ondisk the header 
1170  */
1171 static int rbd_read_header(struct rbd_device *rbd_dev,
1172                            struct rbd_image_header *header)
1173 {
1174         ssize_t rc;
1175         struct rbd_image_header_ondisk *dh;
1176         int snap_count = 0;
1177         u64 snap_names_len = 0;
1178
1179         while (1) {
1180                 int len = sizeof(*dh) +
1181                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1182                           snap_names_len;
1183
1184                 rc = -ENOMEM;
1185                 dh = kmalloc(len, GFP_KERNEL);
1186                 if (!dh)
1187                         return -ENOMEM;
1188
1189                 rc = rbd_req_sync_read(rbd_dev,
1190                                        NULL, CEPH_NOSNAP,
1191                                        rbd_dev->obj_md_name,
1192                                        0, len,
1193                                        (char *)dh);
1194                 if (rc < 0)
1195                         goto out_dh;
1196
1197                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1198                 if (rc < 0)
1199                         goto out_dh;
1200
1201                 if (snap_count != header->total_snaps) {
1202                         snap_count = header->total_snaps;
1203                         snap_names_len = header->snap_names_len;
1204                         rbd_header_free(header);
1205                         kfree(dh);
1206                         continue;
1207                 }
1208                 break;
1209         }
1210
1211 out_dh:
1212         kfree(dh);
1213         return rc;
1214 }
1215
1216 /*
1217  * create a snapshot
1218  */
1219 static int rbd_header_add_snap(struct rbd_device *dev,
1220                                const char *snap_name,
1221                                gfp_t gfp_flags)
1222 {
1223         int name_len = strlen(snap_name);
1224         u64 new_snapid;
1225         int ret;
1226         void *data, *data_start, *data_end;
1227
1228         /* we should create a snapshot only if we're pointing at the head */
1229         if (dev->cur_snap)
1230                 return -EINVAL;
1231
1232         ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1233                                       &new_snapid);
1234         dout("created snapid=%lld\n", new_snapid);
1235         if (ret < 0)
1236                 return ret;
1237
1238         data = kmalloc(name_len + 16, gfp_flags);
1239         if (!data)
1240                 return -ENOMEM;
1241
1242         data_start = data;
1243         data_end = data + name_len + 16;
1244
1245         ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1246         ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1247
1248         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1249                                 data_start, data - data_start);
1250
1251         kfree(data_start);
1252
1253         if (ret < 0)
1254                 return ret;
1255
1256         dev->header.snapc->seq =  new_snapid;
1257
1258         return 0;
1259 bad:
1260         return -ERANGE;
1261 }
1262
1263 /*
1264  * only read the first part of the ondisk header, without the snaps info
1265  */
1266 static int rbd_update_snaps(struct rbd_device *rbd_dev)
1267 {
1268         int ret;
1269         struct rbd_image_header h;
1270         u64 snap_seq;
1271
1272         ret = rbd_read_header(rbd_dev, &h);
1273         if (ret < 0)
1274                 return ret;
1275
1276         down_write(&rbd_dev->header.snap_rwsem);
1277
1278         snap_seq = rbd_dev->header.snapc->seq;
1279
1280         kfree(rbd_dev->header.snapc);
1281         kfree(rbd_dev->header.snap_names);
1282         kfree(rbd_dev->header.snap_sizes);
1283
1284         rbd_dev->header.total_snaps = h.total_snaps;
1285         rbd_dev->header.snapc = h.snapc;
1286         rbd_dev->header.snap_names = h.snap_names;
1287         rbd_dev->header.snap_sizes = h.snap_sizes;
1288         rbd_dev->header.snapc->seq = snap_seq;
1289
1290         up_write(&rbd_dev->header.snap_rwsem);
1291
1292         return 0;
1293 }
1294
1295 static int rbd_init_disk(struct rbd_device *rbd_dev)
1296 {
1297         struct gendisk *disk;
1298         struct request_queue *q;
1299         int rc;
1300         u64 total_size = 0;
1301
1302         /* contact OSD, request size info about the object being mapped */
1303         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1304         if (rc)
1305                 return rc;
1306
1307         rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1308         if (rc)
1309                 return rc;
1310
1311         /* create gendisk info */
1312         rc = -ENOMEM;
1313         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1314         if (!disk)
1315                 goto out;
1316
1317         sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1318         disk->major = rbd_dev->major;
1319         disk->first_minor = 0;
1320         disk->fops = &rbd_bd_ops;
1321         disk->private_data = rbd_dev;
1322
1323         /* init rq */
1324         rc = -ENOMEM;
1325         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1326         if (!q)
1327                 goto out_disk;
1328         blk_queue_merge_bvec(q, rbd_merge_bvec);
1329         disk->queue = q;
1330
1331         q->queuedata = rbd_dev;
1332
1333         rbd_dev->disk = disk;
1334         rbd_dev->q = q;
1335
1336         /* finally, announce the disk to the world */
1337         set_capacity(disk, total_size / 512ULL);
1338         add_disk(disk);
1339
1340         pr_info("%s: added with size 0x%llx\n",
1341                 disk->disk_name, (unsigned long long)total_size);
1342         return 0;
1343
1344 out_disk:
1345         put_disk(disk);
1346 out:
1347         return rc;
1348 }
1349
1350 /********************************************************************
1351  * /sys/class/rbd/
1352  *                   add        map rados objects to blkdev
1353  *                   remove     unmap rados objects
1354  *                   list       show mappings
1355  *******************************************************************/
1356
1357 static void class_rbd_release(struct class *cls)
1358 {
1359         kfree(cls);
1360 }
1361
1362 static ssize_t class_rbd_list(struct class *c,
1363                               struct class_attribute *attr,
1364                               char *data)
1365 {
1366         int n = 0;
1367         struct list_head *tmp;
1368         int max = PAGE_SIZE;
1369
1370         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1371
1372         n += snprintf(data, max,
1373                       "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1374
1375         list_for_each(tmp, &rbd_dev_list) {
1376                 struct rbd_device *rbd_dev;
1377
1378                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1379                 n += snprintf(data+n, max-n,
1380                               "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1381                               rbd_dev->id,
1382                               rbd_dev->major,
1383                               ceph_client_id(rbd_dev->client),
1384                               rbd_dev->pool_name,
1385                               rbd_dev->obj, rbd_dev->snap_name,
1386                               rbd_dev->header.image_size >> 10);
1387                 if (n == max)
1388                         break;
1389         }
1390
1391         mutex_unlock(&ctl_mutex);
1392         return n;
1393 }
1394
1395 static ssize_t class_rbd_add(struct class *c,
1396                              struct class_attribute *attr,
1397                              const char *buf, size_t count)
1398 {
1399         struct ceph_osd_client *osdc;
1400         struct rbd_device *rbd_dev;
1401         ssize_t rc = -ENOMEM;
1402         int irc, new_id = 0;
1403         struct list_head *tmp;
1404         char *mon_dev_name;
1405         char *options;
1406
1407         if (!try_module_get(THIS_MODULE))
1408                 return -ENODEV;
1409
1410         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1411         if (!mon_dev_name)
1412                 goto err_out_mod;
1413
1414         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1415         if (!options)
1416                 goto err_mon_dev;
1417
1418         /* new rbd_device object */
1419         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1420         if (!rbd_dev)
1421                 goto err_out_opt;
1422
1423         /* static rbd_device initialization */
1424         spin_lock_init(&rbd_dev->lock);
1425         INIT_LIST_HEAD(&rbd_dev->node);
1426
1427         /* generate unique id: find highest unique id, add one */
1428         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1429
1430         list_for_each(tmp, &rbd_dev_list) {
1431                 struct rbd_device *rbd_dev;
1432
1433                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1434                 if (rbd_dev->id >= new_id)
1435                         new_id = rbd_dev->id + 1;
1436         }
1437
1438         rbd_dev->id = new_id;
1439
1440         /* add to global list */
1441         list_add_tail(&rbd_dev->node, &rbd_dev_list);
1442
1443         /* parse add command */
1444         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1445                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
1446                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1447                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1448                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1449                    mon_dev_name, options, rbd_dev->pool_name,
1450                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
1451                 rc = -EINVAL;
1452                 goto err_out_slot;
1453         }
1454
1455         if (rbd_dev->snap_name[0] == 0)
1456                 rbd_dev->snap_name[0] = '-';
1457
1458         rbd_dev->obj_len = strlen(rbd_dev->obj);
1459         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1460                  rbd_dev->obj, RBD_SUFFIX);
1461
1462         /* initialize rest of new object */
1463         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1464         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1465         if (rc < 0)
1466                 goto err_out_slot;
1467
1468         mutex_unlock(&ctl_mutex);
1469
1470         /* pick the pool */
1471         osdc = &rbd_dev->client->osdc;
1472         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1473         if (rc < 0)
1474                 goto err_out_client;
1475         rbd_dev->poolid = rc;
1476
1477         /* register our block device */
1478         irc = register_blkdev(0, rbd_dev->name);
1479         if (irc < 0) {
1480                 rc = irc;
1481                 goto err_out_client;
1482         }
1483         rbd_dev->major = irc;
1484
1485         /* set up and announce blkdev mapping */
1486         rc = rbd_init_disk(rbd_dev);
1487         if (rc)
1488                 goto err_out_blkdev;
1489
1490         return count;
1491
1492 err_out_blkdev:
1493         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1494 err_out_client:
1495         rbd_put_client(rbd_dev);
1496         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1497 err_out_slot:
1498         list_del_init(&rbd_dev->node);
1499         mutex_unlock(&ctl_mutex);
1500
1501         kfree(rbd_dev);
1502 err_out_opt:
1503         kfree(options);
1504 err_mon_dev:
1505         kfree(mon_dev_name);
1506 err_out_mod:
1507         dout("Error adding device %s\n", buf);
1508         module_put(THIS_MODULE);
1509         return rc;
1510 }
1511
1512 static struct rbd_device *__rbd_get_dev(unsigned long id)
1513 {
1514         struct list_head *tmp;
1515         struct rbd_device *rbd_dev;
1516
1517         list_for_each(tmp, &rbd_dev_list) {
1518                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1519                 if (rbd_dev->id == id)
1520                         return rbd_dev;
1521         }
1522         return NULL;
1523 }
1524
1525 static ssize_t class_rbd_remove(struct class *c,
1526                                 struct class_attribute *attr,
1527                                 const char *buf,
1528                                 size_t count)
1529 {
1530         struct rbd_device *rbd_dev = NULL;
1531         int target_id, rc;
1532         unsigned long ul;
1533
1534         rc = strict_strtoul(buf, 10, &ul);
1535         if (rc)
1536                 return rc;
1537
1538         /* convert to int; abort if we lost anything in the conversion */
1539         target_id = (int) ul;
1540         if (target_id != ul)
1541                 return -EINVAL;
1542
1543         /* remove object from list immediately */
1544         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1545
1546         rbd_dev = __rbd_get_dev(target_id);
1547         if (rbd_dev)
1548                 list_del_init(&rbd_dev->node);
1549
1550         mutex_unlock(&ctl_mutex);
1551
1552         if (!rbd_dev)
1553                 return -ENOENT;
1554
1555         rbd_put_client(rbd_dev);
1556
1557         /* clean up and free blkdev */
1558         rbd_free_disk(rbd_dev);
1559         unregister_blkdev(rbd_dev->major, rbd_dev->name);
1560         kfree(rbd_dev);
1561
1562         /* release module ref */
1563         module_put(THIS_MODULE);
1564
1565         return count;
1566 }
1567
1568 static ssize_t class_rbd_snaps_list(struct class *c,
1569                               struct class_attribute *attr,
1570                               char *data)
1571 {
1572         struct rbd_device *rbd_dev = NULL;
1573         struct list_head *tmp;
1574         struct rbd_image_header *header;
1575         int i, n = 0, max = PAGE_SIZE;
1576         int ret;
1577
1578         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1579
1580         n += snprintf(data, max, "#id\tsnap\tKB\n");
1581
1582         list_for_each(tmp, &rbd_dev_list) {
1583                 char *names, *p;
1584                 struct ceph_snap_context *snapc;
1585
1586                 rbd_dev = list_entry(tmp, struct rbd_device, node);
1587                 header = &rbd_dev->header;
1588
1589                 down_read(&header->snap_rwsem);
1590
1591                 names = header->snap_names;
1592                 snapc = header->snapc;
1593
1594                 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1595                               rbd_dev->id, RBD_SNAP_HEAD_NAME,
1596                               header->image_size >> 10,
1597                               (!rbd_dev->cur_snap ? " (*)" : ""));
1598                 if (n == max)
1599                         break;
1600
1601                 p = names;
1602                 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1603                         n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1604                               rbd_dev->id, p, header->snap_sizes[i] >> 10,
1605                               (rbd_dev->cur_snap &&
1606                                (snap_index(header, i) == rbd_dev->cur_snap) ?
1607                                " (*)" : ""));
1608                         if (n == max)
1609                                 break;
1610                 }
1611
1612                 up_read(&header->snap_rwsem);
1613         }
1614
1615
1616         ret = n;
1617         mutex_unlock(&ctl_mutex);
1618         return ret;
1619 }
1620
1621 static ssize_t class_rbd_snaps_refresh(struct class *c,
1622                                 struct class_attribute *attr,
1623                                 const char *buf,
1624                                 size_t count)
1625 {
1626         struct rbd_device *rbd_dev = NULL;
1627         int target_id, rc;
1628         unsigned long ul;
1629         int ret = count;
1630
1631         rc = strict_strtoul(buf, 10, &ul);
1632         if (rc)
1633                 return rc;
1634
1635         /* convert to int; abort if we lost anything in the conversion */
1636         target_id = (int) ul;
1637         if (target_id != ul)
1638                 return -EINVAL;
1639
1640         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1641
1642         rbd_dev = __rbd_get_dev(target_id);
1643         if (!rbd_dev) {
1644                 ret = -ENOENT;
1645                 goto done;
1646         }
1647
1648         rc = rbd_update_snaps(rbd_dev);
1649         if (rc < 0)
1650                 ret = rc;
1651
1652 done:
1653         mutex_unlock(&ctl_mutex);
1654         return ret;
1655 }
1656
1657 static ssize_t class_rbd_snap_create(struct class *c,
1658                                 struct class_attribute *attr,
1659                                 const char *buf,
1660                                 size_t count)
1661 {
1662         struct rbd_device *rbd_dev = NULL;
1663         int target_id, ret;
1664         char *name;
1665
1666         name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1667         if (!name)
1668                 return -ENOMEM;
1669
1670         /* parse snaps add command */
1671         if (sscanf(buf, "%d "
1672                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1673                    &target_id,
1674                    name) != 2) {
1675                 ret = -EINVAL;
1676                 goto done;
1677         }
1678
1679         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1680
1681         rbd_dev = __rbd_get_dev(target_id);
1682         if (!rbd_dev) {
1683                 ret = -ENOENT;
1684                 goto done_unlock;
1685         }
1686
1687         ret = rbd_header_add_snap(rbd_dev,
1688                                   name, GFP_KERNEL);
1689         if (ret < 0)
1690                 goto done_unlock;
1691
1692         ret = rbd_update_snaps(rbd_dev);
1693         if (ret < 0)
1694                 goto done_unlock;
1695
1696         ret = count;
1697 done_unlock:
1698         mutex_unlock(&ctl_mutex);
1699 done:
1700         kfree(name);
1701         return ret;
1702 }
1703
1704 static ssize_t class_rbd_rollback(struct class *c,
1705                                 struct class_attribute *attr,
1706                                 const char *buf,
1707                                 size_t count)
1708 {
1709         struct rbd_device *rbd_dev = NULL;
1710         int target_id, ret;
1711         u64 snapid;
1712         char snap_name[RBD_MAX_SNAP_NAME_LEN];
1713         u64 cur_ofs;
1714         char *seg_name;
1715
1716         /* parse snaps add command */
1717         if (sscanf(buf, "%d "
1718                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1719                    &target_id,
1720                    snap_name) != 2) {
1721                 return -EINVAL;
1722         }
1723
1724         ret = -ENOMEM;
1725         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1726         if (!seg_name)
1727                 return ret;
1728
1729         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1730
1731         rbd_dev = __rbd_get_dev(target_id);
1732         if (!rbd_dev) {
1733                 ret = -ENOENT;
1734                 goto done_unlock;
1735         }
1736
1737         ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1738         if (ret < 0)
1739                 goto done_unlock;
1740
1741         dout("snapid=%lld\n", snapid);
1742
1743         cur_ofs = 0;
1744         while (cur_ofs < rbd_dev->header.image_size) {
1745                 cur_ofs += rbd_get_segment(&rbd_dev->header,
1746                                            rbd_dev->obj,
1747                                            cur_ofs, (u64)-1,
1748                                            seg_name, NULL);
1749                 dout("seg_name=%s\n", seg_name);
1750
1751                 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1752                 if (ret < 0)
1753                         pr_warning("could not roll back obj %s err=%d\n",
1754                                    seg_name, ret);
1755         }
1756
1757         ret = rbd_update_snaps(rbd_dev);
1758         if (ret < 0)
1759                 goto done_unlock;
1760
1761         ret = count;
1762
1763 done_unlock:
1764         mutex_unlock(&ctl_mutex);
1765         kfree(seg_name);
1766
1767         return ret;
1768 }
1769
1770 static struct class_attribute class_rbd_attrs[] = {
1771         __ATTR(add,             0200, NULL, class_rbd_add),
1772         __ATTR(remove,          0200, NULL, class_rbd_remove),
1773         __ATTR(list,            0444, class_rbd_list, NULL),
1774         __ATTR(snaps_refresh,   0200, NULL, class_rbd_snaps_refresh),
1775         __ATTR(snap_create,     0200, NULL, class_rbd_snap_create),
1776         __ATTR(snaps_list,      0444, class_rbd_snaps_list, NULL),
1777         __ATTR(snap_rollback,   0200, NULL, class_rbd_rollback),
1778         __ATTR_NULL
1779 };
1780
1781 /*
1782  * create control files in sysfs
1783  * /sys/class/rbd/...
1784  */
1785 static int rbd_sysfs_init(void)
1786 {
1787         int ret = -ENOMEM;
1788
1789         class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1790         if (!class_rbd)
1791                 goto out;
1792
1793         class_rbd->name = DRV_NAME;
1794         class_rbd->owner = THIS_MODULE;
1795         class_rbd->class_release = class_rbd_release;
1796         class_rbd->class_attrs = class_rbd_attrs;
1797
1798         ret = class_register(class_rbd);
1799         if (ret)
1800                 goto out_class;
1801         return 0;
1802
1803 out_class:
1804         kfree(class_rbd);
1805         class_rbd = NULL;
1806         pr_err(DRV_NAME ": failed to create class rbd\n");
1807 out:
1808         return ret;
1809 }
1810
1811 static void rbd_sysfs_cleanup(void)
1812 {
1813         if (class_rbd)
1814                 class_destroy(class_rbd);
1815         class_rbd = NULL;
1816 }
1817
1818 int __init rbd_init(void)
1819 {
1820         int rc;
1821
1822         rc = rbd_sysfs_init();
1823         if (rc)
1824                 return rc;
1825         spin_lock_init(&node_lock);
1826         pr_info("loaded " DRV_NAME_LONG "\n");
1827         return 0;
1828 }
1829
1830 void __exit rbd_exit(void)
1831 {
1832         rbd_sysfs_cleanup();
1833 }
1834
1835 module_init(rbd_init);
1836 module_exit(rbd_exit);
1837
1838 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1839 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1840 MODULE_DESCRIPTION("rados block device");
1841
1842 /* following authorship retained from original osdblk.c */
1843 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1844
1845 MODULE_LICENSE("GPL");