ceph: only release unused caps with mds requests
[pandora-kernel.git] / fs / ceph / caps.c
index e1e6df0..7d0a0d0 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/sched.h>
 #include <linux/vmalloc.h>
 #include <linux/wait.h>
+#include <linux/writeback.h>
 
 #include "super.h"
 #include "decode.h"
@@ -655,7 +656,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-       int have = ci->i_snap_caps;
+       int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
        struct ceph_cap *cap;
        struct rb_node *p;
 
@@ -856,8 +857,8 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 }
 
 /*
- * caller should hold i_lock, and session s_mutex.
- * returns true if this is the last cap.  if so, caller should iput.
+ * caller should hold i_lock.
+ * caller will not hold session s_mutex if called from destroy_inode.
  */
 void __ceph_remove_cap(struct ceph_cap *cap)
 {
@@ -974,15 +975,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
 }
 
 /*
- * Queue cap releases when an inode is dropped from our
- * cache.
+ * Queue cap releases when an inode is dropped from our cache.  Since
+ * inode is about to be destroyed, there is no need for i_lock.
  */
 void ceph_queue_caps_release(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct rb_node *p;
 
-       spin_lock(&inode->i_lock);
        p = rb_first(&ci->i_caps);
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
@@ -1024,9 +1024,7 @@ void ceph_queue_caps_release(struct inode *inode)
                spin_unlock(&session->s_cap_lock);
                p = rb_next(p);
                __ceph_remove_cap(cap);
-
        }
-       spin_unlock(&inode->i_lock);
 }
 
 /*
@@ -1409,6 +1407,7 @@ static int try_nonblocking_invalidate(struct inode *inode)
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                     struct ceph_mds_session *session)
+       __releases(session->s_mutex)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1416,7 +1415,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_cap *cap;
        int file_wanted, used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-       int drop_session_lock = session ? 0 : 1;
        int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
@@ -1576,6 +1574,11 @@ retry_locked:
                }
 
 ack:
+               if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+                       dout(" skipping %p I_NOFLUSH set\n", inode);
+                       continue;
+               }
+
                if (session && session != cap->session) {
                        dout("oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -1636,7 +1639,7 @@ ack:
        if (queue_invalidate)
                ceph_queue_invalidate(inode);
 
-       if (session && drop_session_lock)
+       if (session)
                mutex_unlock(&session->s_mutex);
        if (took_snap_rwsem)
                up_read(&mdsc->snap_rwsem);
@@ -1655,6 +1658,10 @@ static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
 
 retry:
        spin_lock(&inode->i_lock);
+       if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+               dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
+               goto out;
+       }
        if (ci->i_dirty_caps && ci->i_auth_cap) {
                struct ceph_cap *cap = ci->i_auth_cap;
                int used = __ceph_caps_used(ci);
@@ -1795,12 +1802,13 @@ int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
  * get by with fewer MDS messages if we wait for data writeback to
  * complete first.
  */
-int ceph_write_inode(struct inode *inode, int wait)
+int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned flush_tid;
        int err = 0;
        int dirty;
+       int wait = wbc->sync_mode == WB_SYNC_ALL;
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
@@ -1926,14 +1934,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        struct inode *inode = &ci->vfs_inode;
        int ret = 0;
        int have, implemented;
+       int file_wanted;
 
        dout("get_cap_refs %p need %s want %s\n", inode,
             ceph_cap_string(need), ceph_cap_string(want));
        spin_lock(&inode->i_lock);
 
-       /* make sure we _have_ some caps! */
-       if (!__ceph_is_any_caps(ci)) {
-               dout("get_cap_refs %p no real caps\n", inode);
+       /* make sure file is actually open */
+       file_wanted = __ceph_caps_file_wanted(ci);
+       if ((file_wanted & need) == 0) {
+               dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
+                    ceph_cap_string(need), ceph_cap_string(file_wanted));
                *err = -EBADF;
                ret = 1;
                goto out;
@@ -2184,18 +2195,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
- * caller holds s_mutex.
+ * caller holds s_mutex and i_lock, we drop both.
+ *
  * return value:
  *  0 - ok
  *  1 - check_caps on auth cap only (writeback)
  *  2 - check_caps (ack revoke)
  */
-static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-                           struct ceph_mds_session *session,
-                           struct ceph_cap *cap,
-                           struct ceph_buffer *xattr_buf)
+static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+                            struct ceph_mds_session *session,
+                            struct ceph_cap *cap,
+                            struct ceph_buffer *xattr_buf)
        __releases(inode->i_lock)
-
+       __releases(session->s_mutex)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2205,12 +2217,11 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        u64 size = le64_to_cpu(grant->size);
        u64 max_size = le64_to_cpu(grant->max_size);
        struct timespec mtime, atime, ctime;
-       int reply = 0;
+       int check_caps = 0;
        int wake = 0;
        int writeback = 0;
        int revoked_rdcache = 0;
        int queue_invalidate = 0;
-       int tried_invalidate = 0;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2222,10 +2233,8 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
         * try to invalidate (once).  (If there are dirty buffers, we
         * will invalidate _after_ writeback.)
         */
-restart:
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
-           !ci->i_wrbuffer_ref && !tried_invalidate) {
-               tried_invalidate = 1;
+           !ci->i_wrbuffer_ref) {
                if (try_nonblocking_invalidate(inode) == 0) {
                        revoked_rdcache = 1;
                } else {
@@ -2236,7 +2245,6 @@ restart:
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
                        }
                }
-               goto restart;
        }
 
        /* side effects now are allowed */
