Merge git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 22 Mar 2011 23:25:25 +0000 (16:25 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 22 Mar 2011 23:25:25 +0000 (16:25 -0700)
* git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  rbd: use watch/notify for changes in rbd header
  libceph: add lingering request and watch/notify event framework
  rbd: update email address in Documentation
  ceph: rename dentry_release -> d_release, fix comment
  ceph: add request to the tail of unsafe write list
  ceph: remove request from unsafe list if it is canceled/timed out
  ceph: move readahead default to fs/ceph from libceph
  ceph: add ino32 mount option
  ceph: update common header files
  ceph: remove debugfs debug cruft
  libceph: fix osd request queuing on osdmap updates
  ceph: preserve I_COMPLETE across rename
  libceph: Fix base64-decoding when input ends in newline.

15 files changed:
Documentation/ABI/testing/sysfs-bus-rbd
drivers/block/rbd.c
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/super.c
fs/ceph/super.h
include/linux/ceph/ceph_fs.h
include/linux/ceph/libceph.h
include/linux/ceph/osd_client.h
include/linux/ceph/rados.h
net/ceph/armor.c
net/ceph/ceph_common.c
net/ceph/osd_client.c

index 90a87e2..fa72ccb 100644 (file)
@@ -1,6 +1,6 @@
 What:          /sys/bus/rbd/
 Date:          November 2010
-Contact:       Yehuda Sadeh <yehuda@hq.newdream.net>,
+Contact:       Yehuda Sadeh <yehuda@newdream.net>,
                Sage Weil <sage@newdream.net>
 Description:
 
index e1e38b1..16dc364 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
+#include <linux/parser.h>
 
 #include <linux/kernel.h>
 #include <linux/device.h>
@@ -54,6 +55,8 @@
 
 #define DEV_NAME_LEN           32
 
+#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+
 /*
  * block device image metadata (in-memory version)
  */
@@ -71,6 +74,12 @@ struct rbd_image_header {
 
        char *snap_names;
        u64 *snap_sizes;
+
+       u64 obj_version;
+};
+
+struct rbd_options {
+       int     notify_timeout;
 };
 
 /*
@@ -78,6 +87,7 @@ struct rbd_image_header {
  */
 struct rbd_client {
        struct ceph_client      *client;
+       struct rbd_options      *rbd_opts;
        struct kref             kref;
        struct list_head        node;
 };
@@ -124,6 +134,9 @@ struct rbd_device {
        char                    pool_name[RBD_MAX_POOL_NAME_LEN];
        int                     poolid;
 
+       struct ceph_osd_event   *watch_event;
+       struct ceph_osd_request *watch_request;
+
        char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
        u32 cur_snap;   /* index+1 of current snapshot within snap context
                           0 - for the head */
@@ -177,6 +190,8 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
+static int __rbd_update_snaps(struct rbd_device *rbd_dev);
+
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct gendisk *disk = bdev->bd_disk;
@@ -211,7 +226,8 @@ static const struct block_device_operations rbd_bd_ops = {
  * Initialize an rbd client instance.
  * We own *opt.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *opt)
+static struct rbd_client *rbd_client_create(struct ceph_options *opt,
+                                           struct rbd_options *rbd_opts)
 {
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
@@ -233,6 +249,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *opt)
        if (ret < 0)
                goto out_err;
 
+       rbdc->rbd_opts = rbd_opts;
+
        spin_lock(&node_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&node_lock);
@@ -266,6 +284,59 @@ static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
        return NULL;
 }
 
+/*
+ * mount options
+ */
+enum {
+       Opt_notify_timeout,
+       Opt_last_int,
+       /* int args above */
+       Opt_last_string,
+       /* string args above */
+};
+
+static match_table_t rbdopt_tokens = {
+       {Opt_notify_timeout, "notify_timeout=%d"},
+       /* int args above */
+       /* string args above */
+       {-1, NULL}
+};
+
+static int parse_rbd_opts_token(char *c, void *private)
+{
+       struct rbd_options *rbdopt = private;
+       substring_t argstr[MAX_OPT_ARGS];
+       int token, intval, ret;
+
+       token = match_token((char *)c, rbdopt_tokens, argstr);
+       if (token < 0)
+               return -EINVAL;
+
+       if (token < Opt_last_int) {
+               ret = match_int(&argstr[0], &intval);
+               if (ret < 0) {
+                       pr_err("bad mount option arg (not int) "
+                              "at '%s'\n", c);
+                       return ret;
+               }
+               dout("got int token %d val %d\n", token, intval);
+       } else if (token > Opt_last_int && token < Opt_last_string) {
+               dout("got string token %d val %s\n", token,
+                    argstr[0].from);
+       } else {
+               dout("got token %d\n", token);
+       }
+
+       switch (token) {
+       case Opt_notify_timeout:
+               rbdopt->notify_timeout = intval;
+               break;
+       default:
+               BUG_ON(token);
+       }
+       return 0;
+}
+
 /*
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
@@ -276,11 +347,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
        struct rbd_client *rbdc;
        struct ceph_options *opt;
        int ret;
+       struct rbd_options *rbd_opts;
+
+       rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
+       if (!rbd_opts)
+               return -ENOMEM;
+
+       rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
        ret = ceph_parse_options(&opt, options, mon_addr,
-                                mon_addr + strlen(mon_addr), NULL, NULL);
+                                mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
        if (ret < 0)
-               return ret;
+               goto done_err;
 
        spin_lock(&node_lock);
        rbdc = __rbd_client_find(opt);
@@ -296,13 +374,18 @@ static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
        }
        spin_unlock(&node_lock);
 
-       rbdc = rbd_client_create(opt);
-       if (IS_ERR(rbdc))
-               return PTR_ERR(rbdc);
+       rbdc = rbd_client_create(opt, rbd_opts);
+       if (IS_ERR(rbdc)) {
+               ret = PTR_ERR(rbdc);
+               goto done_err;
+       }
 
        rbd_dev->rbd_client = rbdc;
        rbd_dev->client = rbdc->client;
        return 0;
+done_err:
+       kfree(rbd_opts);
+       return ret;
 }
 
 /*
@@ -318,6 +401,7 @@ static void rbd_client_release(struct kref *kref)
        spin_unlock(&node_lock);
 
        ceph_destroy_client(rbdc->client);
+       kfree(rbdc->rbd_opts);
        kfree(rbdc);
 }
 
@@ -666,7 +750,9 @@ static int rbd_do_request(struct request *rq,
                          struct ceph_osd_req_op *ops,
                          int num_reply,
                          void (*rbd_cb)(struct ceph_osd_request *req,
-                                        struct ceph_msg *msg))
+                                        struct ceph_msg *msg),
+                         struct ceph_osd_request **linger_req,
+                         u64 *ver)
 {
        struct ceph_osd_request *req;
        struct ceph_file_layout *layout;
@@ -729,12 +815,20 @@ static int rbd_do_request(struct request *rq,
                                req->r_oid, req->r_oid_len);
        up_read(&header->snap_rwsem);
 
+       if (linger_req) {
+               ceph_osdc_set_request_linger(&dev->client->osdc, req);
+               *linger_req = req;
+       }
+
        ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
        if (ret < 0)
                goto done_err;
 
        if (!rbd_cb) {
                ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+               if (ver)
+                       *ver = le64_to_cpu(req->r_reassert_version.version);
+               dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
                ceph_osdc_put_request(req);
        }
        return ret;
@@ -789,6 +883,11 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
        kfree(req_data);
 }
 
+static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+       ceph_osdc_put_request(req);
+}
+
 /*
  * Do a synchronous ceph osd operation
  */
@@ -801,7 +900,9 @@ static int rbd_req_sync_op(struct rbd_device *dev,
                           int num_reply,
                           const char *obj,
                           u64 ofs, u64 len,
-                          char *buf)
+                          char *buf,
+                          struct ceph_osd_request **linger_req,
+                          u64 *ver)
 {
        int ret;
        struct page **pages;
@@ -833,7 +934,8 @@ static int rbd_req_sync_op(struct rbd_device *dev,
                          flags,
                          ops,
                          2,
-                         NULL);
+                         NULL,
+                         linger_req, ver);
        if (ret < 0)
                goto done_ops;
 
@@ -893,7 +995,7 @@ static int rbd_do_op(struct request *rq,
                             flags,
                             ops,
                             num_reply,
-                            rbd_req_cb);
+                            rbd_req_cb, 0, NULL);
 done:
        kfree(seg_name);
        return ret;
