Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/lowpan...
[pandora-kernel.git] / net / sunrpc / clnt.c
index 5abab09..5bc2f45 100644 (file)
@@ -36,7 +36,9 @@
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
 #include <linux/sunrpc/metrics.h>
+#include <linux/sunrpc/bc_xprt.h>
 
+#include "sunrpc.h"
 
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY       RPCDBG_CALL
@@ -63,6 +65,9 @@ static void   call_decode(struct rpc_task *task);
 static void    call_bind(struct rpc_task *task);
 static void    call_bind_status(struct rpc_task *task);
 static void    call_transmit(struct rpc_task *task);
+#if defined(CONFIG_NFS_V4_1)
+static void    call_bc_transmit(struct rpc_task *task);
+#endif /* CONFIG_NFS_V4_1 */
 static void    call_status(struct rpc_task *task);
 static void    call_transmit_status(struct rpc_task *task);
 static void    call_refresh(struct rpc_task *task);
@@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
 }
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
+#if defined(CONFIG_NFS_V4_1)
+/**
+ * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
+ * rpc_execute against it
+ * @ops: RPC call ops
+ */
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
+                                       const struct rpc_call_ops *tk_ops)
+{
+       struct rpc_task *task;
+       struct xdr_buf *xbufp = &req->rq_snd_buf;
+       struct rpc_task_setup task_setup_data = {
+               .callback_ops = tk_ops,
+       };
+
+       dprintk("RPC: rpc_run_bc_task req= %p\n", req);
+       /*
+        * Create an rpc_task to send the data
+        */
+       task = rpc_new_task(&task_setup_data);
+       if (!task) {
+               xprt_free_bc_request(req);
+               goto out;
+       }
+       task->tk_rqstp = req;
+
+       /*
+        * Set up the xdr_buf length.
+        * This also indicates that the buffer is XDR encoded already.
+        */
+       xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
+                       xbufp->tail[0].iov_len;
+
+       task->tk_action = call_bc_transmit;
+       atomic_inc(&task->tk_count);
+       BUG_ON(atomic_read(&task->tk_count) != 2);
+       rpc_execute(task);
+
+out:
+       dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
+       return task;
+}
+#endif /* CONFIG_NFS_V4_1 */
+
 void
 rpc_call_start(struct rpc_task *task)
 {
@@ -694,6 +743,19 @@ void rpc_force_rebind(struct rpc_clnt *clnt)
 }
 EXPORT_SYMBOL_GPL(rpc_force_rebind);
 
+/*
+ * Restart an (async) RPC call from the call_prepare state.
+ * Usually called from within the exit handler.
+ */
+void
+rpc_restart_call_prepare(struct rpc_task *task)
+{
+       if (RPC_ASSASSINATED(task))
+               return;
+       task->tk_action = rpc_prepare_task;
+}
+EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
+
 /*
  * Restart an (async) RPC call. Usually called from within the
  * exit handler.
@@ -1085,7 +1147,7 @@ call_transmit(struct rpc_task *task)
         * in order to allow access to the socket to other RPC requests.
         */
        call_transmit_status(task);
-       if (task->tk_msg.rpc_proc->p_decode != NULL)
+       if (rpc_reply_expected(task))
                return;
        task->tk_action = rpc_exit_task;
        rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
@@ -1120,6 +1182,72 @@ call_transmit_status(struct rpc_task *task)
        }
 }
 
+#if defined(CONFIG_NFS_V4_1)
+/*
+ * 5b. Send the backchannel RPC reply.  On error, drop the reply.  In
+ * addition, disconnect on connectivity errors.
+ */
+static void
+call_bc_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+
+       BUG_ON(task->tk_status != 0);
+       task->tk_status = xprt_prepare_transmit(task);
+       if (task->tk_status == -EAGAIN) {
+               /*
+                * Could not reserve the transport. Try again after the
+                * transport is released.
+                */
+               task->tk_status = 0;
+               task->tk_action = call_bc_transmit;
+               return;
+       }
+
+       task->tk_action = rpc_exit_task;
+       if (task->tk_status < 0) {
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               return;
+       }
+
+       xprt_transmit(task);
+       xprt_end_transmit(task);
+       dprint_status(task);
+       switch (task->tk_status) {
+       case 0:
+               /* Success */
+               break;
+       case -EHOSTDOWN:
+       case -EHOSTUNREACH:
+       case -ENETUNREACH:
+       case -ETIMEDOUT:
+               /*
+                * Problem reaching the server.  Disconnect and let the
+                * forechannel reestablish the connection.  The server will
+                * have to retransmit the backchannel request and we'll
+                * reprocess it.  Since these ops are idempotent, there's no
+                * need to cache our reply at this time.
+                */
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               xprt_conditional_disconnect(task->tk_xprt,
+                       req->rq_connect_cookie);
+               break;
+       default:
+               /*
+                * We were unable to reply and will have to drop the
+                * request.  The server should reconnect and retransmit.
+                */
+               BUG_ON(task->tk_status == -EAGAIN);
+               printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+                       "error: %d\n", task->tk_status);
+               break;
+       }
+       rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
+}
+#endif /* CONFIG_NFS_V4_1 */
+
 /*
  * 6.  Sort out the RPC call status
  */
@@ -1130,8 +1258,8 @@ call_status(struct rpc_task *task)
        struct rpc_rqst *req = task->tk_rqstp;
        int             status;
 
-       if (req->rq_received > 0 && !req->rq_bytes_sent)
-               task->tk_status = req->rq_received;
+       if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
+               task->tk_status = req->rq_reply_bytes_recvd;
 
        dprint_status(task);
 
@@ -1248,7 +1376,7 @@ call_decode(struct rpc_task *task)
 
        /*
         * Ensure that we see all writes made by xprt_complete_rqst()
-        * before it changed req->rq_received.
+        * before it changed req->rq_reply_bytes_recvd.
         */
        smp_rmb();
        req->rq_rcv_buf.len = req->rq_private_buf.len;
@@ -1289,7 +1417,7 @@ out_retry:
        task->tk_status = 0;
        /* Note: rpc_verify_header() may have freed the RPC slot */
        if (task->tk_rqstp == req) {
-               req->rq_received = req->rq_rcv_buf.len = 0;
+               req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
                if (task->tk_client->cl_discrtry)
                        xprt_conditional_disconnect(task->tk_xprt,
                                        req->rq_connect_cookie);
@@ -1377,13 +1505,14 @@ rpc_verify_header(struct rpc_task *task)
        }
        if ((len -= 3) < 0)
                goto out_overflow;
-       p += 1; /* skip XID */
 
+       p += 1; /* skip XID */
        if ((n = ntohl(*p++)) != RPC_REPLY) {
                dprintk("RPC: %5u %s: not an RPC reply: %x\n",
-                               task->tk_pid, __func__, n);
+                       task->tk_pid, __func__, n);
                goto out_garbage;
        }
+
        if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
                if (--len < 0)
                        goto out_overflow;