SUNRPC: Ensure that we grab the XPRT_LOCK before calling xprt_alloc_slot
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 17 Jul 2011 20:01:03 +0000 (16:01 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 17 Jul 2011 20:01:03 +0000 (16:01 -0400)
This throttles the allocation of new slots when the socket is busy
reconnecting and/or is out of buffer space.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtsock.c

index 217b020..a876882 100644 (file)
@@ -111,7 +111,7 @@ struct rpc_rqst {
 
 struct rpc_xprt_ops {
        void            (*set_buffer_size)(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize);
-       int             (*reserve_xprt)(struct rpc_task *task);
+       int             (*reserve_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
        void            (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
        void            (*rpcbind)(struct rpc_task *task);
        void            (*set_port)(struct rpc_xprt *xprt, unsigned short port);
@@ -271,8 +271,8 @@ struct xprt_class {
 struct rpc_xprt                *xprt_create_transport(struct xprt_create *args);
 void                   xprt_connect(struct rpc_task *task);
 void                   xprt_reserve(struct rpc_task *task);
-int                    xprt_reserve_xprt(struct rpc_task *task);
-int                    xprt_reserve_xprt_cong(struct rpc_task *task);
+int                    xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
+int                    xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
 int                    xprt_prepare_transmit(struct rpc_task *task);
 void                   xprt_transmit(struct rpc_task *task);
 void                   xprt_end_transmit(struct rpc_task *task);
index fbdbaf2..ccd583a 100644 (file)
@@ -191,10 +191,9 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
  * transport connects from colliding with writes.  No congestion control
  * is provided.
  */
-int xprt_reserve_xprt(struct rpc_task *task)
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       struct rpc_xprt *xprt = req->rq_xprt;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
                if (task == xprt->snd_task)
@@ -202,8 +201,10 @@ int xprt_reserve_xprt(struct rpc_task *task)
                goto out_sleep;
        }
        xprt->snd_task = task;
-       req->rq_bytes_sent = 0;
-       req->rq_ntrans++;
+       if (req != NULL) {
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
+       }
 
        return 1;
 
@@ -212,7 +213,7 @@ out_sleep:
                        task->tk_pid, xprt);
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       if (req->rq_ntrans)
+       if (req != NULL && req->rq_ntrans)
                rpc_sleep_on(&xprt->resend, task, NULL);
        else
                rpc_sleep_on(&xprt->sending, task, NULL);
@@ -239,9 +240,8 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
  * integrated into the decision of whether a request is allowed to be
  * woken up and given access to the transport.
  */
-int xprt_reserve_xprt_cong(struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-       struct rpc_xprt *xprt = task->tk_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
@@ -249,12 +249,14 @@ int xprt_reserve_xprt_cong(struct rpc_task *task)
                        return 1;
                goto out_sleep;
        }
+       if (req == NULL) {
+               xprt->snd_task = task;
+               return 1;
+       }
        if (__xprt_get_cong(xprt, task)) {
                xprt->snd_task = task;
-               if (req) {
-                       req->rq_bytes_sent = 0;
-                       req->rq_ntrans++;
-               }
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
                return 1;
        }
        xprt_clear_locked(xprt);
@@ -262,7 +264,7 @@ out_sleep:
        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       if (req && req->rq_ntrans)
+       if (req != NULL && req->rq_ntrans)
                rpc_sleep_on(&xprt->resend, task, NULL);
        else
                rpc_sleep_on(&xprt->sending, task, NULL);
@@ -275,7 +277,7 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
        int retval;
 
        spin_lock_bh(&xprt->transport_lock);
-       retval = xprt->ops->reserve_xprt(task);
+       retval = xprt->ops->reserve_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
        return retval;
 }
@@ -291,7 +293,7 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
        task = rpc_wake_up_next(&xprt->resend);
        if (!task) {
                task = rpc_wake_up_next(&xprt->sending);
-               if (!task)
+               if (task == NULL)
                        goto out_unlock;
        }
 
@@ -310,6 +312,7 @@ out_unlock:
 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
        struct rpc_task *task;
+       struct rpc_rqst *req;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
@@ -318,16 +321,19 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
        task = rpc_wake_up_next(&xprt->resend);
        if (!task) {
                task = rpc_wake_up_next(&xprt->sending);
-               if (!task)
+               if (task == NULL)
                        goto out_unlock;
        }
+
+       req = task->tk_rqstp;
+       if (req == NULL) {
+               xprt->snd_task = task;
+               return;
+       }
        if (__xprt_get_cong(xprt, task)) {
-               struct rpc_rqst *req = task->tk_rqstp;
                xprt->snd_task = task;
-               if (req) {
-                       req->rq_bytes_sent = 0;
-                       req->rq_ntrans++;
-               }
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
                return;
        }
 out_unlock:
@@ -852,7 +858,7 @@ int xprt_prepare_transmit(struct rpc_task *task)
                err = req->rq_reply_bytes_recvd;
                goto out_unlock;
        }
-       if (!xprt->ops->reserve_xprt(task))
+       if (!xprt->ops->reserve_xprt(xprt, task))
                err = -EAGAIN;
 out_unlock:
        spin_unlock_bh(&xprt->transport_lock);
@@ -933,8 +939,6 @@ static void xprt_alloc_slot(struct rpc_task *task)
        struct rpc_xprt *xprt = task->tk_xprt;
 
        task->tk_status = 0;
-       if (task->tk_rqstp)
-               return;
        if (!list_empty(&xprt->free)) {
                struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
                list_del_init(&req->rq_list);
@@ -944,7 +948,6 @@ static void xprt_alloc_slot(struct rpc_task *task)
        }
        dprintk("RPC:       waiting for request slot\n");
        task->tk_status = -EAGAIN;
-       task->tk_timeout = 0;
        rpc_sleep_on(&xprt->backlog, task, NULL);
 }
 
@@ -1001,10 +1004,25 @@ void xprt_reserve(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
+       task->tk_status = 0;
+       if (task->tk_rqstp != NULL)
+               return;
+
+       /* Note: grabbing the xprt_lock_write() here is not strictly needed,
+        * but ensures that we throttle new slot allocation if the transport
+        * is congested (e.g. if reconnecting or if we're out of socket
+        * write buffer space).
+        */
+       task->tk_timeout = 0;
+       task->tk_status = -EAGAIN;
+       if (!xprt_lock_write(xprt, task))
+               return;
+
        task->tk_status = -EIO;
        spin_lock(&xprt->reserve_lock);
        xprt_alloc_slot(task);
        spin_unlock(&xprt->reserve_lock);
+       xprt_release_write(xprt, task);
 }
 
 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
index 0867070..674a492 100644 (file)
@@ -452,9 +452,8 @@ xprt_rdma_connect(struct rpc_task *task)
 }
 
 static int
-xprt_rdma_reserve_xprt(struct rpc_task *task)
+xprt_rdma_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-       struct rpc_xprt *xprt = task->tk_xprt;
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        int credits = atomic_read(&r_xprt->rx_buf.rb_credits);
 
@@ -466,7 +465,7 @@ xprt_rdma_reserve_xprt(struct rpc_task *task)
                BUG_ON(r_xprt->rx_buf.rb_cwndscale <= 0);
        }
        xprt->cwnd = credits * r_xprt->rx_buf.rb_cwndscale;
-       return xprt_reserve_xprt_cong(task);
+       return xprt_reserve_xprt_cong(xprt, task);
 }
 
 /*
index cd6c410..adaa54c 100644 (file)
@@ -755,6 +755,8 @@ static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
        if (task == NULL)
                goto out_release;
        req = task->tk_rqstp;
+       if (req == NULL)
+               goto out_release;
        if (req->rq_bytes_sent == 0)
                goto out_release;
        if (req->rq_bytes_sent == req->rq_snd_buf.len)