@@ -940,18 +1042,174 @@ static int rbd_req_sync_read(struct rbd_device *dev,
                          u64 snapid,
                          const char *obj,
                          u64 ofs, u64 len,
-                         char *buf)
+                         char *buf,
+                         u64 *ver)
 {
        return rbd_req_sync_op(dev, NULL,
                               (snapid ? snapid : CEPH_NOSNAP),
                               CEPH_OSD_OP_READ,
                               CEPH_OSD_FLAG_READ,
                               NULL,
-                              1, obj, ofs, len, buf);
+                              1, obj, ofs, len, buf, NULL, ver);
 }
 
 /*
- * Request sync osd read
+ * Request sync osd watch
+ */
+static int rbd_req_sync_notify_ack(struct rbd_device *dev,
+                                  u64 ver,
+                                  u64 notify_id,
+                                  const char *obj)
+{
+       struct ceph_osd_req_op *ops;
+       struct page **pages = NULL;
+       int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
+       if (ret < 0)
+               return ret;
+
+       ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+       ops[0].watch.cookie = notify_id;
+       ops[0].watch.flag = 0;
+
+       ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
+                         obj, 0, 0, NULL,
+                         pages, 0,
+                         CEPH_OSD_FLAG_READ,
+                         ops,
+                         1,
+                         rbd_simple_req_cb, 0, NULL);
+
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+       struct rbd_device *dev = (struct rbd_device *)data;
+       if (!dev)
+               return;
+
+       dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+               notify_id, (int)opcode);
+       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+       __rbd_update_snaps(dev);
+       mutex_unlock(&ctl_mutex);
+
+       rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+}
+
+/*
+ * Request sync osd watch
+ */
+static int rbd_req_sync_watch(struct rbd_device *dev,
+                             const char *obj,
+                             u64 ver)
+{
+       struct ceph_osd_req_op *ops;
+       struct ceph_osd_client *osdc = &dev->client->osdc;
+
+       int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
+       if (ret < 0)
+               return ret;
+
+       ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
+                                    (void *)dev, &dev->watch_event);
+       if (ret < 0)
+               goto fail;
+
+       ops[0].watch.ver = cpu_to_le64(ver);
+       ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+       ops[0].watch.flag = 1;
+
+       ret = rbd_req_sync_op(dev, NULL,
+                             CEPH_NOSNAP,
+                             0,
+                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+                             ops,
+                             1, obj, 0, 0, NULL,
+                             &dev->watch_request, NULL);
+
+       if (ret < 0)
+               goto fail_event;
+
+       rbd_destroy_ops(ops);
+       return 0;
+
+fail_event:
+       ceph_osdc_cancel_event(dev->watch_event);
+       dev->watch_event = NULL;
+fail:
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+struct rbd_notify_info {
+       struct rbd_device *dev;
+};
+
+static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+       struct rbd_device *dev = (struct rbd_device *)data;
+       if (!dev)
+               return;
+
+       dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+               notify_id, (int)opcode);
+}
+
+/*
+ * Request sync osd notify
+ */
+static int rbd_req_sync_notify(struct rbd_device *dev,
+                         const char *obj)
+{
+       struct ceph_osd_req_op *ops;
+       struct ceph_osd_client *osdc = &dev->client->osdc;
+       struct ceph_osd_event *event;
+       struct rbd_notify_info info;
+       int payload_len = sizeof(u32) + sizeof(u32);
+       int ret;
+
+       ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
+       if (ret < 0)
+               return ret;
+
+       info.dev = dev;
+
+       ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
+                                    (void *)&info, &event);
+       if (ret < 0)
+               goto fail;
+
+       ops[0].watch.ver = 1;
+       ops[0].watch.flag = 1;
+       ops[0].watch.cookie = event->cookie;
+       ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
+       ops[0].watch.timeout = 12;
+
+       ret = rbd_req_sync_op(dev, NULL,
+                              CEPH_NOSNAP,
+                              0,
+                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+                              ops,
+                              1, obj, 0, 0, NULL, NULL, NULL);
+       if (ret < 0)
+               goto fail_event;
+
+       ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
+       dout("ceph_osdc_wait_event returned %d\n", ret);
+       rbd_destroy_ops(ops);
+       return 0;
+
+fail_event:
+       ceph_osdc_cancel_event(event);
+fail:
+       rbd_destroy_ops(ops);
+       return ret;
+}
+
+/*
+ * Request sync osd rollback
  */
 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
                                     u64 snapid,
@@ -969,13 +1227,10 @@ static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL);
+                              1, obj, 0, 0, NULL, NULL, NULL);
 
        rbd_destroy_ops(ops);
 
-       if (ret < 0)
-               return ret;
-
        return ret;
 }
 
@@ -987,7 +1242,8 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
                             const char *cls,
                             const char *method,
                             const char *data,
-                            int len)
+                            int len,
+                            u64 *ver)
 {
        struct ceph_osd_req_op *ops;
        int cls_len = strlen(cls);
@@ -1010,7 +1266,7 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
                               0,
                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
                               ops,
-                              1, obj, 0, 0, NULL);
+                              1, obj, 0, 0, NULL, NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1156,6 +1412,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
        struct rbd_image_header_ondisk *dh;
        int snap_count = 0;
        u64 snap_names_len = 0;
+       u64 ver;
 
        while (1) {
                int len = sizeof(*dh) +
@@ -1171,7 +1428,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
                                       NULL, CEPH_NOSNAP,
                                       rbd_dev->obj_md_name,
                                       0, len,
-                                      (char *)dh);
+                                      (char *)dh, &ver);
                if (rc < 0)
                        goto out_dh;
 
@@ -1188,6 +1445,7 @@ static int rbd_read_header(struct rbd_device *rbd_dev,
                }
                break;
        }
+       header->obj_version = ver;
 
 out_dh:
        kfree(dh);
@@ -1205,6 +1463,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        u64 new_snapid;
        int ret;
        void *data, *data_start, *data_end;
+       u64 ver;
 
        /* we should create a snapshot only if we're pointing at the head */
        if (dev->cur_snap)
@@ -1227,7 +1486,7 @@ static int rbd_header_add_snap(struct rbd_device *dev,
        ceph_encode_64_safe(&data, data_end, new_snapid, bad);
 
        ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
-                               data_start, data - data_start);
+                               data_start, data - data_start, &ver);
 
        kfree(data_start);
 
@@ -1259,6 +1518,7 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        int ret;
        struct rbd_image_header h;
        u64 snap_seq;
+       int follow_seq = 0;
 
        ret = rbd_read_header(rbd_dev, &h);
        if (ret < 0)
@@ -1267,6 +1527,11 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        down_write(&rbd_dev->header.snap_rwsem);
 
        snap_seq = rbd_dev->header.snapc->seq;
+       if (rbd_dev->header.total_snaps &&
+           rbd_dev->header.snapc->snaps[0] == snap_seq)
+               /* pointing at the head, will need to follow that
+                  if head moves */
+               follow_seq = 1;
 
        kfree(rbd_dev->header.snapc);
        kfree(rbd_dev->header.snap_names);
@@ -1277,7 +1542,10 @@ static int __rbd_update_snaps(struct rbd_device *rbd_dev)
        rbd_dev->header.snap_names = h.snap_names;
        rbd_dev->header.snap_names_len = h.snap_names_len;
        rbd_dev->header.snap_sizes = h.snap_sizes;
-       rbd_dev->header.snapc->seq = snap_seq;
+       if (follow_seq)
+               rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
+       else
+               rbd_dev->header.snapc->seq = snap_seq;
 
        ret = __rbd_init_snaps_header(rbd_dev);
 
@@ -1699,7 +1967,28 @@ static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
        device_unregister(&rbd_dev->dev);
 }
 
-static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
+static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
+{
+       int ret, rc;
+
+       do {
+               ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
+                                        rbd_dev->header.obj_version);
+               if (ret == -ERANGE) {
+                       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+                       rc = __rbd_update_snaps(rbd_dev);
+                       mutex_unlock(&ctl_mutex);
+                       if (rc < 0)
+                               return rc;
+               }
+       } while (ret == -ERANGE);
+
+       return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+                      const char *buf,
+                      size_t count)
 {
        struct ceph_osd_client *osdc;
        struct rbd_device *rbd_dev;
@@ -1797,6 +2086,10 @@ static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
        if (rc)
                goto err_out_bus;
 
+       rc = rbd_init_watch_dev(rbd_dev);
+       if (rc)
+               goto err_out_bus;
+
        return count;
 
 err_out_bus:
@@ -1849,6 +2142,12 @@ static void rbd_dev_release(struct device *dev)
        struct rbd_device *rbd_dev =
                        container_of(dev, struct rbd_device, dev);
 
+       if (rbd_dev->watch_request)
+               ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
+                                                   rbd_dev->watch_request);
+       if (rbd_dev->watch_event)
+               ceph_osdc_cancel_event(rbd_dev->watch_event);
+
        rbd_put_client(rbd_dev);
 
        /* clean up and free blkdev */
