SUNRPC: Replace xprt->resend and xprt->sending with a priority queue
[pandora-kernel.git] / net / sunrpc / xprt.c
index ce5eb68..9b6a4d1 100644 (file)
@@ -62,6 +62,7 @@
 /*
  * Local functions
  */
+static void     xprt_init(struct rpc_xprt *xprt, struct net *net);
 static void    xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 static void    xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
@@ -191,10 +192,10 @@ EXPORT_SYMBOL_GPL(xprt_load_transport);
  * transport connects from colliding with writes.  No congestion control
  * is provided.
  */
-int xprt_reserve_xprt(struct rpc_task *task)
+int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        struct rpc_rqst *req = task->tk_rqstp;
-       struct rpc_xprt *xprt = req->rq_xprt;
+       int priority;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
                if (task == xprt->snd_task)
@@ -202,8 +203,10 @@ int xprt_reserve_xprt(struct rpc_task *task)
                goto out_sleep;
        }
        xprt->snd_task = task;
-       req->rq_bytes_sent = 0;
-       req->rq_ntrans++;
+       if (req != NULL) {
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
+       }
 
        return 1;
 
@@ -212,10 +215,13 @@ out_sleep:
                        task->tk_pid, xprt);
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       if (req->rq_ntrans)
-               rpc_sleep_on(&xprt->resend, task, NULL);
+       if (req == NULL)
+               priority = RPC_PRIORITY_LOW;
+       else if (!req->rq_ntrans)
+               priority = RPC_PRIORITY_NORMAL;
        else
-               rpc_sleep_on(&xprt->sending, task, NULL);
+               priority = RPC_PRIORITY_HIGH;
+       rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
        return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
@@ -239,22 +245,24 @@ static void xprt_clear_locked(struct rpc_xprt *xprt)
  * integrated into the decision of whether a request is allowed to be
  * woken up and given access to the transport.
  */
-int xprt_reserve_xprt_cong(struct rpc_task *task)
+int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
 {
-       struct rpc_xprt *xprt = task->tk_xprt;
        struct rpc_rqst *req = task->tk_rqstp;
+       int priority;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
                if (task == xprt->snd_task)
                        return 1;
                goto out_sleep;
        }
+       if (req == NULL) {
+               xprt->snd_task = task;
+               return 1;
+       }
        if (__xprt_get_cong(xprt, task)) {
                xprt->snd_task = task;
-               if (req) {
-                       req->rq_bytes_sent = 0;
-                       req->rq_ntrans++;
-               }
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
                return 1;
        }
        xprt_clear_locked(xprt);
@@ -262,10 +270,13 @@ out_sleep:
        dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
        task->tk_timeout = 0;
        task->tk_status = -EAGAIN;
-       if (req && req->rq_ntrans)
-               rpc_sleep_on(&xprt->resend, task, NULL);
+       if (req == NULL)
+               priority = RPC_PRIORITY_LOW;
+       else if (!req->rq_ntrans)
+               priority = RPC_PRIORITY_NORMAL;
        else
-               rpc_sleep_on(&xprt->sending, task, NULL);
+               priority = RPC_PRIORITY_HIGH;
+       rpc_sleep_on_priority(&xprt->sending, task, NULL, priority);
        return 0;
 }
 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
@@ -275,7 +286,7 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
        int retval;
 
        spin_lock_bh(&xprt->transport_lock);
-       retval = xprt->ops->reserve_xprt(task);
+       retval = xprt->ops->reserve_xprt(xprt, task);
        spin_unlock_bh(&xprt->transport_lock);
        return retval;
 }
@@ -288,12 +299,9 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
 
-       task = rpc_wake_up_next(&xprt->resend);
-       if (!task) {
-               task = rpc_wake_up_next(&xprt->sending);
-               if (!task)
-                       goto out_unlock;
-       }
+       task = rpc_wake_up_next(&xprt->sending);
+       if (task == NULL)
+               goto out_unlock;
 
        req = task->tk_rqstp;
        xprt->snd_task = task;
