Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 13 May 2010 01:47:29 +0000 (18:47 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 13 May 2010 01:47:29 +0000 (18:47 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: preserve seq # on requeued messages after transient transport errors
  ceph: fix cap removal races
  ceph: zero unused message header, footer fields
  ceph: fix locking for waking session requests after reconnect
  ceph: resubmit requests on pg mapping change (not just primary change)
  ceph: fix open file counting on snapped inodes when mds returns no caps
  ceph: unregister osd request on failure
  ceph: don't use writeback_control in writepages completion
  ceph: unregister bdi before kill_anon_super releases device name

12 files changed:
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/messenger.c
fs/ceph/messenger.h
fs/ceph/osd_client.c
fs/ceph/osd_client.h
fs/ceph/osdmap.c
fs/ceph/osdmap.h
fs/ceph/rados.h
fs/ceph/super.c

index 4b42c2b..a9005d8 100644 (file)
@@ -504,7 +504,6 @@ static void writepages_finish(struct ceph_osd_request *req,
        int i;
        struct ceph_snap_context *snapc = req->r_snapc;
        struct address_space *mapping = inode->i_mapping;
-       struct writeback_control *wbc = req->r_wbc;
        __s32 rc = -EIO;
        u64 bytes = 0;
        struct ceph_client *client = ceph_inode_to_client(inode);
@@ -546,10 +545,6 @@ static void writepages_finish(struct ceph_osd_request *req,
                        clear_bdi_congested(&client->backing_dev_info,
                                            BLK_RW_ASYNC);
 
-               if (i >= wrote) {
-                       dout("inode %p skipping page %p\n", inode, page);
-                       wbc->pages_skipped++;
-               }
                ceph_put_snap_context((void *)page->private);
                page->private = 0;
                ClearPagePrivate(page);
@@ -799,7 +794,6 @@ get_more_pages:
                                alloc_page_vec(client, req);
                                req->r_callback = writepages_finish;
                                req->r_inode = inode;
-                               req->r_wbc = wbc;
                        }
 
                        /* note position of first page in pvec */
index 0c16818..d940053 100644 (file)
@@ -858,6 +858,8 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 }
 
 /*
+ * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
+ *
  * caller should hold i_lock.
  * caller will not hold session s_mutex if called from destroy_inode.
  */
@@ -866,15 +868,10 @@ void __ceph_remove_cap(struct ceph_cap *cap)
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
        struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+       int removed = 0;
 
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
-       /* remove from inode list */
-       rb_erase(&cap->ci_node, &ci->i_caps);
-       cap->ci = NULL;
-       if (ci->i_auth_cap == cap)
-               ci->i_auth_cap = NULL;
-
        /* remove from session list */
        spin_lock(&session->s_cap_lock);
        if (session->s_cap_iterator == cap) {
@@ -885,10 +882,18 @@ void __ceph_remove_cap(struct ceph_cap *cap)
                list_del_init(&cap->session_caps);
                session->s_nr_caps--;
                cap->session = NULL;
+               removed = 1;
        }
+       /* protect backpointer with s_cap_lock: see iterate_session_caps */
+       cap->ci = NULL;
        spin_unlock(&session->s_cap_lock);
 
-       if (cap->session == NULL)
+       /* remove from inode list */
+       rb_erase(&cap->ci_node, &ci->i_caps);
+       if (ci->i_auth_cap == cap)
+               ci->i_auth_cap = NULL;
+
+       if (removed)
                ceph_put_cap(cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
index 261f3e6..85b4d2f 100644 (file)
@@ -733,6 +733,10 @@ no_change:
                                __ceph_get_fmode(ci, cap_fmode);
                        spin_unlock(&inode->i_lock);
                }
+       } else if (cap_fmode >= 0) {
+               pr_warning("mds issued no caps on %llx.%llx\n",
+                          ceph_vinop(inode));
+               __ceph_get_fmode(ci, cap_fmode);
        }
 
        /* update delegation info? */
index 60a9a4a..24561a5 100644 (file)
@@ -736,9 +736,10 @@ static void cleanup_cap_releases(struct ceph_mds_session *session)
 }
 
 /*
- * Helper to safely iterate over all caps associated with a session.
+ * Helper to safely iterate over all caps associated with a session, with
+ * special care taken to handle a racing __ceph_remove_cap().
  *
- * caller must hold session s_mutex
+ * Caller must hold session s_mutex.
  */
 static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
