nfsd: make client_lock per net
[pandora-kernel.git] / net / sunrpc / svc_xprt.c
index bac973a..194d865 100644 (file)
@@ -208,6 +208,35 @@ static struct svc_xprt *__svc_xpo_create(struct svc_xprt_class *xcl,
        return xcl->xcl_ops->xpo_create(serv, net, sap, len, flags);
 }
 
+/*
+ * svc_xprt_received conditionally queues the transport for processing
+ * by another thread. The caller must hold the XPT_BUSY bit and must
+ * not thereafter touch transport data.
+ *
+ * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
+ * insufficient) data.
+ */
+static void svc_xprt_received(struct svc_xprt *xprt)
+{
+       BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
+       /* As soon as we clear busy, the xprt could be closed and
+        * 'put', so we need a reference to call svc_xprt_enqueue with:
+        */
+       svc_xprt_get(xprt);
+       clear_bit(XPT_BUSY, &xprt->xpt_flags);
+       svc_xprt_enqueue(xprt);
+       svc_xprt_put(xprt);
+}
+
+void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new)
+{
+       clear_bit(XPT_TEMP, &new->xpt_flags);
+       spin_lock_bh(&serv->sv_lock);
+       list_add(&new->xpt_list, &serv->sv_permsocks);
+       spin_unlock_bh(&serv->sv_lock);
+       svc_xprt_received(new);
+}
+
 int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
                    struct net *net, const int family,
                    const unsigned short port, int flags)
@@ -232,13 +261,8 @@ int svc_create_xprt(struct svc_serv *serv, const char *xprt_name,
                        module_put(xcl->xcl_owner);
                        return PTR_ERR(newxprt);
                }
-
-               clear_bit(XPT_TEMP, &newxprt->xpt_flags);
-               spin_lock_bh(&serv->sv_lock);
-               list_add(&newxprt->xpt_list, &serv->sv_permsocks);
-               spin_unlock_bh(&serv->sv_lock);
+               svc_add_new_perm_xprt(serv, newxprt);
                newport = svc_xprt_local_port(newxprt);
-               clear_bit(XPT_BUSY, &newxprt->xpt_flags);
                return newport;
        }
  err:
@@ -394,27 +418,6 @@ static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
        return xprt;
 }
 
-/*
- * svc_xprt_received conditionally queues the transport for processing
- * by another thread. The caller must hold the XPT_BUSY bit and must
- * not thereafter touch transport data.
- *
- * Note: XPT_DATA only gets cleared when a read-attempt finds no (or
- * insufficient) data.
- */
-void svc_xprt_received(struct svc_xprt *xprt)
-{
-       BUG_ON(!test_bit(XPT_BUSY, &xprt->xpt_flags));
-       /* As soon as we clear busy, the xprt could be closed and
-        * 'put', so we need a reference to call svc_xprt_enqueue with:
-        */
-       svc_xprt_get(xprt);
-       clear_bit(XPT_BUSY, &xprt->xpt_flags);
-       svc_xprt_enqueue(xprt);
-       svc_xprt_put(xprt);
-}
-EXPORT_SYMBOL_GPL(svc_xprt_received);
-
 /**
  * svc_reserve - change the space reserved for the reply to a request.
  * @rqstp:  The request in question
@@ -565,33 +568,12 @@ static void svc_check_conn_limits(struct svc_serv *serv)
        }
 }
 
-/*
- * Receive the next request on any transport.  This code is carefully
- * organised not to touch any cachelines in the shared svc_serv
- * structure, only cachelines in the local svc_pool.
- */
-int svc_recv(struct svc_rqst *rqstp, long timeout)
+int svc_alloc_arg(struct svc_rqst *rqstp)
 {
-       struct svc_xprt         *xprt = NULL;
-       struct svc_serv         *serv = rqstp->rq_server;
-       struct svc_pool         *pool = rqstp->rq_pool;
-       int                     len, i;
-       int                     pages;
-       struct xdr_buf          *arg;
-       DECLARE_WAITQUEUE(wait, current);
-       long                    time_left;
-
-       dprintk("svc: server %p waiting for data (to = %ld)\n",
-               rqstp, timeout);
-
-       if (rqstp->rq_xprt)
-               printk(KERN_ERR
-                       "svc_recv: service %p, transport not NULL!\n",
-                        rqstp);
-       if (waitqueue_active(&rqstp->rq_wait))
-               printk(KERN_ERR
-                       "svc_recv: service %p, wait queue active!\n",
-                        rqstp);
+       struct svc_serv *serv = rqstp->rq_server;
+       struct xdr_buf *arg;
+       int pages;
+       int i;
 
        /* now allocate needed pages.  If we get a failure, sleep briefly */
        pages = (serv->sv_max_mesg + PAGE_SIZE) / PAGE_SIZE;
@@ -621,11 +603,15 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
        arg->page_len = (pages-2)*PAGE_SIZE;
        arg->len = (pages-1)*PAGE_SIZE;
        arg->tail[0].iov_len = 0;
+       return 0;
+}
 