@@ -1914,14 +2213,24 @@ static ssize_t rbd_snap_add(struct device *dev,
        ret = rbd_header_add_snap(rbd_dev,
                                  name, GFP_KERNEL);
        if (ret < 0)
-               goto done_unlock;
+               goto err_unlock;
 
        ret = __rbd_update_snaps(rbd_dev);
        if (ret < 0)
-               goto done_unlock;
+               goto err_unlock;
+
+       /* shouldn't hold ctl_mutex when notifying.. notify might
+          trigger a watch callback that would need to get that mutex */
+       mutex_unlock(&ctl_mutex);
+
+       /* make a best effort, don't error if failed */
+       rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
 
        ret = count;
-done_unlock:
+       kfree(name);
+       return ret;
+
+err_unlock:
        mutex_unlock(&ctl_mutex);
        kfree(name);
        return ret;
index 08f65fa..0dba691 100644 (file)
@@ -210,8 +210,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_congestion_kb)
                goto out;
 
-       dout("a\n");
-
        snprintf(name, sizeof(name), "../../bdi/%s",
                 dev_name(fsc->backing_dev_info.dev));
        fsc->debugfs_bdi =
@@ -221,7 +219,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_bdi)
                goto out;
 
-       dout("b\n");
        fsc->debugfs_mdsmap = debugfs_create_file("mdsmap",
                                        0600,
                                        fsc->client->debugfs_dir,
@@ -230,7 +227,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_mdsmap)
                goto out;
 
-       dout("ca\n");
        fsc->debugfs_mdsc = debugfs_create_file("mdsc",
                                                0600,
                                                fsc->client->debugfs_dir,
@@ -239,7 +235,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_mdsc)
                goto out;
 
-       dout("da\n");
        fsc->debugfs_caps = debugfs_create_file("caps",
                                                   0400,
                                                   fsc->client->debugfs_dir,
@@ -248,7 +243,6 @@ int ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
        if (!fsc->debugfs_caps)
                goto out;
 
-       dout("ea\n");
        fsc->debugfs_dentry_lru = debugfs_create_file("dentry_lru",
                                        0600,
                                        fsc->client->debugfs_dir,
index ebafa65..1a867a3 100644 (file)
@@ -161,7 +161,7 @@ more:
        filp->f_pos = di->offset;
        err = filldir(dirent, dentry->d_name.name,
                      dentry->d_name.len, di->offset,
-                     dentry->d_inode->i_ino,
+                     ceph_translate_ino(dentry->d_sb, dentry->d_inode->i_ino),
                      dentry->d_inode->i_mode >> 12);
 
        if (last) {
@@ -245,15 +245,17 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
 
                dout("readdir off 0 -> '.'\n");
                if (filldir(dirent, ".", 1, ceph_make_fpos(0, 0),
-                           inode->i_ino, inode->i_mode >> 12) < 0)
+                           ceph_translate_ino(inode->i_sb, inode->i_ino),
+                           inode->i_mode >> 12) < 0)
                        return 0;
                filp->f_pos = 1;
                off = 1;
        }
        if (filp->f_pos == 1) {
+               ino_t ino = filp->f_dentry->d_parent->d_inode->i_ino;
                dout("readdir off 1 -> '..'\n");
                if (filldir(dirent, "..", 2, ceph_make_fpos(0, 1),
-                           filp->f_dentry->d_parent->d_inode->i_ino,
+                           ceph_translate_ino(inode->i_sb, ino),
                            inode->i_mode >> 12) < 0)
                        return 0;
                filp->f_pos = 2;
@@ -377,7 +379,8 @@ more:
                if (filldir(dirent,
                            rinfo->dir_dname[off - fi->offset],
                            rinfo->dir_dname_len[off - fi->offset],
-                           pos, ino, ftype) < 0) {
+                           pos,
+                           ceph_translate_ino(inode->i_sb, ino), ftype) < 0) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
@@ -1024,14 +1027,13 @@ out_touch:
 }
 
 /*
- * When a dentry is released, clear the dir I_COMPLETE if it was part
- * of the current dir gen or if this is in the snapshot namespace.
+ * Release our ceph_dentry_info.
  */
-static void ceph_dentry_release(struct dentry *dentry)
+static void ceph_d_release(struct dentry *dentry)
 {
        struct ceph_dentry_info *di = ceph_dentry(dentry);
 
-       dout("dentry_release %p\n", dentry);
+       dout("d_release %p\n", dentry);
        if (di) {
                ceph_dentry_lru_del(dentry);
                if (di->lease_session)
@@ -1256,14 +1258,14 @@ const struct inode_operations ceph_dir_iops = {
 
 const struct dentry_operations ceph_dentry_ops = {
        .d_revalidate = ceph_d_revalidate,
-       .d_release = ceph_dentry_release,
+       .d_release = ceph_d_release,
 };
 
 const struct dentry_operations ceph_snapdir_dentry_ops = {
        .d_revalidate = ceph_snapdir_d_revalidate,
-       .d_release = ceph_dentry_release,
+       .d_release = ceph_d_release,
 };
 
 const struct dentry_operations ceph_snap_dentry_ops = {
-       .d_release = ceph_dentry_release,
+       .d_release = ceph_d_release,
 };
index 7d0e4a8..159b512 100644 (file)
@@ -564,11 +564,19 @@ more:
                         * start_request so that a tid has been assigned.
                         */
                        spin_lock(&ci->i_unsafe_lock);
-                       list_add(&req->r_unsafe_item, &ci->i_unsafe_writes);
+                       list_add_tail(&req->r_unsafe_item,
+                                     &ci->i_unsafe_writes);
                        spin_unlock(&ci->i_unsafe_lock);
                        ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR);
                }
+               
                ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+               if (ret < 0 && req->r_safe_callback) {
+                       spin_lock(&ci->i_unsafe_lock);
+                       list_del_init(&req->r_unsafe_item);
+                       spin_unlock(&ci->i_unsafe_lock);
+                       ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR);
+               }
        }
 
        if (file->f_flags & O_DIRECT)
index 193bfa5..b54c97d 100644 (file)
@@ -36,6 +36,13 @@ static void ceph_vmtruncate_work(struct work_struct *work);
 /*
  * find or create an inode, given the ceph ino number
  */