@@ -310,24 +318,25 @@ out_unlock:
 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
 {
        struct rpc_task *task;
+       struct rpc_rqst *req;
 
        if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
                return;
        if (RPCXPRT_CONGESTED(xprt))
                goto out_unlock;
-       task = rpc_wake_up_next(&xprt->resend);
-       if (!task) {
-               task = rpc_wake_up_next(&xprt->sending);
-               if (!task)
-                       goto out_unlock;
+       task = rpc_wake_up_next(&xprt->sending);
+       if (task == NULL)
+               goto out_unlock;
+
+       req = task->tk_rqstp;
+       if (req == NULL) {
+               xprt->snd_task = task;
+               return;
        }
        if (__xprt_get_cong(xprt, task)) {
-               struct rpc_rqst *req = task->tk_rqstp;
                xprt->snd_task = task;
-               if (req) {
-                       req->rq_bytes_sent = 0;
-                       req->rq_ntrans++;
-               }
+               req->rq_bytes_sent = 0;
+               req->rq_ntrans++;
                return;
        }
 out_unlock:
@@ -852,7 +861,7 @@ int xprt_prepare_transmit(struct rpc_task *task)
                err = req->rq_reply_bytes_recvd;
                goto out_unlock;
        }
-       if (!xprt->ops->reserve_xprt(task))
+       if (!xprt->ops->reserve_xprt(xprt, task))
                err = -EAGAIN;
 out_unlock:
        spin_unlock_bh(&xprt->transport_lock);
@@ -928,28 +937,66 @@ void xprt_transmit(struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 }
 
+static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+       struct rpc_rqst *req = ERR_PTR(-EAGAIN);
+
+       if (!atomic_add_unless(&xprt->num_reqs, 1, xprt->max_reqs))
+               goto out;
+       req = kzalloc(sizeof(struct rpc_rqst), gfp_flags);
+       if (req != NULL)
+               goto out;
+       atomic_dec(&xprt->num_reqs);
+       req = ERR_PTR(-ENOMEM);
+out:
+       return req;
+}
+
+static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
+{
+       if (atomic_add_unless(&xprt->num_reqs, -1, xprt->min_reqs)) {
+               kfree(req);
+               return true;
+       }
+       return false;
+}
+
 static void xprt_alloc_slot(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
+       struct rpc_rqst *req;
 
-       task->tk_status = 0;
-       if (task->tk_rqstp)
-               return;
        if (!list_empty(&xprt->free)) {
-               struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
-               list_del_init(&req->rq_list);
-               task->tk_rqstp = req;
-               xprt_request_init(task, xprt);
-               return;
+               req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
+               list_del(&req->rq_list);
+               goto out_init_req;
+       }
+       req = xprt_dynamic_alloc_slot(xprt, GFP_NOWAIT);
+       if (!IS_ERR(req))
+               goto out_init_req;
+       switch (PTR_ERR(req)) {
+       case -ENOMEM:
+               rpc_delay(task, HZ >> 2);
+               dprintk("RPC:       dynamic allocation of request slot "
+                               "failed! Retrying\n");
+               break;
+       case -EAGAIN:
+               rpc_sleep_on(&xprt->backlog, task, NULL);
+               dprintk("RPC:       waiting for request slot\n");
        }
-       dprintk("RPC:       waiting for request slot\n");
        task->tk_status = -EAGAIN;
-       task->tk_timeout = 0;
-       rpc_sleep_on(&xprt->backlog, task, NULL);
+       return;
+out_init_req:
+       task->tk_status = 0;
+       task->tk_rqstp = req;
+       xprt_request_init(task, xprt);
 }
 
 static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
 {
+       if (xprt_dynamic_free_slot(xprt, req))
+               return;
+
        memset(req, 0, sizeof(*req));   /* mark unused */
 
        spin_lock(&xprt->reserve_lock);
@@ -958,25 +1005,49 @@ static void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
        spin_unlock(&xprt->reserve_lock);
 }
 
-struct rpc_xprt *xprt_alloc(struct net *net, int size, int max_req)
+static void xprt_free_all_slots(struct rpc_xprt *xprt)
+{
+       struct rpc_rqst *req;
+       while (!list_empty(&xprt->free)) {
+               req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
+               list_del(&req->rq_list);
+               kfree(req);
+       }
+}
+
+struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
+               unsigned int num_prealloc,
+               unsigned int max_alloc)
 {
        struct rpc_xprt *xprt;
+       struct rpc_rqst *req;
+       int i;
 
        xprt = kzalloc(size, GFP_KERNEL);
        if (xprt == NULL)
                goto out;
-       atomic_set(&xprt->count, 1);
 
-       xprt->max_reqs = max_req;
-       xprt->slot = kcalloc(max_req, sizeof(struct rpc_rqst), GFP_KERNEL);
-       if (xprt->slot == NULL)
+       xprt_init(xprt, net);
+
+       for (i = 0; i < num_prealloc; i++) {
+               req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
+               if (!req)
+                       break;
+               list_add(&req->rq_list, &xprt->free);
+       }
+       if (i < num_prealloc)
                goto out_free;
+       if (max_alloc > num_prealloc)
+               xprt->max_reqs = max_alloc;
+       else
+               xprt->max_reqs = num_prealloc;
+       xprt->min_reqs = num_prealloc;
+       atomic_set(&xprt->num_reqs, num_prealloc);
 
-       xprt->xprt_net = get_net(net);
        return xprt;
 
 out_free:
-       kfree(xprt);
+       xprt_free(xprt);
 out:
        return NULL;
 }
@@ -985,7 +1056,7 @@ EXPORT_SYMBOL_GPL(xprt_alloc);
 void xprt_free(struct rpc_xprt *xprt)
 {
        put_net(xprt->xprt_net);
-       kfree(xprt->slot);
+       xprt_free_all_slots(xprt);
        kfree(xprt);
 }
 EXPORT_SYMBOL_GPL(xprt_free);
