SUNRPC: Add a framework to clean up management of rpc_pipefs directories
[pandora-kernel.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_decode(struct rpc_task *task);
65 static void     call_bind(struct rpc_task *task);
66 static void     call_bind_status(struct rpc_task *task);
67 static void     call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void     call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void     call_status(struct rpc_task *task);
72 static void     call_transmit_status(struct rpc_task *task);
73 static void     call_refresh(struct rpc_task *task);
74 static void     call_refreshresult(struct rpc_task *task);
75 static void     call_timeout(struct rpc_task *task);
76 static void     call_connect(struct rpc_task *task);
77 static void     call_connect_status(struct rpc_task *task);
78
79 static __be32   *rpc_encode_header(struct rpc_task *task);
80 static __be32   *rpc_verify_header(struct rpc_task *task);
81 static int      rpc_ping(struct rpc_clnt *clnt);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         if (clnt->cl_dentry) {
106                 if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107                         clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108                 rpc_remove_client_dir(clnt->cl_dentry, clnt);
109         }
110         clnt->cl_dentry = NULL;
111 }
112
113 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114 {
115         struct net *net = rpc_net_ns(clnt);
116         struct super_block *pipefs_sb;
117
118         pipefs_sb = rpc_get_sb_net(net);
119         if (pipefs_sb) {
120                 __rpc_clnt_remove_pipedir(clnt);
121                 rpc_put_sb_net(net);
122         }
123 }
124
125 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126                                     struct rpc_clnt *clnt)
127 {
128         static uint32_t clntid;
129         const char *dir_name = clnt->cl_program->pipe_dir_name;
130         char name[15];
131         struct dentry *dir, *dentry;
132
133         dir = rpc_d_lookup_sb(sb, dir_name);
134         if (dir == NULL) {
135                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
136                 return dir;
137         }
138         for (;;) {
139                 snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
140                 name[sizeof(name) - 1] = '\0';
141                 dentry = rpc_create_client_dir(dir, name, clnt);
142                 if (!IS_ERR(dentry))
143                         break;
144                 if (dentry == ERR_PTR(-EEXIST))
145                         continue;
146                 printk(KERN_INFO "RPC: Couldn't create pipefs entry"
147                                 " %s/%s, error %ld\n",
148                                 dir_name, name, PTR_ERR(dentry));
149                 break;
150         }
151         dput(dir);
152         return dentry;
153 }
154
155 static int
156 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
157 {
158         struct dentry *dentry;
159
160         if (clnt->cl_program->pipe_dir_name == NULL)
161                 goto out;
162         clnt->cl_dentry = NULL;
163         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
164         if (IS_ERR(dentry))
165                 return PTR_ERR(dentry);
166         clnt->cl_dentry = dentry;
167 out:
168         return 0;
169 }
170
171 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
172 {
173         if (clnt->cl_program->pipe_dir_name == NULL)
174                 return 1;
175
176         if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
177             ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
178                 return 1;
179         if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
180                 return 1;
181         return 0;
182 }
183
184 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
185                                    struct super_block *sb)
186 {
187         struct dentry *dentry;
188         int err = 0;
189
190         switch (event) {
191         case RPC_PIPEFS_MOUNT:
192                 dentry = rpc_setup_pipedir_sb(sb, clnt);
193                 if (!dentry)
194                         return -ENOENT;
195                 if (IS_ERR(dentry))
196                         return PTR_ERR(dentry);
197                 clnt->cl_dentry = dentry;
198                 if (clnt->cl_auth->au_ops->pipes_create) {
199                         err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
200                         if (err)
201                                 __rpc_clnt_remove_pipedir(clnt);
202                 }
203                 break;
204         case RPC_PIPEFS_UMOUNT:
205                 __rpc_clnt_remove_pipedir(clnt);
206                 break;
207         default:
208                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
209                 return -ENOTSUPP;
210         }
211         return err;
212 }
213
214 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
215                                 struct super_block *sb)
216 {
217         int error = 0;
218
219         for (;; clnt = clnt->cl_parent) {
220                 if (!rpc_clnt_skip_event(clnt, event))
221                         error = __rpc_clnt_handle_event(clnt, event, sb);
222                 if (error || clnt == clnt->cl_parent)
223                         break;
224         }
225         return error;
226 }
227
228 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
229 {
230         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
231         struct rpc_clnt *clnt;
232
233         spin_lock(&sn->rpc_client_lock);
234         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
235                 if (rpc_clnt_skip_event(clnt, event))
236                         continue;
237                 spin_unlock(&sn->rpc_client_lock);
238                 return clnt;
239         }
240         spin_unlock(&sn->rpc_client_lock);
241         return NULL;
242 }
243
244 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
245                             void *ptr)
246 {
247         struct super_block *sb = ptr;
248         struct rpc_clnt *clnt;
249         int error = 0;
250
251         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
252                 error = __rpc_pipefs_event(clnt, event, sb);
253                 if (error)
254                         break;
255         }
256         return error;
257 }
258
259 static struct notifier_block rpc_clients_block = {
260         .notifier_call  = rpc_pipefs_event,
261         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
262 };
263
264 int rpc_clients_notifier_register(void)
265 {
266         return rpc_pipefs_notifier_register(&rpc_clients_block);
267 }
268
269 void rpc_clients_notifier_unregister(void)
270 {
271         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
272 }
273
274 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
275 {
276         clnt->cl_nodelen = strlen(nodename);
277         if (clnt->cl_nodelen > UNX_MAXNODENAME)
278                 clnt->cl_nodelen = UNX_MAXNODENAME;
279         memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
280 }
281
282 static int rpc_client_register(const struct rpc_create_args *args,
283                                struct rpc_clnt *clnt)
284 {
285         struct rpc_auth_create_args auth_args = {
286                 .pseudoflavor = args->authflavor,
287                 .target_name = args->client_name,
288         };
289         struct rpc_auth *auth;
290         struct net *net = rpc_net_ns(clnt);
291         struct super_block *pipefs_sb;
292         int err;
293
294         pipefs_sb = rpc_get_sb_net(net);
295         if (pipefs_sb) {
296                 err = rpc_setup_pipedir(pipefs_sb, clnt);
297                 if (err)
298                         goto out;
299         }
300
301         rpc_register_client(clnt);
302         if (pipefs_sb)
303                 rpc_put_sb_net(net);
304
305         auth = rpcauth_create(&auth_args, clnt);
306         if (IS_ERR(auth)) {
307                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
308                                 args->authflavor);
309                 err = PTR_ERR(auth);
310                 goto err_auth;
311         }
312         return 0;
313 err_auth:
314         pipefs_sb = rpc_get_sb_net(net);
315         rpc_unregister_client(clnt);
316         __rpc_clnt_remove_pipedir(clnt);
317 out:
318         if (pipefs_sb)
319                 rpc_put_sb_net(net);
320         return err;
321 }
322
323 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
324 {
325         const struct rpc_program *program = args->program;
326         const struct rpc_version *version;
327         struct rpc_clnt         *clnt = NULL;
328         int err;
329
330         /* sanity check the name before trying to print it */
331         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
332                         program->name, args->servername, xprt);
333
334         err = rpciod_up();
335         if (err)
336                 goto out_no_rpciod;
337
338         err = -EINVAL;
339         if (args->version >= program->nrvers)
340                 goto out_err;
341         version = program->version[args->version];
342         if (version == NULL)
343                 goto out_err;
344
345         err = -ENOMEM;
346         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
347         if (!clnt)
348                 goto out_err;
349         clnt->cl_parent = clnt;
350
351         rcu_assign_pointer(clnt->cl_xprt, xprt);
352         clnt->cl_procinfo = version->procs;
353         clnt->cl_maxproc  = version->nrprocs;
354         clnt->cl_prog     = args->prognumber ? : program->number;
355         clnt->cl_vers     = version->number;
356         clnt->cl_stats    = program->stats;
357         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
358         rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
359         err = -ENOMEM;
360         if (clnt->cl_metrics == NULL)
361                 goto out_no_stats;
362         clnt->cl_program  = program;
363         INIT_LIST_HEAD(&clnt->cl_tasks);
364         spin_lock_init(&clnt->cl_lock);
365
366         if (!xprt_bound(xprt))
367                 clnt->cl_autobind = 1;
368
369         clnt->cl_timeout = xprt->timeout;
370         if (args->timeout != NULL) {
371                 memcpy(&clnt->cl_timeout_default, args->timeout,
372                                 sizeof(clnt->cl_timeout_default));
373                 clnt->cl_timeout = &clnt->cl_timeout_default;
374         }
375
376         clnt->cl_rtt = &clnt->cl_rtt_default;
377         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
378
379         atomic_set(&clnt->cl_count, 1);
380
381         /* save the nodename */
382         rpc_clnt_set_nodename(clnt, utsname()->nodename);
383
384         err = rpc_client_register(args, clnt);
385         if (err)
386                 goto out_no_path;
387         return clnt;
388
389 out_no_path:
390         rpc_free_iostats(clnt->cl_metrics);
391 out_no_stats:
392         kfree(clnt);
393 out_err:
394         rpciod_down();
395 out_no_rpciod:
396         xprt_put(xprt);
397         return ERR_PTR(err);
398 }
399
400 /**
401  * rpc_create - create an RPC client and transport with one call
402  * @args: rpc_clnt create argument structure
403  *
404  * Creates and initializes an RPC transport and an RPC client.
405  *
406  * It can ping the server in order to determine if it is up, and to see if
407  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
408  * this behavior so asynchronous tasks can also use rpc_create.
409  */
410 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
411 {
412         struct rpc_xprt *xprt;
413         struct rpc_clnt *clnt;
414         struct xprt_create xprtargs = {
415                 .net = args->net,
416                 .ident = args->protocol,
417                 .srcaddr = args->saddress,
418                 .dstaddr = args->address,
419                 .addrlen = args->addrsize,
420                 .servername = args->servername,
421                 .bc_xprt = args->bc_xprt,
422         };
423         char servername[48];
424
425         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
426                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
427         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
428                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
429         /*
430          * If the caller chooses not to specify a hostname, whip
431          * up a string representation of the passed-in address.
432          */
433         if (xprtargs.servername == NULL) {
434                 struct sockaddr_un *sun =
435                                 (struct sockaddr_un *)args->address;
436                 struct sockaddr_in *sin =
437                                 (struct sockaddr_in *)args->address;
438                 struct sockaddr_in6 *sin6 =
439                                 (struct sockaddr_in6 *)args->address;
440
441                 servername[0] = '\0';
442                 switch (args->address->sa_family) {
443                 case AF_LOCAL:
444                         snprintf(servername, sizeof(servername), "%s",
445                                  sun->sun_path);
446                         break;
447                 case AF_INET:
448                         snprintf(servername, sizeof(servername), "%pI4",
449                                  &sin->sin_addr.s_addr);
450                         break;
451                 case AF_INET6:
452                         snprintf(servername, sizeof(servername), "%pI6",
453                                  &sin6->sin6_addr);
454                         break;
455                 default:
456                         /* caller wants default server name, but
457                          * address family isn't recognized. */
458                         return ERR_PTR(-EINVAL);
459                 }
460                 xprtargs.servername = servername;
461         }
462
463         xprt = xprt_create_transport(&xprtargs);
464         if (IS_ERR(xprt))
465                 return (struct rpc_clnt *)xprt;
466
467         /*
468          * By default, kernel RPC client connects from a reserved port.
469          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
470          * but it is always enabled for rpciod, which handles the connect
471          * operation.
472          */
473         xprt->resvport = 1;
474         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
475                 xprt->resvport = 0;
476
477         clnt = rpc_new_client(args, xprt);
478         if (IS_ERR(clnt))
479                 return clnt;
480
481         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
482                 int err = rpc_ping(clnt);
483                 if (err != 0) {
484                         rpc_shutdown_client(clnt);
485                         return ERR_PTR(err);
486                 }
487         }
488
489         clnt->cl_softrtry = 1;
490         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
491                 clnt->cl_softrtry = 0;
492
493         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
494                 clnt->cl_autobind = 1;
495         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
496                 clnt->cl_discrtry = 1;
497         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
498                 clnt->cl_chatty = 1;
499
500         return clnt;
501 }
502 EXPORT_SYMBOL_GPL(rpc_create);
503
504 /*
505  * This function clones the RPC client structure. It allows us to share the
506  * same transport while varying parameters such as the authentication
507  * flavour.
508  */
509 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
510                                            struct rpc_clnt *clnt)
511 {
512         struct rpc_xprt *xprt;
513         struct rpc_clnt *new;
514         int err;
515
516         err = -ENOMEM;
517         rcu_read_lock();
518         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
519         rcu_read_unlock();
520         if (xprt == NULL)
521                 goto out_err;
522         args->servername = xprt->servername;
523
524         new = rpc_new_client(args, xprt);
525         if (IS_ERR(new)) {
526                 err = PTR_ERR(new);
527                 goto out_err;
528         }
529
530         atomic_inc(&clnt->cl_count);
531         new->cl_parent = clnt;
532
533         /* Turn off autobind on clones */
534         new->cl_autobind = 0;
535         new->cl_softrtry = clnt->cl_softrtry;
536         new->cl_discrtry = clnt->cl_discrtry;
537         new->cl_chatty = clnt->cl_chatty;
538         return new;
539
540 out_err:
541         dprintk("RPC:       %s: returned error %d\n", __func__, err);
542         return ERR_PTR(err);
543 }
544
545 /**
546  * rpc_clone_client - Clone an RPC client structure
547  *
548  * @clnt: RPC client whose parameters are copied
549  *
550  * Returns a fresh RPC client or an ERR_PTR.
551  */
552 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
553 {
554         struct rpc_create_args args = {
555                 .program        = clnt->cl_program,
556                 .prognumber     = clnt->cl_prog,
557                 .version        = clnt->cl_vers,
558                 .authflavor     = clnt->cl_auth->au_flavor,
559         };
560         return __rpc_clone_client(&args, clnt);
561 }
562 EXPORT_SYMBOL_GPL(rpc_clone_client);
563
564 /**
565  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
566  *
567  * @clnt: RPC client whose parameters are copied
568  * @flavor: security flavor for new client
569  *
570  * Returns a fresh RPC client or an ERR_PTR.
571  */
572 struct rpc_clnt *
573 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
574 {
575         struct rpc_create_args args = {
576                 .program        = clnt->cl_program,
577                 .prognumber     = clnt->cl_prog,
578                 .version        = clnt->cl_vers,
579                 .authflavor     = flavor,
580         };
581         return __rpc_clone_client(&args, clnt);
582 }
583 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
584
585 /*
586  * Kill all tasks for the given client.
587  * XXX: kill their descendants as well?
588  */
589 void rpc_killall_tasks(struct rpc_clnt *clnt)
590 {
591         struct rpc_task *rovr;
592
593
594         if (list_empty(&clnt->cl_tasks))
595                 return;
596         dprintk("RPC:       killing all tasks for client %p\n", clnt);
597         /*
598          * Spin lock all_tasks to prevent changes...
599          */
600         spin_lock(&clnt->cl_lock);
601         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
602                 if (!RPC_IS_ACTIVATED(rovr))
603                         continue;
604                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
605                         rovr->tk_flags |= RPC_TASK_KILLED;
606                         rpc_exit(rovr, -EIO);
607                         if (RPC_IS_QUEUED(rovr))
608                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
609                                                         rovr);
610                 }
611         }
612         spin_unlock(&clnt->cl_lock);
613 }
614 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
615
616 /*
617  * Properly shut down an RPC client, terminating all outstanding
618  * requests.
619  */
620 void rpc_shutdown_client(struct rpc_clnt *clnt)
621 {
622         might_sleep();
623
624         dprintk_rcu("RPC:       shutting down %s client for %s\n",
625                         clnt->cl_program->name,
626                         rcu_dereference(clnt->cl_xprt)->servername);
627
628         while (!list_empty(&clnt->cl_tasks)) {
629                 rpc_killall_tasks(clnt);
630                 wait_event_timeout(destroy_wait,
631                         list_empty(&clnt->cl_tasks), 1*HZ);
632         }
633
634         rpc_release_client(clnt);
635 }
636 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
637
638 /*
639  * Free an RPC client
640  */
641 static void
642 rpc_free_client(struct rpc_clnt *clnt)
643 {
644         dprintk_rcu("RPC:       destroying %s client for %s\n",
645                         clnt->cl_program->name,
646                         rcu_dereference(clnt->cl_xprt)->servername);
647         if (clnt->cl_parent != clnt)
648                 rpc_release_client(clnt->cl_parent);
649         rpc_clnt_remove_pipedir(clnt);
650         rpc_unregister_client(clnt);
651         rpc_free_iostats(clnt->cl_metrics);
652         clnt->cl_metrics = NULL;
653         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
654         rpciod_down();
655         kfree(clnt);
656 }
657
658 /*
659  * Free an RPC client
660  */
661 static void
662 rpc_free_auth(struct rpc_clnt *clnt)
663 {
664         if (clnt->cl_auth == NULL) {
665                 rpc_free_client(clnt);
666                 return;
667         }
668
669         /*
670          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
671          *       release remaining GSS contexts. This mechanism ensures
672          *       that it can do so safely.
673          */
674         atomic_inc(&clnt->cl_count);
675         rpcauth_release(clnt->cl_auth);
676         clnt->cl_auth = NULL;
677         if (atomic_dec_and_test(&clnt->cl_count))
678                 rpc_free_client(clnt);
679 }
680
681 /*
682  * Release reference to the RPC client
683  */
684 void
685 rpc_release_client(struct rpc_clnt *clnt)
686 {
687         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
688
689         if (list_empty(&clnt->cl_tasks))
690                 wake_up(&destroy_wait);
691         if (atomic_dec_and_test(&clnt->cl_count))
692                 rpc_free_auth(clnt);
693 }
694 EXPORT_SYMBOL_GPL(rpc_release_client);
695
696 /**
697  * rpc_bind_new_program - bind a new RPC program to an existing client
698  * @old: old rpc_client
699  * @program: rpc program to set
700  * @vers: rpc program version
701  *
702  * Clones the rpc client and sets up a new RPC program. This is mainly
703  * of use for enabling different RPC programs to share the same transport.
704  * The Sun NFSv2/v3 ACL protocol can do this.
705  */
706 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
707                                       const struct rpc_program *program,
708                                       u32 vers)
709 {
710         struct rpc_create_args args = {
711                 .program        = program,
712                 .prognumber     = program->number,
713                 .version        = vers,
714                 .authflavor     = old->cl_auth->au_flavor,
715         };
716         struct rpc_clnt *clnt;
717         int err;
718
719         clnt = __rpc_clone_client(&args, old);
720         if (IS_ERR(clnt))
721                 goto out;
722         err = rpc_ping(clnt);
723         if (err != 0) {
724                 rpc_shutdown_client(clnt);
725                 clnt = ERR_PTR(err);
726         }
727 out:
728         return clnt;
729 }
730 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
731
732 void rpc_task_release_client(struct rpc_task *task)
733 {
734         struct rpc_clnt *clnt = task->tk_client;
735
736         if (clnt != NULL) {
737                 /* Remove from client task list */
738                 spin_lock(&clnt->cl_lock);
739                 list_del(&task->tk_task);
740                 spin_unlock(&clnt->cl_lock);
741                 task->tk_client = NULL;
742
743                 rpc_release_client(clnt);
744         }
745 }
746
747 static
748 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
749 {
750         if (clnt != NULL) {
751                 rpc_task_release_client(task);
752                 task->tk_client = clnt;
753                 atomic_inc(&clnt->cl_count);
754                 if (clnt->cl_softrtry)
755                         task->tk_flags |= RPC_TASK_SOFT;
756                 if (sk_memalloc_socks()) {
757                         struct rpc_xprt *xprt;
758
759                         rcu_read_lock();
760                         xprt = rcu_dereference(clnt->cl_xprt);
761                         if (xprt->swapper)
762                                 task->tk_flags |= RPC_TASK_SWAPPER;
763                         rcu_read_unlock();
764                 }
765                 /* Add to the client's list of all tasks */
766                 spin_lock(&clnt->cl_lock);
767                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
768                 spin_unlock(&clnt->cl_lock);
769         }
770 }
771
772 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
773 {
774         rpc_task_release_client(task);
775         rpc_task_set_client(task, clnt);
776 }
777 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
778
779
780 static void
781 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
782 {
783         if (msg != NULL) {
784                 task->tk_msg.rpc_proc = msg->rpc_proc;
785                 task->tk_msg.rpc_argp = msg->rpc_argp;
786                 task->tk_msg.rpc_resp = msg->rpc_resp;
787                 if (msg->rpc_cred != NULL)
788                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
789         }
790 }
791
792 /*
793  * Default callback for async RPC calls
794  */
795 static void
796 rpc_default_callback(struct rpc_task *task, void *data)
797 {
798 }
799
800 static const struct rpc_call_ops rpc_default_ops = {
801         .rpc_call_done = rpc_default_callback,
802 };
803
804 /**
805  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
806  * @task_setup_data: pointer to task initialisation data
807  */
808 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
809 {
810         struct rpc_task *task;
811
812         task = rpc_new_task(task_setup_data);
813         if (IS_ERR(task))
814                 goto out;
815
816         rpc_task_set_client(task, task_setup_data->rpc_client);
817         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
818
819         if (task->tk_action == NULL)
820                 rpc_call_start(task);
821
822         atomic_inc(&task->tk_count);
823         rpc_execute(task);
824 out:
825         return task;
826 }
827 EXPORT_SYMBOL_GPL(rpc_run_task);
828
829 /**
830  * rpc_call_sync - Perform a synchronous RPC call
831  * @clnt: pointer to RPC client
832  * @msg: RPC call parameters
833  * @flags: RPC call flags
834  */
835 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
836 {
837         struct rpc_task *task;
838         struct rpc_task_setup task_setup_data = {
839                 .rpc_client = clnt,
840                 .rpc_message = msg,
841                 .callback_ops = &rpc_default_ops,
842                 .flags = flags,
843         };
844         int status;
845
846         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
847         if (flags & RPC_TASK_ASYNC) {
848                 rpc_release_calldata(task_setup_data.callback_ops,
849                         task_setup_data.callback_data);
850                 return -EINVAL;
851         }
852
853         task = rpc_run_task(&task_setup_data);
854         if (IS_ERR(task))
855                 return PTR_ERR(task);
856         status = task->tk_status;
857         rpc_put_task(task);
858         return status;
859 }
860 EXPORT_SYMBOL_GPL(rpc_call_sync);
861
862 /**
863  * rpc_call_async - Perform an asynchronous RPC call
864  * @clnt: pointer to RPC client
865  * @msg: RPC call parameters
866  * @flags: RPC call flags
867  * @tk_ops: RPC call ops
868  * @data: user call data
869  */
870 int
871 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
872                const struct rpc_call_ops *tk_ops, void *data)
873 {
874         struct rpc_task *task;
875         struct rpc_task_setup task_setup_data = {
876                 .rpc_client = clnt,
877                 .rpc_message = msg,
878                 .callback_ops = tk_ops,
879                 .callback_data = data,
880                 .flags = flags|RPC_TASK_ASYNC,
881         };
882
883         task = rpc_run_task(&task_setup_data);
884         if (IS_ERR(task))
885                 return PTR_ERR(task);
886         rpc_put_task(task);
887         return 0;
888 }
889 EXPORT_SYMBOL_GPL(rpc_call_async);
890
891 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
892 /**
893  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
894  * rpc_execute against it
895  * @req: RPC request
896  * @tk_ops: RPC call ops
897  */
898 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
899                                 const struct rpc_call_ops *tk_ops)
900 {
901         struct rpc_task *task;
902         struct xdr_buf *xbufp = &req->rq_snd_buf;
903         struct rpc_task_setup task_setup_data = {
904                 .callback_ops = tk_ops,
905         };
906
907         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
908         /*
909          * Create an rpc_task to send the data
910          */
911         task = rpc_new_task(&task_setup_data);
912         if (IS_ERR(task)) {
913                 xprt_free_bc_request(req);
914                 goto out;
915         }
916         task->tk_rqstp = req;
917
918         /*
919          * Set up the xdr_buf length.
920          * This also indicates that the buffer is XDR encoded already.
921          */
922         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
923                         xbufp->tail[0].iov_len;
924
925         task->tk_action = call_bc_transmit;
926         atomic_inc(&task->tk_count);
927         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
928         rpc_execute(task);
929
930 out:
931         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
932         return task;
933 }
934 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
935
936 void
937 rpc_call_start(struct rpc_task *task)
938 {
939         task->tk_action = call_start;
940 }
941 EXPORT_SYMBOL_GPL(rpc_call_start);
942
943 /**
944  * rpc_peeraddr - extract remote peer address from clnt's xprt
945  * @clnt: RPC client structure
946  * @buf: target buffer
947  * @bufsize: length of target buffer
948  *
949  * Returns the number of bytes that are actually in the stored address.
950  */
951 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
952 {
953         size_t bytes;
954         struct rpc_xprt *xprt;
955
956         rcu_read_lock();
957         xprt = rcu_dereference(clnt->cl_xprt);
958
959         bytes = xprt->addrlen;
960         if (bytes > bufsize)
961                 bytes = bufsize;
962         memcpy(buf, &xprt->addr, bytes);
963         rcu_read_unlock();
964
965         return bytes;
966 }
967 EXPORT_SYMBOL_GPL(rpc_peeraddr);
968
969 /**
970  * rpc_peeraddr2str - return remote peer address in printable format
971  * @clnt: RPC client structure
972  * @format: address format
973  *
974  * NB: the lifetime of the memory referenced by the returned pointer is
975  * the same as the rpc_xprt itself.  As long as the caller uses this
976  * pointer, it must hold the RCU read lock.
977  */
978 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
979                              enum rpc_display_format_t format)
980 {
981         struct rpc_xprt *xprt;
982
983         xprt = rcu_dereference(clnt->cl_xprt);
984
985         if (xprt->address_strings[format] != NULL)
986                 return xprt->address_strings[format];
987         else
988                 return "unprintable";
989 }
990 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
991
992 static const struct sockaddr_in rpc_inaddr_loopback = {
993         .sin_family             = AF_INET,
994         .sin_addr.s_addr        = htonl(INADDR_ANY),
995 };
996
997 static const struct sockaddr_in6 rpc_in6addr_loopback = {
998         .sin6_family            = AF_INET6,
999         .sin6_addr              = IN6ADDR_ANY_INIT,
1000 };
1001
1002 /*
1003  * Try a getsockname() on a connected datagram socket.  Using a
1004  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1005  * This conserves the ephemeral port number space.
1006  *
1007  * Returns zero and fills in "buf" if successful; otherwise, a
1008  * negative errno is returned.
1009  */
1010 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1011                         struct sockaddr *buf, int buflen)
1012 {
1013         struct socket *sock;
1014         int err;
1015
1016         err = __sock_create(net, sap->sa_family,
1017                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1018         if (err < 0) {
1019                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1020                 goto out;
1021         }
1022
1023         switch (sap->sa_family) {
1024         case AF_INET:
1025                 err = kernel_bind(sock,
1026                                 (struct sockaddr *)&rpc_inaddr_loopback,
1027                                 sizeof(rpc_inaddr_loopback));
1028                 break;
1029         case AF_INET6:
1030                 err = kernel_bind(sock,
1031                                 (struct sockaddr *)&rpc_in6addr_loopback,
1032                                 sizeof(rpc_in6addr_loopback));
1033                 break;
1034         default:
1035                 err = -EAFNOSUPPORT;
1036                 goto out;
1037         }
1038         if (err < 0) {
1039                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1040                 goto out_release;
1041         }
1042
1043         err = kernel_connect(sock, sap, salen, 0);
1044         if (err < 0) {
1045                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1046                 goto out_release;
1047         }
1048
1049         err = kernel_getsockname(sock, buf, &buflen);
1050         if (err < 0) {
1051                 dprintk("RPC:       getsockname failed (%d)\n", err);
1052                 goto out_release;
1053         }
1054
1055         err = 0;
1056         if (buf->sa_family == AF_INET6) {
1057                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1058                 sin6->sin6_scope_id = 0;
1059         }
1060         dprintk("RPC:       %s succeeded\n", __func__);
1061
1062 out_release:
1063         sock_release(sock);
1064 out:
1065         return err;
1066 }
1067
1068 /*
1069  * Scraping a connected socket failed, so we don't have a useable
1070  * local address.  Fallback: generate an address that will prevent
1071  * the server from calling us back.
1072  *
1073  * Returns zero and fills in "buf" if successful; otherwise, a
1074  * negative errno is returned.
1075  */
1076 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1077 {
1078         switch (family) {
1079         case AF_INET:
1080                 if (buflen < sizeof(rpc_inaddr_loopback))
1081                         return -EINVAL;
1082                 memcpy(buf, &rpc_inaddr_loopback,
1083                                 sizeof(rpc_inaddr_loopback));
1084                 break;
1085         case AF_INET6:
1086                 if (buflen < sizeof(rpc_in6addr_loopback))
1087                         return -EINVAL;
1088                 memcpy(buf, &rpc_in6addr_loopback,
1089                                 sizeof(rpc_in6addr_loopback));
1090         default:
1091                 dprintk("RPC:       %s: address family not supported\n",
1092                         __func__);
1093                 return -EAFNOSUPPORT;
1094         }
1095         dprintk("RPC:       %s: succeeded\n", __func__);
1096         return 0;
1097 }
1098
1099 /**
1100  * rpc_localaddr - discover local endpoint address for an RPC client
1101  * @clnt: RPC client structure
1102  * @buf: target buffer
1103  * @buflen: size of target buffer, in bytes
1104  *
1105  * Returns zero and fills in "buf" and "buflen" if successful;
1106  * otherwise, a negative errno is returned.
1107  *
1108  * This works even if the underlying transport is not currently connected,
1109  * or if the upper layer never previously provided a source address.
1110  *
1111  * The result of this function call is transient: multiple calls in
1112  * succession may give different results, depending on how local
1113  * networking configuration changes over time.
1114  */
1115 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1116 {
1117         struct sockaddr_storage address;
1118         struct sockaddr *sap = (struct sockaddr *)&address;
1119         struct rpc_xprt *xprt;
1120         struct net *net;
1121         size_t salen;
1122         int err;
1123
1124         rcu_read_lock();
1125         xprt = rcu_dereference(clnt->cl_xprt);
1126         salen = xprt->addrlen;
1127         memcpy(sap, &xprt->addr, salen);
1128         net = get_net(xprt->xprt_net);
1129         rcu_read_unlock();
1130
1131         rpc_set_port(sap, 0);
1132         err = rpc_sockname(net, sap, salen, buf, buflen);
1133         put_net(net);
1134         if (err != 0)
1135                 /* Couldn't discover local address, return ANYADDR */
1136                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1137         return 0;
1138 }
1139 EXPORT_SYMBOL_GPL(rpc_localaddr);
1140
1141 void
1142 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1143 {
1144         struct rpc_xprt *xprt;
1145
1146         rcu_read_lock();
1147         xprt = rcu_dereference(clnt->cl_xprt);
1148         if (xprt->ops->set_buffer_size)
1149                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1150         rcu_read_unlock();
1151 }
1152 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1153
1154 /**
1155  * rpc_protocol - Get transport protocol number for an RPC client
1156  * @clnt: RPC client to query
1157  *
1158  */
1159 int rpc_protocol(struct rpc_clnt *clnt)
1160 {
1161         int protocol;
1162
1163         rcu_read_lock();
1164         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1165         rcu_read_unlock();
1166         return protocol;
1167 }
1168 EXPORT_SYMBOL_GPL(rpc_protocol);
1169
1170 /**
1171  * rpc_net_ns - Get the network namespace for this RPC client
1172  * @clnt: RPC client to query
1173  *
1174  */
1175 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1176 {
1177         struct net *ret;
1178
1179         rcu_read_lock();
1180         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1181         rcu_read_unlock();
1182         return ret;
1183 }
1184 EXPORT_SYMBOL_GPL(rpc_net_ns);
1185
1186 /**
1187  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1188  * @clnt: RPC client to query
1189  *
1190  * For stream transports, this is one RPC record fragment (see RFC
1191  * 1831), as we don't support multi-record requests yet.  For datagram
1192  * transports, this is the size of an IP packet minus the IP, UDP, and
1193  * RPC header sizes.
1194  */
1195 size_t rpc_max_payload(struct rpc_clnt *clnt)
1196 {
1197         size_t ret;
1198
1199         rcu_read_lock();
1200         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1201         rcu_read_unlock();
1202         return ret;
1203 }
1204 EXPORT_SYMBOL_GPL(rpc_max_payload);
1205
1206 /**
1207  * rpc_get_timeout - Get timeout for transport in units of HZ
1208  * @clnt: RPC client to query
1209  */
1210 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1211 {
1212         unsigned long ret;
1213
1214         rcu_read_lock();
1215         ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1216         rcu_read_unlock();
1217         return ret;
1218 }
1219 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1220
1221 /**
1222  * rpc_force_rebind - force transport to check that remote port is unchanged
1223  * @clnt: client to rebind
1224  *
1225  */
1226 void rpc_force_rebind(struct rpc_clnt *clnt)
1227 {
1228         if (clnt->cl_autobind) {
1229                 rcu_read_lock();
1230                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1231                 rcu_read_unlock();
1232         }
1233 }
1234 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1235
1236 /*
1237  * Restart an (async) RPC call from the call_prepare state.
1238  * Usually called from within the exit handler.
1239  */
1240 int
1241 rpc_restart_call_prepare(struct rpc_task *task)
1242 {
1243         if (RPC_ASSASSINATED(task))
1244                 return 0;
1245         task->tk_action = call_start;
1246         if (task->tk_ops->rpc_call_prepare != NULL)
1247                 task->tk_action = rpc_prepare_task;
1248         return 1;
1249 }
1250 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1251
1252 /*
1253  * Restart an (async) RPC call. Usually called from within the
1254  * exit handler.
1255  */
1256 int
1257 rpc_restart_call(struct rpc_task *task)
1258 {
1259         if (RPC_ASSASSINATED(task))
1260                 return 0;
1261         task->tk_action = call_start;
1262         return 1;
1263 }
1264 EXPORT_SYMBOL_GPL(rpc_restart_call);
1265
1266 #ifdef RPC_DEBUG
1267 static const char *rpc_proc_name(const struct rpc_task *task)
1268 {
1269         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1270
1271         if (proc) {
1272                 if (proc->p_name)
1273                         return proc->p_name;
1274                 else
1275                         return "NULL";
1276         } else
1277                 return "no proc";
1278 }
1279 #endif
1280
1281 /*
1282  * 0.  Initial state
1283  *
1284  *     Other FSM states can be visited zero or more times, but
1285  *     this state is visited exactly once for each RPC.
1286  */
1287 static void
1288 call_start(struct rpc_task *task)
1289 {
1290         struct rpc_clnt *clnt = task->tk_client;
1291
1292         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1293                         clnt->cl_program->name, clnt->cl_vers,
1294                         rpc_proc_name(task),
1295                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1296
1297         /* Increment call count */
1298         task->tk_msg.rpc_proc->p_count++;
1299         clnt->cl_stats->rpccnt++;
1300         task->tk_action = call_reserve;
1301 }
1302
1303 /*
1304  * 1.   Reserve an RPC call slot
1305  */
1306 static void
1307 call_reserve(struct rpc_task *task)
1308 {
1309         dprint_status(task);
1310
1311         task->tk_status  = 0;
1312         task->tk_action  = call_reserveresult;
1313         xprt_reserve(task);
1314 }
1315
1316 static void call_retry_reserve(struct rpc_task *task);
1317
1318 /*
1319  * 1b.  Grok the result of xprt_reserve()
1320  */
1321 static void
1322 call_reserveresult(struct rpc_task *task)
1323 {
1324         int status = task->tk_status;
1325
1326         dprint_status(task);
1327
1328         /*
1329          * After a call to xprt_reserve(), we must have either
1330          * a request slot or else an error status.
1331          */
1332         task->tk_status = 0;
1333         if (status >= 0) {
1334                 if (task->tk_rqstp) {
1335                         task->tk_action = call_refresh;
1336                         return;
1337                 }
1338
1339                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1340                                 __func__, status);
1341                 rpc_exit(task, -EIO);
1342                 return;
1343         }
1344
1345         /*
1346          * Even though there was an error, we may have acquired
1347          * a request slot somehow.  Make sure not to leak it.
1348          */
1349         if (task->tk_rqstp) {
1350                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1351                                 __func__, status);
1352                 xprt_release(task);
1353         }
1354
1355         switch (status) {
1356         case -ENOMEM:
1357                 rpc_delay(task, HZ >> 2);
1358         case -EAGAIN:   /* woken up; retry */
1359                 task->tk_action = call_retry_reserve;
1360                 return;
1361         case -EIO:      /* probably a shutdown */
1362                 break;
1363         default:
1364                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1365                                 __func__, status);
1366                 break;
1367         }
1368         rpc_exit(task, status);
1369 }
1370
1371 /*
1372  * 1c.  Retry reserving an RPC call slot
1373  */
1374 static void
1375 call_retry_reserve(struct rpc_task *task)
1376 {
1377         dprint_status(task);
1378
1379         task->tk_status  = 0;
1380         task->tk_action  = call_reserveresult;
1381         xprt_retry_reserve(task);
1382 }
1383
1384 /*
1385  * 2.   Bind and/or refresh the credentials
1386  */
1387 static void
1388 call_refresh(struct rpc_task *task)
1389 {
1390         dprint_status(task);
1391
1392         task->tk_action = call_refreshresult;
1393         task->tk_status = 0;
1394         task->tk_client->cl_stats->rpcauthrefresh++;
1395         rpcauth_refreshcred(task);
1396 }
1397
1398 /*
1399  * 2a.  Process the results of a credential refresh
1400  */
1401 static void
1402 call_refreshresult(struct rpc_task *task)
1403 {
1404         int status = task->tk_status;
1405
1406         dprint_status(task);
1407
1408         task->tk_status = 0;
1409         task->tk_action = call_refresh;
1410         switch (status) {
1411         case 0:
1412                 if (rpcauth_uptodatecred(task))
1413                         task->tk_action = call_allocate;
1414                 return;
1415         case -ETIMEDOUT:
1416                 rpc_delay(task, 3*HZ);
1417         case -EKEYEXPIRED:
1418         case -EAGAIN:
1419                 status = -EACCES;
1420                 if (!task->tk_cred_retry)
1421                         break;
1422                 task->tk_cred_retry--;
1423                 dprintk("RPC: %5u %s: retry refresh creds\n",
1424                                 task->tk_pid, __func__);
1425                 return;
1426         }
1427         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1428                                 task->tk_pid, __func__, status);
1429         rpc_exit(task, status);
1430 }
1431
1432 /*
1433  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1434  *      (Note: buffer memory is freed in xprt_release).
1435  */
1436 static void
1437 call_allocate(struct rpc_task *task)
1438 {
1439         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1440         struct rpc_rqst *req = task->tk_rqstp;
1441         struct rpc_xprt *xprt = req->rq_xprt;
1442         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1443
1444         dprint_status(task);
1445
1446         task->tk_status = 0;
1447         task->tk_action = call_bind;
1448
1449         if (req->rq_buffer)
1450                 return;
1451
1452         if (proc->p_proc != 0) {
1453                 BUG_ON(proc->p_arglen == 0);
1454                 if (proc->p_decode != NULL)
1455                         BUG_ON(proc->p_replen == 0);
1456         }
1457
1458         /*
1459          * Calculate the size (in quads) of the RPC call
1460          * and reply headers, and convert both values
1461          * to byte sizes.
1462          */
1463         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1464         req->rq_callsize <<= 2;
1465         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1466         req->rq_rcvsize <<= 2;
1467
1468         req->rq_buffer = xprt->ops->buf_alloc(task,
1469                                         req->rq_callsize + req->rq_rcvsize);
1470         if (req->rq_buffer != NULL)
1471                 return;
1472
1473         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1474
1475         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1476                 task->tk_action = call_allocate;
1477                 rpc_delay(task, HZ>>4);
1478                 return;
1479         }
1480
1481         rpc_exit(task, -ERESTARTSYS);
1482 }
1483
1484 static inline int
1485 rpc_task_need_encode(struct rpc_task *task)
1486 {
1487         return task->tk_rqstp->rq_snd_buf.len == 0;
1488 }
1489
1490 static inline void
1491 rpc_task_force_reencode(struct rpc_task *task)
1492 {
1493         task->tk_rqstp->rq_snd_buf.len = 0;
1494         task->tk_rqstp->rq_bytes_sent = 0;
1495 }
1496
1497 static inline void
1498 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1499 {
1500         buf->head[0].iov_base = start;
1501         buf->head[0].iov_len = len;
1502         buf->tail[0].iov_len = 0;
1503         buf->page_len = 0;
1504         buf->flags = 0;
1505         buf->len = 0;
1506         buf->buflen = len;
1507 }
1508
1509 /*
1510  * 3.   Encode arguments of an RPC call
1511  */
1512 static void
1513 rpc_xdr_encode(struct rpc_task *task)
1514 {
1515         struct rpc_rqst *req = task->tk_rqstp;
1516         kxdreproc_t     encode;
1517         __be32          *p;
1518
1519         dprint_status(task);
1520
1521         rpc_xdr_buf_init(&req->rq_snd_buf,
1522                          req->rq_buffer,
1523                          req->rq_callsize);
1524         rpc_xdr_buf_init(&req->rq_rcv_buf,
1525                          (char *)req->rq_buffer + req->rq_callsize,
1526                          req->rq_rcvsize);
1527
1528         p = rpc_encode_header(task);
1529         if (p == NULL) {
1530                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1531                 rpc_exit(task, -EIO);
1532                 return;
1533         }
1534
1535         encode = task->tk_msg.rpc_proc->p_encode;
1536         if (encode == NULL)
1537                 return;
1538
1539         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1540                         task->tk_msg.rpc_argp);
1541 }
1542
1543 /*
1544  * 4.   Get the server port number if not yet set
1545  */
1546 static void
1547 call_bind(struct rpc_task *task)
1548 {
1549         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1550
1551         dprint_status(task);
1552
1553         task->tk_action = call_connect;
1554         if (!xprt_bound(xprt)) {
1555                 task->tk_action = call_bind_status;
1556                 task->tk_timeout = xprt->bind_timeout;
1557                 xprt->ops->rpcbind(task);
1558         }
1559 }
1560
1561 /*
1562  * 4a.  Sort out bind result
1563  */
1564 static void
1565 call_bind_status(struct rpc_task *task)
1566 {
1567         int status = -EIO;
1568
1569         if (task->tk_status >= 0) {
1570                 dprint_status(task);
1571                 task->tk_status = 0;
1572                 task->tk_action = call_connect;
1573                 return;
1574         }
1575
1576         trace_rpc_bind_status(task);
1577         switch (task->tk_status) {
1578         case -ENOMEM:
1579                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1580                 rpc_delay(task, HZ >> 2);
1581                 goto retry_timeout;
1582         case -EACCES:
1583                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1584                                 "unavailable\n", task->tk_pid);
1585                 /* fail immediately if this is an RPC ping */
1586                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1587                         status = -EOPNOTSUPP;
1588                         break;
1589                 }
1590                 if (task->tk_rebind_retry == 0)
1591                         break;
1592                 task->tk_rebind_retry--;
1593                 rpc_delay(task, 3*HZ);
1594                 goto retry_timeout;
1595         case -ETIMEDOUT:
1596                 dprintk("RPC: %5u rpcbind request timed out\n",
1597                                 task->tk_pid);
1598                 goto retry_timeout;
1599         case -EPFNOSUPPORT:
1600                 /* server doesn't support any rpcbind version we know of */
1601                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1602                                 task->tk_pid);
1603                 break;
1604         case -EPROTONOSUPPORT:
1605                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1606                                 task->tk_pid);
1607                 task->tk_status = 0;
1608                 task->tk_action = call_bind;
1609                 return;
1610         case -ECONNREFUSED:             /* connection problems */
1611         case -ECONNRESET:
1612         case -ENOTCONN:
1613         case -EHOSTDOWN:
1614         case -EHOSTUNREACH:
1615         case -ENETUNREACH:
1616         case -EPIPE:
1617                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1618                                 task->tk_pid, task->tk_status);
1619                 if (!RPC_IS_SOFTCONN(task)) {
1620                         rpc_delay(task, 5*HZ);
1621                         goto retry_timeout;
1622                 }
1623                 status = task->tk_status;
1624                 break;
1625         default:
1626                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1627                                 task->tk_pid, -task->tk_status);
1628         }
1629
1630         rpc_exit(task, status);
1631         return;
1632
1633 retry_timeout:
1634         task->tk_action = call_timeout;
1635 }
1636
1637 /*
1638  * 4b.  Connect to the RPC server
1639  */
1640 static void
1641 call_connect(struct rpc_task *task)
1642 {
1643         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1644
1645         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1646                         task->tk_pid, xprt,
1647                         (xprt_connected(xprt) ? "is" : "is not"));
1648
1649         task->tk_action = call_transmit;
1650         if (!xprt_connected(xprt)) {
1651                 task->tk_action = call_connect_status;
1652                 if (task->tk_status < 0)
1653                         return;
1654                 xprt_connect(task);
1655         }
1656 }
1657
1658 /*
1659  * 4c.  Sort out connect result
1660  */
1661 static void
1662 call_connect_status(struct rpc_task *task)
1663 {
1664         struct rpc_clnt *clnt = task->tk_client;
1665         int status = task->tk_status;
1666
1667         dprint_status(task);
1668
1669         trace_rpc_connect_status(task, status);
1670         switch (status) {
1671                 /* if soft mounted, test if we've timed out */
1672         case -ETIMEDOUT:
1673                 task->tk_action = call_timeout;
1674                 return;
1675         case -ECONNREFUSED:
1676         case -ECONNRESET:
1677         case -ENETUNREACH:
1678                 if (RPC_IS_SOFTCONN(task))
1679                         break;
1680                 /* retry with existing socket, after a delay */
1681         case 0:
1682         case -EAGAIN:
1683                 task->tk_status = 0;
1684                 clnt->cl_stats->netreconn++;
1685                 task->tk_action = call_transmit;
1686                 return;
1687         }
1688         rpc_exit(task, status);
1689 }
1690
1691 /*
1692  * 5.   Transmit the RPC request, and wait for reply
1693  */
1694 static void
1695 call_transmit(struct rpc_task *task)
1696 {
1697         dprint_status(task);
1698
1699         task->tk_action = call_status;
1700         if (task->tk_status < 0)
1701                 return;
1702         task->tk_status = xprt_prepare_transmit(task);
1703         if (task->tk_status != 0)
1704                 return;
1705         task->tk_action = call_transmit_status;
1706         /* Encode here so that rpcsec_gss can use correct sequence number. */
1707         if (rpc_task_need_encode(task)) {
1708                 rpc_xdr_encode(task);
1709                 /* Did the encode result in an error condition? */
1710                 if (task->tk_status != 0) {
1711                         /* Was the error nonfatal? */
1712                         if (task->tk_status == -EAGAIN)
1713                                 rpc_delay(task, HZ >> 4);
1714                         else
1715                                 rpc_exit(task, task->tk_status);
1716                         return;
1717                 }
1718         }
1719         xprt_transmit(task);
1720         if (task->tk_status < 0)
1721                 return;
1722         /*
1723          * On success, ensure that we call xprt_end_transmit() before sleeping
1724          * in order to allow access to the socket to other RPC requests.
1725          */
1726         call_transmit_status(task);
1727         if (rpc_reply_expected(task))
1728                 return;
1729         task->tk_action = rpc_exit_task;
1730         rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1731 }
1732
1733 /*
1734  * 5a.  Handle cleanup after a transmission
1735  */
1736 static void
1737 call_transmit_status(struct rpc_task *task)
1738 {
1739         task->tk_action = call_status;
1740
1741         /*
1742          * Common case: success.  Force the compiler to put this
1743          * test first.
1744          */
1745         if (task->tk_status == 0) {
1746                 xprt_end_transmit(task);
1747                 rpc_task_force_reencode(task);
1748                 return;
1749         }
1750
1751         switch (task->tk_status) {
1752         case -EAGAIN:
1753                 break;
1754         default:
1755                 dprint_status(task);
1756                 xprt_end_transmit(task);
1757                 rpc_task_force_reencode(task);
1758                 break;
1759                 /*
1760                  * Special cases: if we've been waiting on the
1761                  * socket's write_space() callback, or if the
1762                  * socket just returned a connection error,
1763                  * then hold onto the transport lock.
1764                  */
1765         case -ECONNREFUSED:
1766         case -EHOSTDOWN:
1767         case -EHOSTUNREACH:
1768         case -ENETUNREACH:
1769                 if (RPC_IS_SOFTCONN(task)) {
1770                         xprt_end_transmit(task);
1771                         rpc_exit(task, task->tk_status);
1772                         break;
1773                 }
1774         case -ECONNRESET:
1775         case -ENOTCONN:
1776         case -EPIPE:
1777                 rpc_task_force_reencode(task);
1778         }
1779 }
1780
1781 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1782 /*
1783  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1784  * addition, disconnect on connectivity errors.
1785  */
1786 static void
1787 call_bc_transmit(struct rpc_task *task)
1788 {
1789         struct rpc_rqst *req = task->tk_rqstp;
1790
1791         task->tk_status = xprt_prepare_transmit(task);
1792         if (task->tk_status == -EAGAIN) {
1793                 /*
1794                  * Could not reserve the transport. Try again after the
1795                  * transport is released.
1796                  */
1797                 task->tk_status = 0;
1798                 task->tk_action = call_bc_transmit;
1799                 return;
1800         }
1801
1802         task->tk_action = rpc_exit_task;
1803         if (task->tk_status < 0) {
1804                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1805                         "error: %d\n", task->tk_status);
1806                 return;
1807         }
1808
1809         xprt_transmit(task);
1810         xprt_end_transmit(task);
1811         dprint_status(task);
1812         switch (task->tk_status) {
1813         case 0:
1814                 /* Success */
1815                 break;
1816         case -EHOSTDOWN:
1817         case -EHOSTUNREACH:
1818         case -ENETUNREACH:
1819         case -ETIMEDOUT:
1820                 /*
1821                  * Problem reaching the server.  Disconnect and let the
1822                  * forechannel reestablish the connection.  The server will
1823                  * have to retransmit the backchannel request and we'll
1824                  * reprocess it.  Since these ops are idempotent, there's no
1825                  * need to cache our reply at this time.
1826                  */
1827                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1828                         "error: %d\n", task->tk_status);
1829                 xprt_conditional_disconnect(req->rq_xprt,
1830                         req->rq_connect_cookie);
1831                 break;
1832         default:
1833                 /*
1834                  * We were unable to reply and will have to drop the
1835                  * request.  The server should reconnect and retransmit.
1836                  */
1837                 WARN_ON_ONCE(task->tk_status == -EAGAIN);
1838                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1839                         "error: %d\n", task->tk_status);
1840                 break;
1841         }
1842         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1843 }
1844 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1845
1846 /*
1847  * 6.   Sort out the RPC call status
1848  */
1849 static void
1850 call_status(struct rpc_task *task)
1851 {
1852         struct rpc_clnt *clnt = task->tk_client;
1853         struct rpc_rqst *req = task->tk_rqstp;
1854         int             status;
1855
1856         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1857                 task->tk_status = req->rq_reply_bytes_recvd;
1858
1859         dprint_status(task);
1860
1861         status = task->tk_status;
1862         if (status >= 0) {
1863                 task->tk_action = call_decode;
1864                 return;
1865         }
1866
1867         trace_rpc_call_status(task);
1868         task->tk_status = 0;
1869         switch(status) {
1870         case -EHOSTDOWN:
1871         case -EHOSTUNREACH:
1872         case -ENETUNREACH:
1873                 /*
1874                  * Delay any retries for 3 seconds, then handle as if it
1875                  * were a timeout.
1876                  */
1877                 rpc_delay(task, 3*HZ);
1878         case -ETIMEDOUT:
1879                 task->tk_action = call_timeout;
1880                 if (task->tk_client->cl_discrtry)
1881                         xprt_conditional_disconnect(req->rq_xprt,
1882                                         req->rq_connect_cookie);
1883                 break;
1884         case -ECONNRESET:
1885         case -ECONNREFUSED:
1886                 rpc_force_rebind(clnt);
1887                 rpc_delay(task, 3*HZ);
1888         case -EPIPE:
1889         case -ENOTCONN:
1890                 task->tk_action = call_bind;
1891                 break;
1892         case -EAGAIN:
1893                 task->tk_action = call_transmit;
1894                 break;
1895         case -EIO:
1896                 /* shutdown or soft timeout */
1897                 rpc_exit(task, status);
1898                 break;
1899         default:
1900                 if (clnt->cl_chatty)
1901                         printk("%s: RPC call returned error %d\n",
1902                                clnt->cl_program->name, -status);
1903                 rpc_exit(task, status);
1904         }
1905 }
1906
1907 /*
1908  * 6a.  Handle RPC timeout
1909  *      We do not release the request slot, so we keep using the
1910  *      same XID for all retransmits.
1911  */
1912 static void
1913 call_timeout(struct rpc_task *task)
1914 {
1915         struct rpc_clnt *clnt = task->tk_client;
1916
1917         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1918                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1919                 goto retry;
1920         }
1921
1922         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1923         task->tk_timeouts++;
1924
1925         if (RPC_IS_SOFTCONN(task)) {
1926                 rpc_exit(task, -ETIMEDOUT);
1927                 return;
1928         }
1929         if (RPC_IS_SOFT(task)) {
1930                 if (clnt->cl_chatty) {
1931                         rcu_read_lock();
1932                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1933                                 clnt->cl_program->name,
1934                                 rcu_dereference(clnt->cl_xprt)->servername);
1935                         rcu_read_unlock();
1936                 }
1937                 if (task->tk_flags & RPC_TASK_TIMEOUT)
1938                         rpc_exit(task, -ETIMEDOUT);
1939                 else
1940                         rpc_exit(task, -EIO);
1941                 return;
1942         }
1943
1944         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1945                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1946                 if (clnt->cl_chatty) {
1947                         rcu_read_lock();
1948                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1949                         clnt->cl_program->name,
1950                         rcu_dereference(clnt->cl_xprt)->servername);
1951                         rcu_read_unlock();
1952                 }
1953         }
1954         rpc_force_rebind(clnt);
1955         /*
1956          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1957          * event? RFC2203 requires the server to drop all such requests.
1958          */
1959         rpcauth_invalcred(task);
1960
1961 retry:
1962         clnt->cl_stats->rpcretrans++;
1963         task->tk_action = call_bind;
1964         task->tk_status = 0;
1965 }
1966
1967 /*
1968  * 7.   Decode the RPC reply
1969  */
1970 static void
1971 call_decode(struct rpc_task *task)
1972 {
1973         struct rpc_clnt *clnt = task->tk_client;
1974         struct rpc_rqst *req = task->tk_rqstp;
1975         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1976         __be32          *p;
1977
1978         dprint_status(task);
1979
1980         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1981                 if (clnt->cl_chatty) {
1982                         rcu_read_lock();
1983                         printk(KERN_NOTICE "%s: server %s OK\n",
1984                                 clnt->cl_program->name,
1985                                 rcu_dereference(clnt->cl_xprt)->servername);
1986                         rcu_read_unlock();
1987                 }
1988                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1989         }
1990
1991         /*
1992          * Ensure that we see all writes made by xprt_complete_rqst()
1993          * before it changed req->rq_reply_bytes_recvd.
1994          */
1995         smp_rmb();
1996         req->rq_rcv_buf.len = req->rq_private_buf.len;
1997
1998         /* Check that the softirq receive buffer is valid */
1999         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2000                                 sizeof(req->rq_rcv_buf)) != 0);
2001
2002         if (req->rq_rcv_buf.len < 12) {
2003                 if (!RPC_IS_SOFT(task)) {
2004                         task->tk_action = call_bind;
2005                         clnt->cl_stats->rpcretrans++;
2006                         goto out_retry;
2007                 }
2008                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2009                                 clnt->cl_program->name, task->tk_status);
2010                 task->tk_action = call_timeout;
2011                 goto out_retry;
2012         }
2013
2014         p = rpc_verify_header(task);
2015         if (IS_ERR(p)) {
2016                 if (p == ERR_PTR(-EAGAIN))
2017                         goto out_retry;
2018                 return;
2019         }
2020
2021         task->tk_action = rpc_exit_task;
2022
2023         if (decode) {
2024                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2025                                                       task->tk_msg.rpc_resp);
2026         }
2027         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2028                         task->tk_status);
2029         return;
2030 out_retry:
2031         task->tk_status = 0;
2032         /* Note: rpc_verify_header() may have freed the RPC slot */
2033         if (task->tk_rqstp == req) {
2034                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2035                 if (task->tk_client->cl_discrtry)
2036                         xprt_conditional_disconnect(req->rq_xprt,
2037                                         req->rq_connect_cookie);
2038         }
2039 }
2040
2041 static __be32 *
2042 rpc_encode_header(struct rpc_task *task)
2043 {
2044         struct rpc_clnt *clnt = task->tk_client;
2045         struct rpc_rqst *req = task->tk_rqstp;
2046         __be32          *p = req->rq_svec[0].iov_base;
2047
2048         /* FIXME: check buffer size? */
2049
2050         p = xprt_skip_transport_header(req->rq_xprt, p);
2051         *p++ = req->rq_xid;             /* XID */
2052         *p++ = htonl(RPC_CALL);         /* CALL */
2053         *p++ = htonl(RPC_VERSION);      /* RPC version */
2054         *p++ = htonl(clnt->cl_prog);    /* program number */
2055         *p++ = htonl(clnt->cl_vers);    /* program version */
2056         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2057         p = rpcauth_marshcred(task, p);
2058         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2059         return p;
2060 }
2061
2062 static __be32 *
2063 rpc_verify_header(struct rpc_task *task)
2064 {
2065         struct rpc_clnt *clnt = task->tk_client;
2066         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2067         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2068         __be32  *p = iov->iov_base;
2069         u32 n;
2070         int error = -EACCES;
2071
2072         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2073                 /* RFC-1014 says that the representation of XDR data must be a
2074                  * multiple of four bytes
2075                  * - if it isn't pointer subtraction in the NFS client may give
2076                  *   undefined results
2077                  */
2078                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
2079                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2080                        task->tk_rqstp->rq_rcv_buf.len);
2081                 goto out_eio;
2082         }
2083         if ((len -= 3) < 0)
2084                 goto out_overflow;
2085
2086         p += 1; /* skip XID */
2087         if ((n = ntohl(*p++)) != RPC_REPLY) {
2088                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2089                         task->tk_pid, __func__, n);
2090                 goto out_garbage;
2091         }
2092
2093         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2094                 if (--len < 0)
2095                         goto out_overflow;
2096                 switch ((n = ntohl(*p++))) {
2097                 case RPC_AUTH_ERROR:
2098                         break;
2099                 case RPC_MISMATCH:
2100                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2101                                 task->tk_pid, __func__);
2102                         error = -EPROTONOSUPPORT;
2103                         goto out_err;
2104                 default:
2105                         dprintk("RPC: %5u %s: RPC call rejected, "
2106                                 "unknown error: %x\n",
2107                                 task->tk_pid, __func__, n);
2108                         goto out_eio;
2109                 }
2110                 if (--len < 0)
2111                         goto out_overflow;
2112                 switch ((n = ntohl(*p++))) {
2113                 case RPC_AUTH_REJECTEDCRED:
2114                 case RPC_AUTH_REJECTEDVERF:
2115                 case RPCSEC_GSS_CREDPROBLEM:
2116                 case RPCSEC_GSS_CTXPROBLEM:
2117                         if (!task->tk_cred_retry)
2118                                 break;
2119                         task->tk_cred_retry--;
2120                         dprintk("RPC: %5u %s: retry stale creds\n",
2121                                         task->tk_pid, __func__);
2122                         rpcauth_invalcred(task);
2123                         /* Ensure we obtain a new XID! */
2124                         xprt_release(task);
2125                         task->tk_action = call_reserve;
2126                         goto out_retry;
2127                 case RPC_AUTH_BADCRED:
2128                 case RPC_AUTH_BADVERF:
2129                         /* possibly garbled cred/verf? */
2130                         if (!task->tk_garb_retry)
2131                                 break;
2132                         task->tk_garb_retry--;
2133                         dprintk("RPC: %5u %s: retry garbled creds\n",
2134                                         task->tk_pid, __func__);
2135                         task->tk_action = call_bind;
2136                         goto out_retry;
2137                 case RPC_AUTH_TOOWEAK:
2138                         rcu_read_lock();
2139                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2140                                "authentication.\n",
2141                                rcu_dereference(clnt->cl_xprt)->servername);
2142                         rcu_read_unlock();
2143                         break;
2144                 default:
2145                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2146                                         task->tk_pid, __func__, n);
2147                         error = -EIO;
2148                 }
2149                 dprintk("RPC: %5u %s: call rejected %d\n",
2150                                 task->tk_pid, __func__, n);
2151                 goto out_err;
2152         }
2153         if (!(p = rpcauth_checkverf(task, p))) {
2154                 dprintk("RPC: %5u %s: auth check failed\n",
2155                                 task->tk_pid, __func__);
2156                 goto out_garbage;               /* bad verifier, retry */
2157         }
2158         len = p - (__be32 *)iov->iov_base - 1;
2159         if (len < 0)
2160                 goto out_overflow;
2161         switch ((n = ntohl(*p++))) {
2162         case RPC_SUCCESS:
2163                 return p;
2164         case RPC_PROG_UNAVAIL:
2165                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2166                                 "by server %s\n", task->tk_pid, __func__,
2167                                 (unsigned int)clnt->cl_prog,
2168                                 rcu_dereference(clnt->cl_xprt)->servername);
2169                 error = -EPFNOSUPPORT;
2170                 goto out_err;
2171         case RPC_PROG_MISMATCH:
2172                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2173                                 "by server %s\n", task->tk_pid, __func__,
2174                                 (unsigned int)clnt->cl_prog,
2175                                 (unsigned int)clnt->cl_vers,
2176                                 rcu_dereference(clnt->cl_xprt)->servername);
2177                 error = -EPROTONOSUPPORT;
2178                 goto out_err;
2179         case RPC_PROC_UNAVAIL:
2180                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2181                                 "version %u on server %s\n",
2182                                 task->tk_pid, __func__,
2183                                 rpc_proc_name(task),
2184                                 clnt->cl_prog, clnt->cl_vers,
2185                                 rcu_dereference(clnt->cl_xprt)->servername);
2186                 error = -EOPNOTSUPP;
2187                 goto out_err;
2188         case RPC_GARBAGE_ARGS:
2189                 dprintk("RPC: %5u %s: server saw garbage\n",
2190                                 task->tk_pid, __func__);
2191                 break;                  /* retry */
2192         default:
2193                 dprintk("RPC: %5u %s: server accept status: %x\n",
2194                                 task->tk_pid, __func__, n);
2195                 /* Also retry */
2196         }
2197
2198 out_garbage:
2199         clnt->cl_stats->rpcgarbage++;
2200         if (task->tk_garb_retry) {
2201                 task->tk_garb_retry--;
2202                 dprintk("RPC: %5u %s: retrying\n",
2203                                 task->tk_pid, __func__);
2204                 task->tk_action = call_bind;
2205 out_retry:
2206                 return ERR_PTR(-EAGAIN);
2207         }
2208 out_eio:
2209         error = -EIO;
2210 out_err:
2211         rpc_exit(task, error);
2212         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2213                         __func__, error);
2214         return ERR_PTR(error);
2215 out_overflow:
2216         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2217                         __func__);
2218         goto out_garbage;
2219 }
2220
2221 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2222 {
2223 }
2224
2225 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2226 {
2227         return 0;
2228 }
2229
2230 static struct rpc_procinfo rpcproc_null = {
2231         .p_encode = rpcproc_encode_null,
2232         .p_decode = rpcproc_decode_null,
2233 };
2234
2235 static int rpc_ping(struct rpc_clnt *clnt)
2236 {
2237         struct rpc_message msg = {
2238                 .rpc_proc = &rpcproc_null,
2239         };
2240         int err;
2241         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2242         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2243         put_rpccred(msg.rpc_cred);
2244         return err;
2245 }
2246
2247 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2248 {
2249         struct rpc_message msg = {
2250                 .rpc_proc = &rpcproc_null,
2251                 .rpc_cred = cred,
2252         };
2253         struct rpc_task_setup task_setup_data = {
2254                 .rpc_client = clnt,
2255                 .rpc_message = &msg,
2256                 .callback_ops = &rpc_default_ops,
2257                 .flags = flags,
2258         };
2259         return rpc_run_task(&task_setup_data);
2260 }
2261 EXPORT_SYMBOL_GPL(rpc_call_null);
2262
2263 #ifdef RPC_DEBUG
2264 static void rpc_show_header(void)
2265 {
2266         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2267                 "-timeout ---ops--\n");
2268 }
2269
2270 static void rpc_show_task(const struct rpc_clnt *clnt,
2271                           const struct rpc_task *task)
2272 {
2273         const char *rpc_waitq = "none";
2274
2275         if (RPC_IS_QUEUED(task))
2276                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2277
2278         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2279                 task->tk_pid, task->tk_flags, task->tk_status,
2280                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2281                 clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
2282                 task->tk_action, rpc_waitq);
2283 }
2284
2285 void rpc_show_tasks(struct net *net)
2286 {
2287         struct rpc_clnt *clnt;
2288         struct rpc_task *task;
2289         int header = 0;
2290         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2291
2292         spin_lock(&sn->rpc_client_lock);
2293         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2294                 spin_lock(&clnt->cl_lock);
2295                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2296                         if (!header) {
2297                                 rpc_show_header();
2298                                 header++;
2299                         }
2300                         rpc_show_task(clnt, task);
2301                 }
2302                 spin_unlock(&clnt->cl_lock);
2303         }
2304         spin_unlock(&sn->rpc_client_lock);
2305 }
2306 #endif