2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DRV_NAME "rbd"
45 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
57 * An RBD device name will be "rbd#", where the "rbd" comes from
58 * RBD_DRV_NAME above, and # is a unique integer identifier.
59 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
60 * enough to hold all possible device names.
62 #define DEV_NAME_LEN 32
63 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
65 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
68 * block device image metadata (in-memory version)
70 struct rbd_image_header {
76 struct rw_semaphore snap_rwsem;
77 struct ceph_snap_context *snapc;
78 size_t snap_names_len;
93 * an instance of the client. multiple devices may share an rbd client.
96 struct ceph_client *client;
97 struct rbd_options *rbd_opts;
99 struct list_head node;
103 * a request completion status
105 struct rbd_req_status {
112 * a collection of requests
114 struct rbd_req_coll {
118 struct rbd_req_status status[0];
122 * a single io request
125 struct request *rq; /* blk layer request */
126 struct bio *bio; /* cloned bio */
127 struct page **pages; /* list of used pages */
130 struct rbd_req_coll *coll;
137 struct list_head node;
145 int id; /* blkdev unique id */
147 int major; /* blkdev assigned major */
148 struct gendisk *disk; /* blkdev's gendisk and rq */
149 struct request_queue *q;
151 struct rbd_client *rbd_client;
153 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
155 spinlock_t lock; /* queue lock */
157 struct rbd_image_header header;
158 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
160 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
161 char pool_name[RBD_MAX_POOL_NAME_LEN];
164 struct ceph_osd_event *watch_event;
165 struct ceph_osd_request *watch_request;
167 char snap_name[RBD_MAX_SNAP_NAME_LEN];
168 u32 cur_snap; /* index+1 of current snapshot within snap context
172 struct list_head node;
174 /* list of snapshots */
175 struct list_head snaps;
181 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
183 static LIST_HEAD(rbd_dev_list); /* devices */
184 static DEFINE_SPINLOCK(rbd_dev_list_lock);
186 static LIST_HEAD(rbd_client_list); /* clients */
187 static DEFINE_SPINLOCK(rbd_client_list_lock);
189 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
190 static void rbd_dev_release(struct device *dev);
191 static ssize_t rbd_snap_add(struct device *dev,
192 struct device_attribute *attr,
195 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
196 struct rbd_snap *snap);
198 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
200 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
203 static struct bus_attribute rbd_bus_attrs[] = {
204 __ATTR(add, S_IWUSR, NULL, rbd_add),
205 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
209 static struct bus_type rbd_bus_type = {
211 .bus_attrs = rbd_bus_attrs,
214 static void rbd_root_dev_release(struct device *dev)
218 static struct device rbd_root_dev = {
220 .release = rbd_root_dev_release,
224 static struct rbd_device *dev_to_rbd(struct device *dev)
226 return container_of(dev, struct rbd_device, dev);
229 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
231 return get_device(&rbd_dev->dev);
234 static void rbd_put_dev(struct rbd_device *rbd_dev)
236 put_device(&rbd_dev->dev);
239 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
241 static int rbd_open(struct block_device *bdev, fmode_t mode)
243 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
245 rbd_get_dev(rbd_dev);
247 set_device_ro(bdev, rbd_dev->read_only);
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255 static int rbd_release(struct gendisk *disk, fmode_t mode)
257 struct rbd_device *rbd_dev = disk->private_data;
259 rbd_put_dev(rbd_dev);
264 static const struct block_device_operations rbd_bd_ops = {
265 .owner = THIS_MODULE,
267 .release = rbd_release,
271 * Initialize an rbd client instance.
274 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
275 struct rbd_options *rbd_opts)
277 struct rbd_client *rbdc;
280 dout("rbd_client_create\n");
281 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 kref_init(&rbdc->kref);
286 INIT_LIST_HEAD(&rbdc->node);
288 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
290 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
291 if (IS_ERR(rbdc->client))
293 opt = NULL; /* Now rbdc->client is responsible for opt */
295 ret = ceph_open_session(rbdc->client);
299 rbdc->rbd_opts = rbd_opts;
301 spin_lock(&rbd_client_list_lock);
302 list_add_tail(&rbdc->node, &rbd_client_list);
303 spin_unlock(&rbd_client_list_lock);
305 mutex_unlock(&ctl_mutex);
307 dout("rbd_client_create created %p\n", rbdc);
311 ceph_destroy_client(rbdc->client);
313 mutex_unlock(&ctl_mutex);
317 ceph_destroy_options(opt);
322 * Find a ceph client with specific addr and configuration.
324 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
326 struct rbd_client *client_node;
328 if (opt->flags & CEPH_OPT_NOSHARE)
331 list_for_each_entry(client_node, &rbd_client_list, node)
332 if (ceph_compare_options(opt, client_node->client) == 0)
345 /* string args above */
348 static match_table_t rbdopt_tokens = {
349 {Opt_notify_timeout, "notify_timeout=%d"},
351 /* string args above */
355 static int parse_rbd_opts_token(char *c, void *private)
357 struct rbd_options *rbdopt = private;
358 substring_t argstr[MAX_OPT_ARGS];
359 int token, intval, ret;
361 token = match_token(c, rbdopt_tokens, argstr);
365 if (token < Opt_last_int) {
366 ret = match_int(&argstr[0], &intval);
368 pr_err("bad mount option arg (not int) "
372 dout("got int token %d val %d\n", token, intval);
373 } else if (token > Opt_last_int && token < Opt_last_string) {
374 dout("got string token %d val %s\n", token,
377 dout("got token %d\n", token);
381 case Opt_notify_timeout:
382 rbdopt->notify_timeout = intval;
391 * Get a ceph client with specific addr and configuration, if one does
392 * not exist create it.
394 static struct rbd_client *rbd_get_client(const char *mon_addr,
398 struct rbd_client *rbdc;
399 struct ceph_options *opt;
400 struct rbd_options *rbd_opts;
402 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
404 return ERR_PTR(-ENOMEM);
406 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
408 opt = ceph_parse_options(options, mon_addr,
409 mon_addr + mon_addr_len,
410 parse_rbd_opts_token, rbd_opts);
413 return ERR_CAST(opt);
416 spin_lock(&rbd_client_list_lock);
417 rbdc = __rbd_client_find(opt);
419 /* using an existing client */
420 kref_get(&rbdc->kref);
421 spin_unlock(&rbd_client_list_lock);
423 ceph_destroy_options(opt);
428 spin_unlock(&rbd_client_list_lock);
430 rbdc = rbd_client_create(opt, rbd_opts);
439 * Destroy ceph client
441 * Caller must hold rbd_client_list_lock.
443 static void rbd_client_release(struct kref *kref)
445 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
447 dout("rbd_release_client %p\n", rbdc);
448 list_del(&rbdc->node);
450 ceph_destroy_client(rbdc->client);
451 kfree(rbdc->rbd_opts);
456 * Drop reference to ceph client node. If it's not referenced anymore, release
459 static void rbd_put_client(struct rbd_device *rbd_dev)
461 spin_lock(&rbd_client_list_lock);
462 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
463 spin_unlock(&rbd_client_list_lock);
464 rbd_dev->rbd_client = NULL;
468 * Destroy requests collection
470 static void rbd_coll_release(struct kref *kref)
472 struct rbd_req_coll *coll =
473 container_of(kref, struct rbd_req_coll, kref);
475 dout("rbd_coll_release %p\n", coll);
480 * Create a new header structure, translate header format from the on-disk
483 static int rbd_header_from_disk(struct rbd_image_header *header,
484 struct rbd_image_header_ondisk *ondisk,
489 u32 snap_count = le32_to_cpu(ondisk->snap_count);
492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 init_rwsem(&header->snap_rwsem);
496 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
497 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
498 snap_count * sizeof (*ondisk),
503 header->snap_names = kmalloc(header->snap_names_len,
505 if (!header->snap_names)
507 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
509 if (!header->snap_sizes)
512 header->snap_names = NULL;
513 header->snap_sizes = NULL;
515 memcpy(header->block_name, ondisk->block_name,
516 sizeof(ondisk->block_name));
518 header->image_size = le64_to_cpu(ondisk->image_size);
519 header->obj_order = ondisk->options.order;
520 header->crypt_type = ondisk->options.crypt_type;
521 header->comp_type = ondisk->options.comp_type;
523 atomic_set(&header->snapc->nref, 1);
524 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
525 header->snapc->num_snaps = snap_count;
526 header->total_snaps = snap_count;
528 if (snap_count && allocated_snaps == snap_count) {
529 for (i = 0; i < snap_count; i++) {
530 header->snapc->snaps[i] =
531 le64_to_cpu(ondisk->snaps[i].id);
532 header->snap_sizes[i] =
533 le64_to_cpu(ondisk->snaps[i].image_size);
536 /* copy snapshot names */
537 memcpy(header->snap_names, &ondisk->snaps[i],
538 header->snap_names_len);
544 kfree(header->snap_names);
546 kfree(header->snapc);
550 static int snap_index(struct rbd_image_header *header, int snap_num)
552 return header->total_snaps - snap_num;
555 static u64 cur_snap_id(struct rbd_device *rbd_dev)
557 struct rbd_image_header *header = &rbd_dev->header;
559 if (!rbd_dev->cur_snap)
562 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
565 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
569 char *p = header->snap_names;
571 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
572 if (strcmp(snap_name, p) == 0)
575 if (i == header->total_snaps)
578 *seq = header->snapc->snaps[i];
581 *size = header->snap_sizes[i];
586 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
588 struct rbd_image_header *header = &dev->header;
589 struct ceph_snap_context *snapc = header->snapc;
592 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
594 down_write(&header->snap_rwsem);
596 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
597 sizeof (RBD_SNAP_HEAD_NAME))) {
598 if (header->total_snaps)
599 snapc->seq = header->snap_seq;
605 *size = header->image_size;
607 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
611 dev->cur_snap = header->total_snaps - ret;
617 up_write(&header->snap_rwsem);
621 static void rbd_header_free(struct rbd_image_header *header)
623 kfree(header->snapc);
624 kfree(header->snap_names);
625 kfree(header->snap_sizes);
629 * get the actual striped segment name, offset and length
631 static u64 rbd_get_segment(struct rbd_image_header *header,
632 const char *block_name,
634 char *seg_name, u64 *segofs)
636 u64 seg = ofs >> header->obj_order;
639 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
640 "%s.%012llx", block_name, seg);
642 ofs = ofs & ((1 << header->obj_order) - 1);
643 len = min_t(u64, len, (1 << header->obj_order) - ofs);
651 static int rbd_get_num_segments(struct rbd_image_header *header,
654 u64 start_seg = ofs >> header->obj_order;
655 u64 end_seg = (ofs + len - 1) >> header->obj_order;
656 return end_seg - start_seg + 1;
660 * returns the size of an object in the image
662 static u64 rbd_obj_bytes(struct rbd_image_header *header)
664 return 1 << header->obj_order;
671 static void bio_chain_put(struct bio *chain)
677 chain = chain->bi_next;
683 * zeros a bio chain, starting at specific offset
685 static void zero_bio_chain(struct bio *chain, int start_ofs)
694 bio_for_each_segment(bv, chain, i) {
695 if (pos + bv->bv_len > start_ofs) {
696 int remainder = max(start_ofs - pos, 0);
697 buf = bvec_kmap_irq(bv, &flags);
698 memset(buf + remainder, 0,
699 bv->bv_len - remainder);
700 bvec_kunmap_irq(buf, &flags);
705 chain = chain->bi_next;
710 * bio_chain_clone - clone a chain of bios up to a certain length.
711 * might return a bio_pair that will need to be released.
713 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
714 struct bio_pair **bp,
715 int len, gfp_t gfpmask)
717 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
721 bio_pair_release(*bp);
725 while (old_chain && (total < len)) {
726 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
730 if (total + old_chain->bi_size > len) {
734 * this split can only happen with a single paged bio,
735 * split_bio will BUG_ON if this is not the case
737 dout("bio_chain_clone split! total=%d remaining=%d"
739 (int)total, (int)len-total,
740 (int)old_chain->bi_size);
742 /* split the bio. We'll release it either in the next
743 call, or it will have to be released outside */
744 bp = bio_split(old_chain, (len - total) / 512ULL);
748 __bio_clone(tmp, &bp->bio1);
752 __bio_clone(tmp, old_chain);
753 *next = old_chain->bi_next;
757 gfpmask &= ~__GFP_WAIT;
761 new_chain = tail = tmp;
766 old_chain = old_chain->bi_next;
768 total += tmp->bi_size;
774 tail->bi_next = NULL;
781 dout("bio_chain_clone with err\n");
782 bio_chain_put(new_chain);
787 * helpers for osd request op vectors.
789 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
794 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
798 (*ops)[0].op = opcode;
800 * op extent offset and length will be set later on
801 * in calc_raw_layout()
803 (*ops)[0].payload_len = payload_len;
807 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812 static void rbd_coll_end_req_index(struct request *rq,
813 struct rbd_req_coll *coll,
817 struct request_queue *q;
820 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
821 coll, index, ret, len);
827 blk_end_request(rq, ret, len);
833 spin_lock_irq(q->queue_lock);
834 coll->status[index].done = 1;
835 coll->status[index].rc = ret;
836 coll->status[index].bytes = len;
837 max = min = coll->num_done;
838 while (max < coll->total && coll->status[max].done)
841 for (i = min; i<max; i++) {
842 __blk_end_request(rq, coll->status[i].rc,
843 coll->status[i].bytes);
845 kref_put(&coll->kref, rbd_coll_release);
847 spin_unlock_irq(q->queue_lock);
850 static void rbd_coll_end_req(struct rbd_request *req,
853 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
857 * Send ceph osd request
859 static int rbd_do_request(struct request *rq,
860 struct rbd_device *dev,
861 struct ceph_snap_context *snapc,
863 const char *obj, u64 ofs, u64 len,
868 struct ceph_osd_req_op *ops,
870 struct rbd_req_coll *coll,
872 void (*rbd_cb)(struct ceph_osd_request *req,
873 struct ceph_msg *msg),
874 struct ceph_osd_request **linger_req,
877 struct ceph_osd_request *req;
878 struct ceph_file_layout *layout;
881 struct timespec mtime = CURRENT_TIME;
882 struct rbd_request *req_data;
883 struct ceph_osd_request_head *reqhead;
884 struct rbd_image_header *header = &dev->header;
885 struct ceph_osd_client *osdc;
887 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
890 rbd_coll_end_req_index(rq, coll, coll_index,
896 req_data->coll = coll;
897 req_data->coll_index = coll_index;
900 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
902 down_read(&header->snap_rwsem);
904 osdc = &dev->rbd_client->client->osdc;
905 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
906 false, GFP_NOIO, pages, bio);
908 up_read(&header->snap_rwsem);
913 req->r_callback = rbd_cb;
917 req_data->pages = pages;
920 req->r_priv = req_data;
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
925 strncpy(req->r_oid, obj, sizeof(req->r_oid));
926 req->r_oid_len = strlen(req->r_oid);
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
933 layout->fl_pg_preferred = cpu_to_le32(-1);
934 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
935 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
938 ceph_osdc_build_request(req, ofs, &len,
942 req->r_oid, req->r_oid_len);
943 up_read(&header->snap_rwsem);
946 ceph_osdc_set_request_linger(osdc, req);
950 ret = ceph_osdc_start_request(osdc, req, false);
955 ret = ceph_osdc_wait_request(osdc, req);
957 *ver = le64_to_cpu(req->r_reassert_version.version);
958 dout("reassert_ver=%lld\n",
959 le64_to_cpu(req->r_reassert_version.version));
960 ceph_osdc_put_request(req);
965 bio_chain_put(req_data->bio);
966 ceph_osdc_put_request(req);
968 rbd_coll_end_req(req_data, ret, len);
974 * Ceph osd op callback
976 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
978 struct rbd_request *req_data = req->r_priv;
979 struct ceph_osd_reply_head *replyhead;
980 struct ceph_osd_op *op;
986 replyhead = msg->front.iov_base;
987 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
988 op = (void *)(replyhead + 1);
989 rc = le32_to_cpu(replyhead->result);
990 bytes = le64_to_cpu(op->extent.length);
991 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
993 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
995 if (rc == -ENOENT && read_op) {
996 zero_bio_chain(req_data->bio, 0);
998 } else if (rc == 0 && read_op && bytes < req_data->len) {
999 zero_bio_chain(req_data->bio, bytes);
1000 bytes = req_data->len;
1003 rbd_coll_end_req(req_data, rc, bytes);
1006 bio_chain_put(req_data->bio);
1008 ceph_osdc_put_request(req);
1012 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1014 ceph_osdc_put_request(req);
1018 * Do a synchronous ceph osd operation
1020 static int rbd_req_sync_op(struct rbd_device *dev,
1021 struct ceph_snap_context *snapc,
1025 struct ceph_osd_req_op *orig_ops,
1030 struct ceph_osd_request **linger_req,
1034 struct page **pages;
1036 struct ceph_osd_req_op *ops = orig_ops;
1039 num_pages = calc_pages_for(ofs , len);
1040 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1042 return PTR_ERR(pages);
1045 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1046 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1050 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1051 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1057 ret = rbd_do_request(NULL, dev, snapc, snapid,
1058 obj, ofs, len, NULL,
1069 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1074 rbd_destroy_ops(ops);
1076 ceph_release_page_vector(pages, num_pages);
1081 * Do an asynchronous ceph osd operation
1083 static int rbd_do_op(struct request *rq,
1084 struct rbd_device *rbd_dev ,
1085 struct ceph_snap_context *snapc,
1087 int opcode, int flags, int num_reply,
1090 struct rbd_req_coll *coll,
1097 struct ceph_osd_req_op *ops;
1100 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1104 seg_len = rbd_get_segment(&rbd_dev->header,
1105 rbd_dev->header.block_name,
1107 seg_name, &seg_ofs);
1109 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1111 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1115 /* we've taken care of segment sizes earlier when we
1116 cloned the bios. We should never have a segment
1117 truncated at this point */
1118 BUG_ON(seg_len < len);
1120 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1121 seg_name, seg_ofs, seg_len,
1128 rbd_req_cb, 0, NULL);
1130 rbd_destroy_ops(ops);
1137 * Request async osd write
1139 static int rbd_req_write(struct request *rq,
1140 struct rbd_device *rbd_dev,
1141 struct ceph_snap_context *snapc,
1144 struct rbd_req_coll *coll,
1147 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1149 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1151 ofs, len, bio, coll, coll_index);
1155 * Request async osd read
1157 static int rbd_req_read(struct request *rq,
1158 struct rbd_device *rbd_dev,
1162 struct rbd_req_coll *coll,
1165 return rbd_do_op(rq, rbd_dev, NULL,
1166 (snapid ? snapid : CEPH_NOSNAP),
1170 ofs, len, bio, coll, coll_index);
1174 * Request sync osd read
1176 static int rbd_req_sync_read(struct rbd_device *dev,
1177 struct ceph_snap_context *snapc,
1184 return rbd_req_sync_op(dev, NULL,
1185 (snapid ? snapid : CEPH_NOSNAP),
1189 1, obj, ofs, len, buf, NULL, ver);
1193 * Request sync osd watch
1195 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1200 struct ceph_osd_req_op *ops;
1201 struct page **pages = NULL;
1204 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1208 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1209 ops[0].watch.cookie = notify_id;
1210 ops[0].watch.flag = 0;
1212 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1219 rbd_simple_req_cb, 0, NULL);
1221 rbd_destroy_ops(ops);
1225 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1227 struct rbd_device *dev = (struct rbd_device *)data;
1233 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1234 notify_id, (int)opcode);
1235 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1236 rc = __rbd_update_snaps(dev);
1237 mutex_unlock(&ctl_mutex);
1239 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1240 " update snaps: %d\n", dev->major, rc);
1242 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1246 * Request sync osd watch
1248 static int rbd_req_sync_watch(struct rbd_device *dev,
1252 struct ceph_osd_req_op *ops;
1253 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1255 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1259 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1260 (void *)dev, &dev->watch_event);
1264 ops[0].watch.ver = cpu_to_le64(ver);
1265 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1266 ops[0].watch.flag = 1;
1268 ret = rbd_req_sync_op(dev, NULL,
1271 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1274 &dev->watch_request, NULL);
1279 rbd_destroy_ops(ops);
1283 ceph_osdc_cancel_event(dev->watch_event);
1284 dev->watch_event = NULL;
1286 rbd_destroy_ops(ops);
1291 * Request sync osd unwatch
1293 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1296 struct ceph_osd_req_op *ops;
1298 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1302 ops[0].watch.ver = 0;
1303 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1304 ops[0].watch.flag = 0;
1306 ret = rbd_req_sync_op(dev, NULL,
1309 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1311 1, obj, 0, 0, NULL, NULL, NULL);
1313 rbd_destroy_ops(ops);
1314 ceph_osdc_cancel_event(dev->watch_event);
1315 dev->watch_event = NULL;
1319 struct rbd_notify_info {
1320 struct rbd_device *dev;
1323 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1325 struct rbd_device *dev = (struct rbd_device *)data;
1329 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1330 notify_id, (int)opcode);
1334 * Request sync osd notify
1336 static int rbd_req_sync_notify(struct rbd_device *dev,
1339 struct ceph_osd_req_op *ops;
1340 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1341 struct ceph_osd_event *event;
1342 struct rbd_notify_info info;
1343 int payload_len = sizeof(u32) + sizeof(u32);
1346 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1352 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1353 (void *)&info, &event);
1357 ops[0].watch.ver = 1;
1358 ops[0].watch.flag = 1;
1359 ops[0].watch.cookie = event->cookie;
1360 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1361 ops[0].watch.timeout = 12;
1363 ret = rbd_req_sync_op(dev, NULL,
1366 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1368 1, obj, 0, 0, NULL, NULL, NULL);
1372 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1373 dout("ceph_osdc_wait_event returned %d\n", ret);
1374 rbd_destroy_ops(ops);
1378 ceph_osdc_cancel_event(event);
1380 rbd_destroy_ops(ops);
1385 * Request sync osd read
1387 static int rbd_req_sync_exec(struct rbd_device *dev,
1395 struct ceph_osd_req_op *ops;
1396 int cls_len = strlen(cls);
1397 int method_len = strlen(method);
1398 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1399 cls_len + method_len + len);
1403 ops[0].cls.class_name = cls;
1404 ops[0].cls.class_len = (__u8)cls_len;
1405 ops[0].cls.method_name = method;
1406 ops[0].cls.method_len = (__u8)method_len;
1407 ops[0].cls.argc = 0;
1408 ops[0].cls.indata = data;
1409 ops[0].cls.indata_len = len;
1411 ret = rbd_req_sync_op(dev, NULL,
1414 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1416 1, obj, 0, 0, NULL, NULL, ver);
1418 rbd_destroy_ops(ops);
1420 dout("cls_exec returned %d\n", ret);
1424 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1426 struct rbd_req_coll *coll =
1427 kzalloc(sizeof(struct rbd_req_coll) +
1428 sizeof(struct rbd_req_status) * num_reqs,
1433 coll->total = num_reqs;
1434 kref_init(&coll->kref);
1439 * block device queue callback
1441 static void rbd_rq_fn(struct request_queue *q)
1443 struct rbd_device *rbd_dev = q->queuedata;
1445 struct bio_pair *bp = NULL;
1447 rq = blk_fetch_request(q);
1451 struct bio *rq_bio, *next_bio = NULL;
1453 int size, op_size = 0;
1455 int num_segs, cur_seg = 0;
1456 struct rbd_req_coll *coll;
1458 /* peek at request from block layer */
1462 dout("fetched request\n");
1464 /* filter out block requests we don't understand */
1465 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466 __blk_end_request_all(rq, 0);
1470 /* deduce our operation (read, write) */
1471 do_write = (rq_data_dir(rq) == WRITE);
1473 size = blk_rq_bytes(rq);
1474 ofs = blk_rq_pos(rq) * 512ULL;
1476 if (do_write && rbd_dev->read_only) {
1477 __blk_end_request_all(rq, -EROFS);
1481 spin_unlock_irq(q->queue_lock);
1483 dout("%s 0x%x bytes at 0x%llx\n",
1484 do_write ? "write" : "read",
1485 size, blk_rq_pos(rq) * 512ULL);
1487 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1488 coll = rbd_alloc_coll(num_segs);
1490 spin_lock_irq(q->queue_lock);
1491 __blk_end_request_all(rq, -ENOMEM);
1496 /* a bio clone to be passed down to OSD req */
1497 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1498 op_size = rbd_get_segment(&rbd_dev->header,
1499 rbd_dev->header.block_name,
1502 kref_get(&coll->kref);
1503 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1504 op_size, GFP_ATOMIC);
1506 rbd_coll_end_req_index(rq, coll, cur_seg,
1512 /* init OSD command: write or read */
1514 rbd_req_write(rq, rbd_dev,
1515 rbd_dev->header.snapc,
1520 rbd_req_read(rq, rbd_dev,
1521 cur_snap_id(rbd_dev),
1533 kref_put(&coll->kref, rbd_coll_release);
1536 bio_pair_release(bp);
1537 spin_lock_irq(q->queue_lock);
1539 rq = blk_fetch_request(q);
1544 * a queue callback. Makes sure that we don't create a bio that spans across
1545 * multiple osd objects. One exception would be with a single page bios,
1546 * which we handle later at bio_chain_clone
1548 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1549 struct bio_vec *bvec)
1551 struct rbd_device *rbd_dev = q->queuedata;
1552 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1553 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1554 unsigned int bio_sectors = bmd->bi_size >> 9;
1557 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1558 + bio_sectors)) << 9;
1560 max = 0; /* bio_add cannot handle a negative return */
1561 if (max <= bvec->bv_len && bio_sectors == 0)
1562 return bvec->bv_len;
1566 static void rbd_free_disk(struct rbd_device *rbd_dev)
1568 struct gendisk *disk = rbd_dev->disk;
1573 rbd_header_free(&rbd_dev->header);
1575 if (disk->flags & GENHD_FL_UP)
1578 blk_cleanup_queue(disk->queue);
1583 * reload the ondisk the header
1585 static int rbd_read_header(struct rbd_device *rbd_dev,
1586 struct rbd_image_header *header)
1589 struct rbd_image_header_ondisk *dh;
1591 u64 snap_names_len = 0;
1595 int len = sizeof(*dh) +
1596 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1600 dh = kmalloc(len, GFP_KERNEL);
1604 rc = rbd_req_sync_read(rbd_dev,
1606 rbd_dev->obj_md_name,
1612 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1615 pr_warning("unrecognized header format"
1616 " for image %s", rbd_dev->obj);
1621 if (snap_count != header->total_snaps) {
1622 snap_count = header->total_snaps;
1623 snap_names_len = header->snap_names_len;
1624 rbd_header_free(header);
1630 header->obj_version = ver;
1640 static int rbd_header_add_snap(struct rbd_device *dev,
1641 const char *snap_name,
1644 int name_len = strlen(snap_name);
1649 struct ceph_mon_client *monc;
1651 /* we should create a snapshot only if we're pointing at the head */
1655 monc = &dev->rbd_client->client->monc;
1656 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1657 dout("created snapid=%lld\n", new_snapid);
1661 data = kmalloc(name_len + 16, gfp_flags);
1666 e = data + name_len + 16;
1668 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1669 ceph_encode_64_safe(&p, e, new_snapid, bad);
1671 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1672 data, p - data, &ver);
1679 dev->header.snapc->seq = new_snapid;
1686 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1688 struct rbd_snap *snap;
1690 while (!list_empty(&rbd_dev->snaps)) {
1691 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1692 __rbd_remove_snap_dev(rbd_dev, snap);
1697 * only read the first part of the ondisk header, without the snaps info
1699 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1702 struct rbd_image_header h;
1706 ret = rbd_read_header(rbd_dev, &h);
1711 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1713 down_write(&rbd_dev->header.snap_rwsem);
1715 snap_seq = rbd_dev->header.snapc->seq;
1716 if (rbd_dev->header.total_snaps &&
1717 rbd_dev->header.snapc->snaps[0] == snap_seq)
1718 /* pointing at the head, will need to follow that
1722 kfree(rbd_dev->header.snapc);
1723 kfree(rbd_dev->header.snap_names);
1724 kfree(rbd_dev->header.snap_sizes);
1726 rbd_dev->header.total_snaps = h.total_snaps;
1727 rbd_dev->header.snapc = h.snapc;
1728 rbd_dev->header.snap_names = h.snap_names;
1729 rbd_dev->header.snap_names_len = h.snap_names_len;
1730 rbd_dev->header.snap_sizes = h.snap_sizes;
1732 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1734 rbd_dev->header.snapc->seq = snap_seq;
1736 ret = __rbd_init_snaps_header(rbd_dev);
1738 up_write(&rbd_dev->header.snap_rwsem);
1743 static int rbd_init_disk(struct rbd_device *rbd_dev)
1745 struct gendisk *disk;
1746 struct request_queue *q;
1750 /* contact OSD, request size info about the object being mapped */
1751 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1755 /* no need to lock here, as rbd_dev is not registered yet */
1756 rc = __rbd_init_snaps_header(rbd_dev);
1760 rc = rbd_header_set_snap(rbd_dev, &total_size);
1764 /* create gendisk info */
1766 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1770 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1772 disk->major = rbd_dev->major;
1773 disk->first_minor = 0;
1774 disk->fops = &rbd_bd_ops;
1775 disk->private_data = rbd_dev;
1779 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1783 /* set io sizes to object size */
1784 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1785 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1786 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1787 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1789 blk_queue_merge_bvec(q, rbd_merge_bvec);
1792 q->queuedata = rbd_dev;
1794 rbd_dev->disk = disk;
1797 /* finally, announce the disk to the world */
1798 set_capacity(disk, total_size / 512ULL);
1801 pr_info("%s: added with size 0x%llx\n",
1802 disk->disk_name, (unsigned long long)total_size);
1815 static ssize_t rbd_size_show(struct device *dev,
1816 struct device_attribute *attr, char *buf)
1818 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1820 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1823 static ssize_t rbd_major_show(struct device *dev,
1824 struct device_attribute *attr, char *buf)
1826 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1828 return sprintf(buf, "%d\n", rbd_dev->major);
1831 static ssize_t rbd_client_id_show(struct device *dev,
1832 struct device_attribute *attr, char *buf)
1834 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1836 return sprintf(buf, "client%lld\n",
1837 ceph_client_id(rbd_dev->rbd_client->client));
1840 static ssize_t rbd_pool_show(struct device *dev,
1841 struct device_attribute *attr, char *buf)
1843 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1845 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1848 static ssize_t rbd_name_show(struct device *dev,
1849 struct device_attribute *attr, char *buf)
1851 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1853 return sprintf(buf, "%s\n", rbd_dev->obj);
1856 static ssize_t rbd_snap_show(struct device *dev,
1857 struct device_attribute *attr,
1860 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1862 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1865 static ssize_t rbd_image_refresh(struct device *dev,
1866 struct device_attribute *attr,
1870 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1874 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1876 rc = __rbd_update_snaps(rbd_dev);
1880 mutex_unlock(&ctl_mutex);
1884 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1885 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1886 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1887 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1888 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1889 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1890 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1891 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1893 static struct attribute *rbd_attrs[] = {
1894 &dev_attr_size.attr,
1895 &dev_attr_major.attr,
1896 &dev_attr_client_id.attr,
1897 &dev_attr_pool.attr,
1898 &dev_attr_name.attr,
1899 &dev_attr_current_snap.attr,
1900 &dev_attr_refresh.attr,
1901 &dev_attr_create_snap.attr,
1905 static struct attribute_group rbd_attr_group = {
1909 static const struct attribute_group *rbd_attr_groups[] = {
1914 static void rbd_sysfs_dev_release(struct device *dev)
1918 static struct device_type rbd_device_type = {
1920 .groups = rbd_attr_groups,
1921 .release = rbd_sysfs_dev_release,
1929 static ssize_t rbd_snap_size_show(struct device *dev,
1930 struct device_attribute *attr,
1933 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935 return sprintf(buf, "%lld\n", (long long)snap->size);
1938 static ssize_t rbd_snap_id_show(struct device *dev,
1939 struct device_attribute *attr,
1942 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1944 return sprintf(buf, "%lld\n", (long long)snap->id);
1947 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1948 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1950 static struct attribute *rbd_snap_attrs[] = {
1951 &dev_attr_snap_size.attr,
1952 &dev_attr_snap_id.attr,
1956 static struct attribute_group rbd_snap_attr_group = {
1957 .attrs = rbd_snap_attrs,
1960 static void rbd_snap_dev_release(struct device *dev)
1962 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1967 static const struct attribute_group *rbd_snap_attr_groups[] = {
1968 &rbd_snap_attr_group,
1972 static struct device_type rbd_snap_device_type = {
1973 .groups = rbd_snap_attr_groups,
1974 .release = rbd_snap_dev_release,
1977 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1978 struct rbd_snap *snap)
1980 list_del(&snap->node);
1981 device_unregister(&snap->dev);
1984 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1985 struct rbd_snap *snap,
1986 struct device *parent)
1988 struct device *dev = &snap->dev;
1991 dev->type = &rbd_snap_device_type;
1992 dev->parent = parent;
1993 dev->release = rbd_snap_dev_release;
1994 dev_set_name(dev, "snap_%s", snap->name);
1995 ret = device_register(dev);
2000 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2001 int i, const char *name,
2002 struct rbd_snap **snapp)
2005 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2008 snap->name = kstrdup(name, GFP_KERNEL);
2009 snap->size = rbd_dev->header.snap_sizes[i];
2010 snap->id = rbd_dev->header.snapc->snaps[i];
2011 if (device_is_registered(&rbd_dev->dev)) {
2012 ret = rbd_register_snap_dev(rbd_dev, snap,
2026 * search for the previous snap in a null delimited string list
2028 const char *rbd_prev_snap_name(const char *name, const char *start)
2030 if (name < start + 2)
2043 * compare the old list of snapshots that we have to what's in the header
2044 * and update it accordingly. Note that the header holds the snapshots
2045 * in a reverse order (from newest to oldest) and we need to go from
2046 * older to new so that we don't get a duplicate snap name when
2047 * doing the process (e.g., removed snapshot and recreated a new
2048 * one with the same name.
2050 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2052 const char *name, *first_name;
2053 int i = rbd_dev->header.total_snaps;
2054 struct rbd_snap *snap, *old_snap = NULL;
2056 struct list_head *p, *n;
2058 first_name = rbd_dev->header.snap_names;
2059 name = first_name + rbd_dev->header.snap_names_len;
2061 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2064 old_snap = list_entry(p, struct rbd_snap, node);
2067 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2069 if (!i || old_snap->id < cur_id) {
2070 /* old_snap->id was skipped, thus was removed */
2071 __rbd_remove_snap_dev(rbd_dev, old_snap);
2074 if (old_snap->id == cur_id) {
2075 /* we have this snapshot already */
2077 name = rbd_prev_snap_name(name, first_name);
2081 i--, name = rbd_prev_snap_name(name, first_name)) {
2086 cur_id = rbd_dev->header.snapc->snaps[i];
2087 /* snapshot removal? handle it above */
2088 if (cur_id >= old_snap->id)
2090 /* a new snapshot */
2091 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2095 /* note that we add it backward so using n and not p */
2096 list_add(&snap->node, n);
2100 /* we're done going over the old snap list, just add what's left */
2101 for (; i > 0; i--) {
2102 name = rbd_prev_snap_name(name, first_name);
2107 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2110 list_add(&snap->node, &rbd_dev->snaps);
2116 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2120 struct rbd_snap *snap;
2122 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2123 dev = &rbd_dev->dev;
2125 dev->bus = &rbd_bus_type;
2126 dev->type = &rbd_device_type;
2127 dev->parent = &rbd_root_dev;
2128 dev->release = rbd_dev_release;
2129 dev_set_name(dev, "%d", rbd_dev->id);
2130 ret = device_register(dev);
2134 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2135 ret = rbd_register_snap_dev(rbd_dev, snap,
2141 mutex_unlock(&ctl_mutex);
2145 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2147 device_unregister(&rbd_dev->dev);
2150 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2155 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2156 rbd_dev->header.obj_version);
2157 if (ret == -ERANGE) {
2158 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2159 rc = __rbd_update_snaps(rbd_dev);
2160 mutex_unlock(&ctl_mutex);
2164 } while (ret == -ERANGE);
2169 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2172 * Get a unique rbd identifier for the given new rbd_dev, and add
2173 * the rbd_dev to the global list. The minimum rbd id is 1.
2175 static void rbd_id_get(struct rbd_device *rbd_dev)
2177 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2179 spin_lock(&rbd_dev_list_lock);
2180 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2181 spin_unlock(&rbd_dev_list_lock);
2185 * Remove an rbd_dev from the global list, and record that its
2186 * identifier is no longer in use.
2188 static void rbd_id_put(struct rbd_device *rbd_dev)
2190 struct list_head *tmp;
2191 int rbd_id = rbd_dev->id;
2196 spin_lock(&rbd_dev_list_lock);
2197 list_del_init(&rbd_dev->node);
2200 * If the id being "put" is not the current maximum, there
2201 * is nothing special we need to do.
2203 if (rbd_id != atomic64_read(&rbd_id_max)) {
2204 spin_unlock(&rbd_dev_list_lock);
2209 * We need to update the current maximum id. Search the
2210 * list to find out what it is. We're more likely to find
2211 * the maximum at the end, so search the list backward.
2214 list_for_each_prev(tmp, &rbd_dev_list) {
2215 struct rbd_device *rbd_dev;
2217 rbd_dev = list_entry(tmp, struct rbd_device, node);
2218 if (rbd_id > max_id)
2221 spin_unlock(&rbd_dev_list_lock);
2224 * The max id could have been updated by rbd_id_get(), in
2225 * which case it now accurately reflects the new maximum.
2226 * Be careful not to overwrite the maximum value in that
2229 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2233 * Skips over white space at *buf, and updates *buf to point to the
2234 * first found non-space character (if any). Returns the length of
2235 * the token (string of non-white space characters) found.
2237 static inline size_t next_token(const char **buf)
2240 * These are the characters that produce nonzero for
2241 * isspace() in the "C" and "POSIX" locales.
2243 const char *spaces = " \f\n\r\t\v";
2245 *buf += strspn(*buf, spaces); /* Find start of token */
2247 return strcspn(*buf, spaces); /* Return token length */
2251 * Finds the next token in *buf, and if the provided token buffer is
2252 * big enough, copies the found token into it. The result, if
2253 * copied, is guaranteed to be terminated with '\0'.
2255 * Returns the length of the token found (not including the '\0').
2256 * Return value will be 0 if no token is found, and it will be >=
2257 * token_size if the token would not fit.
2259 * The *buf pointer will be updated point beyond the end of the
2260 * found token. Note that this occurs even if the token buffer is
2261 * too small to hold it.
2263 static inline size_t copy_token(const char **buf,
2269 len = next_token(buf);
2270 if (len < token_size) {
2271 memcpy(token, *buf, len);
2272 *(token + len) = '\0';
2280 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2281 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2282 * on the list of monitor addresses and other options provided via
2285 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2287 const char **mon_addrs,
2288 size_t *mon_addrs_size,
2290 size_t options_size)
2294 /* The first four tokens are required */
2296 len = next_token(&buf);
2299 *mon_addrs_size = len + 1;
2304 len = copy_token(&buf, options, options_size);
2305 if (!len || len >= options_size)
2308 len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
2309 if (!len || len >= sizeof (rbd_dev->pool_name))
2312 len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
2313 if (!len || len >= sizeof (rbd_dev->obj))
2316 /* We have the object length in hand, save it. */
2318 rbd_dev->obj_len = len;
2320 BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
2321 < RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
2322 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2325 * The snapshot name is optional, but it's an error if it's
2326 * too long. If no snapshot is supplied, fill in the default.
2328 len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
2330 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2331 sizeof (RBD_SNAP_HEAD_NAME));
2332 else if (len >= sizeof (rbd_dev->snap_name))
2338 static ssize_t rbd_add(struct bus_type *bus,
2342 struct rbd_device *rbd_dev;
2343 const char *mon_addrs = NULL;
2344 size_t mon_addrs_size = 0;
2345 char *options = NULL;
2346 struct ceph_osd_client *osdc;
2349 if (!try_module_get(THIS_MODULE))
2352 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2355 options = kmalloc(count, GFP_KERNEL);
2359 /* static rbd_device initialization */
2360 spin_lock_init(&rbd_dev->lock);
2361 INIT_LIST_HEAD(&rbd_dev->node);
2362 INIT_LIST_HEAD(&rbd_dev->snaps);
2364 init_rwsem(&rbd_dev->header.snap_rwsem);
2366 /* generate unique id: find highest unique id, add one */
2367 rbd_id_get(rbd_dev);
2369 /* Fill in the device name, now that we have its id. */
2370 BUILD_BUG_ON(DEV_NAME_LEN
2371 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2372 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2374 /* parse add command */
2375 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2380 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2382 if (IS_ERR(rbd_dev->rbd_client)) {
2383 rc = PTR_ERR(rbd_dev->rbd_client);
2388 osdc = &rbd_dev->rbd_client->client->osdc;
2389 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2391 goto err_out_client;
2392 rbd_dev->poolid = rc;
2394 /* register our block device */
2395 rc = register_blkdev(0, rbd_dev->name);
2397 goto err_out_client;
2398 rbd_dev->major = rc;
2400 rc = rbd_bus_add_dev(rbd_dev);
2402 goto err_out_blkdev;
2404 /* set up and announce blkdev mapping */
2405 rc = rbd_init_disk(rbd_dev);
2409 rc = rbd_init_watch_dev(rbd_dev);
2416 rbd_id_put(rbd_dev);
2418 /* this will also clean up rest of rbd_dev stuff */
2420 rbd_bus_del_dev(rbd_dev);
2425 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2427 rbd_put_client(rbd_dev);
2429 rbd_id_put(rbd_dev);
2434 dout("Error adding device %s\n", buf);
2435 module_put(THIS_MODULE);
2437 return (ssize_t) rc;
2440 static struct rbd_device *__rbd_get_dev(unsigned long id)
2442 struct list_head *tmp;
2443 struct rbd_device *rbd_dev;
2445 spin_lock(&rbd_dev_list_lock);
2446 list_for_each(tmp, &rbd_dev_list) {
2447 rbd_dev = list_entry(tmp, struct rbd_device, node);
2448 if (rbd_dev->id == id) {
2449 spin_unlock(&rbd_dev_list_lock);
2453 spin_unlock(&rbd_dev_list_lock);
2457 static void rbd_dev_release(struct device *dev)
2459 struct rbd_device *rbd_dev =
2460 container_of(dev, struct rbd_device, dev);
2462 if (rbd_dev->watch_request) {
2463 struct ceph_client *client = rbd_dev->rbd_client->client;
2465 ceph_osdc_unregister_linger_request(&client->osdc,
2466 rbd_dev->watch_request);
2468 if (rbd_dev->watch_event)
2469 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2471 rbd_put_client(rbd_dev);
2473 /* clean up and free blkdev */
2474 rbd_free_disk(rbd_dev);
2475 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2478 /* release module ref */
2479 module_put(THIS_MODULE);
2482 static ssize_t rbd_remove(struct bus_type *bus,
2486 struct rbd_device *rbd_dev = NULL;
2491 rc = strict_strtoul(buf, 10, &ul);
2495 /* convert to int; abort if we lost anything in the conversion */
2496 target_id = (int) ul;
2497 if (target_id != ul)
2500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2502 rbd_dev = __rbd_get_dev(target_id);
2508 rbd_id_put(rbd_dev);
2510 __rbd_remove_all_snaps(rbd_dev);
2511 rbd_bus_del_dev(rbd_dev);
2514 mutex_unlock(&ctl_mutex);
2518 static ssize_t rbd_snap_add(struct device *dev,
2519 struct device_attribute *attr,
2523 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2525 char *name = kmalloc(count + 1, GFP_KERNEL);
2529 snprintf(name, count, "%s", buf);
2531 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2533 ret = rbd_header_add_snap(rbd_dev,
2538 ret = __rbd_update_snaps(rbd_dev);
2542 /* shouldn't hold ctl_mutex when notifying.. notify might
2543 trigger a watch callback that would need to get that mutex */
2544 mutex_unlock(&ctl_mutex);
2546 /* make a best effort, don't error if failed */
2547 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2554 mutex_unlock(&ctl_mutex);
2560 * create control files in sysfs
2563 static int rbd_sysfs_init(void)
2567 ret = bus_register(&rbd_bus_type);
2571 ret = device_register(&rbd_root_dev);
2576 static void rbd_sysfs_cleanup(void)
2578 device_unregister(&rbd_root_dev);
2579 bus_unregister(&rbd_bus_type);
2582 int __init rbd_init(void)
2586 rc = rbd_sysfs_init();
2589 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2593 void __exit rbd_exit(void)
2595 rbd_sysfs_cleanup();
2598 module_init(rbd_init);
2599 module_exit(rbd_exit);
2601 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2602 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2603 MODULE_DESCRIPTION("rados block device");
2605 /* following authorship retained from original osdblk.c */
2606 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2608 MODULE_LICENSE("GPL");