+static int ceph_set_ino_cb(struct inode *inode, void *data)
+{
+       ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
+       inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
+       return 0;
+}
+
 struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
 {
        struct inode *inode;
@@ -1030,9 +1037,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                        dout("fill_trace doing d_move %p -> %p\n",
                             req->r_old_dentry, dn);
 
-                       /* d_move screws up d_subdirs order */
-                       ceph_i_clear(dir, CEPH_I_COMPLETE);
-
                        d_move(req->r_old_dentry, dn);
                        dout(" src %p '%.*s' dst %p '%.*s'\n",
                             req->r_old_dentry,
@@ -1044,12 +1048,15 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
                           rehashing bug in vfs_rename_dir */
                        ceph_invalidate_dentry_lease(dn);
 
-                       /* take overwritten dentry's readdir offset */
-                       dout("dn %p gets %p offset %lld (old offset %lld)\n",
-                            req->r_old_dentry, dn, ceph_dentry(dn)->offset,
+                       /*
+                        * d_move() puts the renamed dentry at the end of
+                        * d_subdirs.  We need to assign it an appropriate
+                        * directory offset so we can behave when holding
+                        * I_COMPLETE.
+                        */
+                       ceph_set_dentry_offset(req->r_old_dentry);
+                       dout("dn %p gets new offset %lld\n", req->r_old_dentry, 
                             ceph_dentry(req->r_old_dentry)->offset);
-                       ceph_dentry(req->r_old_dentry)->offset =
-                               ceph_dentry(dn)->offset;
 
                        dn = req->r_old_dentry;  /* use old_dentry */
                        in = dn->d_inode;
@@ -1809,7 +1816,7 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
        err = ceph_do_getattr(inode, CEPH_STAT_CAP_INODE_ALL);
        if (!err) {
                generic_fillattr(inode, stat);
-               stat->ino = inode->i_ino;
+               stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
                if (ceph_snap(inode) != CEPH_NOSNAP)
                        stat->dev = ceph_snap(inode);
                else
index 9c50854..a9e78b4 100644 (file)
@@ -131,6 +131,7 @@ enum {
        Opt_rbytes,
        Opt_norbytes,
        Opt_noasyncreaddir,
+       Opt_ino32,
 };
 
 static match_table_t fsopt_tokens = {
@@ -150,6 +151,7 @@ static match_table_t fsopt_tokens = {
        {Opt_rbytes, "rbytes"},
        {Opt_norbytes, "norbytes"},
        {Opt_noasyncreaddir, "noasyncreaddir"},
+       {Opt_ino32, "ino32"},
        {-1, NULL}
 };
 
@@ -225,6 +227,9 @@ static int parse_fsopt_token(char *c, void *private)
        case Opt_noasyncreaddir:
                fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
                break;
+       case Opt_ino32:
+               fsopt->flags |= CEPH_MOUNT_OPT_INO32;
+               break;
        default:
                BUG_ON(token);
        }
@@ -288,7 +293,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
         fsopt->sb_flags = flags;
         fsopt->flags = CEPH_MOUNT_OPT_DEFAULT;
 
-        fsopt->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
+        fsopt->rsize = CEPH_RSIZE_DEFAULT;
         fsopt->snapdir_name = kstrdup(CEPH_SNAPDIRNAME_DEFAULT, GFP_KERNEL);
        fsopt->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
        fsopt->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
@@ -370,7 +375,7 @@ static int ceph_show_options(struct seq_file *m, struct vfsmount *mnt)
 
        if (fsopt->wsize)
                seq_printf(m, ",wsize=%d", fsopt->wsize);
-       if (fsopt->rsize != CEPH_MOUNT_RSIZE_DEFAULT)
+       if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
                seq_printf(m, ",rsize=%d", fsopt->rsize);
        if (fsopt->congestion_kb != default_congestion_kb())
                seq_printf(m, ",write_congestion_kb=%d", fsopt->congestion_kb);
index 20b907d..619fe71 100644 (file)
@@ -27,6 +27,7 @@
 #define CEPH_MOUNT_OPT_DIRSTAT         (1<<4) /* `cat dirname` for stats */
 #define CEPH_MOUNT_OPT_RBYTES          (1<<5) /* dir st_bytes = rbytes */
 #define CEPH_MOUNT_OPT_NOASYNCREADDIR  (1<<7) /* no dcache readdir */
+#define CEPH_MOUNT_OPT_INO32           (1<<8) /* 32 bit inos */
 
 #define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES)
 
@@ -35,6 +36,7 @@
 #define ceph_test_mount_opt(fsc, opt) \
        (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt))
 
+#define CEPH_RSIZE_DEFAULT             (512*1024) /* readahead */
 #define CEPH_MAX_READDIR_DEFAULT        1024
 #define CEPH_MAX_READDIR_BYTES_DEFAULT  (512*1024)
 #define CEPH_SNAPDIRNAME_DEFAULT        ".snap"
@@ -319,6 +321,16 @@ static inline struct ceph_inode_info *ceph_inode(struct inode *inode)
        return container_of(inode, struct ceph_inode_info, vfs_inode);
 }
 
+static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode)
+{
+       return (struct ceph_fs_client *)inode->i_sb->s_fs_info;
+}
+
+static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb)
+{
+       return (struct ceph_fs_client *)sb->s_fs_info;
+}
+
 static inline struct ceph_vino ceph_vino(struct inode *inode)
 {
        return ceph_inode(inode)->i_vino;
@@ -327,19 +339,49 @@ static inline struct ceph_vino ceph_vino(struct inode *inode)
 /*
  * ino_t is <64 bits on many architectures, blech.
  *
- * don't include snap in ino hash, at least for now.
+ *               i_ino (kernel inode)   st_ino (userspace)
+ * i386          32                     32
+ * x86_64+ino32  64                     32
+ * x86_64        64                     64
+ */
+static inline u32 ceph_ino_to_ino32(ino_t ino)
+{
+       ino ^= ino >> (sizeof(ino) * 8 - 32);
+       if (!ino)
+               ino = 1;
+       return ino;
+}
+
+/*
+ * kernel i_ino value
  */
 static inline ino_t ceph_vino_to_ino(struct ceph_vino vino)
 {
        ino_t ino = (ino_t)vino.ino;  /* ^ (vino.snap << 20); */
 #if BITS_PER_LONG == 32
-       ino ^= vino.ino >> (sizeof(u64)-sizeof(ino_t)) * 8;
-       if (!ino)
-               ino = 1;
+       ino = ceph_ino_to_ino32(ino);
 #endif
        return ino;
 }
 
+/*
+ * user-visible ino (stat, filldir)
+ */
+#if BITS_PER_LONG == 32
+static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
+{
+       return ino;
+}
+#else
+static inline ino_t ceph_translate_ino(struct super_block *sb, ino_t ino)
+{
+       if (ceph_test_mount_opt(ceph_sb_to_client(sb), INO32))
+               ino = ceph_ino_to_ino32(ino);
+       return ino;
+}
+#endif
+
+
 /* for printf-style formatting */
 #define ceph_vinop(i) ceph_inode(i)->i_vino.ino, ceph_inode(i)->i_vino.snap
 
@@ -428,13 +470,6 @@ static inline loff_t ceph_make_fpos(unsigned frag, unsigned off)
        return ((loff_t)frag << 32) | (loff_t)off;
 }
 
-static inline int ceph_set_ino_cb(struct inode *inode, void *data)
-{
-       ceph_inode(inode)->i_vino = *(struct ceph_vino *)data;
-       inode->i_ino = ceph_vino_to_ino(*(struct ceph_vino *)data);
-       return 0;
-}
-
 /*
  * caps helpers
  */
@@ -503,15 +538,6 @@ extern void ceph_reservation_status(struct ceph_fs_client *client,
                                    int *total, int *avail, int *used,
                                    int *reserved, int *min);
 
-static inline struct ceph_fs_client *ceph_inode_to_client(struct inode *inode)
-{
-       return (struct ceph_fs_client *)inode->i_sb->s_fs_info;
-}
-
-static inline struct ceph_fs_client *ceph_sb_to_client(struct super_block *sb)
-{
-       return (struct ceph_fs_client *)sb->s_fs_info;
-}
 
 
 /*
index 09dcc0c..b8e995f 100644 (file)
@@ -136,9 +136,18 @@ struct ceph_dir_layout {
 
 
 /* osd */
