Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/ide
[pandora-kernel.git] / net / sunrpc / clnt.c
1 /*
2  *  linux/net/sunrpc/clnt.c
3  *
4  *  This file contains the high-level RPC interface.
5  *  It is modeled as a finite state machine to support both synchronous
6  *  and asynchronous requests.
7  *
8  *  -   RPC header generation and argument serialization.
9  *  -   Credential refresh.
10  *  -   TCP connect handling.
11  *  -   Retry of operation when it is suspected the operation failed because
12  *      of uid squashing on the server, or when the credentials were stale
13  *      and need to be refreshed, or when a packet was damaged in transit.
14  *      This may be have to be moved to the VFS layer.
15  *
16  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
17  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
18  */
19
20
21 #include <linux/module.h>
22 #include <linux/types.h>
23 #include <linux/kallsyms.h>
24 #include <linux/mm.h>
25 #include <linux/namei.h>
26 #include <linux/mount.h>
27 #include <linux/slab.h>
28 #include <linux/utsname.h>
29 #include <linux/workqueue.h>
30 #include <linux/in.h>
31 #include <linux/in6.h>
32 #include <linux/un.h>
33 #include <linux/rcupdate.h>
34
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/addr.h>
37 #include <linux/sunrpc/rpc_pipe_fs.h>
38 #include <linux/sunrpc/metrics.h>
39 #include <linux/sunrpc/bc_xprt.h>
40 #include <trace/events/sunrpc.h>
41
42 #include "sunrpc.h"
43 #include "netns.h"
44
45 #ifdef RPC_DEBUG
46 # define RPCDBG_FACILITY        RPCDBG_CALL
47 #endif
48
49 #define dprint_status(t)                                        \
50         dprintk("RPC: %5u %s (status %d)\n", t->tk_pid,         \
51                         __func__, t->tk_status)
52
53 /*
54  * All RPC clients are linked into this list
55  */
56
57 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
58
59
60 static void     call_start(struct rpc_task *task);
61 static void     call_reserve(struct rpc_task *task);
62 static void     call_reserveresult(struct rpc_task *task);
63 static void     call_allocate(struct rpc_task *task);
64 static void     call_decode(struct rpc_task *task);
65 static void     call_bind(struct rpc_task *task);
66 static void     call_bind_status(struct rpc_task *task);
67 static void     call_transmit(struct rpc_task *task);
68 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
69 static void     call_bc_transmit(struct rpc_task *task);
70 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
71 static void     call_status(struct rpc_task *task);
72 static void     call_transmit_status(struct rpc_task *task);
73 static void     call_refresh(struct rpc_task *task);
74 static void     call_refreshresult(struct rpc_task *task);
75 static void     call_timeout(struct rpc_task *task);
76 static void     call_connect(struct rpc_task *task);
77 static void     call_connect_status(struct rpc_task *task);
78
79 static __be32   *rpc_encode_header(struct rpc_task *task);
80 static __be32   *rpc_verify_header(struct rpc_task *task);
81 static int      rpc_ping(struct rpc_clnt *clnt);
82
83 static void rpc_register_client(struct rpc_clnt *clnt)
84 {
85         struct net *net = rpc_net_ns(clnt);
86         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
87
88         spin_lock(&sn->rpc_client_lock);
89         list_add(&clnt->cl_clients, &sn->all_clients);
90         spin_unlock(&sn->rpc_client_lock);
91 }
92
93 static void rpc_unregister_client(struct rpc_clnt *clnt)
94 {
95         struct net *net = rpc_net_ns(clnt);
96         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
97
98         spin_lock(&sn->rpc_client_lock);
99         list_del(&clnt->cl_clients);
100         spin_unlock(&sn->rpc_client_lock);
101 }
102
103 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
104 {
105         if (clnt->cl_dentry) {
106                 if (clnt->cl_auth && clnt->cl_auth->au_ops->pipes_destroy)
107                         clnt->cl_auth->au_ops->pipes_destroy(clnt->cl_auth);
108                 rpc_remove_client_dir(clnt->cl_dentry);
109         }
110         clnt->cl_dentry = NULL;
111 }
112
113 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
114 {
115         struct net *net = rpc_net_ns(clnt);
116         struct super_block *pipefs_sb;
117
118         pipefs_sb = rpc_get_sb_net(net);
119         if (pipefs_sb) {
120                 __rpc_clnt_remove_pipedir(clnt);
121                 rpc_put_sb_net(net);
122         }
123 }
124
125 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
126                                     struct rpc_clnt *clnt,
127                                     const char *dir_name)
128 {
129         static uint32_t clntid;
130         char name[15];
131         struct qstr q = { .name = name };
132         struct dentry *dir, *dentry;
133         int error;
134
135         dir = rpc_d_lookup_sb(sb, dir_name);
136         if (dir == NULL) {
137                 pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
138                 return dir;
139         }
140         for (;;) {
141                 q.len = snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
142                 name[sizeof(name) - 1] = '\0';
143                 q.hash = full_name_hash(q.name, q.len);
144                 dentry = rpc_create_client_dir(dir, &q, clnt);
145                 if (!IS_ERR(dentry))
146                         break;
147                 error = PTR_ERR(dentry);
148                 if (error != -EEXIST) {
149                         printk(KERN_INFO "RPC: Couldn't create pipefs entry"
150                                         " %s/%s, error %d\n",
151                                         dir_name, name, error);
152                         break;
153                 }
154         }
155         dput(dir);
156         return dentry;
157 }
158
159 static int
160 rpc_setup_pipedir(struct rpc_clnt *clnt, const char *dir_name,
161                   struct super_block *pipefs_sb)
162 {
163         struct dentry *dentry;
164
165         clnt->cl_dentry = NULL;
166         if (dir_name == NULL)
167                 return 0;
168         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt, dir_name);
169         if (IS_ERR(dentry))
170                 return PTR_ERR(dentry);
171         clnt->cl_dentry = dentry;
172         return 0;
173 }
174
175 static inline int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
176 {
177         if (((event == RPC_PIPEFS_MOUNT) && clnt->cl_dentry) ||
178             ((event == RPC_PIPEFS_UMOUNT) && !clnt->cl_dentry))
179                 return 1;
180         if ((event == RPC_PIPEFS_MOUNT) && atomic_read(&clnt->cl_count) == 0)
181                 return 1;
182         return 0;
183 }
184
185 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
186                                    struct super_block *sb)
187 {
188         struct dentry *dentry;
189         int err = 0;
190
191         switch (event) {
192         case RPC_PIPEFS_MOUNT:
193                 dentry = rpc_setup_pipedir_sb(sb, clnt,
194                                               clnt->cl_program->pipe_dir_name);
195                 if (!dentry)
196                         return -ENOENT;
197                 if (IS_ERR(dentry))
198                         return PTR_ERR(dentry);
199                 clnt->cl_dentry = dentry;
200                 if (clnt->cl_auth->au_ops->pipes_create) {
201                         err = clnt->cl_auth->au_ops->pipes_create(clnt->cl_auth);
202                         if (err)
203                                 __rpc_clnt_remove_pipedir(clnt);
204                 }
205                 break;
206         case RPC_PIPEFS_UMOUNT:
207                 __rpc_clnt_remove_pipedir(clnt);
208                 break;
209         default:
210                 printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
211                 return -ENOTSUPP;
212         }
213         return err;
214 }
215
216 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
217                                 struct super_block *sb)
218 {
219         int error = 0;
220
221         for (;; clnt = clnt->cl_parent) {
222                 if (!rpc_clnt_skip_event(clnt, event))
223                         error = __rpc_clnt_handle_event(clnt, event, sb);
224                 if (error || clnt == clnt->cl_parent)
225                         break;
226         }
227         return error;
228 }
229
230 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
231 {
232         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
233         struct rpc_clnt *clnt;
234
235         spin_lock(&sn->rpc_client_lock);
236         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
237                 if (clnt->cl_program->pipe_dir_name == NULL)
238                         continue;
239                 if (rpc_clnt_skip_event(clnt, event))
240                         continue;
241                 spin_unlock(&sn->rpc_client_lock);
242                 return clnt;
243         }
244         spin_unlock(&sn->rpc_client_lock);
245         return NULL;
246 }
247
248 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
249                             void *ptr)
250 {
251         struct super_block *sb = ptr;
252         struct rpc_clnt *clnt;
253         int error = 0;
254
255         while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
256                 error = __rpc_pipefs_event(clnt, event, sb);
257                 if (error)
258                         break;
259         }
260         return error;
261 }
262
263 static struct notifier_block rpc_clients_block = {
264         .notifier_call  = rpc_pipefs_event,
265         .priority       = SUNRPC_PIPEFS_RPC_PRIO,
266 };
267
268 int rpc_clients_notifier_register(void)
269 {
270         return rpc_pipefs_notifier_register(&rpc_clients_block);
271 }
272
273 void rpc_clients_notifier_unregister(void)
274 {
275         return rpc_pipefs_notifier_unregister(&rpc_clients_block);
276 }
277
278 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
279 {
280         clnt->cl_nodelen = strlen(nodename);
281         if (clnt->cl_nodelen > UNX_MAXNODENAME)
282                 clnt->cl_nodelen = UNX_MAXNODENAME;
283         memcpy(clnt->cl_nodename, nodename, clnt->cl_nodelen);
284 }
285
286 static int rpc_client_register(const struct rpc_create_args *args,
287                                struct rpc_clnt *clnt)
288 {
289         const struct rpc_program *program = args->program;
290         struct rpc_auth *auth;
291         struct net *net = rpc_net_ns(clnt);
292         struct super_block *pipefs_sb;
293         int err = 0;
294
295         pipefs_sb = rpc_get_sb_net(net);
296         if (pipefs_sb) {
297                 err = rpc_setup_pipedir(clnt, program->pipe_dir_name, pipefs_sb);
298                 if (err)
299                         goto out;
300         }
301
302         auth = rpcauth_create(args->authflavor, clnt);
303         if (IS_ERR(auth)) {
304                 dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
305                                 args->authflavor);
306                 err = PTR_ERR(auth);
307                 goto err_auth;
308         }
309
310         rpc_register_client(clnt);
311 out:
312         if (pipefs_sb)
313                 rpc_put_sb_net(net);
314         return err;
315
316 err_auth:
317         __rpc_clnt_remove_pipedir(clnt);
318         goto out;
319 }
320
321 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args, struct rpc_xprt *xprt)
322 {
323         const struct rpc_program *program = args->program;
324         const struct rpc_version *version;
325         struct rpc_clnt         *clnt = NULL;
326         int err;
327
328         /* sanity check the name before trying to print it */
329         dprintk("RPC:       creating %s client for %s (xprt %p)\n",
330                         program->name, args->servername, xprt);
331
332         err = rpciod_up();
333         if (err)
334                 goto out_no_rpciod;
335
336         err = -EINVAL;
337         if (args->version >= program->nrvers)
338                 goto out_err;
339         version = program->version[args->version];
340         if (version == NULL)
341                 goto out_err;
342
343         err = -ENOMEM;
344         clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
345         if (!clnt)
346                 goto out_err;
347         clnt->cl_parent = clnt;
348
349         rcu_assign_pointer(clnt->cl_xprt, xprt);
350         clnt->cl_procinfo = version->procs;
351         clnt->cl_maxproc  = version->nrprocs;
352         clnt->cl_protname = program->name;
353         clnt->cl_prog     = args->prognumber ? : program->number;
354         clnt->cl_vers     = version->number;
355         clnt->cl_stats    = program->stats;
356         clnt->cl_metrics  = rpc_alloc_iostats(clnt);
357         err = -ENOMEM;
358         if (clnt->cl_metrics == NULL)
359                 goto out_no_stats;
360         clnt->cl_program  = program;
361         INIT_LIST_HEAD(&clnt->cl_tasks);
362         spin_lock_init(&clnt->cl_lock);
363
364         if (!xprt_bound(xprt))
365                 clnt->cl_autobind = 1;
366
367         clnt->cl_timeout = xprt->timeout;
368         if (args->timeout != NULL) {
369                 memcpy(&clnt->cl_timeout_default, args->timeout,
370                                 sizeof(clnt->cl_timeout_default));
371                 clnt->cl_timeout = &clnt->cl_timeout_default;
372         }
373
374         clnt->cl_rtt = &clnt->cl_rtt_default;
375         rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
376         clnt->cl_principal = NULL;
377         if (args->client_name) {
378                 clnt->cl_principal = kstrdup(args->client_name, GFP_KERNEL);
379                 if (!clnt->cl_principal)
380                         goto out_no_principal;
381         }
382
383         atomic_set(&clnt->cl_count, 1);
384
385         /* save the nodename */
386         rpc_clnt_set_nodename(clnt, utsname()->nodename);
387
388         err = rpc_client_register(args, clnt);
389         if (err)
390                 goto out_no_path;
391         return clnt;
392
393 out_no_path:
394         kfree(clnt->cl_principal);
395 out_no_principal:
396         rpc_free_iostats(clnt->cl_metrics);
397 out_no_stats:
398         kfree(clnt);
399 out_err:
400         rpciod_down();
401 out_no_rpciod:
402         xprt_put(xprt);
403         return ERR_PTR(err);
404 }
405
406 /**
407  * rpc_create - create an RPC client and transport with one call
408  * @args: rpc_clnt create argument structure
409  *
410  * Creates and initializes an RPC transport and an RPC client.
411  *
412  * It can ping the server in order to determine if it is up, and to see if
413  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
414  * this behavior so asynchronous tasks can also use rpc_create.
415  */
416 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
417 {
418         struct rpc_xprt *xprt;
419         struct rpc_clnt *clnt;
420         struct xprt_create xprtargs = {
421                 .net = args->net,
422                 .ident = args->protocol,
423                 .srcaddr = args->saddress,
424                 .dstaddr = args->address,
425                 .addrlen = args->addrsize,
426                 .servername = args->servername,
427                 .bc_xprt = args->bc_xprt,
428         };
429         char servername[48];
430
431         if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
432                 xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
433         if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
434                 xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
435         /*
436          * If the caller chooses not to specify a hostname, whip
437          * up a string representation of the passed-in address.
438          */
439         if (xprtargs.servername == NULL) {
440                 struct sockaddr_un *sun =
441                                 (struct sockaddr_un *)args->address;
442                 struct sockaddr_in *sin =
443                                 (struct sockaddr_in *)args->address;
444                 struct sockaddr_in6 *sin6 =
445                                 (struct sockaddr_in6 *)args->address;
446
447                 servername[0] = '\0';
448                 switch (args->address->sa_family) {
449                 case AF_LOCAL:
450                         snprintf(servername, sizeof(servername), "%s",
451                                  sun->sun_path);
452                         break;
453                 case AF_INET:
454                         snprintf(servername, sizeof(servername), "%pI4",
455                                  &sin->sin_addr.s_addr);
456                         break;
457                 case AF_INET6:
458                         snprintf(servername, sizeof(servername), "%pI6",
459                                  &sin6->sin6_addr);
460                         break;
461                 default:
462                         /* caller wants default server name, but
463                          * address family isn't recognized. */
464                         return ERR_PTR(-EINVAL);
465                 }
466                 xprtargs.servername = servername;
467         }
468
469         xprt = xprt_create_transport(&xprtargs);
470         if (IS_ERR(xprt))
471                 return (struct rpc_clnt *)xprt;
472
473         /*
474          * By default, kernel RPC client connects from a reserved port.
475          * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
476          * but it is always enabled for rpciod, which handles the connect
477          * operation.
478          */
479         xprt->resvport = 1;
480         if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
481                 xprt->resvport = 0;
482
483         clnt = rpc_new_client(args, xprt);
484         if (IS_ERR(clnt))
485                 return clnt;
486
487         if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
488                 int err = rpc_ping(clnt);
489                 if (err != 0) {
490                         rpc_shutdown_client(clnt);
491                         return ERR_PTR(err);
492                 }
493         }
494
495         clnt->cl_softrtry = 1;
496         if (args->flags & RPC_CLNT_CREATE_HARDRTRY)
497                 clnt->cl_softrtry = 0;
498
499         if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
500                 clnt->cl_autobind = 1;
501         if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
502                 clnt->cl_discrtry = 1;
503         if (!(args->flags & RPC_CLNT_CREATE_QUIET))
504                 clnt->cl_chatty = 1;
505
506         return clnt;
507 }
508 EXPORT_SYMBOL_GPL(rpc_create);
509
510 /*
511  * This function clones the RPC client structure. It allows us to share the
512  * same transport while varying parameters such as the authentication
513  * flavour.
514  */
515 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
516                                            struct rpc_clnt *clnt)
517 {
518         struct rpc_xprt *xprt;
519         struct rpc_clnt *new;
520         int err;
521
522         err = -ENOMEM;
523         rcu_read_lock();
524         xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
525         rcu_read_unlock();
526         if (xprt == NULL)
527                 goto out_err;
528         args->servername = xprt->servername;
529
530         new = rpc_new_client(args, xprt);
531         if (IS_ERR(new)) {
532                 err = PTR_ERR(new);
533                 goto out_err;
534         }
535
536         atomic_inc(&clnt->cl_count);
537         new->cl_parent = clnt;
538
539         /* Turn off autobind on clones */
540         new->cl_autobind = 0;
541         new->cl_softrtry = clnt->cl_softrtry;
542         new->cl_discrtry = clnt->cl_discrtry;
543         new->cl_chatty = clnt->cl_chatty;
544         return new;
545
546 out_err:
547         dprintk("RPC:       %s: returned error %d\n", __func__, err);
548         return ERR_PTR(err);
549 }
550
551 /**
552  * rpc_clone_client - Clone an RPC client structure
553  *
554  * @clnt: RPC client whose parameters are copied
555  *
556  * Returns a fresh RPC client or an ERR_PTR.
557  */
558 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
559 {
560         struct rpc_create_args args = {
561                 .program        = clnt->cl_program,
562                 .prognumber     = clnt->cl_prog,
563                 .version        = clnt->cl_vers,
564                 .authflavor     = clnt->cl_auth->au_flavor,
565                 .client_name    = clnt->cl_principal,
566         };
567         return __rpc_clone_client(&args, clnt);
568 }
569 EXPORT_SYMBOL_GPL(rpc_clone_client);
570
571 /**
572  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
573  *
574  * @clnt: RPC client whose parameters are copied
575  * @flavor: security flavor for new client
576  *
577  * Returns a fresh RPC client or an ERR_PTR.
578  */
579 struct rpc_clnt *
580 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
581 {
582         struct rpc_create_args args = {
583                 .program        = clnt->cl_program,
584                 .prognumber     = clnt->cl_prog,
585                 .version        = clnt->cl_vers,
586                 .authflavor     = flavor,
587                 .client_name    = clnt->cl_principal,
588         };
589         return __rpc_clone_client(&args, clnt);
590 }
591 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
592
593 /*
594  * Kill all tasks for the given client.
595  * XXX: kill their descendants as well?
596  */
597 void rpc_killall_tasks(struct rpc_clnt *clnt)
598 {
599         struct rpc_task *rovr;
600
601
602         if (list_empty(&clnt->cl_tasks))
603                 return;
604         dprintk("RPC:       killing all tasks for client %p\n", clnt);
605         /*
606          * Spin lock all_tasks to prevent changes...
607          */
608         spin_lock(&clnt->cl_lock);
609         list_for_each_entry(rovr, &clnt->cl_tasks, tk_task) {
610                 if (!RPC_IS_ACTIVATED(rovr))
611                         continue;
612                 if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
613                         rovr->tk_flags |= RPC_TASK_KILLED;
614                         rpc_exit(rovr, -EIO);
615                         if (RPC_IS_QUEUED(rovr))
616                                 rpc_wake_up_queued_task(rovr->tk_waitqueue,
617                                                         rovr);
618                 }
619         }
620         spin_unlock(&clnt->cl_lock);
621 }
622 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
623
624 /*
625  * Properly shut down an RPC client, terminating all outstanding
626  * requests.
627  */
628 void rpc_shutdown_client(struct rpc_clnt *clnt)
629 {
630         might_sleep();
631
632         dprintk_rcu("RPC:       shutting down %s client for %s\n",
633                         clnt->cl_protname,
634                         rcu_dereference(clnt->cl_xprt)->servername);
635
636         while (!list_empty(&clnt->cl_tasks)) {
637                 rpc_killall_tasks(clnt);
638                 wait_event_timeout(destroy_wait,
639                         list_empty(&clnt->cl_tasks), 1*HZ);
640         }
641
642         rpc_release_client(clnt);
643 }
644 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
645
646 /*
647  * Free an RPC client
648  */
649 static void
650 rpc_free_client(struct rpc_clnt *clnt)
651 {
652         dprintk_rcu("RPC:       destroying %s client for %s\n",
653                         clnt->cl_protname,
654                         rcu_dereference(clnt->cl_xprt)->servername);
655         if (clnt->cl_parent != clnt)
656                 rpc_release_client(clnt->cl_parent);
657         rpc_clnt_remove_pipedir(clnt);
658         rpc_unregister_client(clnt);
659         rpc_free_iostats(clnt->cl_metrics);
660         kfree(clnt->cl_principal);
661         clnt->cl_metrics = NULL;
662         xprt_put(rcu_dereference_raw(clnt->cl_xprt));
663         rpciod_down();
664         kfree(clnt);
665 }
666
667 /*
668  * Free an RPC client
669  */
670 static void
671 rpc_free_auth(struct rpc_clnt *clnt)
672 {
673         if (clnt->cl_auth == NULL) {
674                 rpc_free_client(clnt);
675                 return;
676         }
677
678         /*
679          * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
680          *       release remaining GSS contexts. This mechanism ensures
681          *       that it can do so safely.
682          */
683         atomic_inc(&clnt->cl_count);
684         rpcauth_release(clnt->cl_auth);
685         clnt->cl_auth = NULL;
686         if (atomic_dec_and_test(&clnt->cl_count))
687                 rpc_free_client(clnt);
688 }
689
690 /*
691  * Release reference to the RPC client
692  */
693 void
694 rpc_release_client(struct rpc_clnt *clnt)
695 {
696         dprintk("RPC:       rpc_release_client(%p)\n", clnt);
697
698         if (list_empty(&clnt->cl_tasks))
699                 wake_up(&destroy_wait);
700         if (atomic_dec_and_test(&clnt->cl_count))
701                 rpc_free_auth(clnt);
702 }
703 EXPORT_SYMBOL_GPL(rpc_release_client);
704
705 /**
706  * rpc_bind_new_program - bind a new RPC program to an existing client
707  * @old: old rpc_client
708  * @program: rpc program to set
709  * @vers: rpc program version
710  *
711  * Clones the rpc client and sets up a new RPC program. This is mainly
712  * of use for enabling different RPC programs to share the same transport.
713  * The Sun NFSv2/v3 ACL protocol can do this.
714  */
715 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
716                                       const struct rpc_program *program,
717                                       u32 vers)
718 {
719         struct rpc_create_args args = {
720                 .program        = program,
721                 .prognumber     = program->number,
722                 .version        = vers,
723                 .authflavor     = old->cl_auth->au_flavor,
724                 .client_name    = old->cl_principal,
725         };
726         struct rpc_clnt *clnt;
727         int err;
728
729         clnt = __rpc_clone_client(&args, old);
730         if (IS_ERR(clnt))
731                 goto out;
732         err = rpc_ping(clnt);
733         if (err != 0) {
734                 rpc_shutdown_client(clnt);
735                 clnt = ERR_PTR(err);
736         }
737 out:
738         return clnt;
739 }
740 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
741
742 void rpc_task_release_client(struct rpc_task *task)
743 {
744         struct rpc_clnt *clnt = task->tk_client;
745
746         if (clnt != NULL) {
747                 /* Remove from client task list */
748                 spin_lock(&clnt->cl_lock);
749                 list_del(&task->tk_task);
750                 spin_unlock(&clnt->cl_lock);
751                 task->tk_client = NULL;
752
753                 rpc_release_client(clnt);
754         }
755 }
756
757 static
758 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
759 {
760         if (clnt != NULL) {
761                 rpc_task_release_client(task);
762                 task->tk_client = clnt;
763                 atomic_inc(&clnt->cl_count);
764                 if (clnt->cl_softrtry)
765                         task->tk_flags |= RPC_TASK_SOFT;
766                 if (sk_memalloc_socks()) {
767                         struct rpc_xprt *xprt;
768
769                         rcu_read_lock();
770                         xprt = rcu_dereference(clnt->cl_xprt);
771                         if (xprt->swapper)
772                                 task->tk_flags |= RPC_TASK_SWAPPER;
773                         rcu_read_unlock();
774                 }
775                 /* Add to the client's list of all tasks */
776                 spin_lock(&clnt->cl_lock);
777                 list_add_tail(&task->tk_task, &clnt->cl_tasks);
778                 spin_unlock(&clnt->cl_lock);
779         }
780 }
781
782 void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt)
783 {
784         rpc_task_release_client(task);
785         rpc_task_set_client(task, clnt);
786 }
787 EXPORT_SYMBOL_GPL(rpc_task_reset_client);
788
789
790 static void
791 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
792 {
793         if (msg != NULL) {
794                 task->tk_msg.rpc_proc = msg->rpc_proc;
795                 task->tk_msg.rpc_argp = msg->rpc_argp;
796                 task->tk_msg.rpc_resp = msg->rpc_resp;
797                 if (msg->rpc_cred != NULL)
798                         task->tk_msg.rpc_cred = get_rpccred(msg->rpc_cred);
799         }
800 }
801
802 /*
803  * Default callback for async RPC calls
804  */
805 static void
806 rpc_default_callback(struct rpc_task *task, void *data)
807 {
808 }
809
810 static const struct rpc_call_ops rpc_default_ops = {
811         .rpc_call_done = rpc_default_callback,
812 };
813
814 /**
815  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
816  * @task_setup_data: pointer to task initialisation data
817  */
818 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
819 {
820         struct rpc_task *task;
821
822         task = rpc_new_task(task_setup_data);
823         if (IS_ERR(task))
824                 goto out;
825
826         rpc_task_set_client(task, task_setup_data->rpc_client);
827         rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
828
829         if (task->tk_action == NULL)
830                 rpc_call_start(task);
831
832         atomic_inc(&task->tk_count);
833         rpc_execute(task);
834 out:
835         return task;
836 }
837 EXPORT_SYMBOL_GPL(rpc_run_task);
838
839 /**
840  * rpc_call_sync - Perform a synchronous RPC call
841  * @clnt: pointer to RPC client
842  * @msg: RPC call parameters
843  * @flags: RPC call flags
844  */
845 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
846 {
847         struct rpc_task *task;
848         struct rpc_task_setup task_setup_data = {
849                 .rpc_client = clnt,
850                 .rpc_message = msg,
851                 .callback_ops = &rpc_default_ops,
852                 .flags = flags,
853         };
854         int status;
855
856         WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
857         if (flags & RPC_TASK_ASYNC) {
858                 rpc_release_calldata(task_setup_data.callback_ops,
859                         task_setup_data.callback_data);
860                 return -EINVAL;
861         }
862
863         task = rpc_run_task(&task_setup_data);
864         if (IS_ERR(task))
865                 return PTR_ERR(task);
866         status = task->tk_status;
867         rpc_put_task(task);
868         return status;
869 }
870 EXPORT_SYMBOL_GPL(rpc_call_sync);
871
872 /**
873  * rpc_call_async - Perform an asynchronous RPC call
874  * @clnt: pointer to RPC client
875  * @msg: RPC call parameters
876  * @flags: RPC call flags
877  * @tk_ops: RPC call ops
878  * @data: user call data
879  */
880 int
881 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
882                const struct rpc_call_ops *tk_ops, void *data)
883 {
884         struct rpc_task *task;
885         struct rpc_task_setup task_setup_data = {
886                 .rpc_client = clnt,
887                 .rpc_message = msg,
888                 .callback_ops = tk_ops,
889                 .callback_data = data,
890                 .flags = flags|RPC_TASK_ASYNC,
891         };
892
893         task = rpc_run_task(&task_setup_data);
894         if (IS_ERR(task))
895                 return PTR_ERR(task);
896         rpc_put_task(task);
897         return 0;
898 }
899 EXPORT_SYMBOL_GPL(rpc_call_async);
900
901 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
902 /**
903  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
904  * rpc_execute against it
905  * @req: RPC request
906  * @tk_ops: RPC call ops
907  */
908 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
909                                 const struct rpc_call_ops *tk_ops)
910 {
911         struct rpc_task *task;
912         struct xdr_buf *xbufp = &req->rq_snd_buf;
913         struct rpc_task_setup task_setup_data = {
914                 .callback_ops = tk_ops,
915         };
916
917         dprintk("RPC: rpc_run_bc_task req= %p\n", req);
918         /*
919          * Create an rpc_task to send the data
920          */
921         task = rpc_new_task(&task_setup_data);
922         if (IS_ERR(task)) {
923                 xprt_free_bc_request(req);
924                 goto out;
925         }
926         task->tk_rqstp = req;
927
928         /*
929          * Set up the xdr_buf length.
930          * This also indicates that the buffer is XDR encoded already.
931          */
932         xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
933                         xbufp->tail[0].iov_len;
934
935         task->tk_action = call_bc_transmit;
936         atomic_inc(&task->tk_count);
937         WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
938         rpc_execute(task);
939
940 out:
941         dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
942         return task;
943 }
944 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
945
946 void
947 rpc_call_start(struct rpc_task *task)
948 {
949         task->tk_action = call_start;
950 }
951 EXPORT_SYMBOL_GPL(rpc_call_start);
952
953 /**
954  * rpc_peeraddr - extract remote peer address from clnt's xprt
955  * @clnt: RPC client structure
956  * @buf: target buffer
957  * @bufsize: length of target buffer
958  *
959  * Returns the number of bytes that are actually in the stored address.
960  */
961 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
962 {
963         size_t bytes;
964         struct rpc_xprt *xprt;
965
966         rcu_read_lock();
967         xprt = rcu_dereference(clnt->cl_xprt);
968
969         bytes = xprt->addrlen;
970         if (bytes > bufsize)
971                 bytes = bufsize;
972         memcpy(buf, &xprt->addr, bytes);
973         rcu_read_unlock();
974
975         return bytes;
976 }
977 EXPORT_SYMBOL_GPL(rpc_peeraddr);
978
979 /**
980  * rpc_peeraddr2str - return remote peer address in printable format
981  * @clnt: RPC client structure
982  * @format: address format
983  *
984  * NB: the lifetime of the memory referenced by the returned pointer is
985  * the same as the rpc_xprt itself.  As long as the caller uses this
986  * pointer, it must hold the RCU read lock.
987  */
988 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
989                              enum rpc_display_format_t format)
990 {
991         struct rpc_xprt *xprt;
992
993         xprt = rcu_dereference(clnt->cl_xprt);
994
995         if (xprt->address_strings[format] != NULL)
996                 return xprt->address_strings[format];
997         else
998                 return "unprintable";
999 }
1000 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1001
1002 static const struct sockaddr_in rpc_inaddr_loopback = {
1003         .sin_family             = AF_INET,
1004         .sin_addr.s_addr        = htonl(INADDR_ANY),
1005 };
1006
1007 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1008         .sin6_family            = AF_INET6,
1009         .sin6_addr              = IN6ADDR_ANY_INIT,
1010 };
1011
1012 /*
1013  * Try a getsockname() on a connected datagram socket.  Using a
1014  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1015  * This conserves the ephemeral port number space.
1016  *
1017  * Returns zero and fills in "buf" if successful; otherwise, a
1018  * negative errno is returned.
1019  */
1020 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1021                         struct sockaddr *buf, int buflen)
1022 {
1023         struct socket *sock;
1024         int err;
1025
1026         err = __sock_create(net, sap->sa_family,
1027                                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1028         if (err < 0) {
1029                 dprintk("RPC:       can't create UDP socket (%d)\n", err);
1030                 goto out;
1031         }
1032
1033         switch (sap->sa_family) {
1034         case AF_INET:
1035                 err = kernel_bind(sock,
1036                                 (struct sockaddr *)&rpc_inaddr_loopback,
1037                                 sizeof(rpc_inaddr_loopback));
1038                 break;
1039         case AF_INET6:
1040                 err = kernel_bind(sock,
1041                                 (struct sockaddr *)&rpc_in6addr_loopback,
1042                                 sizeof(rpc_in6addr_loopback));
1043                 break;
1044         default:
1045                 err = -EAFNOSUPPORT;
1046                 goto out;
1047         }
1048         if (err < 0) {
1049                 dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1050                 goto out_release;
1051         }
1052
1053         err = kernel_connect(sock, sap, salen, 0);
1054         if (err < 0) {
1055                 dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1056                 goto out_release;
1057         }
1058
1059         err = kernel_getsockname(sock, buf, &buflen);
1060         if (err < 0) {
1061                 dprintk("RPC:       getsockname failed (%d)\n", err);
1062                 goto out_release;
1063         }
1064
1065         err = 0;
1066         if (buf->sa_family == AF_INET6) {
1067                 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1068                 sin6->sin6_scope_id = 0;
1069         }
1070         dprintk("RPC:       %s succeeded\n", __func__);
1071
1072 out_release:
1073         sock_release(sock);
1074 out:
1075         return err;
1076 }
1077
1078 /*
1079  * Scraping a connected socket failed, so we don't have a useable
1080  * local address.  Fallback: generate an address that will prevent
1081  * the server from calling us back.
1082  *
1083  * Returns zero and fills in "buf" if successful; otherwise, a
1084  * negative errno is returned.
1085  */
1086 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1087 {
1088         switch (family) {
1089         case AF_INET:
1090                 if (buflen < sizeof(rpc_inaddr_loopback))
1091                         return -EINVAL;
1092                 memcpy(buf, &rpc_inaddr_loopback,
1093                                 sizeof(rpc_inaddr_loopback));
1094                 break;
1095         case AF_INET6:
1096                 if (buflen < sizeof(rpc_in6addr_loopback))
1097                         return -EINVAL;
1098                 memcpy(buf, &rpc_in6addr_loopback,
1099                                 sizeof(rpc_in6addr_loopback));
1100         default:
1101                 dprintk("RPC:       %s: address family not supported\n",
1102                         __func__);
1103                 return -EAFNOSUPPORT;
1104         }
1105         dprintk("RPC:       %s: succeeded\n", __func__);
1106         return 0;
1107 }
1108
1109 /**
1110  * rpc_localaddr - discover local endpoint address for an RPC client
1111  * @clnt: RPC client structure
1112  * @buf: target buffer
1113  * @buflen: size of target buffer, in bytes
1114  *
1115  * Returns zero and fills in "buf" and "buflen" if successful;
1116  * otherwise, a negative errno is returned.
1117  *
1118  * This works even if the underlying transport is not currently connected,
1119  * or if the upper layer never previously provided a source address.
1120  *
1121  * The result of this function call is transient: multiple calls in
1122  * succession may give different results, depending on how local
1123  * networking configuration changes over time.
1124  */
1125 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1126 {
1127         struct sockaddr_storage address;
1128         struct sockaddr *sap = (struct sockaddr *)&address;
1129         struct rpc_xprt *xprt;
1130         struct net *net;
1131         size_t salen;
1132         int err;
1133
1134         rcu_read_lock();
1135         xprt = rcu_dereference(clnt->cl_xprt);
1136         salen = xprt->addrlen;
1137         memcpy(sap, &xprt->addr, salen);
1138         net = get_net(xprt->xprt_net);
1139         rcu_read_unlock();
1140
1141         rpc_set_port(sap, 0);
1142         err = rpc_sockname(net, sap, salen, buf, buflen);
1143         put_net(net);
1144         if (err != 0)
1145                 /* Couldn't discover local address, return ANYADDR */
1146                 return rpc_anyaddr(sap->sa_family, buf, buflen);
1147         return 0;
1148 }
1149 EXPORT_SYMBOL_GPL(rpc_localaddr);
1150
1151 void
1152 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1153 {
1154         struct rpc_xprt *xprt;
1155
1156         rcu_read_lock();
1157         xprt = rcu_dereference(clnt->cl_xprt);
1158         if (xprt->ops->set_buffer_size)
1159                 xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1160         rcu_read_unlock();
1161 }
1162 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1163
1164 /**
1165  * rpc_protocol - Get transport protocol number for an RPC client
1166  * @clnt: RPC client to query
1167  *
1168  */
1169 int rpc_protocol(struct rpc_clnt *clnt)
1170 {
1171         int protocol;
1172
1173         rcu_read_lock();
1174         protocol = rcu_dereference(clnt->cl_xprt)->prot;
1175         rcu_read_unlock();
1176         return protocol;
1177 }
1178 EXPORT_SYMBOL_GPL(rpc_protocol);
1179
1180 /**
1181  * rpc_net_ns - Get the network namespace for this RPC client
1182  * @clnt: RPC client to query
1183  *
1184  */
1185 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1186 {
1187         struct net *ret;
1188
1189         rcu_read_lock();
1190         ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1191         rcu_read_unlock();
1192         return ret;
1193 }
1194 EXPORT_SYMBOL_GPL(rpc_net_ns);
1195
1196 /**
1197  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1198  * @clnt: RPC client to query
1199  *
1200  * For stream transports, this is one RPC record fragment (see RFC
1201  * 1831), as we don't support multi-record requests yet.  For datagram
1202  * transports, this is the size of an IP packet minus the IP, UDP, and
1203  * RPC header sizes.
1204  */
1205 size_t rpc_max_payload(struct rpc_clnt *clnt)
1206 {
1207         size_t ret;
1208
1209         rcu_read_lock();
1210         ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1211         rcu_read_unlock();
1212         return ret;
1213 }
1214 EXPORT_SYMBOL_GPL(rpc_max_payload);
1215
1216 /**
1217  * rpc_get_timeout - Get timeout for transport in units of HZ
1218  * @clnt: RPC client to query
1219  */
1220 unsigned long rpc_get_timeout(struct rpc_clnt *clnt)
1221 {
1222         unsigned long ret;
1223
1224         rcu_read_lock();
1225         ret = rcu_dereference(clnt->cl_xprt)->timeout->to_initval;
1226         rcu_read_unlock();
1227         return ret;
1228 }
1229 EXPORT_SYMBOL_GPL(rpc_get_timeout);
1230
1231 /**
1232  * rpc_force_rebind - force transport to check that remote port is unchanged
1233  * @clnt: client to rebind
1234  *
1235  */
1236 void rpc_force_rebind(struct rpc_clnt *clnt)
1237 {
1238         if (clnt->cl_autobind) {
1239                 rcu_read_lock();
1240                 xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1241                 rcu_read_unlock();
1242         }
1243 }
1244 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1245
1246 /*
1247  * Restart an (async) RPC call from the call_prepare state.
1248  * Usually called from within the exit handler.
1249  */
1250 int
1251 rpc_restart_call_prepare(struct rpc_task *task)
1252 {
1253         if (RPC_ASSASSINATED(task))
1254                 return 0;
1255         task->tk_action = call_start;
1256         if (task->tk_ops->rpc_call_prepare != NULL)
1257                 task->tk_action = rpc_prepare_task;
1258         return 1;
1259 }
1260 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1261
1262 /*
1263  * Restart an (async) RPC call. Usually called from within the
1264  * exit handler.
1265  */
1266 int
1267 rpc_restart_call(struct rpc_task *task)
1268 {
1269         if (RPC_ASSASSINATED(task))
1270                 return 0;
1271         task->tk_action = call_start;
1272         return 1;
1273 }
1274 EXPORT_SYMBOL_GPL(rpc_restart_call);
1275
1276 #ifdef RPC_DEBUG
1277 static const char *rpc_proc_name(const struct rpc_task *task)
1278 {
1279         const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1280
1281         if (proc) {
1282                 if (proc->p_name)
1283                         return proc->p_name;
1284                 else
1285                         return "NULL";
1286         } else
1287                 return "no proc";
1288 }
1289 #endif
1290
1291 /*
1292  * 0.  Initial state
1293  *
1294  *     Other FSM states can be visited zero or more times, but
1295  *     this state is visited exactly once for each RPC.
1296  */
1297 static void
1298 call_start(struct rpc_task *task)
1299 {
1300         struct rpc_clnt *clnt = task->tk_client;
1301
1302         dprintk("RPC: %5u call_start %s%d proc %s (%s)\n", task->tk_pid,
1303                         clnt->cl_protname, clnt->cl_vers,
1304                         rpc_proc_name(task),
1305                         (RPC_IS_ASYNC(task) ? "async" : "sync"));
1306
1307         /* Increment call count */
1308         task->tk_msg.rpc_proc->p_count++;
1309         clnt->cl_stats->rpccnt++;
1310         task->tk_action = call_reserve;
1311 }
1312
1313 /*
1314  * 1.   Reserve an RPC call slot
1315  */
1316 static void
1317 call_reserve(struct rpc_task *task)
1318 {
1319         dprint_status(task);
1320
1321         task->tk_status  = 0;
1322         task->tk_action  = call_reserveresult;
1323         xprt_reserve(task);
1324 }
1325
1326 static void call_retry_reserve(struct rpc_task *task);
1327
1328 /*
1329  * 1b.  Grok the result of xprt_reserve()
1330  */
1331 static void
1332 call_reserveresult(struct rpc_task *task)
1333 {
1334         int status = task->tk_status;
1335
1336         dprint_status(task);
1337
1338         /*
1339          * After a call to xprt_reserve(), we must have either
1340          * a request slot or else an error status.
1341          */
1342         task->tk_status = 0;
1343         if (status >= 0) {
1344                 if (task->tk_rqstp) {
1345                         task->tk_action = call_refresh;
1346                         return;
1347                 }
1348
1349                 printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n",
1350                                 __func__, status);
1351                 rpc_exit(task, -EIO);
1352                 return;
1353         }
1354
1355         /*
1356          * Even though there was an error, we may have acquired
1357          * a request slot somehow.  Make sure not to leak it.
1358          */
1359         if (task->tk_rqstp) {
1360                 printk(KERN_ERR "%s: status=%d, request allocated anyway\n",
1361                                 __func__, status);
1362                 xprt_release(task);
1363         }
1364
1365         switch (status) {
1366         case -ENOMEM:
1367                 rpc_delay(task, HZ >> 2);
1368         case -EAGAIN:   /* woken up; retry */
1369                 task->tk_action = call_retry_reserve;
1370                 return;
1371         case -EIO:      /* probably a shutdown */
1372                 break;
1373         default:
1374                 printk(KERN_ERR "%s: unrecognized error %d, exiting\n",
1375                                 __func__, status);
1376                 break;
1377         }
1378         rpc_exit(task, status);
1379 }
1380
1381 /*
1382  * 1c.  Retry reserving an RPC call slot
1383  */
1384 static void
1385 call_retry_reserve(struct rpc_task *task)
1386 {
1387         dprint_status(task);
1388
1389         task->tk_status  = 0;
1390         task->tk_action  = call_reserveresult;
1391         xprt_retry_reserve(task);
1392 }
1393
1394 /*
1395  * 2.   Bind and/or refresh the credentials
1396  */
1397 static void
1398 call_refresh(struct rpc_task *task)
1399 {
1400         dprint_status(task);
1401
1402         task->tk_action = call_refreshresult;
1403         task->tk_status = 0;
1404         task->tk_client->cl_stats->rpcauthrefresh++;
1405         rpcauth_refreshcred(task);
1406 }
1407
1408 /*
1409  * 2a.  Process the results of a credential refresh
1410  */
1411 static void
1412 call_refreshresult(struct rpc_task *task)
1413 {
1414         int status = task->tk_status;
1415
1416         dprint_status(task);
1417
1418         task->tk_status = 0;
1419         task->tk_action = call_refresh;
1420         switch (status) {
1421         case 0:
1422                 if (rpcauth_uptodatecred(task))
1423                         task->tk_action = call_allocate;
1424                 return;
1425         case -ETIMEDOUT:
1426                 rpc_delay(task, 3*HZ);
1427         case -EKEYEXPIRED:
1428         case -EAGAIN:
1429                 status = -EACCES;
1430                 if (!task->tk_cred_retry)
1431                         break;
1432                 task->tk_cred_retry--;
1433                 dprintk("RPC: %5u %s: retry refresh creds\n",
1434                                 task->tk_pid, __func__);
1435                 return;
1436         }
1437         dprintk("RPC: %5u %s: refresh creds failed with error %d\n",
1438                                 task->tk_pid, __func__, status);
1439         rpc_exit(task, status);
1440 }
1441
1442 /*
1443  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1444  *      (Note: buffer memory is freed in xprt_release).
1445  */
1446 static void
1447 call_allocate(struct rpc_task *task)
1448 {
1449         unsigned int slack = task->tk_rqstp->rq_cred->cr_auth->au_cslack;
1450         struct rpc_rqst *req = task->tk_rqstp;
1451         struct rpc_xprt *xprt = req->rq_xprt;
1452         struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1453
1454         dprint_status(task);
1455
1456         task->tk_status = 0;
1457         task->tk_action = call_bind;
1458
1459         if (req->rq_buffer)
1460                 return;
1461
1462         if (proc->p_proc != 0) {
1463                 BUG_ON(proc->p_arglen == 0);
1464                 if (proc->p_decode != NULL)
1465                         BUG_ON(proc->p_replen == 0);
1466         }
1467
1468         /*
1469          * Calculate the size (in quads) of the RPC call
1470          * and reply headers, and convert both values
1471          * to byte sizes.
1472          */
1473         req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen;
1474         req->rq_callsize <<= 2;
1475         req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
1476         req->rq_rcvsize <<= 2;
1477
1478         req->rq_buffer = xprt->ops->buf_alloc(task,
1479                                         req->rq_callsize + req->rq_rcvsize);
1480         if (req->rq_buffer != NULL)
1481                 return;
1482
1483         dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
1484
1485         if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1486                 task->tk_action = call_allocate;
1487                 rpc_delay(task, HZ>>4);
1488                 return;
1489         }
1490
1491         rpc_exit(task, -ERESTARTSYS);
1492 }
1493
1494 static inline int
1495 rpc_task_need_encode(struct rpc_task *task)
1496 {
1497         return task->tk_rqstp->rq_snd_buf.len == 0;
1498 }
1499
1500 static inline void
1501 rpc_task_force_reencode(struct rpc_task *task)
1502 {
1503         task->tk_rqstp->rq_snd_buf.len = 0;
1504         task->tk_rqstp->rq_bytes_sent = 0;
1505 }
1506
1507 static inline void
1508 rpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len)
1509 {
1510         buf->head[0].iov_base = start;
1511         buf->head[0].iov_len = len;
1512         buf->tail[0].iov_len = 0;
1513         buf->page_len = 0;
1514         buf->flags = 0;
1515         buf->len = 0;
1516         buf->buflen = len;
1517 }
1518
1519 /*
1520  * 3.   Encode arguments of an RPC call
1521  */
1522 static void
1523 rpc_xdr_encode(struct rpc_task *task)
1524 {
1525         struct rpc_rqst *req = task->tk_rqstp;
1526         kxdreproc_t     encode;
1527         __be32          *p;
1528
1529         dprint_status(task);
1530
1531         rpc_xdr_buf_init(&req->rq_snd_buf,
1532                          req->rq_buffer,
1533                          req->rq_callsize);
1534         rpc_xdr_buf_init(&req->rq_rcv_buf,
1535                          (char *)req->rq_buffer + req->rq_callsize,
1536                          req->rq_rcvsize);
1537
1538         p = rpc_encode_header(task);
1539         if (p == NULL) {
1540                 printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
1541                 rpc_exit(task, -EIO);
1542                 return;
1543         }
1544
1545         encode = task->tk_msg.rpc_proc->p_encode;
1546         if (encode == NULL)
1547                 return;
1548
1549         task->tk_status = rpcauth_wrap_req(task, encode, req, p,
1550                         task->tk_msg.rpc_argp);
1551 }
1552
1553 /*
1554  * 4.   Get the server port number if not yet set
1555  */
1556 static void
1557 call_bind(struct rpc_task *task)
1558 {
1559         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1560
1561         dprint_status(task);
1562
1563         task->tk_action = call_connect;
1564         if (!xprt_bound(xprt)) {
1565                 task->tk_action = call_bind_status;
1566                 task->tk_timeout = xprt->bind_timeout;
1567                 xprt->ops->rpcbind(task);
1568         }
1569 }
1570
1571 /*
1572  * 4a.  Sort out bind result
1573  */
1574 static void
1575 call_bind_status(struct rpc_task *task)
1576 {
1577         int status = -EIO;
1578
1579         if (task->tk_status >= 0) {
1580                 dprint_status(task);
1581                 task->tk_status = 0;
1582                 task->tk_action = call_connect;
1583                 return;
1584         }
1585
1586         trace_rpc_bind_status(task);
1587         switch (task->tk_status) {
1588         case -ENOMEM:
1589                 dprintk("RPC: %5u rpcbind out of memory\n", task->tk_pid);
1590                 rpc_delay(task, HZ >> 2);
1591                 goto retry_timeout;
1592         case -EACCES:
1593                 dprintk("RPC: %5u remote rpcbind: RPC program/version "
1594                                 "unavailable\n", task->tk_pid);
1595                 /* fail immediately if this is an RPC ping */
1596                 if (task->tk_msg.rpc_proc->p_proc == 0) {
1597                         status = -EOPNOTSUPP;
1598                         break;
1599                 }
1600                 if (task->tk_rebind_retry == 0)
1601                         break;
1602                 task->tk_rebind_retry--;
1603                 rpc_delay(task, 3*HZ);
1604                 goto retry_timeout;
1605         case -ETIMEDOUT:
1606                 dprintk("RPC: %5u rpcbind request timed out\n",
1607                                 task->tk_pid);
1608                 goto retry_timeout;
1609         case -EPFNOSUPPORT:
1610                 /* server doesn't support any rpcbind version we know of */
1611                 dprintk("RPC: %5u unrecognized remote rpcbind service\n",
1612                                 task->tk_pid);
1613                 break;
1614         case -EPROTONOSUPPORT:
1615                 dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n",
1616                                 task->tk_pid);
1617                 task->tk_status = 0;
1618                 task->tk_action = call_bind;
1619                 return;
1620         case -ECONNREFUSED:             /* connection problems */
1621         case -ECONNRESET:
1622         case -ENOTCONN:
1623         case -EHOSTDOWN:
1624         case -EHOSTUNREACH:
1625         case -ENETUNREACH:
1626         case -EPIPE:
1627                 dprintk("RPC: %5u remote rpcbind unreachable: %d\n",
1628                                 task->tk_pid, task->tk_status);
1629                 if (!RPC_IS_SOFTCONN(task)) {
1630                         rpc_delay(task, 5*HZ);
1631                         goto retry_timeout;
1632                 }
1633                 status = task->tk_status;
1634                 break;
1635         default:
1636                 dprintk("RPC: %5u unrecognized rpcbind error (%d)\n",
1637                                 task->tk_pid, -task->tk_status);
1638         }
1639
1640         rpc_exit(task, status);
1641         return;
1642
1643 retry_timeout:
1644         task->tk_action = call_timeout;
1645 }
1646
1647 /*
1648  * 4b.  Connect to the RPC server
1649  */
1650 static void
1651 call_connect(struct rpc_task *task)
1652 {
1653         struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1654
1655         dprintk("RPC: %5u call_connect xprt %p %s connected\n",
1656                         task->tk_pid, xprt,
1657                         (xprt_connected(xprt) ? "is" : "is not"));
1658
1659         task->tk_action = call_transmit;
1660         if (!xprt_connected(xprt)) {
1661                 task->tk_action = call_connect_status;
1662                 if (task->tk_status < 0)
1663                         return;
1664                 xprt_connect(task);
1665         }
1666 }
1667
1668 /*
1669  * 4c.  Sort out connect result
1670  */
1671 static void
1672 call_connect_status(struct rpc_task *task)
1673 {
1674         struct rpc_clnt *clnt = task->tk_client;
1675         int status = task->tk_status;
1676
1677         dprint_status(task);
1678
1679         trace_rpc_connect_status(task, status);
1680         switch (status) {
1681                 /* if soft mounted, test if we've timed out */
1682         case -ETIMEDOUT:
1683                 task->tk_action = call_timeout;
1684                 return;
1685         case -ECONNREFUSED:
1686         case -ECONNRESET:
1687         case -ENETUNREACH:
1688                 if (RPC_IS_SOFTCONN(task))
1689                         break;
1690                 /* retry with existing socket, after a delay */
1691         case 0:
1692         case -EAGAIN:
1693                 task->tk_status = 0;
1694                 clnt->cl_stats->netreconn++;
1695                 task->tk_action = call_transmit;
1696                 return;
1697         }
1698         rpc_exit(task, status);
1699 }
1700
1701 /*
1702  * 5.   Transmit the RPC request, and wait for reply
1703  */
1704 static void
1705 call_transmit(struct rpc_task *task)
1706 {
1707         dprint_status(task);
1708
1709         task->tk_action = call_status;
1710         if (task->tk_status < 0)
1711                 return;
1712         task->tk_status = xprt_prepare_transmit(task);
1713         if (task->tk_status != 0)
1714                 return;
1715         task->tk_action = call_transmit_status;
1716         /* Encode here so that rpcsec_gss can use correct sequence number. */
1717         if (rpc_task_need_encode(task)) {
1718                 rpc_xdr_encode(task);
1719                 /* Did the encode result in an error condition? */
1720                 if (task->tk_status != 0) {
1721                         /* Was the error nonfatal? */
1722                         if (task->tk_status == -EAGAIN)
1723                                 rpc_delay(task, HZ >> 4);
1724                         else
1725                                 rpc_exit(task, task->tk_status);
1726                         return;
1727                 }
1728         }
1729         xprt_transmit(task);
1730         if (task->tk_status < 0)
1731                 return;
1732         /*
1733          * On success, ensure that we call xprt_end_transmit() before sleeping
1734          * in order to allow access to the socket to other RPC requests.
1735          */
1736         call_transmit_status(task);
1737         if (rpc_reply_expected(task))
1738                 return;
1739         task->tk_action = rpc_exit_task;
1740         rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
1741 }
1742
1743 /*
1744  * 5a.  Handle cleanup after a transmission
1745  */
1746 static void
1747 call_transmit_status(struct rpc_task *task)
1748 {
1749         task->tk_action = call_status;
1750
1751         /*
1752          * Common case: success.  Force the compiler to put this
1753          * test first.
1754          */
1755         if (task->tk_status == 0) {
1756                 xprt_end_transmit(task);
1757                 rpc_task_force_reencode(task);
1758                 return;
1759         }
1760
1761         switch (task->tk_status) {
1762         case -EAGAIN:
1763                 break;
1764         default:
1765                 dprint_status(task);
1766                 xprt_end_transmit(task);
1767                 rpc_task_force_reencode(task);
1768                 break;
1769                 /*
1770                  * Special cases: if we've been waiting on the
1771                  * socket's write_space() callback, or if the
1772                  * socket just returned a connection error,
1773                  * then hold onto the transport lock.
1774                  */
1775         case -ECONNREFUSED:
1776         case -EHOSTDOWN:
1777         case -EHOSTUNREACH:
1778         case -ENETUNREACH:
1779                 if (RPC_IS_SOFTCONN(task)) {
1780                         xprt_end_transmit(task);
1781                         rpc_exit(task, task->tk_status);
1782                         break;
1783                 }
1784         case -ECONNRESET:
1785         case -ENOTCONN:
1786         case -EPIPE:
1787                 rpc_task_force_reencode(task);
1788         }
1789 }
1790
1791 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1792 /*
1793  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
1794  * addition, disconnect on connectivity errors.
1795  */
1796 static void
1797 call_bc_transmit(struct rpc_task *task)
1798 {
1799         struct rpc_rqst *req = task->tk_rqstp;
1800
1801         task->tk_status = xprt_prepare_transmit(task);
1802         if (task->tk_status == -EAGAIN) {
1803                 /*
1804                  * Could not reserve the transport. Try again after the
1805                  * transport is released.
1806                  */
1807                 task->tk_status = 0;
1808                 task->tk_action = call_bc_transmit;
1809                 return;
1810         }
1811
1812         task->tk_action = rpc_exit_task;
1813         if (task->tk_status < 0) {
1814                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1815                         "error: %d\n", task->tk_status);
1816                 return;
1817         }
1818
1819         xprt_transmit(task);
1820         xprt_end_transmit(task);
1821         dprint_status(task);
1822         switch (task->tk_status) {
1823         case 0:
1824                 /* Success */
1825                 break;
1826         case -EHOSTDOWN:
1827         case -EHOSTUNREACH:
1828         case -ENETUNREACH:
1829         case -ETIMEDOUT:
1830                 /*
1831                  * Problem reaching the server.  Disconnect and let the
1832                  * forechannel reestablish the connection.  The server will
1833                  * have to retransmit the backchannel request and we'll
1834                  * reprocess it.  Since these ops are idempotent, there's no
1835                  * need to cache our reply at this time.
1836                  */
1837                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1838                         "error: %d\n", task->tk_status);
1839                 xprt_conditional_disconnect(req->rq_xprt,
1840                         req->rq_connect_cookie);
1841                 break;
1842         default:
1843                 /*
1844                  * We were unable to reply and will have to drop the
1845                  * request.  The server should reconnect and retransmit.
1846                  */
1847                 WARN_ON_ONCE(task->tk_status == -EAGAIN);
1848                 printk(KERN_NOTICE "RPC: Could not send backchannel reply "
1849                         "error: %d\n", task->tk_status);
1850                 break;
1851         }
1852         rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
1853 }
1854 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1855
1856 /*
1857  * 6.   Sort out the RPC call status
1858  */
1859 static void
1860 call_status(struct rpc_task *task)
1861 {
1862         struct rpc_clnt *clnt = task->tk_client;
1863         struct rpc_rqst *req = task->tk_rqstp;
1864         int             status;
1865
1866         if (req->rq_reply_bytes_recvd > 0 && !req->rq_bytes_sent)
1867                 task->tk_status = req->rq_reply_bytes_recvd;
1868
1869         dprint_status(task);
1870
1871         status = task->tk_status;
1872         if (status >= 0) {
1873                 task->tk_action = call_decode;
1874                 return;
1875         }
1876
1877         trace_rpc_call_status(task);
1878         task->tk_status = 0;
1879         switch(status) {
1880         case -EHOSTDOWN:
1881         case -EHOSTUNREACH:
1882         case -ENETUNREACH:
1883                 /*
1884                  * Delay any retries for 3 seconds, then handle as if it
1885                  * were a timeout.
1886                  */
1887                 rpc_delay(task, 3*HZ);
1888         case -ETIMEDOUT:
1889                 task->tk_action = call_timeout;
1890                 if (task->tk_client->cl_discrtry)
1891                         xprt_conditional_disconnect(req->rq_xprt,
1892                                         req->rq_connect_cookie);
1893                 break;
1894         case -ECONNRESET:
1895         case -ECONNREFUSED:
1896                 rpc_force_rebind(clnt);
1897                 rpc_delay(task, 3*HZ);
1898         case -EPIPE:
1899         case -ENOTCONN:
1900                 task->tk_action = call_bind;
1901                 break;
1902         case -EAGAIN:
1903                 task->tk_action = call_transmit;
1904                 break;
1905         case -EIO:
1906                 /* shutdown or soft timeout */
1907                 rpc_exit(task, status);
1908                 break;
1909         default:
1910                 if (clnt->cl_chatty)
1911                         printk("%s: RPC call returned error %d\n",
1912                                clnt->cl_protname, -status);
1913                 rpc_exit(task, status);
1914         }
1915 }
1916
1917 /*
1918  * 6a.  Handle RPC timeout
1919  *      We do not release the request slot, so we keep using the
1920  *      same XID for all retransmits.
1921  */
1922 static void
1923 call_timeout(struct rpc_task *task)
1924 {
1925         struct rpc_clnt *clnt = task->tk_client;
1926
1927         if (xprt_adjust_timeout(task->tk_rqstp) == 0) {
1928                 dprintk("RPC: %5u call_timeout (minor)\n", task->tk_pid);
1929                 goto retry;
1930         }
1931
1932         dprintk("RPC: %5u call_timeout (major)\n", task->tk_pid);
1933         task->tk_timeouts++;
1934
1935         if (RPC_IS_SOFTCONN(task)) {
1936                 rpc_exit(task, -ETIMEDOUT);
1937                 return;
1938         }
1939         if (RPC_IS_SOFT(task)) {
1940                 if (clnt->cl_chatty) {
1941                         rcu_read_lock();
1942                         printk(KERN_NOTICE "%s: server %s not responding, timed out\n",
1943                                 clnt->cl_protname,
1944                                 rcu_dereference(clnt->cl_xprt)->servername);
1945                         rcu_read_unlock();
1946                 }
1947                 if (task->tk_flags & RPC_TASK_TIMEOUT)
1948                         rpc_exit(task, -ETIMEDOUT);
1949                 else
1950                         rpc_exit(task, -EIO);
1951                 return;
1952         }
1953
1954         if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
1955                 task->tk_flags |= RPC_CALL_MAJORSEEN;
1956                 if (clnt->cl_chatty) {
1957                         rcu_read_lock();
1958                         printk(KERN_NOTICE "%s: server %s not responding, still trying\n",
1959                         clnt->cl_protname,
1960                         rcu_dereference(clnt->cl_xprt)->servername);
1961                         rcu_read_unlock();
1962                 }
1963         }
1964         rpc_force_rebind(clnt);
1965         /*
1966          * Did our request time out due to an RPCSEC_GSS out-of-sequence
1967          * event? RFC2203 requires the server to drop all such requests.
1968          */
1969         rpcauth_invalcred(task);
1970
1971 retry:
1972         clnt->cl_stats->rpcretrans++;
1973         task->tk_action = call_bind;
1974         task->tk_status = 0;
1975 }
1976
1977 /*
1978  * 7.   Decode the RPC reply
1979  */
1980 static void
1981 call_decode(struct rpc_task *task)
1982 {
1983         struct rpc_clnt *clnt = task->tk_client;
1984         struct rpc_rqst *req = task->tk_rqstp;
1985         kxdrdproc_t     decode = task->tk_msg.rpc_proc->p_decode;
1986         __be32          *p;
1987
1988         dprint_status(task);
1989
1990         if (task->tk_flags & RPC_CALL_MAJORSEEN) {
1991                 if (clnt->cl_chatty) {
1992                         rcu_read_lock();
1993                         printk(KERN_NOTICE "%s: server %s OK\n",
1994                                 clnt->cl_protname,
1995                                 rcu_dereference(clnt->cl_xprt)->servername);
1996                         rcu_read_unlock();
1997                 }
1998                 task->tk_flags &= ~RPC_CALL_MAJORSEEN;
1999         }
2000
2001         /*
2002          * Ensure that we see all writes made by xprt_complete_rqst()
2003          * before it changed req->rq_reply_bytes_recvd.
2004          */
2005         smp_rmb();
2006         req->rq_rcv_buf.len = req->rq_private_buf.len;
2007
2008         /* Check that the softirq receive buffer is valid */
2009         WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2010                                 sizeof(req->rq_rcv_buf)) != 0);
2011
2012         if (req->rq_rcv_buf.len < 12) {
2013                 if (!RPC_IS_SOFT(task)) {
2014                         task->tk_action = call_bind;
2015                         clnt->cl_stats->rpcretrans++;
2016                         goto out_retry;
2017                 }
2018                 dprintk("RPC:       %s: too small RPC reply size (%d bytes)\n",
2019                                 clnt->cl_protname, task->tk_status);
2020                 task->tk_action = call_timeout;
2021                 goto out_retry;
2022         }
2023
2024         p = rpc_verify_header(task);
2025         if (IS_ERR(p)) {
2026                 if (p == ERR_PTR(-EAGAIN))
2027                         goto out_retry;
2028                 return;
2029         }
2030
2031         task->tk_action = rpc_exit_task;
2032
2033         if (decode) {
2034                 task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
2035                                                       task->tk_msg.rpc_resp);
2036         }
2037         dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
2038                         task->tk_status);
2039         return;
2040 out_retry:
2041         task->tk_status = 0;
2042         /* Note: rpc_verify_header() may have freed the RPC slot */
2043         if (task->tk_rqstp == req) {
2044                 req->rq_reply_bytes_recvd = req->rq_rcv_buf.len = 0;
2045                 if (task->tk_client->cl_discrtry)
2046                         xprt_conditional_disconnect(req->rq_xprt,
2047                                         req->rq_connect_cookie);
2048         }
2049 }
2050
2051 static __be32 *
2052 rpc_encode_header(struct rpc_task *task)
2053 {
2054         struct rpc_clnt *clnt = task->tk_client;
2055         struct rpc_rqst *req = task->tk_rqstp;
2056         __be32          *p = req->rq_svec[0].iov_base;
2057
2058         /* FIXME: check buffer size? */
2059
2060         p = xprt_skip_transport_header(req->rq_xprt, p);
2061         *p++ = req->rq_xid;             /* XID */
2062         *p++ = htonl(RPC_CALL);         /* CALL */
2063         *p++ = htonl(RPC_VERSION);      /* RPC version */
2064         *p++ = htonl(clnt->cl_prog);    /* program number */
2065         *p++ = htonl(clnt->cl_vers);    /* program version */
2066         *p++ = htonl(task->tk_msg.rpc_proc->p_proc);    /* procedure */
2067         p = rpcauth_marshcred(task, p);
2068         req->rq_slen = xdr_adjust_iovec(&req->rq_svec[0], p);
2069         return p;
2070 }
2071
2072 static __be32 *
2073 rpc_verify_header(struct rpc_task *task)
2074 {
2075         struct rpc_clnt *clnt = task->tk_client;
2076         struct kvec *iov = &task->tk_rqstp->rq_rcv_buf.head[0];
2077         int len = task->tk_rqstp->rq_rcv_buf.len >> 2;
2078         __be32  *p = iov->iov_base;
2079         u32 n;
2080         int error = -EACCES;
2081
2082         if ((task->tk_rqstp->rq_rcv_buf.len & 3) != 0) {
2083                 /* RFC-1014 says that the representation of XDR data must be a
2084                  * multiple of four bytes
2085                  * - if it isn't pointer subtraction in the NFS client may give
2086                  *   undefined results
2087                  */
2088                 dprintk("RPC: %5u %s: XDR representation not a multiple of"
2089                        " 4 bytes: 0x%x\n", task->tk_pid, __func__,
2090                        task->tk_rqstp->rq_rcv_buf.len);
2091                 goto out_eio;
2092         }
2093         if ((len -= 3) < 0)
2094                 goto out_overflow;
2095
2096         p += 1; /* skip XID */
2097         if ((n = ntohl(*p++)) != RPC_REPLY) {
2098                 dprintk("RPC: %5u %s: not an RPC reply: %x\n",
2099                         task->tk_pid, __func__, n);
2100                 goto out_garbage;
2101         }
2102
2103         if ((n = ntohl(*p++)) != RPC_MSG_ACCEPTED) {
2104                 if (--len < 0)
2105                         goto out_overflow;
2106                 switch ((n = ntohl(*p++))) {
2107                 case RPC_AUTH_ERROR:
2108                         break;
2109                 case RPC_MISMATCH:
2110                         dprintk("RPC: %5u %s: RPC call version mismatch!\n",
2111                                 task->tk_pid, __func__);
2112                         error = -EPROTONOSUPPORT;
2113                         goto out_err;
2114                 default:
2115                         dprintk("RPC: %5u %s: RPC call rejected, "
2116                                 "unknown error: %x\n",
2117                                 task->tk_pid, __func__, n);
2118                         goto out_eio;
2119                 }
2120                 if (--len < 0)
2121                         goto out_overflow;
2122                 switch ((n = ntohl(*p++))) {
2123                 case RPC_AUTH_REJECTEDCRED:
2124                 case RPC_AUTH_REJECTEDVERF:
2125                 case RPCSEC_GSS_CREDPROBLEM:
2126                 case RPCSEC_GSS_CTXPROBLEM:
2127                         if (!task->tk_cred_retry)
2128                                 break;
2129                         task->tk_cred_retry--;
2130                         dprintk("RPC: %5u %s: retry stale creds\n",
2131                                         task->tk_pid, __func__);
2132                         rpcauth_invalcred(task);
2133                         /* Ensure we obtain a new XID! */
2134                         xprt_release(task);
2135                         task->tk_action = call_reserve;
2136                         goto out_retry;
2137                 case RPC_AUTH_BADCRED:
2138                 case RPC_AUTH_BADVERF:
2139                         /* possibly garbled cred/verf? */
2140                         if (!task->tk_garb_retry)
2141                                 break;
2142                         task->tk_garb_retry--;
2143                         dprintk("RPC: %5u %s: retry garbled creds\n",
2144                                         task->tk_pid, __func__);
2145                         task->tk_action = call_bind;
2146                         goto out_retry;
2147                 case RPC_AUTH_TOOWEAK:
2148                         rcu_read_lock();
2149                         printk(KERN_NOTICE "RPC: server %s requires stronger "
2150                                "authentication.\n",
2151                                rcu_dereference(clnt->cl_xprt)->servername);
2152                         rcu_read_unlock();
2153                         break;
2154                 default:
2155                         dprintk("RPC: %5u %s: unknown auth error: %x\n",
2156                                         task->tk_pid, __func__, n);
2157                         error = -EIO;
2158                 }
2159                 dprintk("RPC: %5u %s: call rejected %d\n",
2160                                 task->tk_pid, __func__, n);
2161                 goto out_err;
2162         }
2163         if (!(p = rpcauth_checkverf(task, p))) {
2164                 dprintk("RPC: %5u %s: auth check failed\n",
2165                                 task->tk_pid, __func__);
2166                 goto out_garbage;               /* bad verifier, retry */
2167         }
2168         len = p - (__be32 *)iov->iov_base - 1;
2169         if (len < 0)
2170                 goto out_overflow;
2171         switch ((n = ntohl(*p++))) {
2172         case RPC_SUCCESS:
2173                 return p;
2174         case RPC_PROG_UNAVAIL:
2175                 dprintk_rcu("RPC: %5u %s: program %u is unsupported "
2176                                 "by server %s\n", task->tk_pid, __func__,
2177                                 (unsigned int)clnt->cl_prog,
2178                                 rcu_dereference(clnt->cl_xprt)->servername);
2179                 error = -EPFNOSUPPORT;
2180                 goto out_err;
2181         case RPC_PROG_MISMATCH:
2182                 dprintk_rcu("RPC: %5u %s: program %u, version %u unsupported "
2183                                 "by server %s\n", task->tk_pid, __func__,
2184                                 (unsigned int)clnt->cl_prog,
2185                                 (unsigned int)clnt->cl_vers,
2186                                 rcu_dereference(clnt->cl_xprt)->servername);
2187                 error = -EPROTONOSUPPORT;
2188                 goto out_err;
2189         case RPC_PROC_UNAVAIL:
2190                 dprintk_rcu("RPC: %5u %s: proc %s unsupported by program %u, "
2191                                 "version %u on server %s\n",
2192                                 task->tk_pid, __func__,
2193                                 rpc_proc_name(task),
2194                                 clnt->cl_prog, clnt->cl_vers,
2195                                 rcu_dereference(clnt->cl_xprt)->servername);
2196                 error = -EOPNOTSUPP;
2197                 goto out_err;
2198         case RPC_GARBAGE_ARGS:
2199                 dprintk("RPC: %5u %s: server saw garbage\n",
2200                                 task->tk_pid, __func__);
2201                 break;                  /* retry */
2202         default:
2203                 dprintk("RPC: %5u %s: server accept status: %x\n",
2204                                 task->tk_pid, __func__, n);
2205                 /* Also retry */
2206         }
2207
2208 out_garbage:
2209         clnt->cl_stats->rpcgarbage++;
2210         if (task->tk_garb_retry) {
2211                 task->tk_garb_retry--;
2212                 dprintk("RPC: %5u %s: retrying\n",
2213                                 task->tk_pid, __func__);
2214                 task->tk_action = call_bind;
2215 out_retry:
2216                 return ERR_PTR(-EAGAIN);
2217         }
2218 out_eio:
2219         error = -EIO;
2220 out_err:
2221         rpc_exit(task, error);
2222         dprintk("RPC: %5u %s: call failed with error %d\n", task->tk_pid,
2223                         __func__, error);
2224         return ERR_PTR(error);
2225 out_overflow:
2226         dprintk("RPC: %5u %s: server reply was truncated.\n", task->tk_pid,
2227                         __func__);
2228         goto out_garbage;
2229 }
2230
2231 static void rpcproc_encode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2232 {
2233 }
2234
2235 static int rpcproc_decode_null(void *rqstp, struct xdr_stream *xdr, void *obj)
2236 {
2237         return 0;
2238 }
2239
2240 static struct rpc_procinfo rpcproc_null = {
2241         .p_encode = rpcproc_encode_null,
2242         .p_decode = rpcproc_decode_null,
2243 };
2244
2245 static int rpc_ping(struct rpc_clnt *clnt)
2246 {
2247         struct rpc_message msg = {
2248                 .rpc_proc = &rpcproc_null,
2249         };
2250         int err;
2251         msg.rpc_cred = authnull_ops.lookup_cred(NULL, NULL, 0);
2252         err = rpc_call_sync(clnt, &msg, RPC_TASK_SOFT | RPC_TASK_SOFTCONN);
2253         put_rpccred(msg.rpc_cred);
2254         return err;
2255 }
2256
2257 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2258 {
2259         struct rpc_message msg = {
2260                 .rpc_proc = &rpcproc_null,
2261                 .rpc_cred = cred,
2262         };
2263         struct rpc_task_setup task_setup_data = {
2264                 .rpc_client = clnt,
2265                 .rpc_message = &msg,
2266                 .callback_ops = &rpc_default_ops,
2267                 .flags = flags,
2268         };
2269         return rpc_run_task(&task_setup_data);
2270 }
2271 EXPORT_SYMBOL_GPL(rpc_call_null);
2272
2273 #ifdef RPC_DEBUG
2274 static void rpc_show_header(void)
2275 {
2276         printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
2277                 "-timeout ---ops--\n");
2278 }
2279
2280 static void rpc_show_task(const struct rpc_clnt *clnt,
2281                           const struct rpc_task *task)
2282 {
2283         const char *rpc_waitq = "none";
2284
2285         if (RPC_IS_QUEUED(task))
2286                 rpc_waitq = rpc_qname(task->tk_waitqueue);
2287
2288         printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
2289                 task->tk_pid, task->tk_flags, task->tk_status,
2290                 clnt, task->tk_rqstp, task->tk_timeout, task->tk_ops,
2291                 clnt->cl_protname, clnt->cl_vers, rpc_proc_name(task),
2292                 task->tk_action, rpc_waitq);
2293 }
2294
2295 void rpc_show_tasks(struct net *net)
2296 {
2297         struct rpc_clnt *clnt;
2298         struct rpc_task *task;
2299         int header = 0;
2300         struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
2301
2302         spin_lock(&sn->rpc_client_lock);
2303         list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
2304                 spin_lock(&clnt->cl_lock);
2305                 list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
2306                         if (!header) {
2307                                 rpc_show_header();
2308                                 header++;
2309                         }
2310                         rpc_show_task(clnt, task);
2311                 }
2312                 spin_unlock(&clnt->cl_lock);
2313         }
2314         spin_unlock(&sn->rpc_client_lock);
2315 }
2316 #endif