@@ -2136,7 +2137,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
        struct ceph_mds_session *session = NULL;
        struct ceph_msg *reply;
        struct rb_node *p;
-       int err;
+       int err = -ENOMEM;
        struct ceph_pagelist *pagelist;
 
        pr_info("reconnect to recovering mds%d\n", mds);
@@ -2185,7 +2186,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
                goto fail;
        err = iterate_session_caps(session, encode_caps_cb, pagelist);
        if (err < 0)
-               goto out;
+               goto fail;
 
        /*
         * snaprealms.  we provide mds with the ino, seq (version), and
@@ -2213,28 +2214,31 @@ send:
        reply->nr_pages = calc_pages_for(0, pagelist->length);
        ceph_con_send(&session->s_con, reply);
 
-       if (session) {
-               session->s_state = CEPH_MDS_SESSION_OPEN;
-               __wake_requests(mdsc, &session->s_waiting);
-       }
+       session->s_state = CEPH_MDS_SESSION_OPEN;
+       mutex_unlock(&session->s_mutex);
+
+       mutex_lock(&mdsc->mutex);
+       __wake_requests(mdsc, &session->s_waiting);
+       mutex_unlock(&mdsc->mutex);
+
+       ceph_put_mds_session(session);
 
-out:
        up_read(&mdsc->snap_rwsem);
-       if (session) {
-               mutex_unlock(&session->s_mutex);
-               ceph_put_mds_session(session);
-       }
        mutex_lock(&mdsc->mutex);
        return;
 
 fail:
        ceph_msg_put(reply);
+       up_read(&mdsc->snap_rwsem);
+       mutex_unlock(&session->s_mutex);
+       ceph_put_mds_session(session);
 fail_nomsg:
        ceph_pagelist_release(pagelist);
        kfree(pagelist);
 fail_nopagelist:
-       pr_err("ENOMEM preparing reconnect for mds%d\n", mds);
-       goto out;
+       pr_err("error %d preparing reconnect for mds%d\n", err, mds);
+       mutex_lock(&mdsc->mutex);
+       return;
 }
 
 
index 509f57d..cd4fadb 100644 (file)
@@ -492,7 +492,14 @@ static void prepare_write_message(struct ceph_connection *con)
                list_move_tail(&m->list_head, &con->out_sent);
        }
 
-       m->hdr.seq = cpu_to_le64(++con->out_seq);
+       /*
+        * only assign outgoing seq # if we haven't sent this message
+        * yet.  if it is requeued, resend with it's original seq.
+        */
+       if (m->needs_out_seq) {
+               m->hdr.seq = cpu_to_le64(++con->out_seq);
+               m->needs_out_seq = false;
+       }
 
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -1986,6 +1993,8 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 
        BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
 
+       msg->needs_out_seq = true;
+
        /* queue */
        mutex_lock(&con->mutex);
        BUG_ON(!list_empty(&msg->list_head));