-#define CEPH_MSG_OSD_MAP          41
-#define CEPH_MSG_OSD_OP           42
-#define CEPH_MSG_OSD_OPREPLY      43
+#define CEPH_MSG_OSD_MAP                41
+#define CEPH_MSG_OSD_OP                 42
+#define CEPH_MSG_OSD_OPREPLY            43
+#define CEPH_MSG_WATCH_NOTIFY           44
+
+
+/* watch-notify operations */
+enum {
+  WATCH_NOTIFY                         = 1, /* notifying watcher */
+  WATCH_NOTIFY_COMPLETE                        = 2, /* notifier notified when done */
+};
+
 
 /* pool operations */
 enum {
@@ -213,8 +222,10 @@ struct ceph_client_mount {
        struct ceph_mon_request_header monhdr;
 } __attribute__ ((packed));
 
+#define CEPH_SUBSCRIBE_ONETIME    1  /* i want only 1 update after have */
+
 struct ceph_mon_subscribe_item {
-       __le64 have_version;    __le64 have;
+       __le64 have_version;    __le64 have;
        __u8 onetime;
 } __attribute__ ((packed));
 
index 72c72bf..0d2e0ff 100644 (file)
@@ -71,7 +71,6 @@ struct ceph_options {
 #define CEPH_OSD_TIMEOUT_DEFAULT    60  /* seconds */
 #define CEPH_OSD_KEEPALIVE_DEFAULT  5
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
-#define CEPH_MOUNT_RSIZE_DEFAULT    (512*1024) /* readahead */
 
 #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
 #define CEPH_MSG_MAX_DATA_LEN  (16*1024*1024)
index a1af296..f88eacb 100644 (file)
@@ -32,6 +32,7 @@ struct ceph_osd {
        struct rb_node o_node;
        struct ceph_connection o_con;
        struct list_head o_requests;
+       struct list_head o_linger_requests;
        struct list_head o_osd_lru;
        struct ceph_authorizer *o_authorizer;
        void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@ struct ceph_osd_request {
        struct rb_node  r_node;
        struct list_head r_req_lru_item;
        struct list_head r_osd_item;
+       struct list_head r_linger_item;
+       struct list_head r_linger_osd;
        struct ceph_osd *r_osd;
        struct ceph_pg   r_pgid;
        int              r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@ struct ceph_osd_request {
        int               r_flags;     /* any additional flags for the osd */
        u32               r_sent;      /* >0 if r_request is sending/sent */
        int               r_got_reply;
+       int               r_linger;
 
        struct ceph_osd_client *r_osdc;
        struct kref       r_kref;
@@ -74,7 +78,6 @@ struct ceph_osd_request {
        char              r_oid[40];          /* object name */
        int               r_oid_len;
        unsigned long     r_stamp;            /* send OR check time */
-       bool              r_resend;           /* msg send failed, needs retry */
 
        struct ceph_file_layout r_file_layout;
        struct ceph_snap_context *r_snapc;    /* snap context for writes */
@@ -90,6 +93,26 @@ struct ceph_osd_request {
        struct ceph_pagelist *r_trail;        /* trailing part of the data */
 };
 
+struct ceph_osd_event {
+       u64 cookie;
+       int one_shot;
+       struct ceph_osd_client *osdc;
+       void (*cb)(u64, u64, u8, void *);
+       void *data;
+       struct rb_node node;
+       struct list_head osd_node;
+       struct kref kref;
+       struct completion completion;
+};
+
+struct ceph_osd_event_work {
+       struct work_struct work;
+       struct ceph_osd_event *event;
+        u64 ver;
+        u64 notify_id;
+        u8 opcode;
+};
+
 struct ceph_osd_client {
        struct ceph_client     *client;
 
@@ -104,7 +127,10 @@ struct ceph_osd_client {
        u64                    timeout_tid;   /* tid of timeout triggering rq */
        u64                    last_tid;      /* tid of last request */
        struct rb_root         requests;      /* pending requests */
-       struct list_head       req_lru;       /* pending requests lru */
+       struct list_head       req_lru;       /* in-flight lru */
+       struct list_head       req_unsent;    /* unsent/need-resend queue */
+       struct list_head       req_notarget;  /* map to no osd */
+       struct list_head       req_linger;    /* lingering requests */
        int                    num_requests;
        struct delayed_work    timeout_work;
        struct delayed_work    osds_timeout_work;
@@ -116,6 +142,12 @@ struct ceph_osd_client {
 
        struct ceph_msgpool     msgpool_op;
        struct ceph_msgpool     msgpool_op_reply;
+
+       spinlock_t              event_lock;
+       struct rb_root          event_tree;
+       u64                     event_count;
+
+       struct workqueue_struct *notify_wq;
 };
 
 struct ceph_osd_req_op {
@@ -150,6 +182,13 @@ struct ceph_osd_req_op {
                struct {
                        u64 snapid;
                } snap;
+               struct {
+                       u64 cookie;
+                       u64 ver;
+                       __u8 flag;
+                       u32 prot_ver;
+                       u32 timeout;
+               } watch;
        };
        u32 payload_len;
 };
@@ -198,6 +237,11 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
                                      bool use_mempool, int num_reply,
                                      int page_align);
 
+extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+                                        struct ceph_osd_request *req);
+extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+                                               struct ceph_osd_request *req);
+
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {
        kref_get(&req->r_kref);
@@ -233,5 +277,14 @@ extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
                                struct page **pages, int nr_pages,
                                int flags, int do_sync, bool nofail);
 
+/* watch/notify events */
+extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+                                 void (*event_cb)(u64, u64, u8, void *),
+                                 int one_shot, void *data,
+                                 struct ceph_osd_event **pevent);
+extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
+extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
+                               unsigned long timeout);
+extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif
 
index 6d5247f..0a99099 100644 (file)
@@ -12,9 +12,9 @@
  * osdmap encoding versions
  */
 #define CEPH_OSDMAP_INC_VERSION     5
-#define CEPH_OSDMAP_INC_VERSION_EXT 5
+#define CEPH_OSDMAP_INC_VERSION_EXT 6
 #define CEPH_OSDMAP_VERSION         5
-#define CEPH_OSDMAP_VERSION_EXT     5
+#define CEPH_OSDMAP_VERSION_EXT     6
 
 /*
  * fs id
@@ -181,9 +181,17 @@ enum {
        /* read */
        CEPH_OSD_OP_READ      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
        CEPH_OSD_OP_STAT      = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
+       CEPH_OSD_OP_MAPEXT    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
 
        /* fancy read */
-       CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
+       CEPH_OSD_OP_MASKTRUNC   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
+       CEPH_OSD_OP_SPARSE_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 5,
+
+       CEPH_OSD_OP_NOTIFY    = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 6,
+       CEPH_OSD_OP_NOTIFY_ACK = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 7,
+
+       /* versioning */
+       CEPH_OSD_OP_ASSERT_VER = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 8,
 
        /* write */
        CEPH_OSD_OP_WRITE     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
@@ -205,6 +213,8 @@ enum {
        CEPH_OSD_OP_CREATE  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 13,
        CEPH_OSD_OP_ROLLBACK= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 14,
 
+       CEPH_OSD_OP_WATCH   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 15,
+
        /** attrs **/
        /* read */
        CEPH_OSD_OP_GETXATTR  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
@@ -218,11 +228,14 @@ enum {
        CEPH_OSD_OP_RMXATTR   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
 
        /** subop **/
-       CEPH_OSD_OP_PULL           = CEPH_OSD_OP_MODE_SUB | 1,
-       CEPH_OSD_OP_PUSH           = CEPH_OSD_OP_MODE_SUB | 2,
-       CEPH_OSD_OP_BALANCEREADS   = CEPH_OSD_OP_MODE_SUB | 3,
-       CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
-       CEPH_OSD_OP_SCRUB          = CEPH_OSD_OP_MODE_SUB | 5,
+       CEPH_OSD_OP_PULL            = CEPH_OSD_OP_MODE_SUB | 1,
+       CEPH_OSD_OP_PUSH            = CEPH_OSD_OP_MODE_SUB | 2,
+       CEPH_OSD_OP_BALANCEREADS    = CEPH_OSD_OP_MODE_SUB | 3,
+       CEPH_OSD_OP_UNBALANCEREADS  = CEPH_OSD_OP_MODE_SUB | 4,
+       CEPH_OSD_OP_SCRUB           = CEPH_OSD_OP_MODE_SUB | 5,
+       CEPH_OSD_OP_SCRUB_RESERVE   = CEPH_OSD_OP_MODE_SUB | 6,
+       CEPH_OSD_OP_SCRUB_UNRESERVE = CEPH_OSD_OP_MODE_SUB | 7,
+       CEPH_OSD_OP_SCRUB_STOP      = CEPH_OSD_OP_MODE_SUB | 8,
 
        /** lock **/
        CEPH_OSD_OP_WRLOCK    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
@@ -328,6 +341,8 @@ enum {
        CEPH_OSD_CMPXATTR_MODE_U64    = 2
 };
 
+#define RADOS_NOTIFY_VER       1
+
 /*
  * an individual object operation.  each may be accompanied by some data
  * payload
@@ -359,7 +374,12 @@ struct ceph_osd_op {
                struct {
                        __le64 snapid;
                } __attribute__ ((packed)) snap;
-       };
+               struct {
+                       __le64 cookie;
+                       __le64 ver;
+                       __u8 flag;      /* 0 = unwatch, 1 = watch */
+               } __attribute__ ((packed)) watch;
+};
        __le32 payload_len;
 } __attribute__ ((packed));
 
@@ -402,4 +422,5 @@ struct ceph_osd_reply_head {
 } __attribute__ ((packed));
 
 
+
 #endif
index eb2a666..1fc1ee1 100644 (file)
@@ -78,8 +78,10 @@ int ceph_unarmor(char *dst, const char *src, const char *end)
        while (src < end) {
                int a, b, c, d;
 
-               if (src < end && src[0] == '\n')
+               if (src[0] == '\n') {
                        src++;
+                       continue;
+               }
                if (src + 4 > end)
                        return -EINVAL;
                a = decode_bits(src[0]);
index f3e4a13..95f96ab 100644 (file)
@@ -62,6 +62,7 @@ const char *ceph_msg_type_name(int type)
        case CEPH_MSG_OSD_MAP: return "osd_map";
        case CEPH_MSG_OSD_OP: return "osd_op";
        case CEPH_MSG_OSD_OPREPLY: return "osd_opreply";
+       case CEPH_MSG_WATCH_NOTIFY: return "watch_notify";
        default: return "unknown";
        }
 }
index 3e20a12..02212ed 100644 (file)
 #define OSD_OPREPLY_FRONT_LEN  512
 
 static const struct ceph_connection_operations osd_con_ops;
-static int __kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd);
 