-       try_to_freeze();
-       cond_resched();
-       if (signalled() || kthread_should_stop())
-               return -EINTR;
+struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
+{
+       struct svc_xprt *xprt;
+       struct svc_pool         *pool = rqstp->rq_pool;
+       DECLARE_WAITQUEUE(wait, current);
+       long                    time_left;
 
        /* Normally we will wait up to 5 seconds for any required
         * cache information to be provided.
@@ -663,7 +649,7 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
                if (kthread_should_stop()) {
                        set_current_state(TASK_RUNNING);
                        spin_unlock_bh(&pool->sp_lock);
-                       return -EINTR;
+                       return ERR_PTR(-EINTR);
                }
 
                add_wait_queue(&rqstp->rq_wait, &wait);
@@ -684,48 +670,58 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
                        spin_unlock_bh(&pool->sp_lock);
                        dprintk("svc: server %p, no data yet\n", rqstp);
                        if (signalled() || kthread_should_stop())
-                               return -EINTR;
+                               return ERR_PTR(-EINTR);
                        else
-                               return -EAGAIN;
+                               return ERR_PTR(-EAGAIN);
                }
        }
        spin_unlock_bh(&pool->sp_lock);
+       return xprt;
+}
+
+void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
+{
+       spin_lock_bh(&serv->sv_lock);
+       set_bit(XPT_TEMP, &newxpt->xpt_flags);
+       list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
+       serv->sv_tmpcnt++;
+       if (serv->sv_temptimer.function == NULL) {
+               /* setup timer to age temp transports */
+               setup_timer(&serv->sv_temptimer, svc_age_temp_xprts,
+                           (unsigned long)serv);
+               mod_timer(&serv->sv_temptimer,
+                         jiffies + svc_conn_age_period * HZ);
+       }
+       spin_unlock_bh(&serv->sv_lock);
+       svc_xprt_received(newxpt);
+}
+
+static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+       struct svc_serv *serv = rqstp->rq_server;
+       int len = 0;
 
-       len = 0;
        if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
                dprintk("svc_recv: found XPT_CLOSE\n");
                svc_delete_xprt(xprt);
                /* Leave XPT_BUSY set on the dead xprt: */
-               goto out;
+               return 0;
        }
        if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
                struct svc_xprt *newxpt;
+               /*
+                * We know this module_get will succeed because the
+                * listener holds a reference too
+                */
+               __module_get(xprt->xpt_class->xcl_owner);
+               svc_check_conn_limits(xprt->xpt_server);
                newxpt = xprt->xpt_ops->xpo_accept(xprt);
-               if (newxpt) {
-                       /*
-                        * We know this module_get will succeed because the
-                        * listener holds a reference too
-                        */
-                       __module_get(newxpt->xpt_class->xcl_owner);
-                       svc_check_conn_limits(xprt->xpt_server);
-                       spin_lock_bh(&serv->sv_lock);
-                       set_bit(XPT_TEMP, &newxpt->xpt_flags);
-                       list_add(&newxpt->xpt_list, &serv->sv_tempsocks);
-                       serv->sv_tmpcnt++;
-                       if (serv->sv_temptimer.function == NULL) {
-                               /* setup timer to age temp transports */
-                               setup_timer(&serv->sv_temptimer,
-                                           svc_age_temp_xprts,
-                                           (unsigned long)serv);
-                               mod_timer(&serv->sv_temptimer,
-                                         jiffies + svc_conn_age_period * HZ);
-                       }
-                       spin_unlock_bh(&serv->sv_lock);
-                       svc_xprt_received(newxpt);
-               }
+               if (newxpt)
+                       svc_add_new_temp_xprt(serv, newxpt);
        } else if (xprt->xpt_ops->xpo_has_wspace(xprt)) {
+               /* XPT_DATA|XPT_DEFERRED case: */
                dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
-                       rqstp, pool->sp_id, xprt,
+                       rqstp, rqstp->rq_pool->sp_id, xprt,
                        atomic_read(&xprt->xpt_ref.refcount));
                rqstp->rq_deferred = svc_deferred_dequeue(xprt);
                if (rqstp->rq_deferred)