@@ -1001,10 +1072,24 @@ void xprt_reserve(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       task->tk_status = -EIO;
+       task->tk_status = 0;
+       if (task->tk_rqstp != NULL)
+               return;
+
+       /* Note: grabbing the xprt_lock_write() here is not strictly needed,
+        * but ensures that we throttle new slot allocation if the transport
+        * is congested (e.g. if reconnecting or if we're out of socket
+        * write buffer space).
+        */
+       task->tk_timeout = 0;
+       task->tk_status = -EAGAIN;
+       if (!xprt_lock_write(xprt, task))
+               return;
+
        spin_lock(&xprt->reserve_lock);
        xprt_alloc_slot(task);
        spin_unlock(&xprt->reserve_lock);
+       xprt_release_write(xprt, task);
 }
 
 static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
@@ -1021,6 +1106,7 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
 {
        struct rpc_rqst *req = task->tk_rqstp;
 
+       INIT_LIST_HEAD(&req->rq_list);
        req->rq_timeout = task->tk_client->cl_timeout->to_initval;
        req->rq_task    = task;
        req->rq_xprt    = xprt;
@@ -1073,6 +1159,34 @@ void xprt_release(struct rpc_task *task)
                xprt_free_bc_request(req);
 }
 
+static void xprt_init(struct rpc_xprt *xprt, struct net *net)
+{
+       atomic_set(&xprt->count, 1);
+
+       spin_lock_init(&xprt->transport_lock);
+       spin_lock_init(&xprt->reserve_lock);
+
+       INIT_LIST_HEAD(&xprt->free);
+       INIT_LIST_HEAD(&xprt->recv);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+       spin_lock_init(&xprt->bc_pa_lock);
+       INIT_LIST_HEAD(&xprt->bc_pa_list);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
+       xprt->last_used = jiffies;
+       xprt->cwnd = RPC_INITCWND;
+       xprt->bind_index = 0;
+
+       rpc_init_wait_queue(&xprt->binding, "xprt_binding");
+       rpc_init_wait_queue(&xprt->pending, "xprt_pending");
+       rpc_init_priority_wait_queue(&xprt->sending, "xprt_sending");
+       rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
+
+       xprt_init_xid(xprt);
+
+       xprt->xprt_net = get_net(net);
+}
+
 /**
  * xprt_create_transport - create an RPC transport
  * @args: rpc transport creation arguments
@@ -1081,7 +1195,6 @@ void xprt_release(struct rpc_task *task)
 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
 {
        struct rpc_xprt *xprt;
-       struct rpc_rqst *req;
        struct xprt_class *t;
 
        spin_lock(&xprt_list_lock);
@@ -1100,46 +1213,17 @@ found:
        if (IS_ERR(xprt)) {
                dprintk("RPC:       xprt_create_transport: failed, %ld\n",
                                -PTR_ERR(xprt));
-               return xprt;
+               goto out;
        }
-       if (test_and_set_bit(XPRT_INITIALIZED, &xprt->state))
-               /* ->setup returned a pre-initialized xprt: */
-               return xprt;
-
-       spin_lock_init(&xprt->transport_lock);
-       spin_lock_init(&xprt->reserve_lock);
-
-       INIT_LIST_HEAD(&xprt->free);
-       INIT_LIST_HEAD(&xprt->recv);
-#if defined(CONFIG_NFS_V4_1)
-       spin_lock_init(&xprt->bc_pa_lock);
-       INIT_LIST_HEAD(&xprt->bc_pa_list);
-#endif /* CONFIG_NFS_V4_1 */
-
        INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
        if (xprt_has_timer(xprt))
                setup_timer(&xprt->timer, xprt_init_autodisconnect,
                            (unsigned long)xprt);
        else
                init_timer(&xprt->timer);
-       xprt->last_used = jiffies;
-       xprt->cwnd = RPC_INITCWND;
-       xprt->bind_index = 0;
-
-       rpc_init_wait_queue(&xprt->binding, "xprt_binding");
-       rpc_init_wait_queue(&xprt->pending, "xprt_pending");
-       rpc_init_wait_queue(&xprt->sending, "xprt_sending");
-       rpc_init_wait_queue(&xprt->resend, "xprt_resend");
-       rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
-
-       /* initialize free list */
-       for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
-               list_add(&req->rq_list, &xprt->free);
-
-       xprt_init_xid(xprt);
-
        dprintk("RPC:       created transport %p with %u slots\n", xprt,
                        xprt->max_reqs);
+out:
        return xprt;
 }
 
@@ -1157,7 +1241,6 @@ static void xprt_destroy(struct rpc_xprt *xprt)
        rpc_destroy_wait_queue(&xprt->binding);
        rpc_destroy_wait_queue(&xprt->pending);
        rpc_destroy_wait_queue(&xprt->sending);
-       rpc_destroy_wait_queue(&xprt->resend);
        rpc_destroy_wait_queue(&xprt->backlog);
        cancel_work_sync(&xprt->task_cleanup);
        /*