-static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void send_queued(struct ceph_osd_client *osdc);
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req);
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req);
+static int __send_request(struct ceph_osd_client *osdc,
+                         struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -34,6 +39,7 @@ static int op_needs_trail(int op)
        case CEPH_OSD_OP_SETXATTR:
        case CEPH_OSD_OP_CMPXATTR:
        case CEPH_OSD_OP_CALL:
+       case CEPH_OSD_OP_NOTIFY:
                return 1;
        default:
                return 0;
@@ -209,6 +215,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
+       INIT_LIST_HEAD(&req->r_linger_item);
+       INIT_LIST_HEAD(&req->r_linger_osd);
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -315,6 +323,24 @@ static void osd_req_encode_op(struct ceph_osd_request *req,
                break;
        case CEPH_OSD_OP_STARTSYNC:
                break;
+       case CEPH_OSD_OP_NOTIFY:
+               {
+                       __le32 prot_ver = cpu_to_le32(src->watch.prot_ver);
+                       __le32 timeout = cpu_to_le32(src->watch.timeout);
+
+                       BUG_ON(!req->r_trail);
+
+                       ceph_pagelist_append(req->r_trail,
+                                               &prot_ver, sizeof(prot_ver));
+                       ceph_pagelist_append(req->r_trail,
+                                               &timeout, sizeof(timeout));
+               }
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               dst->watch.cookie = cpu_to_le64(src->watch.cookie);
+               dst->watch.ver = cpu_to_le64(src->watch.ver);
+               dst->watch.flag = src->watch.flag;
+               break;
        default:
                pr_err("unrecognized osd opcode %d\n", dst->op);
                WARN_ON(1);
@@ -529,6 +555,45 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void __kick_osd_requests(struct ceph_osd_client *osdc,
+                               struct ceph_osd *osd)
+{
+       struct ceph_osd_request *req, *nreq;
+       int err;
+
+       dout("__kick_osd_requests osd%d\n", osd->o_osd);
+       err = __reset_osd(osdc, osd);
+       if (err == -EAGAIN)
+               return;
+
+       list_for_each_entry(req, &osd->o_requests, r_osd_item) {
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+                    osd->o_osd);
+               if (!req->r_linger)
+                       req->r_flags |= CEPH_OSD_FLAG_RETRY;
+       }
+
+       list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
+                                r_linger_osd) {
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
+                    osd->o_osd);
+       }
+}
+
+static void kick_osd_requests(struct ceph_osd_client *osdc,
+                             struct ceph_osd *kickosd)
+{
+       mutex_lock(&osdc->request_mutex);
+       __kick_osd_requests(osdc, kickosd);
+       mutex_unlock(&osdc->request_mutex);
+}
 
 /*
  * If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +608,8 @@ static void osd_reset(struct ceph_connection *con)
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
        down_read(&osdc->map_sem);
-       kick_requests(osdc, osd);
+       kick_osd_requests(osdc, osd);
+       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -561,6 +627,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        INIT_LIST_HEAD(&osd->o_requests);
+       INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
@@ -650,7 +717,8 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        int ret = 0;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-       if (list_empty(&osd->o_requests)) {
+       if (list_empty(&osd->o_requests) &&
+           list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
@@ -723,10 +791,9 @@ static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
  * Register request, assign tid.  If this is the first request, set up
  * the timeout event.
  */
-static void register_request(struct ceph_osd_client *osdc,
-                            struct ceph_osd_request *req)
+static void __register_request(struct ceph_osd_client *osdc,
+                              struct ceph_osd_request *req)
 {
-       mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
        INIT_LIST_HEAD(&req->r_req_lru_item);
@@ -740,6 +807,13 @@ static void register_request(struct ceph_osd_client *osdc,
                dout(" first request, scheduling timeout\n");
                __schedule_osd_timeout(osdc);
        }
+}
+
+static void register_request(struct ceph_osd_client *osdc,
+                            struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       __register_request(osdc, req);
        mutex_unlock(&osdc->request_mutex);
 }
 
@@ -758,9 +832,14 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests))
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
                        __move_osd_to_lru(osdc, req->r_osd);
-               req->r_osd = NULL;
+               }
+               if (list_empty(&req->r_osd_item) &&
+                   list_empty(&req->r_linger_item))
+                       req->r_osd = NULL;
        }
 
        ceph_osdc_put_request(req);
@@ -781,20 +860,72 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
-       list_del_init(&req->r_req_lru_item);
 }
 