@@ -2322,11 +2330,12 @@ restart:
                if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
                        writeback = 1; /* will delay ack */
                else if (dirty & ~newcaps)
-                       reply = 1;     /* initiate writeback in check_caps */
+                       check_caps = 1;  /* initiate writeback in check_caps */
                else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
                           revoked_rdcache)
-                       reply = 2;     /* send revoke ack in check_caps */
+                       check_caps = 2;     /* send revoke ack in check_caps */
                cap->issued = newcaps;
+               cap->implemented |= newcaps;
        } else if (cap->issued == newcaps) {
                dout("caps unchanged: %s -> %s\n",
                     ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
@@ -2339,6 +2348,7 @@ restart:
                                              * pending revocation */
                wake = 1;
        }
+       BUG_ON(cap->issued & ~cap->implemented);
 
        spin_unlock(&inode->i_lock);
        if (writeback)
@@ -2352,7 +2362,14 @@ restart:
                ceph_queue_invalidate(inode);
        if (wake)
                wake_up(&ci->i_cap_wq);
-       return reply;
+
+       if (check_caps == 1)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+                               session);
+       else if (check_caps == 2)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+       else
+               mutex_unlock(&session->s_mutex);
 }
 
 /*
@@ -2541,9 +2558,8 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_issued = cap->issued;
                }
                __ceph_remove_cap(cap);
-       } else {
-               WARN_ON(!cap);
        }
+       /* else, we already released it */
 
        spin_unlock(&inode->i_lock);
 }
@@ -2607,15 +2623,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct inode *inode;
        struct ceph_cap *cap;
        struct ceph_mds_caps *h;
-       int mds = le64_to_cpu(msg->hdr.src.name.num);
+       int mds = session->s_mds;
        int op;
        u32 seq;
        struct ceph_vino vino;
        u64 cap_id;
        u64 size, max_size;
        u64 tid;
-       int check_caps = 0;
-       int r;
+       void *snaptrace;
 
        dout("handle_caps from mds%d\n", mds);
 
@@ -2624,6 +2639,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
+       snaptrace = h + 1;
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
@@ -2658,10 +2674,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
-                                 msg->middle,
-                                 le32_to_cpu(h->snap_trace_len));
-               check_caps = 1; /* we may have sent a RELEASE to the old auth */
-               goto done;
+                                 snaptrace, le32_to_cpu(h->snap_trace_len));
+               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
+                               session);
+               goto done_unlocked;
        }
 
        /* the rest require a cap */
@@ -2678,16 +2694,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        switch (op) {
        case CEPH_CAP_OP_REVOKE:
        case CEPH_CAP_OP_GRANT:
-               r = handle_cap_grant(inode, h, session, cap, msg->middle);
-               if (r == 1)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
-                                       session);
-               else if (r == 2)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY,
-                                       session);
-               break;
+               handle_cap_grant(inode, h, session, cap, msg->middle);
+               goto done_unlocked;
 
        case CEPH_CAP_OP_FLUSH_ACK:
                handle_cap_flush_ack(inode, tid, h, session, cap);
@@ -2705,9 +2713,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 done:
        mutex_unlock(&session->s_mutex);
-
-       if (check_caps)
-               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL);
+done_unlocked:
        if (inode)
                iput(inode);
        return;
@@ -2750,16 +2756,38 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
  */
 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-       struct ceph_inode_info *ci;
-       struct inode *inode;
+       struct ceph_inode_info *ci, *nci = NULL;
+       struct inode *inode, *ninode = NULL;
+       struct list_head *p, *n;
 
        dout("flush_dirty_caps\n");
        spin_lock(&mdsc->cap_dirty_lock);
-       while (!list_empty(&mdsc->cap_dirty)) {
-               ci = list_first_entry(&mdsc->cap_dirty,
-                                     struct ceph_inode_info,
-                                     i_dirty_item);
-               inode = igrab(&ci->vfs_inode);
+       list_for_each_safe(p, n, &mdsc->cap_dirty) {
+               if (nci) {
+                       ci = nci;
+                       inode = ninode;
+                       ci->i_ceph_flags &= ~CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps inode %p (was next inode)\n",
+                            inode);
+               } else {
+                       ci = list_entry(p, struct ceph_inode_info,
+                                       i_dirty_item);
+                       inode = igrab(&ci->vfs_inode);
+                       BUG_ON(!inode);
+                       dout("flush_dirty_caps inode %p\n", inode);
+               }
+               if (n != &mdsc->cap_dirty) {
+                       nci = list_entry(n, struct ceph_inode_info,
+                                        i_dirty_item);
+                       ninode = igrab(&nci->vfs_inode);
+                       BUG_ON(!ninode);
+                       nci->i_ceph_flags |= CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps next inode %p, noflush\n",
+                            ninode);
+               } else {
+                       nci = NULL;
+                       ninode = NULL;
+               }
                spin_unlock(&mdsc->cap_dirty_lock);
                if (inode) {
                        ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH,
@@ -2808,11 +2836,18 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
        struct ceph_cap *cap;
        struct ceph_mds_request_release *rel = *p;
        int ret = 0;
-
-       dout("encode_inode_release %p mds%d drop %s unless %s\n", inode,
-            mds, ceph_cap_string(drop), ceph_cap_string(unless));
+       int used = 0;
 
        spin_lock(&inode->i_lock);
+       used = __ceph_caps_used(ci);
+
+       dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
+            mds, ceph_cap_string(used), ceph_cap_string(drop),
+            ceph_cap_string(unless));
+
+       /* only drop unused caps */
+       drop &= ~used;
+
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
                if (force ||