ceph: only release unused caps with mds requests
[pandora-kernel.git] / fs / ceph / caps.c
index 20b28dc..7d0a0d0 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/sched.h>
 #include <linux/vmalloc.h>
 #include <linux/wait.h>
+#include <linux/writeback.h>
 
 #include "super.h"
 #include "decode.h"
@@ -128,6 +129,7 @@ static int caps_total_count;        /* total caps allocated */
 static int caps_use_count;          /* in use */
 static int caps_reserve_count;      /* unused, reserved */
 static int caps_avail_count;        /* unused, unreserved */
+static int caps_min_count;          /* keep at least this many (unreserved) */
 
 void __init ceph_caps_init(void)
 {
@@ -149,6 +151,15 @@ void ceph_caps_finalize(void)
        caps_avail_count = 0;
        caps_use_count = 0;
        caps_reserve_count = 0;
+       caps_min_count = 0;
+       spin_unlock(&caps_list_lock);
+}
+
+void ceph_adjust_min_caps(int delta)
+{
+       spin_lock(&caps_list_lock);
+       caps_min_count += delta;
+       BUG_ON(caps_min_count < 0);
        spin_unlock(&caps_list_lock);
 }
 
@@ -256,30 +267,22 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        return cap;
 }
 
-static void put_cap(struct ceph_cap *cap,
-                   struct ceph_cap_reservation *ctx)
+void ceph_put_cap(struct ceph_cap *cap)
 {
        spin_lock(&caps_list_lock);
-       dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
+       dout("put_cap %p %d = %d used + %d resv + %d avail\n",
+            cap, caps_total_count, caps_use_count,
             caps_reserve_count, caps_avail_count);
        caps_use_count--;
        /*
-        * Keep some preallocated caps around, at least enough to do a
-        * readdir (which needs to preallocate lots of them), to avoid
-        * lots of free/alloc churn.
+        * Keep some preallocated caps around (ceph_min_count), to
+        * avoid lots of free/alloc churn.
         */
-       if (caps_avail_count >= caps_reserve_count +
-           ceph_client(cap->ci->vfs_inode.i_sb)->mount_args->max_readdir) {
+       if (caps_avail_count >= caps_reserve_count + caps_min_count) {
                caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               if (ctx) {
-                       ctx->count++;
-                       caps_reserve_count++;
-               } else {
-                       caps_avail_count++;
-               }
+               caps_avail_count++;
                list_add(&cap->caps_item, &caps_list);
        }
 
@@ -289,7 +292,8 @@ static void put_cap(struct ceph_cap *cap,
 }
 
 void ceph_reservation_status(struct ceph_client *client,
-                            int *total, int *avail, int *used, int *reserved)
+                            int *total, int *avail, int *used, int *reserved,
+                            int *min)
 {
        if (total)
                *total = caps_total_count;
@@ -299,6 +303,8 @@ void ceph_reservation_status(struct ceph_client *client,
                *used = caps_use_count;
        if (reserved)
                *reserved = caps_reserve_count;
+       if (min)
+               *min = caps_min_count;
 }
 
 /*
@@ -650,7 +656,7 @@ static int __cap_is_valid(struct ceph_cap *cap)
  */
 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
 {
-       int have = ci->i_snap_caps;
+       int have = ci->i_snap_caps | ci->i_cap_exporting_issued;
        struct ceph_cap *cap;
        struct rb_node *p;
 
@@ -698,7 +704,7 @@ static void __touch_cap(struct ceph_cap *cap)
        struct ceph_mds_session *s = cap->session;
 
        spin_lock(&s->s_cap_lock);
-       if (!s->s_iterating_caps) {
+       if (s->s_cap_iterator == NULL) {
                dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
                     s->s_mds);
                list_move_tail(&cap->session_caps, &s->s_caps);
@@ -851,11 +857,10 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
 }
 
 /*
- * caller should hold i_lock, and session s_mutex.
- * returns true if this is the last cap.  if so, caller should iput.
+ * caller should hold i_lock.
+ * caller will not hold session s_mutex if called from destroy_inode.
  */
-void __ceph_remove_cap(struct ceph_cap *cap,
-                      struct ceph_cap_reservation *ctx)
+void __ceph_remove_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
@@ -863,19 +868,27 @@ void __ceph_remove_cap(struct ceph_cap *cap,
 
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
-       /* remove from session list */
-       spin_lock(&session->s_cap_lock);
-       list_del_init(&cap->session_caps);
-       session->s_nr_caps--;
-       spin_unlock(&session->s_cap_lock);
-
        /* remove from inode list */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       cap->session = NULL;
+       cap->ci = NULL;
        if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
 
-       put_cap(cap, ctx);
+       /* remove from session list */
+       spin_lock(&session->s_cap_lock);
+       if (session->s_cap_iterator == cap) {
+               /* not yet, we are iterating over this very cap */
+               dout("__ceph_remove_cap  delaying %p removal from session %p\n",
+                    cap, cap->session);
+       } else {
+               list_del_init(&cap->session_caps);
+               session->s_nr_caps--;
+               cap->session = NULL;
+       }
+       spin_unlock(&session->s_cap_lock);
+
+       if (cap->session == NULL)
+               ceph_put_cap(cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -962,15 +975,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
 }
 
 /*
- * Queue cap releases when an inode is dropped from our
- * cache.
+ * Queue cap releases when an inode is dropped from our cache.  Since
+ * inode is about to be destroyed, there is no need for i_lock.
  */
 void ceph_queue_caps_release(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct rb_node *p;
 
-       spin_lock(&inode->i_lock);
        p = rb_first(&ci->i_caps);
        while (p) {
                struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
@@ -1011,10 +1023,8 @@ void ceph_queue_caps_release(struct inode *inode)
                }
                spin_unlock(&session->s_cap_lock);
                p = rb_next(p);
-               __ceph_remove_cap(cap, NULL);
-
+               __ceph_remove_cap(cap);
        }
-       spin_unlock(&inode->i_lock);
 }
 
 /*
@@ -1137,12 +1147,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
 
        spin_unlock(&inode->i_lock);
 
-       if (dropping & CEPH_CAP_FILE_CACHE) {
-               /* invalidate what we can */
-               dout("invalidating pages on %p\n", inode);
-               invalidate_mapping_pages(&inode->i_data, 0, -1);
-       }
-
        ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
                op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
                size, max_size, &mtime, &atime, time_warp_seq,
@@ -1355,6 +1359,41 @@ static int __mark_caps_flushing(struct inode *inode,
        return flushing;
 }
 
+/*
+ * try to invalidate mapping pages without blocking.
+ */
+static int mapping_is_empty(struct address_space *mapping)
+{
+       struct page *page = find_get_page(mapping, 0);
+
+       if (!page)
+               return 1;
+
+       put_page(page);
+       return 0;
+}
+
+static int try_nonblocking_invalidate(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       u32 invalidating_gen = ci->i_rdcache_gen;
+
+       spin_unlock(&inode->i_lock);
+       invalidate_mapping_pages(&inode->i_data, 0, -1);
+       spin_lock(&inode->i_lock);
+
+       if (mapping_is_empty(&inode->i_data) &&
+           invalidating_gen == ci->i_rdcache_gen) {
+               /* success. */
+               dout("try_nonblocking_invalidate %p success\n", inode);
+               ci->i_rdcache_gen = 0;
+               ci->i_rdcache_revoking = 0;
+               return 0;
+       }
+       dout("try_nonblocking_invalidate %p failed\n", inode);
+       return -1;
+}
+
 /*
  * Swiss army knife function to examine currently used and wanted
  * versus held caps.  Release, flush, ack revoked caps to mds as
@@ -1368,6 +1407,7 @@ static int __mark_caps_flushing(struct inode *inode,
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
                     struct ceph_mds_session *session)
+       __releases(session->s_mutex)
 {
        struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
        struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1375,7 +1415,6 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_cap *cap;
        int file_wanted, used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-       int drop_session_lock = session ? 0 : 1;
        int issued, implemented, want, retain, revoking, flushing = 0;
        int mds = -1;   /* keep track of how far we've gone through i_caps list
                           to avoid an infinite loop on retry */
@@ -1445,27 +1484,19 @@ retry_locked:
            (file_wanted == 0 ||                     /* no open files */
             (revoking & CEPH_CAP_FILE_CACHE)) &&     /*  or revoking cache */
            !tried_invalidate) {
-               u32 invalidating_gen = ci->i_rdcache_gen;
-               int ret;
-
                dout("check_caps trying to invalidate on %p\n", inode);
-               spin_unlock(&inode->i_lock);
-               ret = invalidate_mapping_pages(&inode->i_data, 0, -1);
-               spin_lock(&inode->i_lock);
-               if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) {
-                       /* success. */
-                       ci->i_rdcache_gen = 0;
-                       ci->i_rdcache_revoking = 0;
-               } else if (revoking & CEPH_CAP_FILE_CACHE) {
-                       dout("check_caps queuing invalidate\n");
-                       queue_invalidate = 1;
-                       ci->i_rdcache_revoking = ci->i_rdcache_gen;
-               } else {
-                       dout("check_caps failed to invalidate pages\n");
-                       /* we failed to invalidate pages.  check these
-                          caps again later. */
-                       force_requeue = 1;
-                       __cap_set_timeouts(mdsc, ci);
+               if (try_nonblocking_invalidate(inode) < 0) {
+                       if (revoking & CEPH_CAP_FILE_CACHE) {
+                               dout("check_caps queuing invalidate\n");
+                               queue_invalidate = 1;
+                               ci->i_rdcache_revoking = ci->i_rdcache_gen;
+                       } else {
+                               dout("check_caps failed to invalidate pages\n");
+                               /* we failed to invalidate pages.  check these
+                                  caps again later. */
+                               force_requeue = 1;
+                               __cap_set_timeouts(mdsc, ci);
+                       }
                }
                tried_invalidate = 1;
                goto retry_locked;
@@ -1543,6 +1574,11 @@ retry_locked:
                }
 
 ack:
+               if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+                       dout(" skipping %p I_NOFLUSH set\n", inode);
+                       continue;
+               }
+
                if (session && session != cap->session) {
                        dout("oops, wrong session %p mutex\n", session);
                        mutex_unlock(&session->s_mutex);
@@ -1603,7 +1639,7 @@ ack:
        if (queue_invalidate)
                ceph_queue_invalidate(inode);
 
-       if (session && drop_session_lock)
+       if (session)
                mutex_unlock(&session->s_mutex);
        if (took_snap_rwsem)
                up_read(&mdsc->snap_rwsem);
@@ -1622,6 +1658,10 @@ static int try_flush_caps(struct inode *inode, struct ceph_mds_session *session,
 
 retry:
        spin_lock(&inode->i_lock);
+       if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
+               dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
+               goto out;
+       }
        if (ci->i_dirty_caps && ci->i_auth_cap) {
                struct ceph_cap *cap = ci->i_auth_cap;
                int used = __ceph_caps_used(ci);
@@ -1762,12 +1802,13 @@ int ceph_fsync(struct file *file, struct dentry *dentry, int datasync)
  * get by with fewer MDS messages if we wait for data writeback to
  * complete first.
  */
-int ceph_write_inode(struct inode *inode, int wait)
+int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned flush_tid;
        int err = 0;
        int dirty;
+       int wait = wbc->sync_mode == WB_SYNC_ALL;
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
@@ -1893,14 +1934,17 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        struct inode *inode = &ci->vfs_inode;
        int ret = 0;
        int have, implemented;
+       int file_wanted;
 
        dout("get_cap_refs %p need %s want %s\n", inode,
             ceph_cap_string(need), ceph_cap_string(want));
        spin_lock(&inode->i_lock);
 
-       /* make sure we _have_ some caps! */
-       if (!__ceph_is_any_caps(ci)) {
-               dout("get_cap_refs %p no real caps\n", inode);
+       /* make sure file is actually open */
+       file_wanted = __ceph_caps_file_wanted(ci);
+       if ((file_wanted & need) == 0) {
+               dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
+                    ceph_cap_string(need), ceph_cap_string(file_wanted));
                *err = -EBADF;
                ret = 1;
                goto out;
@@ -2151,18 +2195,19 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
- * caller holds s_mutex.
+ * caller holds s_mutex and i_lock, we drop both.
+ *
  * return value:
  *  0 - ok
  *  1 - check_caps on auth cap only (writeback)
  *  2 - check_caps (ack revoke)
  */
-static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-                           struct ceph_mds_session *session,
-                           struct ceph_cap *cap,
-                           struct ceph_buffer *xattr_buf)
+static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+                            struct ceph_mds_session *session,
+                            struct ceph_cap *cap,
+                            struct ceph_buffer *xattr_buf)
        __releases(inode->i_lock)
-
+       __releases(session->s_mutex)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        int mds = session->s_mds;
@@ -2172,13 +2217,11 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        u64 size = le64_to_cpu(grant->size);
        u64 max_size = le64_to_cpu(grant->max_size);
        struct timespec mtime, atime, ctime;
-       int reply = 0;
+       int check_caps = 0;
        int wake = 0;
        int writeback = 0;
        int revoked_rdcache = 0;
        int queue_invalidate = 0;
-       int tried_invalidate = 0;
-       int ret;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2190,29 +2233,18 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
         * try to invalidate (once).  (If there are dirty buffers, we
         * will invalidate _after_ writeback.)
         */
-restart:
        if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
-           !ci->i_wrbuffer_ref && !tried_invalidate) {
-               dout("CACHE invalidation\n");
-               spin_unlock(&inode->i_lock);
-               tried_invalidate = 1;
-
-               ret = invalidate_mapping_pages(&inode->i_data, 0, -1);
-               spin_lock(&inode->i_lock);
-               if (ret < 0) {
+           !ci->i_wrbuffer_ref) {
+               if (try_nonblocking_invalidate(inode) == 0) {
+                       revoked_rdcache = 1;
+               } else {
                        /* there were locked pages.. invalidate later
                           in a separate thread. */
                        if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
                                queue_invalidate = 1;
                                ci->i_rdcache_revoking = ci->i_rdcache_gen;
                        }
-               } else {
-                       /* we successfully invalidated those pages */
-                       revoked_rdcache = 1;
-                       ci->i_rdcache_gen = 0;
-                       ci->i_rdcache_revoking = 0;
                }
-               goto restart;
        }
 
        /* side effects now are allowed */