+static void __register_linger_request(struct ceph_osd_client *osdc,
+                                   struct ceph_osd_request *req)
+{
+       dout("__register_linger_request %p\n", req);
+       list_add_tail(&req->r_linger_item, &osdc->req_linger);
+       list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+}
+
+static void __unregister_linger_request(struct ceph_osd_client *osdc,
+                                       struct ceph_osd_request *req)
+{
+       dout("__unregister_linger_request %p\n", req);
+       if (req->r_osd) {
+               list_del_init(&req->r_linger_item);
+               list_del_init(&req->r_linger_osd);
+
+               if (list_empty(&req->r_osd->o_requests) &&
+                   list_empty(&req->r_osd->o_linger_requests)) {
+                       dout("moving osd to %p lru\n", req->r_osd);
+                       __move_osd_to_lru(osdc, req->r_osd);
+               }
+               req->r_osd = NULL;
+       }
+}
+
+void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+                                        struct ceph_osd_request *req)
+{
+       mutex_lock(&osdc->request_mutex);
+       if (req->r_linger) {
+               __unregister_linger_request(osdc, req);
+               ceph_osdc_put_request(req);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+EXPORT_SYMBOL(ceph_osdc_unregister_linger_request);
+
+void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+                                 struct ceph_osd_request *req)
+{
+       if (!req->r_linger) {
+               dout("set_request_linger %p\n", req);
+               req->r_linger = 1;
+               /*
+                * caller is now responsible for calling
+                * unregister_linger_request
+                */
+               ceph_osdc_get_request(req);
+       }
+}
+EXPORT_SYMBOL(ceph_osdc_set_request_linger);
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.
+ * no up osd, set r_osd to NULL.  Move the request to the appropiate list
+ * (unsent, homeless) or leave on in-flight lru.
  *
  * Return 0 if unchanged, 1 if changed, or negative on error.
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static int __map_osds(struct ceph_osd_client *osdc,
-                     struct ceph_osd_request *req)
+static int __map_request(struct ceph_osd_client *osdc,
+                        struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
@@ -802,11 +933,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
        int o = -1, num = 0;
        int err;
 
-       dout("map_osds %p tid %lld\n", req, req->r_tid);
+       dout("map_request %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
                                      &req->r_file_layout, osdc->osdmap);
-       if (err)
+       if (err) {
+               list_move(&req->r_req_lru_item, &osdc->req_notarget);
                return err;
+       }
        pgid = reqhead->layout.ol_pgid;
        req->r_pgid = pgid;
 
@@ -823,7 +956,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
-       dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+       dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
@@ -841,10 +974,12 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (!req->r_osd && o >= 0) {
                err = -ENOMEM;
                req->r_osd = create_osd(osdc);
-               if (!req->r_osd)
+               if (!req->r_osd) {
+                       list_move(&req->r_req_lru_item, &osdc->req_notarget);
                        goto out;
+               }
 
-               dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+               dout("map_request osd %p is osd%d\n", req->r_osd, o);
                req->r_osd->o_osd = o;
                req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
                __insert_osd(osdc, req->r_osd);
@@ -855,6 +990,9 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (req->r_osd) {
                __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move(&req->r_req_lru_item, &osdc->req_unsent);
+       } else {
+               list_move(&req->r_req_lru_item, &osdc->req_notarget);
        }
        err = 1;   /* osd or pg changed */
 
@@ -869,16 +1007,6 @@ static int __send_request(struct ceph_osd_client *osdc,
                          struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead;
-       int err;
-
-       err = __map_osds(osdc, req);
-       if (err < 0)
-               return err;
-       if (req->r_osd == NULL) {
-               dout("send_request %p no up osds in pg\n", req);
-               ceph_monc_request_next_osdmap(&osdc->client->monc);
-               return 0;
-       }
 
        dout("send_request %p tid %llu to osd%d flags %d\n",
             req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -897,6 +1025,21 @@ static int __send_request(struct ceph_osd_client *osdc,
        return 0;
 }
 
+/*
+ * Send any requests in the queue (req_unsent).
+ */
+static void send_queued(struct ceph_osd_client *osdc)
+{
+       struct ceph_osd_request *req, *tmp;
+
+       dout("send_queued\n");
+       mutex_lock(&osdc->request_mutex);
+       list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+               __send_request(osdc, req);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+
 /*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
@@ -916,30 +1059,13 @@ static void handle_timeout(struct work_struct *work)
        unsigned long keepalive =
                osdc->client->options->osd_keepalive_timeout * HZ;
        unsigned long last_stamp = 0;
-       struct rb_node *p;
        struct list_head slow_osds;
-
        dout("timeout\n");
        down_read(&osdc->map_sem);
 
        ceph_monc_request_next_osdmap(&osdc->client->monc);
 
        mutex_lock(&osdc->request_mutex);
-       for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-               req = rb_entry(p, struct ceph_osd_request, r_node);
-
-               if (req->r_resend) {
-                       int err;
-
-                       dout("osdc resending prev failed %lld\n", req->r_tid);
-                       err = __send_request(osdc, req);
-                       if (err)
-                               dout("osdc failed again on %lld\n", req->r_tid);
-                       else
-                               req->r_resend = false;
-                       continue;
-               }
-       }
 
        /*
         * reset osds that appear to be _really_ unresponsive.  this
@@ -963,7 +1089,7 @@ static void handle_timeout(struct work_struct *work)
                BUG_ON(!osd);
                pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
                           req->r_tid, osd->o_osd);
-               __kick_requests(osdc, osd);
+               __kick_osd_requests(osdc, osd);
        }
 
        /*
@@ -991,7 +1117,7 @@ static void handle_timeout(struct work_struct *work)
 
        __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
-
+       send_queued(osdc);
        up_read(&osdc->map_sem);
 }
 
@@ -1035,7 +1161,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
            numops * sizeof(struct ceph_osd_op))
                goto bad;
        dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
-
        /* lookup */
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
@@ -1079,6 +1204,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
 
        dout("handle_reply tid %llu flags %d\n", tid, flags);
 
+       if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK))
+               __register_linger_request(osdc, req);
+
        /* either this is a read, or we got the safe response */
        if (result < 0 ||
            (flags & CEPH_OSD_FLAG_ONDISK) ||
@@ -1099,6 +1227,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        }
 
 done:
+       dout("req=%p req->r_linger=%d\n", req, req->r_linger);
        ceph_osdc_put_request(req);
        return;
 
@@ -1109,108 +1238,83 @@ bad:
        ceph_msg_dump(msg);
 }
 
-
-static int __kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd)
+static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
-       struct ceph_osd_request *req;
        struct rb_node *p, *n;
-       int needmap = 0;
-       int err;
 
-       dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       if (kickosd) {
-               err = __reset_osd(osdc, kickosd);
-               if (err == -EAGAIN)
-                       return 1;
-       } else {
-               for (p = rb_first(&osdc->osds); p; p = n) {
-                       struct ceph_osd *osd =
-                               rb_entry(p, struct ceph_osd, o_node);
-
-                       n = rb_next(p);
-                       if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-                           memcmp(&osd->o_con.peer_addr,
-                                  ceph_osd_addr(osdc->osdmap,
-                                                osd->o_osd),
-                                  sizeof(struct ceph_entity_addr)) != 0)
-                               __reset_osd(osdc, osd);
-               }
+       for (p = rb_first(&osdc->osds); p; p = n) {
+               struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
+
+               n = rb_next(p);
+               if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+                   memcmp(&osd->o_con.peer_addr,
+                          ceph_osd_addr(osdc->osdmap,
+                                        osd->o_osd),
+                          sizeof(struct ceph_entity_addr)) != 0)
+                       __reset_osd(osdc, osd);
        }
+}
+
+/*
+ * Requeue requests whose mapping to an OSD has changed.  If requests map to
+ * no osd, request a new map.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc)
+{
+       struct ceph_osd_request *req, *nreq;
+       struct rb_node *p;
+       int needmap = 0;
+       int err;
 
+       dout("kick_requests\n");
+       mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
-
-               if (req->r_resend) {
-                       dout(" r_resend set on tid %llu\n", req->r_tid);
-                       __cancel_request(req);
-                       goto kick;
-               }
-               if (req->r_osd && kickosd == req->r_osd) {
-                       __cancel_request(req);
-                       goto kick;
+               err = __map_request(osdc, req);
+               if (err < 0)
+                       continue;  /* error */
+               if (req->r_osd == NULL) {
+                       dout("%p tid %llu maps to no osd\n", req, req->r_tid);
+                       needmap++;  /* request a newer map */
+               } else if (err > 0) {
+                       dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
+                            req->r_osd ? req->r_osd->o_osd : -1);
+                       if (!req->r_linger)
+                               req->r_flags |= CEPH_OSD_FLAG_RETRY;
                }
+       }
+
+       list_for_each_entry_safe(req, nreq, &osdc->req_linger,
+                                r_linger_item) {
+               dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
-               err = __map_osds(osdc, req);
+               err = __map_request(osdc, req);
                if (err == 0)
-                       continue;  /* no change */
-               if (err < 0) {
-                       /*
-                        * FIXME: really, we should set the request
-                        * error and fail if this isn't a 'nofail'
-                        * request, but that's a fair bit more
-                        * complicated to do.  So retry!
-                        */
-                       dout(" setting r_resend on %llu\n", req->r_tid);
-                       req->r_resend = true;
-                       continue;
-               }
+                       continue;  /* no change and no osd was specified */
+               if (err < 0)
+                       continue;  /* hrm! */
                if (req->r_osd == NULL) {
                        dout("tid %llu maps to no valid osd\n", req->r_tid);
                        needmap++;  /* request a newer map */
                        continue;
                }
 
-kick:
-               dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
+               dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
                     req->r_osd ? req->r_osd->o_osd : -1);
-               req->r_flags |= CEPH_OSD_FLAG_RETRY;
-               err = __send_request(osdc, req);
-               if (err) {
-                       dout(" setting r_resend on %llu\n", req->r_tid);
-                       req->r_resend = true;
-               }
+               __unregister_linger_request(osdc, req);
+               __register_request(osdc, req);
        }
-
-       return needmap;
-}
-
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
-                         struct ceph_osd *kickosd)
-{
-       int needmap;
-
-       mutex_lock(&osdc->request_mutex);
-       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-
 }
