SUNRPC: Close a race in __rpc_wait_for_completion_task()
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Mon, 21 Feb 2011 19:05:41 +0000 (11:05 -0800)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Thu, 10 Mar 2011 20:04:52 +0000 (15:04 -0500)
Although they run as rpciod background tasks, under normal operation
(i.e. no SIGKILL), functions like nfs_sillyrename(), nfs4_proc_unlck()
and nfs4_do_close() want to be fully synchronous. This means that when we
exit, we want all references to the rpc_task to be gone, and we want
any dentry references etc. held by that task to be released.

For this reason these functions call __rpc_wait_for_completion_task(),
followed by rpc_put_task() in the expectation that the latter will be
releasing the last reference to the rpc_task, and thus ensuring that the
callback_ops->rpc_release() has been called synchronously.

This patch fixes a race which exists due to the fact that
rpciod calls rpc_complete_task() (in order to wake up the callers of
__rpc_wait_for_completion_task()) and then subsequently calls
rpc_put_task() without ensuring that these two steps are done atomically.

In order to avoid adding new spin locks, the patch uses the existing
waitqueue spin lock to order the rpc_task reference count releases between
the waiting process and rpciod.
The common case where nobody is waiting for completion is optimised for by
checking if the RPC_TASK_ASYNC flag is cleared and/or if the rpc_task
reference count is 1: in those cases we drop trying to grab the spin lock,
and immediately free up the rpc_task.

Those few processes that need to put the rpc_task from inside an
asynchronous context and that do not care about ordering are given a new
helper: rpc_put_task_async().

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
fs/nfs/nfs4proc.c
fs/nfs/unlink.c
include/linux/sunrpc/sched.h
kernel/sched.c
net/sunrpc/sched.c

index 1ff76ac..d1ed671 100644 (file)
@@ -4150,7 +4150,7 @@ static void nfs4_lock_release(void *calldata)
                task = nfs4_do_unlck(&data->fl, data->ctx, data->lsp,
                                data->arg.lock_seqid);
                if (!IS_ERR(task))
-                       rpc_put_task(task);
+                       rpc_put_task_async(task);
                dprintk("%s: cancelling lock!\n", __func__);
        } else
                nfs_free_seqid(data->arg.lock_seqid);
@@ -5227,7 +5227,7 @@ static int nfs41_proc_async_sequence(struct nfs_client *clp, struct rpc_cred *cr
        if (IS_ERR(task))
                ret = PTR_ERR(task);
        else
-               rpc_put_task(task);
+               rpc_put_task_async(task);
        dprintk("<-- %s status=%d\n", __func__, ret);
        return ret;
 }
index e313a51..6481d53 100644 (file)
@@ -180,7 +180,7 @@ static int nfs_do_call_unlink(struct dentry *parent, struct inode *dir, struct n
        task_setup_data.rpc_client = NFS_CLIENT(dir);
        task = rpc_run_task(&task_setup_data);
        if (!IS_ERR(task))
-               rpc_put_task(task);
+               rpc_put_task_async(task);
        return 1;
 }
 
index 88513fd..d81db80 100644 (file)
@@ -212,6 +212,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
                                const struct rpc_call_ops *ops);
 void           rpc_put_task(struct rpc_task *);
+void           rpc_put_task_async(struct rpc_task *);
 void           rpc_exit_task(struct rpc_task *);
 void           rpc_exit(struct rpc_task *, int);
 void           rpc_release_calldata(const struct rpc_call_ops *, void *);
index 18d38e4..42eab5a 100644 (file)
@@ -4213,6 +4213,7 @@ void __wake_up_locked_key(wait_queue_head_t *q, unsigned int mode, void *key)
 {
        __wake_up_common(q, mode, 1, 0, key);
 }
+EXPORT_SYMBOL_GPL(__wake_up_locked_key);
 
 /**
  * __wake_up_sync_key - wake up threads blocked on a waitqueue.
index 243fc09..59e5994 100644 (file)
@@ -252,23 +252,37 @@ static void rpc_set_active(struct rpc_task *task)
 
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
+ * and then waking up all tasks that were sleeping.
  */
-static void rpc_mark_complete_task(struct rpc_task *task)
+static int rpc_complete_task(struct rpc_task *task)
 {
-       smp_mb__before_clear_bit();
+       void *m = &task->tk_runstate;
+       wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
+       struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
+       unsigned long flags;
+       int ret;
+
+       spin_lock_irqsave(&wq->lock, flags);
        clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
-       smp_mb__after_clear_bit();
-       wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
+       ret = atomic_dec_and_test(&task->tk_count);
+       if (waitqueue_active(wq))
+               __wake_up_locked_key(wq, TASK_NORMAL, &k);
+       spin_unlock_irqrestore(&wq->lock, flags);
+       return ret;
 }
 
 /*
  * Allow callers to wait for completion of an RPC call
+ *
+ * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
+ * to enforce taking of the wq->lock and hence avoid races with
+ * rpc_complete_task().
  */
 int __rpc_wait_for_completion_task(struct rpc_task *task, int (*action)(void *))
 {
        if (action == NULL)
                action = rpc_wait_bit_killable;
-       return wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
+       return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
                        action, TASK_KILLABLE);
 }
 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
@@ -857,34 +871,67 @@ static void rpc_async_release(struct work_struct *work)
        rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
 }
 
-void rpc_put_task(struct rpc_task *task)
+static void rpc_release_resources_task(struct rpc_task *task)
 {
-       if (!atomic_dec_and_test(&task->tk_count))
-               return;
-       /* Release resources */
        if (task->tk_rqstp)
                xprt_release(task);
        if (task->tk_msg.rpc_cred)
                put_rpccred(task->tk_msg.rpc_cred);
        rpc_task_release_client(task);
-       if (task->tk_workqueue != NULL) {
+}
+
+static void rpc_final_put_task(struct rpc_task *task,
+               struct workqueue_struct *q)
+{
+       if (q != NULL) {
                INIT_WORK(&task->u.tk_work, rpc_async_release);
-               queue_work(task->tk_workqueue, &task->u.tk_work);
+               queue_work(q, &task->u.tk_work);
        } else
                rpc_free_task(task);
 }
+
+static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
+{
+       if (atomic_dec_and_test(&task->tk_count)) {
+               rpc_release_resources_task(task);
+               rpc_final_put_task(task, q);
+       }
+}
+
+void rpc_put_task(struct rpc_task *task)
+{
+       rpc_do_put_task(task, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_put_task);
 
+void rpc_put_task_async(struct rpc_task *task)
+{
+       rpc_do_put_task(task, task->tk_workqueue);
+}
+EXPORT_SYMBOL_GPL(rpc_put_task_async);
+
 static void rpc_release_task(struct rpc_task *task)
 {
        dprintk("RPC: %5u release task\n", task->tk_pid);
 
        BUG_ON (RPC_IS_QUEUED(task));
 
-       /* Wake up anyone who is waiting for task completion */
-       rpc_mark_complete_task(task);
+       rpc_release_resources_task(task);
 
-       rpc_put_task(task);
+       /*
+        * Note: at this point we have been removed from rpc_clnt->cl_tasks,
+        * so it should be safe to use task->tk_count as a test for whether
+        * or not any other processes still hold references to our rpc_task.
+        */
+       if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
+               /* Wake up anyone who may be waiting for task completion */
+               if (!rpc_complete_task(task))
+                       return;
+       } else {
+               if (!atomic_dec_and_test(&task->tk_count))
+                       return;
+       }
+       rpc_final_put_task(task, task->tk_workqueue);
 }
 
 int rpciod_up(void)