@@ -2298,11 +2330,12 @@ restart:
                if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
                        writeback = 1; /* will delay ack */
                else if (dirty & ~newcaps)
-                       reply = 1;     /* initiate writeback in check_caps */
+                       check_caps = 1;  /* initiate writeback in check_caps */
                else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
                           revoked_rdcache)
-                       reply = 2;     /* send revoke ack in check_caps */
+                       check_caps = 2;     /* send revoke ack in check_caps */
                cap->issued = newcaps;
+               cap->implemented |= newcaps;
        } else if (cap->issued == newcaps) {
                dout("caps unchanged: %s -> %s\n",
                     ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
@@ -2315,6 +2348,7 @@ restart:
                                              * pending revocation */
                wake = 1;
        }
+       BUG_ON(cap->issued & ~cap->implemented);
 
        spin_unlock(&inode->i_lock);
        if (writeback)
@@ -2328,7 +2362,14 @@ restart:
                ceph_queue_invalidate(inode);
        if (wake)
                wake_up(&ci->i_cap_wq);
-       return reply;
+
+       if (check_caps == 1)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+                               session);
+       else if (check_caps == 2)
+               ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+       else
+               mutex_unlock(&session->s_mutex);
 }
 
 /*
@@ -2516,10 +2557,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
                }
-               __ceph_remove_cap(cap, NULL);
-       } else {
-               WARN_ON(!cap);
+               __ceph_remove_cap(cap);
        }
+       /* else, we already released it */
 
        spin_unlock(&inode->i_lock);
 }