+
+
 /*
  * Process updated osd map.
  *
@@ -1263,6 +1367,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
                        }
+                       kick_requests(osdc);
+                       reset_changed_osds(osdc);
                } else {
                        dout("ignoring incremental map %u len %d\n",
                             epoch, maplen);
@@ -1300,6 +1406,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        osdc->osdmap = newmap;
                        if (oldmap)
                                ceph_osdmap_destroy(oldmap);
+                       kick_requests(osdc);
                }
                p += maplen;
                nr_maps--;
@@ -1308,8 +1415,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 done:
        downgrade_write(&osdc->map_sem);
        ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
-       if (newmap)
-               kick_requests(osdc, NULL);
+       send_queued(osdc);
        up_read(&osdc->map_sem);
        wake_up_all(&osdc->client->auth_wq);
        return;
@@ -1321,6 +1427,223 @@ bad:
        return;
 }
 
+/*
+ * watch/notify callback event infrastructure
+ *
+ * These callbacks are used both for watch and notify operations.
+ */
+static void __release_event(struct kref *kref)
+{
+       struct ceph_osd_event *event =
+               container_of(kref, struct ceph_osd_event, kref);
+
+       dout("__release_event %p\n", event);
+       kfree(event);
+}
+
+static void get_event(struct ceph_osd_event *event)
+{
+       kref_get(&event->kref);
+}
+
+void ceph_osdc_put_event(struct ceph_osd_event *event)
+{
+       kref_put(&event->kref, __release_event);
+}
+EXPORT_SYMBOL(ceph_osdc_put_event);
+
+static void __insert_event(struct ceph_osd_client *osdc,
+                            struct ceph_osd_event *new)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (new->cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (new->cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&new->node, parent, p);
+       rb_insert_color(&new->node, &osdc->event_tree);
+}
+
+static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
+                                               u64 cookie)
+{
+       struct rb_node **p = &osdc->event_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_osd_event *event = NULL;
+
+       while (*p) {
+               parent = *p;
+               event = rb_entry(parent, struct ceph_osd_event, node);
+               if (cookie < event->cookie)
+                       p = &(*p)->rb_left;
+               else if (cookie > event->cookie)
+                       p = &(*p)->rb_right;
+               else
+                       return event;
+       }
+       return NULL;
+}
+
+static void __remove_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       if (!RB_EMPTY_NODE(&event->node)) {
+               dout("__remove_event removed %p\n", event);
+               rb_erase(&event->node, &osdc->event_tree);
+               ceph_osdc_put_event(event);
+       } else {
+               dout("__remove_event didn't remove %p\n", event);
+       }
+}
+
+int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+                          void (*event_cb)(u64, u64, u8, void *),
+                          int one_shot, void *data,
+                          struct ceph_osd_event **pevent)
+{
+       struct ceph_osd_event *event;
+
+       event = kmalloc(sizeof(*event), GFP_NOIO);
+       if (!event)
+               return -ENOMEM;
+
+       dout("create_event %p\n", event);
+       event->cb = event_cb;
+       event->one_shot = one_shot;
+       event->data = data;
+       event->osdc = osdc;
+       INIT_LIST_HEAD(&event->osd_node);
+       kref_init(&event->kref);   /* one ref for us */
+       kref_get(&event->kref);    /* one ref for the caller */
+       init_completion(&event->completion);
+
+       spin_lock(&osdc->event_lock);
+       event->cookie = ++osdc->event_count;
+       __insert_event(osdc, event);
+       spin_unlock(&osdc->event_lock);
+
+       *pevent = event;
+       return 0;
+}
+EXPORT_SYMBOL(ceph_osdc_create_event);
+
+void ceph_osdc_cancel_event(struct ceph_osd_event *event)
+{
+       struct ceph_osd_client *osdc = event->osdc;
+
+       dout("cancel_event %p\n", event);
+       spin_lock(&osdc->event_lock);
+       __remove_event(event);
+       spin_unlock(&osdc->event_lock);
+       ceph_osdc_put_event(event); /* caller's */
+}
+EXPORT_SYMBOL(ceph_osdc_cancel_event);
+
+
+static void do_event_work(struct work_struct *work)
+{
+       struct ceph_osd_event_work *event_work =
+               container_of(work, struct ceph_osd_event_work, work);
+       struct ceph_osd_event *event = event_work->event;
+       u64 ver = event_work->ver;
+       u64 notify_id = event_work->notify_id;
+       u8 opcode = event_work->opcode;
+
+       dout("do_event_work completing %p\n", event);
+       event->cb(ver, notify_id, opcode, event->data);
+       complete(&event->completion);
+       dout("do_event_work completed %p\n", event);
+       ceph_osdc_put_event(event);
+       kfree(event_work);
+}
+
+
+/*
+ * Process osd watch notifications
+ */
+void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+       void *p, *end;
+       u8 proto_ver;
+       u64 cookie, ver, notify_id;
+       u8 opcode;
+       struct ceph_osd_event *event;
+       struct ceph_osd_event_work *event_work;
+
+       p = msg->front.iov_base;
+       end = p + msg->front.iov_len;
+
+       ceph_decode_8_safe(&p, end, proto_ver, bad);
+       ceph_decode_8_safe(&p, end, opcode, bad);
+       ceph_decode_64_safe(&p, end, cookie, bad);
+       ceph_decode_64_safe(&p, end, ver, bad);
+       ceph_decode_64_safe(&p, end, notify_id, bad);
+
+       spin_lock(&osdc->event_lock);
+       event = __find_event(osdc, cookie);
+       if (event) {
+               get_event(event);
+               if (event->one_shot)
+                       __remove_event(event);
+       }
+       spin_unlock(&osdc->event_lock);
+       dout("handle_watch_notify cookie %lld ver %lld event %p\n",
+            cookie, ver, event);
+       if (event) {
+               event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
+               INIT_WORK(&event_work->work, do_event_work);
+               if (!event_work) {
+                       dout("ERROR: could not allocate event_work\n");
+                       goto done_err;
+               }
+               event_work->event = event;
+               event_work->ver = ver;
+               event_work->notify_id = notify_id;
+               event_work->opcode = opcode;
+               if (!queue_work(osdc->notify_wq, &event_work->work)) {
+                       dout("WARNING: failed to queue notify event work\n");
+                       goto done_err;
+               }
+       }
+
+       return;
+
+done_err:
+       complete(&event->completion);
+       ceph_osdc_put_event(event);
+       return;
+
+bad:
+       pr_err("osdc handle_watch_notify corrupt msg\n");
+       return;
+}
+
+int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout)
+{
+       int err;
+
+       dout("wait_event %p\n", event);
+       err = wait_for_completion_interruptible_timeout(&event->completion,
+                                                       timeout * HZ);
+       ceph_osdc_put_event(event);
+       if (err > 0)
+               err = 0;
+       dout("wait_event %p returns %d\n", event, err);
+       return err;
+}
+EXPORT_SYMBOL(ceph_osdc_wait_event);
+
 /*
  * Register request, send initial attempt.
  */
@@ -1347,15 +1670,22 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
         * the request still han't been touched yet.
         */
        if (req->r_sent == 0) {
-               rc = __send_request(osdc, req);
-               if (rc) {
-                       if (nofail) {
-                               dout("osdc_start_request failed send, "
-                                    " marking %lld\n", req->r_tid);
-                               req->r_resend = true;
-                               rc = 0;
-                       } else {
-                               __unregister_request(osdc, req);
+               rc = __map_request(osdc, req);
+               if (rc < 0)
+                       return rc;
+               if (req->r_osd == NULL) {
+                       dout("send_request %p no up osds in pg\n", req);
+                       ceph_monc_request_next_osdmap(&osdc->client->monc);
+               } else {
+                       rc = __send_request(osdc, req);
+                       if (rc) {
+                               if (nofail) {
+                                       dout("osdc_start_request failed send, "
+                                            " will retry %lld\n", req->r_tid);
+                                       rc = 0;
+                               } else {
+                                       __unregister_request(osdc, req);
+                               }
                        }
                }
        }
@@ -1441,9 +1771,15 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
        INIT_LIST_HEAD(&osdc->req_lru);
+       INIT_LIST_HEAD(&osdc->req_unsent);
+       INIT_LIST_HEAD(&osdc->req_notarget);
+       INIT_LIST_HEAD(&osdc->req_linger);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+       spin_lock_init(&osdc->event_lock);
+       osdc->event_tree = RB_ROOT;
+       osdc->event_count = 0;
 
        schedule_delayed_work(&osdc->osds_timeout_work,
           round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
@@ -1463,6 +1799,13 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
                                "osd_op_reply");
        if (err < 0)
                goto out_msgpool;
+
+       osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
+       if (IS_ERR(osdc->notify_wq)) {
+               err = PTR_ERR(osdc->notify_wq);
+               osdc->notify_wq = NULL;
+               goto out_msgpool;
+       }
        return 0;
 
 out_msgpool:
@@ -1476,6 +1819,8 @@ EXPORT_SYMBOL(ceph_osdc_init);
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
+       flush_workqueue(osdc->notify_wq);
+       destroy_workqueue(osdc->notify_wq);
        cancel_delayed_work_sync(&osdc->timeout_work);
        cancel_delayed_work_sync(&osdc->osds_timeout_work);
        if (osdc->osdmap) {
@@ -1483,6 +1828,7 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
                osdc->osdmap = NULL;
        }
        remove_old_osds(osdc, 1);
+       WARN_ON(!RB_EMPTY_ROOT(&osdc->osds));
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1591,6 +1937,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        case CEPH_MSG_OSD_OPREPLY:
                handle_reply(osdc, msg, con);
                break;
+       case CEPH_MSG_WATCH_NOTIFY:
+               handle_watch_notify(osdc, msg);
+               break;
 
        default:
                pr_err("received unknown message type %d %s\n", type,
@@ -1684,6 +2033,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
 
        switch (type) {
        case CEPH_MSG_OSD_MAP:
+       case CEPH_MSG_WATCH_NOTIFY:
                return ceph_msg_new(type, front, GFP_NOFS);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);