Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Sat, 28 Aug 2010 21:07:20 +0000 (14:07 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sat, 28 Aug 2010 21:07:20 +0000 (14:07 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: fix get_ticket_handler() error handling
  ceph: don't BUG on ENOMEM during mds reconnect
  ceph: ceph_mdsc_build_path() returns an ERR_PTR
  ceph: Fix warnings
  ceph: ceph_get_inode() returns an ERR_PTR
  ceph: initialize fields on new dentry_infos
  ceph: maintain i_head_snapc when any caps are dirty, not just for data
  ceph: fix osd request lru adjustment when sending request
  ceph: don't improperly set dir complete when holding EXCL cap
  mm: exporting account_page_dirty
  ceph: direct requests in snapped namespace based on nonsnap parent
  ceph: queue cap snap writeback for realm children on snap update
  ceph: include dirty xattrs state in snapped caps
  ceph: fix xattr cap writeback
  ceph: fix multiple mds session shutdown

14 files changed:
fs/ceph/addr.c
fs/ceph/auth_x.c
fs/ceph/caps.c
fs/ceph/debugfs.c
fs/ceph/dir.c
fs/ceph/inode.c
fs/ceph/locks.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/osd_client.c
fs/ceph/snap.c
fs/ceph/super.h
fs/ceph/xattr.c
mm/page-writeback.c

index 5598a0d..4cfce1e 100644 (file)
@@ -87,7 +87,7 @@ static int ceph_set_page_dirty(struct page *page)
 
        /* dirty the head */
        spin_lock(&inode->i_lock);
-       if (ci->i_wrbuffer_ref_head == 0)
+       if (ci->i_head_snapc == NULL)
                ci->i_head_snapc = ceph_get_snap_context(snapc);
        ++ci->i_wrbuffer_ref_head;
        if (ci->i_wrbuffer_ref == 0)
@@ -105,13 +105,7 @@ static int ceph_set_page_dirty(struct page *page)
        spin_lock_irq(&mapping->tree_lock);
        if (page->mapping) {    /* Race with truncate? */
                WARN_ON_ONCE(!PageUptodate(page));
-
-               if (mapping_cap_account_dirty(mapping)) {
-                       __inc_zone_page_state(page, NR_FILE_DIRTY);
-                       __inc_bdi_stat(mapping->backing_dev_info,
-                                       BDI_RECLAIMABLE);
-                       task_io_account_write(PAGE_CACHE_SIZE);
-               }
+               account_page_dirtied(page, page->mapping);
                radix_tree_tag_set(&mapping->page_tree,
                                page_index(page), PAGECACHE_TAG_DIRTY);
 
@@ -352,7 +346,7 @@ static struct ceph_snap_context *get_oldest_context(struct inode *inode,
                        break;
                }
        }
-       if (!snapc && ci->i_head_snapc) {
+       if (!snapc && ci->i_wrbuffer_ref_head) {
                snapc = ceph_get_snap_context(ci->i_head_snapc);
                dout(" head snapc %p has %d dirty pages\n",
                     snapc, ci->i_wrbuffer_ref_head);
index 582e0b2..a2d002c 100644 (file)
@@ -376,7 +376,7 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
 
                th = get_ticket_handler(ac, service);
 
-               if (!th) {
+               if (IS_ERR(th)) {
                        *pneed |= service;
                        continue;
                }
@@ -399,6 +399,9 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
        struct ceph_x_ticket_handler *th =
                get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH);
 
+       if (IS_ERR(th))
+               return PTR_ERR(th);
+
        ceph_x_validate_tickets(ac, &need);
 
        dout("build_request want %x have %x need %x\n",
@@ -450,7 +453,6 @@ static int ceph_x_build_request(struct ceph_auth_client *ac,
                        return -ERANGE;
                head->op = cpu_to_le16(CEPHX_GET_PRINCIPAL_SESSION_KEY);
 
-               BUG_ON(!th);
                ret = ceph_x_build_authorizer(ac, th, &xi->auth_authorizer);
                if (ret)
                        return ret;
@@ -505,7 +507,8 @@ static int ceph_x_handle_reply(struct ceph_auth_client *ac, int result,
 
        case CEPHX_GET_PRINCIPAL_SESSION_KEY:
                th = get_ticket_handler(ac, CEPH_ENTITY_TYPE_AUTH);
-               BUG_ON(!th);
+               if (IS_ERR(th))
+                       return PTR_ERR(th);
                ret = ceph_x_proc_ticket_reply(ac, &th->session_key,
                                               buf + sizeof(*head), end);
                break;
@@ -563,8 +566,8 @@ static int ceph_x_verify_authorizer_reply(struct ceph_auth_client *ac,
        void *end = p + sizeof(au->reply_buf);
 
        th = get_ticket_handler(ac, au->service);
-       if (!th)
-               return -EIO;  /* hrm! */
+       if (IS_ERR(th))
+               return PTR_ERR(th);
        ret = ceph_x_decrypt(&th->session_key, &p, end, &reply, sizeof(reply));
        if (ret < 0)
                return ret;
@@ -626,7 +629,7 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
        struct ceph_x_ticket_handler *th;
 
        th = get_ticket_handler(ac, peer_type);
-       if (th && !IS_ERR(th))
+       if (!IS_ERR(th))
                remove_ticket_handler(ac, th);
 }
 
index 7bf182b..a2069b6 100644 (file)
@@ -1082,6 +1082,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        gid_t gid;
        struct ceph_mds_session *session;
        u64 xattr_version = 0;
+       struct ceph_buffer *xattr_blob = NULL;
        int delayed = 0;
        u64 flush_tid = 0;
        int i;
@@ -1142,6 +1143,10 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
                for (i = 0; i < CEPH_CAP_BITS; i++)
                        if (flushing & (1 << i))
                                ci->i_cap_flush_tid[i] = flush_tid;
+
+               follows = ci->i_head_snapc->seq;
+       } else {
+               follows = 0;
        }
 
        keep = cap->implemented;
@@ -1155,14 +1160,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        mtime = inode->i_mtime;
        atime = inode->i_atime;
        time_warp_seq = ci->i_time_warp_seq;
-       follows = ci->i_snap_realm->cached_context->seq;
        uid = inode->i_uid;
        gid = inode->i_gid;
        mode = inode->i_mode;
 
-       if (dropping & CEPH_CAP_XATTR_EXCL) {
+       if (flushing & CEPH_CAP_XATTR_EXCL) {
                __ceph_build_xattrs_blob(ci);
-               xattr_version = ci->i_xattrs.version + 1;
+               xattr_blob = ci->i_xattrs.blob;
+               xattr_version = ci->i_xattrs.version;
        }
 
        spin_unlock(&inode->i_lock);
@@ -1170,9 +1175,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
                op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
                size, max_size, &mtime, &atime, time_warp_seq,
-               uid, gid, mode,
-               xattr_version,
-               (flushing & CEPH_CAP_XATTR_EXCL) ? ci->i_xattrs.blob : NULL,
+               uid, gid, mode, xattr_version, xattr_blob,
                follows);
        if (ret < 0) {
                dout("error sending cap msg, must requeue %p\n", inode);
@@ -1282,7 +1285,7 @@ retry:
                             &capsnap->mtime, &capsnap->atime,
                             capsnap->time_warp_seq,
                             capsnap->uid, capsnap->gid, capsnap->mode,
-                            0, NULL,
+                            capsnap->xattr_version, capsnap->xattr_blob,
                             capsnap->follows);
 
                next_follows = capsnap->follows + 1;
@@ -1332,7 +1335,11 @@ void __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
             ceph_cap_string(was | mask));
        ci->i_dirty_caps |= mask;
        if (was == 0) {
-               dout(" inode %p now dirty\n", &ci->vfs_inode);
+               if (!ci->i_head_snapc)
+                       ci->i_head_snapc = ceph_get_snap_context(
+                               ci->i_snap_realm->cached_context);
+               dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
+                       ci->i_head_snapc);
                BUG_ON(!list_empty(&ci->i_dirty_item));
                spin_lock(&mdsc->cap_dirty_lock);
                list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
@@ -2190,7 +2197,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 
        if (ci->i_head_snapc == snapc) {
                ci->i_wrbuffer_ref_head -= nr;
-               if (!ci->i_wrbuffer_ref_head) {
+               if (ci->i_wrbuffer_ref_head == 0 &&
+                   ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+                       BUG_ON(!ci->i_head_snapc);
                        ceph_put_snap_context(ci->i_head_snapc);
                        ci->i_head_snapc = NULL;
                }
@@ -2483,6 +2492,11 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                        dout(" inode %p now clean\n", inode);
                        BUG_ON(!list_empty(&ci->i_dirty_item));
                        drop = 1;
+                       if (ci->i_wrbuffer_ref_head == 0) {
+                               BUG_ON(!ci->i_head_snapc);
+                               ceph_put_snap_context(ci->i_head_snapc);
+                               ci->i_head_snapc = NULL;
+                       }
                } else {
                        BUG_ON(list_empty(&ci->i_dirty_item));
                }
index 360c4f2..6fd8b20 100644 (file)
@@ -171,6 +171,8 @@ static int mdsc_show(struct seq_file *s, void *p)
                } else if (req->r_dentry) {
                        path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
                                                    &pathbase, 0);
+                       if (IS_ERR(path))
+                               path = NULL;
                        spin_lock(&req->r_dentry->d_lock);
                        seq_printf(s, " #%llx/%.*s (%s)",
                                   ceph_ino(req->r_dentry->d_parent->d_inode),
@@ -187,6 +189,8 @@ static int mdsc_show(struct seq_file *s, void *p)
                if (req->r_old_dentry) {
                        path = ceph_mdsc_build_path(req->r_old_dentry, &pathlen,
                                                    &pathbase, 0);
+                       if (IS_ERR(path))
+                               path = NULL;
                        spin_lock(&req->r_old_dentry->d_lock);
                        seq_printf(s, " #%llx/%.*s (%s)",
                           ceph_ino(req->r_old_dentry->d_parent->d_inode),
index 67bbb41..6e4f43f 100644 (file)
@@ -46,7 +46,7 @@ int ceph_init_dentry(struct dentry *dentry)
        else
                dentry->d_op = &ceph_snap_dentry_ops;
 
-       di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS);
+       di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO);
        if (!di)
                return -ENOMEM;          /* oh well */
 
index 5d893d3..e7cca41 100644 (file)
@@ -677,6 +677,7 @@ static int fill_inode(struct inode *inode,
                if (ci->i_files == 0 && ci->i_subdirs == 0 &&
                    ceph_snap(inode) == CEPH_NOSNAP &&
                    (le32_to_cpu(info->cap.caps) & CEPH_CAP_FILE_SHARED) &&
+                   (issued & CEPH_CAP_FILE_EXCL) == 0 &&
                    (ci->i_ceph_flags & CEPH_I_COMPLETE) == 0) {
                        dout(" marking %p complete (empty)\n", inode);
                        ci->i_ceph_flags |= CEPH_I_COMPLETE;
@@ -1229,11 +1230,11 @@ retry_lookup:
                        in = dn->d_inode;
                } else {
                        in = ceph_get_inode(parent->d_sb, vino);
-                       if (in == NULL) {
+                       if (IS_ERR(in)) {
                                dout("new_inode badness\n");
                                d_delete(dn);
                                dput(dn);
-                               err = -ENOMEM;
+                               err = PTR_ERR(in);
                                goto out;
                        }
                        dn = splice_dentry(dn, in, NULL);
index ae85af0..ff4e753 100644 (file)
@@ -82,7 +82,8 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
                length = fl->fl_end - fl->fl_start + 1;
 
        err = ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
-                               (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                               (u64)fl->fl_pid,
+                               (u64)(unsigned long)fl->fl_nspid,
                                lock_cmd, fl->fl_start,
                                length, wait);
        if (!err) {
@@ -92,7 +93,8 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
                        /* undo! This should only happen if the kernel detects
                         * local deadlock. */
                        ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
-                                         (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                                         (u64)fl->fl_pid,
+                                         (u64)(unsigned long)fl->fl_nspid,
                                          CEPH_LOCK_UNLOCK, fl->fl_start,
                                          length, 0);
                        dout("got %d on posix_lock_file, undid lock", err);
@@ -132,7 +134,8 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
                length = fl->fl_end - fl->fl_start + 1;
 
        err = ceph_lock_message(CEPH_LOCK_FLOCK, CEPH_MDS_OP_SETFILELOCK,
-                               file, (u64)fl->fl_pid, (u64)fl->fl_nspid,
+                               file, (u64)fl->fl_pid,
+                               (u64)(unsigned long)fl->fl_nspid,
                                lock_cmd, fl->fl_start,
                                length, wait);
        if (!err) {
@@ -141,7 +144,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
                        ceph_lock_message(CEPH_LOCK_FLOCK,
                                          CEPH_MDS_OP_SETFILELOCK,
                                          file, (u64)fl->fl_pid,
-                                         (u64)fl->fl_nspid,
+                                         (u64)(unsigned long)fl->fl_nspid,
                                          CEPH_LOCK_UNLOCK, fl->fl_start,
                                          length, 0);
                        dout("got %d on flock_lock_file_wait, undid lock", err);
@@ -235,7 +238,8 @@ int lock_to_ceph_filelock(struct file_lock *lock,
        cephlock->length = cpu_to_le64(lock->fl_end - lock->fl_start + 1);
        cephlock->client = cpu_to_le64(0);
        cephlock->pid = cpu_to_le64(lock->fl_pid);
-       cephlock->pid_namespace = cpu_to_le64((u64)lock->fl_nspid);
+       cephlock->pid_namespace =
+               cpu_to_le64((u64)(unsigned long)lock->fl_nspid);
 
        switch (lock->fl_type) {
        case F_RDLCK:
index a75ddbf..f091b13 100644 (file)
@@ -560,6 +560,13 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
  *
  * Called under mdsc->mutex.
  */
+struct dentry *get_nonsnap_parent(struct dentry *dentry)
+{
+       while (!IS_ROOT(dentry) && ceph_snap(dentry->d_inode) != CEPH_NOSNAP)
+               dentry = dentry->d_parent;
+       return dentry;
+}
+
 static int __choose_mds(struct ceph_mds_client *mdsc,
                        struct ceph_mds_request *req)
 {
@@ -590,14 +597,29 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
        if (req->r_inode) {
                inode = req->r_inode;
        } else if (req->r_dentry) {
-               if (req->r_dentry->d_inode) {
+               struct inode *dir = req->r_dentry->d_parent->d_inode;
+
+               if (dir->i_sb != mdsc->client->sb) {
+                       /* not this fs! */
+                       inode = req->r_dentry->d_inode;
+               } else if (ceph_snap(dir) != CEPH_NOSNAP) {
+                       /* direct snapped/virtual snapdir requests
+                        * based on parent dir inode */
+                       struct dentry *dn =
+                               get_nonsnap_parent(req->r_dentry->d_parent);
+                       inode = dn->d_inode;
+                       dout("__choose_mds using nonsnap parent %p\n", inode);
+               } else if (req->r_dentry->d_inode) {
+                       /* dentry target */
                        inode = req->r_dentry->d_inode;
                } else {
-                       inode = req->r_dentry->d_parent->d_inode;
+                       /* dir + name */
+                       inode = dir;
                        hash = req->r_dentry->d_name.hash;
                        is_hash = true;
                }
        }
+
        dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash,
             (int)hash, mode);
        if (!inode)
@@ -2208,7 +2230,7 @@ static void handle_session(struct ceph_mds_session *session,
                        pr_info("mds%d reconnect denied\n", session->s_mds);
                remove_session_caps(session);
                wake = 1; /* for good measure */
-               complete_all(&mdsc->session_close_waiters);
+               wake_up_all(&mdsc->session_close_wq);
                kick_requests(mdsc, mds);
                break;
 
@@ -2302,7 +2324,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
                if (IS_ERR(path)) {
                        err = PTR_ERR(path);
-                       BUG_ON(err);
+                       goto out_dput;
                }
        } else {
                path = NULL;
@@ -2310,7 +2332,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        }
        err = ceph_pagelist_encode_string(pagelist, path, pathlen);
        if (err)
-               goto out;
+               goto out_free;
 
        spin_lock(&inode->i_lock);
        cap->seq = 0;        /* reset cap seq */
@@ -2354,8 +2376,9 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
                unlock_kernel();
        }
 
-out:
+out_free:
        kfree(path);
+out_dput:
        dput(dentry);
        return err;
 }
@@ -2876,7 +2899,7 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
                return -ENOMEM;
 
        init_completion(&mdsc->safe_umount_waiters);
-       init_completion(&mdsc->session_close_waiters);
+       init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->sessions = NULL;
        mdsc->max_sessions = 0;
@@ -3021,6 +3044,23 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
        wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
 }
 
+/*
+ * true if all sessions are closed, or we force unmount
+ */
+bool done_closing_sessions(struct ceph_mds_client *mdsc)
+{
+       int i, n = 0;
+
+       if (mdsc->client->mount_state == CEPH_MOUNT_SHUTDOWN)
+               return true;
+
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++)
+               if (mdsc->sessions[i])
+                       n++;
+       mutex_unlock(&mdsc->mutex);
+       return n == 0;
+}
 
 /*
  * called after sb is ro.
@@ -3029,45 +3069,32 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
 {
        struct ceph_mds_session *session;
        int i;
-       int n;
        struct ceph_client *client = mdsc->client;
-       unsigned long started, timeout = client->mount_args->mount_timeout * HZ;
+       unsigned long timeout = client->mount_args->mount_timeout * HZ;
 
        dout("close_sessions\n");
 
-       mutex_lock(&mdsc->mutex);
-
        /* close sessions */
-       started = jiffies;
-       while (time_before(jiffies, started + timeout)) {
-               dout("closing sessions\n");
-               n = 0;
-               for (i = 0; i < mdsc->max_sessions; i++) {
-                       session = __ceph_lookup_mds_session(mdsc, i);
-                       if (!session)
-                               continue;
-                       mutex_unlock(&mdsc->mutex);
-                       mutex_lock(&session->s_mutex);
-                       __close_session(mdsc, session);
-                       mutex_unlock(&session->s_mutex);
-                       ceph_put_mds_session(session);
-                       mutex_lock(&mdsc->mutex);
-                       n++;
-               }
-               if (n == 0)
-                       break;
-
-               if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
-                       break;
-
-               dout("waiting for sessions to close\n");
+       mutex_lock(&mdsc->mutex);
+       for (i = 0; i < mdsc->max_sessions; i++) {
+               session = __ceph_lookup_mds_session(mdsc, i);
+               if (!session)
+                       continue;
                mutex_unlock(&mdsc->mutex);
-               wait_for_completion_timeout(&mdsc->session_close_waiters,
-                                           timeout);
+               mutex_lock(&session->s_mutex);
+               __close_session(mdsc, session);
+               mutex_unlock(&session->s_mutex);
+               ceph_put_mds_session(session);
                mutex_lock(&mdsc->mutex);
        }
+       mutex_unlock(&mdsc->mutex);
+
+       dout("waiting for sessions to close\n");
+       wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc),
+                          timeout);
 
        /* tear down remaining sessions */
+       mutex_lock(&mdsc->mutex);
        for (i = 0; i < mdsc->max_sessions; i++) {
                if (mdsc->sessions[i]) {
                        session = get_session(mdsc->sessions[i]);
@@ -3080,9 +3107,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
                        mutex_lock(&mdsc->mutex);
                }
        }
-
        WARN_ON(!list_empty(&mdsc->cap_delay_list));
-
        mutex_unlock(&mdsc->mutex);
 
        ceph_cleanup_empty_realms(mdsc);
index ab7e89f..c98267c 100644 (file)
@@ -234,7 +234,8 @@ struct ceph_mds_client {
        struct mutex            mutex;         /* all nested structures */
 
        struct ceph_mdsmap      *mdsmap;
-       struct completion       safe_umount_waiters, session_close_waiters;
+       struct completion       safe_umount_waiters;
+       wait_queue_head_t       session_close_wq;
        struct list_head        waiting_for_map;
 
        struct ceph_mds_session **sessions;    /* NULL for mds if no session */
index bed6391..dfced1d 100644 (file)
@@ -661,7 +661,7 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->reassert_version = req->r_reassert_version;
 
        req->r_stamp = jiffies;
-       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
+       list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
index c0b26b6..4868b9d 100644 (file)
@@ -435,7 +435,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap_snap *capsnap;
-       int used;
+       int used, dirty;
 
        capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
        if (!capsnap) {
@@ -445,6 +445,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 
        spin_lock(&inode->i_lock);
        used = __ceph_caps_used(ci);
+       dirty = __ceph_caps_dirty(ci);
        if (__ceph_have_pending_cap_snap(ci)) {
                /* there is no point in queuing multiple "pending" cap_snaps,
                   as no new writes are allowed to start when pending, so any
@@ -452,11 +453,15 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
                   cap_snap.  lucky us. */
                dout("queue_cap_snap %p already pending\n", inode);
                kfree(capsnap);
-       } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR)) {
+       } else if (ci->i_wrbuffer_ref_head || (used & CEPH_CAP_FILE_WR) ||
+                  (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+                            CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
                struct ceph_snap_context *snapc = ci->i_head_snapc;
 
+               dout("queue_cap_snap %p cap_snap %p queuing under %p\n", inode,
+                    capsnap, snapc);
                igrab(inode);
-
+               
                atomic_set(&capsnap->nref, 1);
                capsnap->ci = ci;
                INIT_LIST_HEAD(&capsnap->ci_item);
@@ -464,15 +469,21 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 
                capsnap->follows = snapc->seq - 1;
                capsnap->issued = __ceph_caps_issued(ci, NULL);
-               capsnap->dirty = __ceph_caps_dirty(ci);
+               capsnap->dirty = dirty;
 
                capsnap->mode = inode->i_mode;
                capsnap->uid = inode->i_uid;
                capsnap->gid = inode->i_gid;
 
-               /* fixme? */
-               capsnap->xattr_blob = NULL;
-               capsnap->xattr_len = 0;
+               if (dirty & CEPH_CAP_XATTR_EXCL) {
+                       __ceph_build_xattrs_blob(ci);
+                       capsnap->xattr_blob =
+                               ceph_buffer_get(ci->i_xattrs.blob);
+                       capsnap->xattr_version = ci->i_xattrs.version;
+               } else {
+                       capsnap->xattr_blob = NULL;
+                       capsnap->xattr_version = 0;
+               }
 
                /* dirty page count moved from _head to this cap_snap;
                   all subsequent writes page dirties occur _after_ this
@@ -480,7 +491,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
                capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
                ci->i_wrbuffer_ref_head = 0;
                capsnap->context = snapc;
-               ci->i_head_snapc = NULL;
+               ci->i_head_snapc =
+                       ceph_get_snap_context(ci->i_snap_realm->cached_context);
+               dout(" new snapc is %p\n", ci->i_head_snapc);
                list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
 
                if (used & CEPH_CAP_FILE_WR) {
@@ -539,6 +552,41 @@ int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
        return 1;  /* caller may want to ceph_flush_snaps */
 }
 
+/*
+ * Queue cap_snaps for snap writeback for this realm and its children.
+ * Called under snap_rwsem, so realm topology won't change.
+ */
+static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
+{
+       struct ceph_inode_info *ci;
+       struct inode *lastinode = NULL;
+       struct ceph_snap_realm *child;
+
+       dout("queue_realm_cap_snaps %p %llx inodes\n", realm, realm->ino);
+
+       spin_lock(&realm->inodes_with_caps_lock);
+       list_for_each_entry(ci, &realm->inodes_with_caps,
+                           i_snap_realm_item) {
+               struct inode *inode = igrab(&ci->vfs_inode);
+               if (!inode)
+                       continue;
+               spin_unlock(&realm->inodes_with_caps_lock);
+               if (lastinode)
+                       iput(lastinode);
+               lastinode = inode;
+               ceph_queue_cap_snap(ci);
+               spin_lock(&realm->inodes_with_caps_lock);
+       }
+       spin_unlock(&realm->inodes_with_caps_lock);
+       if (lastinode)
+               iput(lastinode);
+
+       dout("queue_realm_cap_snaps %p %llx children\n", realm, realm->ino);
+       list_for_each_entry(child, &realm->children, child_item)
+               queue_realm_cap_snaps(child);
+
+       dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
+}
 
 /*
  * Parse and apply a snapblob "snap trace" from the MDS.  This specifies
@@ -589,29 +637,8 @@ more:
                 *
                 * ...unless it's a snap deletion!
                 */
-               if (!deletion) {
-                       struct ceph_inode_info *ci;
-                       struct inode *lastinode = NULL;
-
-                       spin_lock(&realm->inodes_with_caps_lock);
-                       list_for_each_entry(ci, &realm->inodes_with_caps,
-                                           i_snap_realm_item) {
-                               struct inode *inode = igrab(&ci->vfs_inode);
-                               if (!inode)
-                                       continue;
-                               spin_unlock(&realm->inodes_with_caps_lock);
-                               if (lastinode)
-                                       iput(lastinode);
-                               lastinode = inode;
-                               ceph_queue_cap_snap(ci);
-                               spin_lock(&realm->inodes_with_caps_lock);
-                       }
-                       spin_unlock(&realm->inodes_with_caps_lock);
-                       if (lastinode)
-                               iput(lastinode);
-                       dout("update_snap_trace cap_snaps queued\n");
-               }
-
+               if (!deletion)
+                       queue_realm_cap_snaps(realm);
        } else {
                dout("update_snap_trace %llx %p seq %lld unchanged\n",
                     realm->ino, realm, realm->seq);
index 2482d69..c33897a 100644 (file)
@@ -216,8 +216,7 @@ struct ceph_cap_snap {
        uid_t uid;
        gid_t gid;
 
-       void *xattr_blob;
-       int xattr_len;
+       struct ceph_buffer *xattr_blob;
        u64 xattr_version;
 
        u64 size;
@@ -229,8 +228,11 @@ struct ceph_cap_snap {
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
 {
-       if (atomic_dec_and_test(&capsnap->nref))
+       if (atomic_dec_and_test(&capsnap->nref)) {
+               if (capsnap->xattr_blob)
+                       ceph_buffer_put(capsnap->xattr_blob);
                kfree(capsnap);
+       }
 }
 
 /*
@@ -342,7 +344,8 @@ struct ceph_inode_info {
        unsigned i_cap_exporting_issued;
        struct ceph_cap_reservation i_cap_migration_resv;
        struct list_head i_cap_snaps;   /* snapped state pending flush to mds */
-       struct ceph_snap_context *i_head_snapc;  /* set if wr_buffer_head > 0 */
+       struct ceph_snap_context *i_head_snapc;  /* set if wr_buffer_head > 0 or
+                                                   dirty|flushing caps */
        unsigned i_snap_caps;           /* cap bits for snapped files */
 
        int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
index 097a265..9578af6 100644 (file)
@@ -485,6 +485,7 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
                ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
                ci->i_xattrs.prealloc_blob = NULL;
                ci->i_xattrs.dirty = false;
+               ci->i_xattrs.version++;
        }
 }
 
index a803f5e..e3bccac 100644 (file)
@@ -1126,6 +1126,7 @@ void account_page_dirtied(struct page *page, struct address_space *mapping)
                task_io_account_write(PAGE_CACHE_SIZE);
        }
 }
+EXPORT_SYMBOL(account_page_dirtied);
 
 /*
  * For address_spaces which do not use buffers.  Just tag the page as dirty in