@@ -2583,15 +2623,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        struct inode *inode;
        struct ceph_cap *cap;
        struct ceph_mds_caps *h;
-       int mds = le64_to_cpu(msg->hdr.src.name.num);
+       int mds = session->s_mds;
        int op;
        u32 seq;
        struct ceph_vino vino;
        u64 cap_id;
        u64 size, max_size;
        u64 tid;
-       int check_caps = 0;
-       int r;
+       void *snaptrace;
 
        dout("handle_caps from mds%d\n", mds);
 
@@ -2600,6 +2639,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        h = msg->front.iov_base;
+       snaptrace = h + 1;
        op = le32_to_cpu(h->op);
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
@@ -2634,10 +2674,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
        case CEPH_CAP_OP_IMPORT:
                handle_cap_import(mdsc, inode, h, session,
-                                 msg->middle,
-                                 le32_to_cpu(h->snap_trace_len));
-               check_caps = 1; /* we may have sent a RELEASE to the old auth */
-               goto done;
+                                 snaptrace, le32_to_cpu(h->snap_trace_len));
+               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
+                               session);
+               goto done_unlocked;
        }
 
        /* the rest require a cap */
@@ -2654,16 +2694,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
        switch (op) {
        case CEPH_CAP_OP_REVOKE:
        case CEPH_CAP_OP_GRANT:
-               r = handle_cap_grant(inode, h, session, cap, msg->middle);
-               if (r == 1)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
-                                       session);
-               else if (r == 2)
-                       ceph_check_caps(ceph_inode(inode),
-                                       CHECK_CAPS_NODELAY,
-                                       session);
-               break;
+               handle_cap_grant(inode, h, session, cap, msg->middle);
+               goto done_unlocked;
 
        case CEPH_CAP_OP_FLUSH_ACK:
                handle_cap_flush_ack(inode, tid, h, session, cap);