@@ -2085,15 +2094,19 @@ struct ceph_msg *ceph_msg_new(int type, int front_len,
        kref_init(&m->kref);
        INIT_LIST_HEAD(&m->list_head);
 
+       m->hdr.tid = 0;
        m->hdr.type = cpu_to_le16(type);
+       m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+       m->hdr.version = 0;
        m->hdr.front_len = cpu_to_le32(front_len);
        m->hdr.middle_len = 0;
        m->hdr.data_len = cpu_to_le32(page_len);
        m->hdr.data_off = cpu_to_le16(page_off);
-       m->hdr.priority = cpu_to_le16(CEPH_MSG_PRIO_DEFAULT);
+       m->hdr.reserved = 0;
        m->footer.front_crc = 0;
        m->footer.middle_crc = 0;
        m->footer.data_crc = 0;
+       m->footer.flags = 0;
        m->front_max = front_len;
        m->front_is_vmalloc = false;
        m->more_to_follow = false;
index a343dae..a5caf91 100644 (file)
@@ -86,6 +86,7 @@ struct ceph_msg {
        struct kref kref;
        bool front_is_vmalloc;
        bool more_to_follow;
+       bool needs_out_seq;
        int front_max;
 
        struct ceph_msgpool *pool;
index c7b4ded..3514f71 100644 (file)
@@ -565,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
-       int o = -1;
+       int acting[CEPH_PG_MAX_SIZE];
+       int o = -1, num = 0;
        int err;
 
        dout("map_osds %p tid %lld\n", req, req->r_tid);
@@ -576,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc,
        pgid = reqhead->layout.ol_pgid;
        req->r_pgid = pgid;
 
-       o = ceph_calc_pg_primary(osdc->osdmap, pgid);
+       err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
+       if (err > 0) {
+               o = acting[0];
+               num = err;
+       }
 
        if ((req->r_osd && req->r_osd->o_osd == o &&
-            req->r_sent >= req->r_osd->o_incarnation) ||
+            req->r_sent >= req->r_osd->o_incarnation &&
+            req->r_num_pg_osds == num &&
+            memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
@@ -587,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
+       /* record full pg acting set */
+       memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
+       req->r_num_pg_osds = num;
+
        if (req->r_osd) {
                __cancel_request(req);
                list_del_init(&req->r_osd_item);
@@ -612,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
                __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
        }
-       err = 1;   /* osd changed */
+       err = 1;   /* osd or pg changed */
 
 out:
        return err;
@@ -779,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        struct ceph_osd_request *req;
        u64 tid;
        int numops, object_len, flags;
+       s32 result;
 
        tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*rhead))
                goto bad;
        numops = le32_to_cpu(rhead->num_ops);
        object_len = le32_to_cpu(rhead->object_len);
+       result = le32_to_cpu(rhead->result);
        if (msg->front.iov_len != sizeof(*rhead) + object_len +
            numops * sizeof(struct ceph_osd_op))
                goto bad;
-       dout("handle_reply %p tid %llu\n", msg, tid);
+       dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
 
        /* lookup */
        mutex_lock(&osdc->request_mutex);
@@ -834,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        dout("handle_reply tid %llu flags %d\n", tid, flags);
 
        /* either this is a read, or we got the safe response */
-       if ((flags & CEPH_OSD_FLAG_ONDISK) ||
+       if (result < 0 ||
+           (flags & CEPH_OSD_FLAG_ONDISK) ||
            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
                __unregister_request(osdc, req);
 
index b075991..ce77698 100644 (file)
@@ -48,6 +48,8 @@ struct ceph_osd_request {
        struct list_head r_osd_item;
        struct ceph_osd *r_osd;
        struct ceph_pg   r_pgid;
+       int              r_pg_osds[CEPH_PG_MAX_SIZE];
+       int              r_num_pg_osds;
 
        struct ceph_connection *r_con_filling_msg;
 
@@ -66,7 +68,6 @@ struct ceph_osd_request {
        struct list_head  r_unsafe_item;
 
        struct inode *r_inode;                /* for use by callbacks */
-       struct writeback_control *r_wbc;      /* ditto */
 
        char              r_oid[40];          /* object name */
        int               r_oid_len;
index 2e2c15e..cfdd8f4 100644 (file)
@@ -1040,13 +1040,34 @@ static int *calc_pg_raw(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
        return osds;
 }
 
+/*
+ * Return acting set for given pgid.
+ */
+int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
+                       int *acting)
+{
+       int rawosds[CEPH_PG_MAX_SIZE], *osds;
+       int i, o, num = CEPH_PG_MAX_SIZE;
+
+       osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
+       if (!osds)
+               return -1;
+
+       /* primary is first up osd */
+       o = 0;
+       for (i = 0; i < num; i++)
+               if (ceph_osd_is_up(osdmap, osds[i]))
+                       acting[o++] = osds[i];
+       return o;
+}
+
 /*
  * Return primary osd for given pgid, or -1 if none.
  */
 int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
 {
-       int rawosds[10], *osds;
-       int i, num = ARRAY_SIZE(rawosds);
+       int rawosds[CEPH_PG_MAX_SIZE], *osds;
+       int i, num = CEPH_PG_MAX_SIZE;
 
        osds = calc_pg_raw(osdmap, pgid, rawosds, &num);
        if (!osds)
@@ -1054,9 +1075,7 @@ int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
 
        /* primary is first up osd */
        for (i = 0; i < num; i++)
-               if (ceph_osd_is_up(osdmap, osds[i])) {
+               if (ceph_osd_is_up(osdmap, osds[i]))
                        return osds[i];
-                       break;
-               }
        return -1;
 }
index 8bc9f1e..970b547 100644 (file)
@@ -120,6 +120,8 @@ extern int ceph_calc_object_layout(struct ceph_object_layout *ol,
                                   const char *oid,
                                   struct ceph_file_layout *fl,
                                   struct ceph_osdmap *osdmap);
+extern int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
+                              int *acting);
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
                                struct ceph_pg pgid);
 
index a1fc1d0..fd56451 100644 (file)
@@ -58,6 +58,7 @@ struct ceph_timespec {
 #define CEPH_PG_LAYOUT_LINEAR 2
 #define CEPH_PG_LAYOUT_HYBRID 3
 
+#define CEPH_PG_MAX_SIZE      16  /* max # osds in a single pg */
 
 /*
  * placement group.
index f888cf4..110857b 100644 (file)
@@ -47,10 +47,20 @@ const char *ceph_file_part(const char *s, int len)
  */
 static void ceph_put_super(struct super_block *s)
 {
-       struct ceph_client *cl = ceph_client(s);
+       struct ceph_client *client = ceph_sb_to_client(s);
 
        dout("put_super\n");
-       ceph_mdsc_close_sessions(&cl->mdsc);
+       ceph_mdsc_close_sessions(&client->mdsc);
+
+       /*
+        * ensure we release the bdi before put_anon_super releases
+        * the device name.
+        */
+       if (s->s_bdi == &client->backing_dev_info) {
+               bdi_unregister(&client->backing_dev_info);
+               s->s_bdi = NULL;
+       }
+
        return;
 }
 
@@ -636,6 +646,8 @@ static void ceph_destroy_client(struct ceph_client *client)
        destroy_workqueue(client->pg_inv_wq);
        destroy_workqueue(client->trunc_wq);
 
+       bdi_destroy(&client->backing_dev_info);
+
        if (client->msgr)
                ceph_messenger_destroy(client->msgr);
        mempool_destroy(client->wb_pagevec_pool);
@@ -876,14 +888,14 @@ static int ceph_register_bdi(struct super_block *sb, struct ceph_client *client)
 {
        int err;
 
-       sb->s_bdi = &client->backing_dev_info;
-
        /* set ra_pages based on rsize mount option? */
        if (client->mount_args->rsize >= PAGE_CACHE_SIZE)
                client->backing_dev_info.ra_pages =
                        (client->mount_args->rsize + PAGE_CACHE_SIZE - 1)
                        >> PAGE_SHIFT;
        err = bdi_register_dev(&client->backing_dev_info, sb->s_dev);
+       if (!err)
+               sb->s_bdi = &client->backing_dev_info;
        return err;
 }
 
@@ -957,9 +969,6 @@ static void ceph_kill_sb(struct super_block *s)
        dout("kill_sb %p\n", s);
        ceph_mdsc_pre_umount(&client->mdsc);
        kill_anon_super(s);    /* will call put_super after sb is r/o */
-       if (s->s_bdi == &client->backing_dev_info)
-               bdi_unregister(&client->backing_dev_info);
-       bdi_destroy(&client->backing_dev_info);
        ceph_destroy_client(client);
 }