@@ -736,10 +732,51 @@ int svc_recv(struct svc_rqst *rqstp, long timeout)
                rqstp->rq_reserved = serv->sv_max_mesg;
                atomic_add(rqstp->rq_reserved, &xprt->xpt_reserved);
        }
+       /* clear XPT_BUSY: */
        svc_xprt_received(xprt);
+       return len;
+}
+
+/*
+ * Receive the next request on any transport.  This code is carefully
+ * organised not to touch any cachelines in the shared svc_serv
+ * structure, only cachelines in the local svc_pool.
+ */
+int svc_recv(struct svc_rqst *rqstp, long timeout)
+{
+       struct svc_xprt         *xprt = NULL;
+       struct svc_serv         *serv = rqstp->rq_server;
+       int                     len, err;
+
+       dprintk("svc: server %p waiting for data (to = %ld)\n",
+               rqstp, timeout);
+
+       if (rqstp->rq_xprt)
+               printk(KERN_ERR
+                       "svc_recv: service %p, transport not NULL!\n",
+                        rqstp);
+       if (waitqueue_active(&rqstp->rq_wait))
+               printk(KERN_ERR
+                       "svc_recv: service %p, wait queue active!\n",
+                        rqstp);
+
+       err = svc_alloc_arg(rqstp);
+       if (err)
+               return err;
+
+       try_to_freeze();
+       cond_resched();
+       if (signalled() || kthread_should_stop())
+               return -EINTR;
+
+       xprt = svc_get_next_xprt(rqstp, timeout);
+       if (IS_ERR(xprt))
+               return PTR_ERR(xprt);
+
+       len = svc_handle_xprt(rqstp, xprt);
 
        /* No data, incomplete (TCP) read, or accept() */
-       if (len == 0 || len == -EAGAIN)
+       if (len <= 0)
                goto out;
 
        clear_bit(XPT_OLD, &xprt->xpt_flags);
@@ -917,16 +954,18 @@ void svc_close_xprt(struct svc_xprt *xprt)
 }
 EXPORT_SYMBOL_GPL(svc_close_xprt);
 
-static void svc_close_list(struct list_head *xprt_list, struct net *net)
+static void svc_close_list(struct svc_serv *serv, struct list_head *xprt_list, struct net *net)
 {
        struct svc_xprt *xprt;
 
+       spin_lock(&serv->sv_lock);
        list_for_each_entry(xprt, xprt_list, xpt_list) {
                if (xprt->xpt_net != net)
                        continue;
                set_bit(XPT_CLOSE, &xprt->xpt_flags);
                set_bit(XPT_BUSY, &xprt->xpt_flags);
        }
+       spin_unlock(&serv->sv_lock);
 }
 
 static void svc_clear_pools(struct svc_serv *serv, struct net *net)
@@ -949,24 +988,28 @@ static void svc_clear_pools(struct svc_serv *serv, struct net *net)
        }
 }
 
-static void svc_clear_list(struct list_head *xprt_list, struct net *net)
+static void svc_clear_list(struct svc_serv *serv, struct list_head *xprt_list, struct net *net)
 {
        struct svc_xprt *xprt;
        struct svc_xprt *tmp;
+       LIST_HEAD(victims);
 
+       spin_lock(&serv->sv_lock);
        list_for_each_entry_safe(xprt, tmp, xprt_list, xpt_list) {
                if (xprt->xpt_net != net)
                        continue;
-               svc_delete_xprt(xprt);
+               list_move(&xprt->xpt_list, &victims);
        }
-       list_for_each_entry(xprt, xprt_list, xpt_list)
-               BUG_ON(xprt->xpt_net == net);
+       spin_unlock(&serv->sv_lock);
+
+       list_for_each_entry_safe(xprt, tmp, &victims, xpt_list)
+               svc_delete_xprt(xprt);
 }
 
 void svc_close_net(struct svc_serv *serv, struct net *net)
 {
-       svc_close_list(&serv->sv_tempsocks, net);
-       svc_close_list(&serv->sv_permsocks, net);
+       svc_close_list(serv, &serv->sv_tempsocks, net);
+       svc_close_list(serv, &serv->sv_permsocks, net);
 
        svc_clear_pools(serv, net);
        /*
@@ -974,8 +1017,8 @@ void svc_close_net(struct svc_serv *serv, struct net *net)
         * svc_xprt_enqueue will not add new entries without taking the
         * sp_lock and checking XPT_BUSY.
         */
-       svc_clear_list(&serv->sv_tempsocks, net);
-       svc_clear_list(&serv->sv_permsocks, net);
+       svc_clear_list(serv, &serv->sv_tempsocks, net);
+       svc_clear_list(serv, &serv->sv_permsocks, net);
 }
 
 /*