@@ -2681,9 +2713,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
 
 done:
        mutex_unlock(&session->s_mutex);
-
-       if (check_caps)
-               ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL);
+done_unlocked:
        if (inode)
                iput(inode);
        return;
@@ -2726,16 +2756,38 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
  */
 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
 {
-       struct ceph_inode_info *ci;
-       struct inode *inode;
+       struct ceph_inode_info *ci, *nci = NULL;
+       struct inode *inode, *ninode = NULL;
+       struct list_head *p, *n;
 
        dout("flush_dirty_caps\n");
        spin_lock(&mdsc->cap_dirty_lock);
-       while (!list_empty(&mdsc->cap_dirty)) {
-               ci = list_first_entry(&mdsc->cap_dirty,
-                                     struct ceph_inode_info,
-                                     i_dirty_item);
-               inode = igrab(&ci->vfs_inode);
+       list_for_each_safe(p, n, &mdsc->cap_dirty) {
+               if (nci) {
+                       ci = nci;
+                       inode = ninode;
+                       ci->i_ceph_flags &= ~CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps inode %p (was next inode)\n",
+                            inode);
+               } else {
+                       ci = list_entry(p, struct ceph_inode_info,
+                                       i_dirty_item);
+                       inode = igrab(&ci->vfs_inode);
+                       BUG_ON(!inode);
+                       dout("flush_dirty_caps inode %p\n", inode);
+               }
+               if (n != &mdsc->cap_dirty) {
+                       nci = list_entry(n, struct ceph_inode_info,
+                                        i_dirty_item);
+                       ninode = igrab(&nci->vfs_inode);
+                       BUG_ON(!ninode);
+                       nci->i_ceph_flags |= CEPH_I_NOFLUSH;
+                       dout("flush_dirty_caps next inode %p, noflush\n",
+                            ninode);
+               } else {
+                       nci = NULL;
+                       ninode = NULL;
+               }
                spin_unlock(&mdsc->cap_dirty_lock);
                if (inode) {
                        ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH,
@@ -2784,11 +2836,18 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
        struct ceph_cap *cap;
        struct ceph_mds_request_release *rel = *p;
        int ret = 0;
-
-       dout("encode_inode_release %p mds%d drop %s unless %s\n", inode,
-            mds, ceph_cap_string(drop), ceph_cap_string(unless));
+       int used = 0;
 
        spin_lock(&inode->i_lock);
+       used = __ceph_caps_used(ci);
+
+       dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
+            mds, ceph_cap_string(used), ceph_cap_string(drop),
+            ceph_cap_string(unless));
+
+       /* only drop unused caps */
+       drop &= ~used;
+
        cap = __get_cap_for_mds(ci, mds);
        if (cap && __cap_is_valid(cap)) {
                if (force ||