l2tp: fix l2tp_eth module loading
[pandora-kernel.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/string.h>
23 #include <linux/slab.h>
24 #include <linux/module.h>
25 #include <linux/capability.h>
26 #include <linux/pagemap.h>
27 #include <linux/errno.h>
28 #include <linux/socket.h>
29 #include <linux/in.h>
30 #include <linux/net.h>
31 #include <linux/mm.h>
32 #include <linux/un.h>
33 #include <linux/udp.h>
34 #include <linux/tcp.h>
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/sched.h>
37 #include <linux/sunrpc/svcsock.h>
38 #include <linux/sunrpc/xprtsock.h>
39 #include <linux/file.h>
40 #ifdef CONFIG_SUNRPC_BACKCHANNEL
41 #include <linux/sunrpc/bc_xprt.h>
42 #endif
43
44 #include <net/sock.h>
45 #include <net/checksum.h>
46 #include <net/udp.h>
47 #include <net/tcp.h>
48
49 #include "sunrpc.h"
50
51 static void xs_close(struct rpc_xprt *xprt);
52
53 /*
54  * xprtsock tunables
55  */
56 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
57 unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
58 unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
59
60 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
61 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
62
63 #define XS_TCP_LINGER_TO        (15U * HZ)
64 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
65
66 /*
67  * We can register our own files under /proc/sys/sunrpc by
68  * calling register_sysctl_table() again.  The files in that
69  * directory become the union of all files registered there.
70  *
71  * We simply need to make sure that we don't collide with
72  * someone else's file names!
73  */
74
75 #ifdef RPC_DEBUG
76
77 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
78 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
79 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
80 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
81 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
82
83 static struct ctl_table_header *sunrpc_table_header;
84
85 /*
86  * FIXME: changing the UDP slot table size should also resize the UDP
87  *        socket buffers for existing UDP transports
88  */
89 static ctl_table xs_tunables_table[] = {
90         {
91                 .procname       = "udp_slot_table_entries",
92                 .data           = &xprt_udp_slot_table_entries,
93                 .maxlen         = sizeof(unsigned int),
94                 .mode           = 0644,
95                 .proc_handler   = proc_dointvec_minmax,
96                 .extra1         = &min_slot_table_size,
97                 .extra2         = &max_slot_table_size
98         },
99         {
100                 .procname       = "tcp_slot_table_entries",
101                 .data           = &xprt_tcp_slot_table_entries,
102                 .maxlen         = sizeof(unsigned int),
103                 .mode           = 0644,
104                 .proc_handler   = proc_dointvec_minmax,
105                 .extra1         = &min_slot_table_size,
106                 .extra2         = &max_slot_table_size
107         },
108         {
109                 .procname       = "tcp_max_slot_table_entries",
110                 .data           = &xprt_max_tcp_slot_table_entries,
111                 .maxlen         = sizeof(unsigned int),
112                 .mode           = 0644,
113                 .proc_handler   = proc_dointvec_minmax,
114                 .extra1         = &min_slot_table_size,
115                 .extra2         = &max_tcp_slot_table_limit
116         },
117         {
118                 .procname       = "min_resvport",
119                 .data           = &xprt_min_resvport,
120                 .maxlen         = sizeof(unsigned int),
121                 .mode           = 0644,
122                 .proc_handler   = proc_dointvec_minmax,
123                 .extra1         = &xprt_min_resvport_limit,
124                 .extra2         = &xprt_max_resvport_limit
125         },
126         {
127                 .procname       = "max_resvport",
128                 .data           = &xprt_max_resvport,
129                 .maxlen         = sizeof(unsigned int),
130                 .mode           = 0644,
131                 .proc_handler   = proc_dointvec_minmax,
132                 .extra1         = &xprt_min_resvport_limit,
133                 .extra2         = &xprt_max_resvport_limit
134         },
135         {
136                 .procname       = "tcp_fin_timeout",
137                 .data           = &xs_tcp_fin_timeout,
138                 .maxlen         = sizeof(xs_tcp_fin_timeout),
139                 .mode           = 0644,
140                 .proc_handler   = proc_dointvec_jiffies,
141         },
142         { },
143 };
144
145 static ctl_table sunrpc_table[] = {
146         {
147                 .procname       = "sunrpc",
148                 .mode           = 0555,
149                 .child          = xs_tunables_table
150         },
151         { },
152 };
153
154 #endif
155
156 /*
157  * Wait duration for a reply from the RPC portmapper.
158  */
159 #define XS_BIND_TO              (60U * HZ)
160
161 /*
162  * Delay if a UDP socket connect error occurs.  This is most likely some
163  * kind of resource problem on the local host.
164  */
165 #define XS_UDP_REEST_TO         (2U * HZ)
166
167 /*
168  * The reestablish timeout allows clients to delay for a bit before attempting
169  * to reconnect to a server that just dropped our connection.
170  *
171  * We implement an exponential backoff when trying to reestablish a TCP
172  * transport connection with the server.  Some servers like to drop a TCP
173  * connection when they are overworked, so we start with a short timeout and
174  * increase over time if the server is down or not responding.
175  */
176 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
177 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
178
179 /*
180  * TCP idle timeout; client drops the transport socket if it is idle
181  * for this long.  Note that we also timeout UDP sockets to prevent
182  * holding port numbers when there is no RPC traffic.
183  */
184 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
185
186 #ifdef RPC_DEBUG
187 # undef  RPC_DEBUG_DATA
188 # define RPCDBG_FACILITY        RPCDBG_TRANS
189 #endif
190
191 #ifdef RPC_DEBUG_DATA
192 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
193 {
194         u8 *buf = (u8 *) packet;
195         int j;
196
197         dprintk("RPC:       %s\n", msg);
198         for (j = 0; j < count && j < 128; j += 4) {
199                 if (!(j & 31)) {
200                         if (j)
201                                 dprintk("\n");
202                         dprintk("0x%04x ", j);
203                 }
204                 dprintk("%02x%02x%02x%02x ",
205                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
206         }
207         dprintk("\n");
208 }
209 #else
210 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
211 {
212         /* NOP */
213 }
214 #endif
215
216 struct sock_xprt {
217         struct rpc_xprt         xprt;
218
219         /*
220          * Network layer
221          */
222         struct socket *         sock;
223         struct sock *           inet;
224
225         /*
226          * State of TCP reply receive
227          */
228         __be32                  tcp_fraghdr,
229                                 tcp_xid,
230                                 tcp_calldir;
231
232         u32                     tcp_offset,
233                                 tcp_reclen;
234
235         unsigned long           tcp_copied,
236                                 tcp_flags;
237
238         /*
239          * Connection of transports
240          */
241         struct delayed_work     connect_worker;
242         struct sockaddr_storage srcaddr;
243         unsigned short          srcport;
244
245         /*
246          * UDP socket buffer size parameters
247          */
248         size_t                  rcvsize,
249                                 sndsize;
250
251         /*
252          * Saved socket callback addresses
253          */
254         void                    (*old_data_ready)(struct sock *, int);
255         void                    (*old_state_change)(struct sock *);
256         void                    (*old_write_space)(struct sock *);
257 };
258
259 /*
260  * TCP receive state flags
261  */
262 #define TCP_RCV_LAST_FRAG       (1UL << 0)
263 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
264 #define TCP_RCV_COPY_XID        (1UL << 2)
265 #define TCP_RCV_COPY_DATA       (1UL << 3)
266 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
267 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
268
269 /*
270  * TCP RPC flags
271  */
272 #define TCP_RPC_REPLY           (1UL << 6)
273
274 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
275 {
276         return (struct sockaddr *) &xprt->addr;
277 }
278
279 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
280 {
281         return (struct sockaddr_un *) &xprt->addr;
282 }
283
284 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
285 {
286         return (struct sockaddr_in *) &xprt->addr;
287 }
288
289 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
290 {
291         return (struct sockaddr_in6 *) &xprt->addr;
292 }
293
294 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
295 {
296         struct sockaddr *sap = xs_addr(xprt);
297         struct sockaddr_in6 *sin6;
298         struct sockaddr_in *sin;
299         struct sockaddr_un *sun;
300         char buf[128];
301
302         switch (sap->sa_family) {
303         case AF_LOCAL:
304                 sun = xs_addr_un(xprt);
305                 strlcpy(buf, sun->sun_path, sizeof(buf));
306                 xprt->address_strings[RPC_DISPLAY_ADDR] =
307                                                 kstrdup(buf, GFP_KERNEL);
308                 break;
309         case AF_INET:
310                 (void)rpc_ntop(sap, buf, sizeof(buf));
311                 xprt->address_strings[RPC_DISPLAY_ADDR] =
312                                                 kstrdup(buf, GFP_KERNEL);
313                 sin = xs_addr_in(xprt);
314                 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
315                 break;
316         case AF_INET6:
317                 (void)rpc_ntop(sap, buf, sizeof(buf));
318                 xprt->address_strings[RPC_DISPLAY_ADDR] =
319                                                 kstrdup(buf, GFP_KERNEL);
320                 sin6 = xs_addr_in6(xprt);
321                 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
322                 break;
323         default:
324                 BUG();
325         }
326
327         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
328 }
329
330 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
331 {
332         struct sockaddr *sap = xs_addr(xprt);
333         char buf[128];
334
335         snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
336         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
337
338         snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
339         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
340 }
341
342 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
343                                      const char *protocol,
344                                      const char *netid)
345 {
346         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
347         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
348         xs_format_common_peer_addresses(xprt);
349         xs_format_common_peer_ports(xprt);
350 }
351
352 static void xs_update_peer_port(struct rpc_xprt *xprt)
353 {
354         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
355         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
356
357         xs_format_common_peer_ports(xprt);
358 }
359
360 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
361 {
362         unsigned int i;
363
364         for (i = 0; i < RPC_DISPLAY_MAX; i++)
365                 switch (i) {
366                 case RPC_DISPLAY_PROTO:
367                 case RPC_DISPLAY_NETID:
368                         continue;
369                 default:
370                         kfree(xprt->address_strings[i]);
371                 }
372 }
373
374 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
375
376 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
377 {
378         struct msghdr msg = {
379                 .msg_name       = addr,
380                 .msg_namelen    = addrlen,
381                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
382         };
383         struct kvec iov = {
384                 .iov_base       = vec->iov_base + base,
385                 .iov_len        = vec->iov_len - base,
386         };
387
388         if (iov.iov_len != 0)
389                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
390         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
391 }
392
393 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)
394 {
395         ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
396                         int offset, size_t size, int flags);
397         struct page **ppage;
398         unsigned int remainder;
399         int err, sent = 0;
400
401         remainder = xdr->page_len - base;
402         base += xdr->page_base;
403         ppage = xdr->pages + (base >> PAGE_SHIFT);
404         base &= ~PAGE_MASK;
405         do_sendpage = sock->ops->sendpage;
406         if (!zerocopy)
407                 do_sendpage = sock_no_sendpage;
408         for(;;) {
409                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
410                 int flags = XS_SENDMSG_FLAGS;
411
412                 remainder -= len;
413                 if (remainder != 0 || more)
414                         flags |= MSG_MORE;
415                 err = do_sendpage(sock, *ppage, base, len, flags);
416                 if (remainder == 0 || err != len)
417                         break;
418                 sent += err;
419                 ppage++;
420                 base = 0;
421         }
422         if (sent == 0)
423                 return err;
424         if (err > 0)
425                 sent += err;
426         return sent;
427 }
428
429 /**
430  * xs_sendpages - write pages directly to a socket
431  * @sock: socket to send on
432  * @addr: UDP only -- address of destination
433  * @addrlen: UDP only -- length of destination address
434  * @xdr: buffer containing this request
435  * @base: starting position in the buffer
436  * @zerocopy: true if it is safe to use sendpage()
437  *
438  */
439 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)
440 {
441         unsigned int remainder = xdr->len - base;
442         int err, sent = 0;
443
444         if (unlikely(!sock))
445                 return -ENOTSOCK;
446
447         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
448         if (base != 0) {
449                 addr = NULL;
450                 addrlen = 0;
451         }
452
453         if (base < xdr->head[0].iov_len || addr != NULL) {
454                 unsigned int len = xdr->head[0].iov_len - base;
455                 remainder -= len;
456                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
457                 if (remainder == 0 || err != len)
458                         goto out;
459                 sent += err;
460                 base = 0;
461         } else
462                 base -= xdr->head[0].iov_len;
463
464         if (base < xdr->page_len) {
465                 unsigned int len = xdr->page_len - base;
466                 remainder -= len;
467                 err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);
468                 if (remainder == 0 || err != len)
469                         goto out;
470                 sent += err;
471                 base = 0;
472         } else
473                 base -= xdr->page_len;
474
475         if (base >= xdr->tail[0].iov_len)
476                 return sent;
477         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
478 out:
479         if (sent == 0)
480                 return err;
481         if (err > 0)
482                 sent += err;
483         return sent;
484 }
485
486 static void xs_nospace_callback(struct rpc_task *task)
487 {
488         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
489
490         transport->inet->sk_write_pending--;
491         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
492 }
493
494 /**
495  * xs_nospace - place task on wait queue if transmit was incomplete
496  * @task: task to put to sleep
497  *
498  */
499 static int xs_nospace(struct rpc_task *task)
500 {
501         struct rpc_rqst *req = task->tk_rqstp;
502         struct rpc_xprt *xprt = req->rq_xprt;
503         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
504         struct sock *sk = transport->inet;
505         int ret = -EAGAIN;
506
507         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
508                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
509                         req->rq_slen);
510
511         /* Protect against races with write_space */
512         spin_lock_bh(&xprt->transport_lock);
513
514         /* Don't race with disconnect */
515         if (xprt_connected(xprt)) {
516                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
517                         /*
518                          * Notify TCP that we're limited by the application
519                          * window size
520                          */
521                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
522                         sk->sk_write_pending++;
523                         /* ...and wait for more buffer space */
524                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
525                 }
526         } else {
527                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
528                 ret = -ENOTCONN;
529         }
530
531         spin_unlock_bh(&xprt->transport_lock);
532
533         /* Race breaker in case memory is freed before above code is called */
534         sk->sk_write_space(sk);
535         return ret;
536 }
537
538 /*
539  * Construct a stream transport record marker in @buf.
540  */
541 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
542 {
543         u32 reclen = buf->len - sizeof(rpc_fraghdr);
544         rpc_fraghdr *base = buf->head[0].iov_base;
545         *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
546 }
547
548 /**
549  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
550  * @task: RPC task that manages the state of an RPC request
551  *
552  * Return values:
553  *        0:    The request has been sent
554  *   EAGAIN:    The socket was blocked, please call again later to
555  *              complete the request
556  * ENOTCONN:    Caller needs to invoke connect logic then call again
557  *    other:    Some other error occured, the request was not sent
558  */
559 static int xs_local_send_request(struct rpc_task *task)
560 {
561         struct rpc_rqst *req = task->tk_rqstp;
562         struct rpc_xprt *xprt = req->rq_xprt;
563         struct sock_xprt *transport =
564                                 container_of(xprt, struct sock_xprt, xprt);
565         struct xdr_buf *xdr = &req->rq_snd_buf;
566         int status;
567
568         xs_encode_stream_record_marker(&req->rq_snd_buf);
569
570         xs_pktdump("packet data:",
571                         req->rq_svec->iov_base, req->rq_svec->iov_len);
572
573         status = xs_sendpages(transport->sock, NULL, 0,
574                                                 xdr, req->rq_bytes_sent, true);
575         dprintk("RPC:       %s(%u) = %d\n",
576                         __func__, xdr->len - req->rq_bytes_sent, status);
577         if (likely(status >= 0)) {
578                 req->rq_bytes_sent += status;
579                 req->rq_xmit_bytes_sent += status;
580                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
581                         req->rq_bytes_sent = 0;
582                         return 0;
583                 }
584                 status = -EAGAIN;
585         }
586
587         switch (status) {
588         case -EAGAIN:
589                 status = xs_nospace(task);
590                 break;
591         default:
592                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
593                         -status);
594         case -EPIPE:
595                 xs_close(xprt);
596                 status = -ENOTCONN;
597         }
598
599         return status;
600 }
601
602 /**
603  * xs_udp_send_request - write an RPC request to a UDP socket
604  * @task: address of RPC task that manages the state of an RPC request
605  *
606  * Return values:
607  *        0:    The request has been sent
608  *   EAGAIN:    The socket was blocked, please call again later to
609  *              complete the request
610  * ENOTCONN:    Caller needs to invoke connect logic then call again
611  *    other:    Some other error occurred, the request was not sent
612  */
613 static int xs_udp_send_request(struct rpc_task *task)
614 {
615         struct rpc_rqst *req = task->tk_rqstp;
616         struct rpc_xprt *xprt = req->rq_xprt;
617         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
618         struct xdr_buf *xdr = &req->rq_snd_buf;
619         int status;
620
621         xs_pktdump("packet data:",
622                                 req->rq_svec->iov_base,
623                                 req->rq_svec->iov_len);
624
625         if (!xprt_bound(xprt))
626                 return -ENOTCONN;
627         status = xs_sendpages(transport->sock,
628                               xs_addr(xprt),
629                               xprt->addrlen, xdr,
630                               req->rq_bytes_sent, true);
631
632         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
633                         xdr->len - req->rq_bytes_sent, status);
634
635         if (status >= 0) {
636                 req->rq_xmit_bytes_sent += status;
637                 if (status >= req->rq_slen)
638                         return 0;
639                 /* Still some bytes left; set up for a retry later. */
640                 status = -EAGAIN;
641         }
642
643         switch (status) {
644         case -ENOTSOCK:
645                 status = -ENOTCONN;
646                 /* Should we call xs_close() here? */
647                 break;
648         case -EAGAIN:
649                 status = xs_nospace(task);
650                 break;
651         default:
652                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
653                         -status);
654         case -ENETUNREACH:
655         case -EPIPE:
656         case -ECONNREFUSED:
657                 /* When the server has died, an ICMP port unreachable message
658                  * prompts ECONNREFUSED. */
659                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
660         }
661
662         return status;
663 }
664
665 /**
666  * xs_tcp_shutdown - gracefully shut down a TCP socket
667  * @xprt: transport
668  *
669  * Initiates a graceful shutdown of the TCP socket by calling the
670  * equivalent of shutdown(SHUT_WR);
671  */
672 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
673 {
674         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
675         struct socket *sock = transport->sock;
676
677         if (sock != NULL)
678                 kernel_sock_shutdown(sock, SHUT_WR);
679 }
680
681 /**
682  * xs_tcp_send_request - write an RPC request to a TCP socket
683  * @task: address of RPC task that manages the state of an RPC request
684  *
685  * Return values:
686  *        0:    The request has been sent
687  *   EAGAIN:    The socket was blocked, please call again later to
688  *              complete the request
689  * ENOTCONN:    Caller needs to invoke connect logic then call again
690  *    other:    Some other error occurred, the request was not sent
691  *
692  * XXX: In the case of soft timeouts, should we eventually give up
693  *      if sendmsg is not able to make progress?
694  */
695 static int xs_tcp_send_request(struct rpc_task *task)
696 {
697         struct rpc_rqst *req = task->tk_rqstp;
698         struct rpc_xprt *xprt = req->rq_xprt;
699         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
700         struct xdr_buf *xdr = &req->rq_snd_buf;
701         bool zerocopy = true;
702         int status;
703
704         xs_encode_stream_record_marker(&req->rq_snd_buf);
705
706         xs_pktdump("packet data:",
707                                 req->rq_svec->iov_base,
708                                 req->rq_svec->iov_len);
709         /* Don't use zero copy if this is a resend. If the RPC call
710          * completes while the socket holds a reference to the pages,
711          * then we may end up resending corrupted data.
712          */
713         if (task->tk_flags & RPC_TASK_SENT)
714                 zerocopy = false;
715
716         /* Continue transmitting the packet/record. We must be careful
717          * to cope with writespace callbacks arriving _after_ we have
718          * called sendmsg(). */
719         while (1) {
720                 status = xs_sendpages(transport->sock,
721                                         NULL, 0, xdr, req->rq_bytes_sent,
722                                         zerocopy);
723
724                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
725                                 xdr->len - req->rq_bytes_sent, status);
726
727                 if (unlikely(status < 0))
728                         break;
729
730                 /* If we've sent the entire packet, immediately
731                  * reset the count of bytes sent. */
732                 req->rq_bytes_sent += status;
733                 req->rq_xmit_bytes_sent += status;
734                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
735                         req->rq_bytes_sent = 0;
736                         return 0;
737                 }
738
739                 if (status != 0)
740                         continue;
741                 status = -EAGAIN;
742                 break;
743         }
744
745         switch (status) {
746         case -ENOTSOCK:
747                 status = -ENOTCONN;
748                 /* Should we call xs_close() here? */
749                 break;
750         case -EAGAIN:
751                 status = xs_nospace(task);
752                 break;
753         default:
754                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
755                         -status);
756         case -ECONNRESET:
757                 xs_tcp_shutdown(xprt);
758         case -ECONNREFUSED:
759         case -ENOTCONN:
760         case -EPIPE:
761                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
762         }
763
764         return status;
765 }
766
767 /**
768  * xs_tcp_release_xprt - clean up after a tcp transmission
769  * @xprt: transport
770  * @task: rpc task
771  *
772  * This cleans up if an error causes us to abort the transmission of a request.
773  * In this case, the socket may need to be reset in order to avoid confusing
774  * the server.
775  */
776 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
777 {
778         struct rpc_rqst *req;
779
780         if (task != xprt->snd_task)
781                 return;
782         if (task == NULL)
783                 goto out_release;
784         req = task->tk_rqstp;
785         if (req == NULL)
786                 goto out_release;
787         if (req->rq_bytes_sent == 0)
788                 goto out_release;
789         if (req->rq_bytes_sent == req->rq_snd_buf.len)
790                 goto out_release;
791         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
792 out_release:
793         xprt_release_xprt(xprt, task);
794 }
795
796 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
797 {
798         transport->old_data_ready = sk->sk_data_ready;
799         transport->old_state_change = sk->sk_state_change;
800         transport->old_write_space = sk->sk_write_space;
801 }
802
803 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
804 {
805         sk->sk_data_ready = transport->old_data_ready;
806         sk->sk_state_change = transport->old_state_change;
807         sk->sk_write_space = transport->old_write_space;
808 }
809
810 static void xs_reset_transport(struct sock_xprt *transport)
811 {
812         struct socket *sock = transport->sock;
813         struct sock *sk = transport->inet;
814         struct rpc_xprt *xprt = &transport->xprt;
815
816         if (sk == NULL)
817                 return;
818
819         transport->srcport = 0;
820
821         write_lock_bh(&sk->sk_callback_lock);
822         transport->inet = NULL;
823         transport->sock = NULL;
824
825         sk->sk_user_data = NULL;
826
827         xs_restore_old_callbacks(transport, sk);
828         xprt_clear_connected(xprt);
829         write_unlock_bh(&sk->sk_callback_lock);
830
831         sk->sk_no_check = 0;
832
833         sock_release(sock);
834 }
835
836 /**
837  * xs_close - close a socket
838  * @xprt: transport
839  *
840  * This is used when all requests are complete; ie, no DRC state remains
841  * on the server we want to save.
842  *
843  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
844  * xs_reset_transport() zeroing the socket from underneath a writer.
845  */
846 static void xs_close(struct rpc_xprt *xprt)
847 {
848         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
849
850         dprintk("RPC:       xs_close xprt %p\n", xprt);
851
852         xs_reset_transport(transport);
853         xprt->reestablish_timeout = 0;
854
855         smp_mb__before_clear_bit();
856         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
857         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
858         clear_bit(XPRT_CLOSING, &xprt->state);
859         smp_mb__after_clear_bit();
860         xprt_disconnect_done(xprt);
861 }
862
863 static void xs_tcp_close(struct rpc_xprt *xprt)
864 {
865         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
866                 xs_close(xprt);
867         else
868                 xs_tcp_shutdown(xprt);
869 }
870
871 /**
872  * xs_destroy - prepare to shutdown a transport
873  * @xprt: doomed transport
874  *
875  */
876 static void xs_destroy(struct rpc_xprt *xprt)
877 {
878         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
879
880         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
881
882         cancel_delayed_work_sync(&transport->connect_worker);
883
884         xs_close(xprt);
885         xs_free_peer_addresses(xprt);
886         xprt_free(xprt);
887         module_put(THIS_MODULE);
888 }
889
890 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
891 {
892         return (struct rpc_xprt *) sk->sk_user_data;
893 }
894
895 static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
896 {
897         struct xdr_skb_reader desc = {
898                 .skb            = skb,
899                 .offset         = sizeof(rpc_fraghdr),
900                 .count          = skb->len - sizeof(rpc_fraghdr),
901         };
902
903         if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
904                 return -1;
905         if (desc.count)
906                 return -1;
907         return 0;
908 }
909
910 /**
911  * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
912  * @sk: socket with data to read
913  * @len: how much data to read
914  *
915  * Currently this assumes we can read the whole reply in a single gulp.
916  */
917 static void xs_local_data_ready(struct sock *sk, int len)
918 {
919         struct rpc_task *task;
920         struct rpc_xprt *xprt;
921         struct rpc_rqst *rovr;
922         struct sk_buff *skb;
923         int err, repsize, copied;
924         u32 _xid;
925         __be32 *xp;
926
927         read_lock_bh(&sk->sk_callback_lock);
928         dprintk("RPC:       %s...\n", __func__);
929         xprt = xprt_from_sock(sk);
930         if (xprt == NULL)
931                 goto out;
932
933         skb = skb_recv_datagram(sk, 0, 1, &err);
934         if (skb == NULL)
935                 goto out;
936
937         if (xprt->shutdown)
938                 goto dropit;
939
940         repsize = skb->len - sizeof(rpc_fraghdr);
941         if (repsize < 4) {
942                 dprintk("RPC:       impossible RPC reply size %d\n", repsize);
943                 goto dropit;
944         }
945
946         /* Copy the XID from the skb... */
947         xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
948         if (xp == NULL)
949                 goto dropit;
950
951         /* Look up and lock the request corresponding to the given XID */
952         spin_lock(&xprt->transport_lock);
953         rovr = xprt_lookup_rqst(xprt, *xp);
954         if (!rovr)
955                 goto out_unlock;
956         task = rovr->rq_task;
957
958         copied = rovr->rq_private_buf.buflen;
959         if (copied > repsize)
960                 copied = repsize;
961
962         if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
963                 dprintk("RPC:       sk_buff copy failed\n");
964                 goto out_unlock;
965         }
966
967         xprt_complete_rqst(task, copied);
968
969  out_unlock:
970         spin_unlock(&xprt->transport_lock);
971  dropit:
972         skb_free_datagram(sk, skb);
973  out:
974         read_unlock_bh(&sk->sk_callback_lock);
975 }
976
977 /**
978  * xs_udp_data_ready - "data ready" callback for UDP sockets
979  * @sk: socket with data to read
980  * @len: how much data to read
981  *
982  */
983 static void xs_udp_data_ready(struct sock *sk, int len)
984 {
985         struct rpc_task *task;
986         struct rpc_xprt *xprt;
987         struct rpc_rqst *rovr;
988         struct sk_buff *skb;
989         int err, repsize, copied;
990         u32 _xid;
991         __be32 *xp;
992
993         read_lock_bh(&sk->sk_callback_lock);
994         dprintk("RPC:       xs_udp_data_ready...\n");
995         if (!(xprt = xprt_from_sock(sk)))
996                 goto out;
997
998         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
999                 goto out;
1000
1001         if (xprt->shutdown)
1002                 goto dropit;
1003
1004         repsize = skb->len - sizeof(struct udphdr);
1005         if (repsize < 4) {
1006                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
1007                 goto dropit;
1008         }
1009
1010         /* Copy the XID from the skb... */
1011         xp = skb_header_pointer(skb, sizeof(struct udphdr),
1012                                 sizeof(_xid), &_xid);
1013         if (xp == NULL)
1014                 goto dropit;
1015
1016         /* Look up and lock the request corresponding to the given XID */
1017         spin_lock(&xprt->transport_lock);
1018         rovr = xprt_lookup_rqst(xprt, *xp);
1019         if (!rovr)
1020                 goto out_unlock;
1021         task = rovr->rq_task;
1022
1023         if ((copied = rovr->rq_private_buf.buflen) > repsize)
1024                 copied = repsize;
1025
1026         /* Suck it into the iovec, verify checksum if not done by hw. */
1027         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1028                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1029                 goto out_unlock;
1030         }
1031
1032         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1033
1034         /* Something worked... */
1035         dst_confirm(skb_dst(skb));
1036
1037         xprt_adjust_cwnd(task, copied);
1038         xprt_complete_rqst(task, copied);
1039
1040  out_unlock:
1041         spin_unlock(&xprt->transport_lock);
1042  dropit:
1043         skb_free_datagram(sk, skb);
1044  out:
1045         read_unlock_bh(&sk->sk_callback_lock);
1046 }
1047
1048 /*
1049  * Helper function to force a TCP close if the server is sending
1050  * junk and/or it has put us in CLOSE_WAIT
1051  */
1052 static void xs_tcp_force_close(struct rpc_xprt *xprt)
1053 {
1054         set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1055         xprt_force_disconnect(xprt);
1056 }
1057
1058 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1059 {
1060         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1061         size_t len, used;
1062         char *p;
1063
1064         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1065         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1066         used = xdr_skb_read_bits(desc, p, len);
1067         transport->tcp_offset += used;
1068         if (used != len)
1069                 return;
1070
1071         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1072         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1073                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1074         else
1075                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1076         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1077
1078         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1079         transport->tcp_offset = 0;
1080
1081         /* Sanity check of the record length */
1082         if (unlikely(transport->tcp_reclen < 8)) {
1083                 dprintk("RPC:       invalid TCP record fragment length\n");
1084                 xs_tcp_force_close(xprt);
1085                 return;
1086         }
1087         dprintk("RPC:       reading TCP record fragment of length %d\n",
1088                         transport->tcp_reclen);
1089 }
1090
1091 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1092 {
1093         if (transport->tcp_offset == transport->tcp_reclen) {
1094                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1095                 transport->tcp_offset = 0;
1096                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1097                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1098                         transport->tcp_flags |= TCP_RCV_COPY_XID;
1099                         transport->tcp_copied = 0;
1100                 }
1101         }
1102 }
1103
1104 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1105 {
1106         size_t len, used;
1107         char *p;
1108
1109         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1110         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1111         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1112         used = xdr_skb_read_bits(desc, p, len);
1113         transport->tcp_offset += used;
1114         if (used != len)
1115                 return;
1116         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1117         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1118         transport->tcp_copied = 4;
1119         dprintk("RPC:       reading %s XID %08x\n",
1120                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1121                                                               : "request with",
1122                         ntohl(transport->tcp_xid));
1123         xs_tcp_check_fraghdr(transport);
1124 }
1125
1126 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1127                                        struct xdr_skb_reader *desc)
1128 {
1129         size_t len, used;
1130         u32 offset;
1131         char *p;
1132
1133         /*
1134          * We want transport->tcp_offset to be 8 at the end of this routine
1135          * (4 bytes for the xid and 4 bytes for the call/reply flag).
1136          * When this function is called for the first time,
1137          * transport->tcp_offset is 4 (after having already read the xid).
1138          */
1139         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1140         len = sizeof(transport->tcp_calldir) - offset;
1141         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1142         p = ((char *) &transport->tcp_calldir) + offset;
1143         used = xdr_skb_read_bits(desc, p, len);
1144         transport->tcp_offset += used;
1145         if (used != len)
1146                 return;
1147         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1148         /*
1149          * We don't yet have the XDR buffer, so we will write the calldir
1150          * out after we get the buffer from the 'struct rpc_rqst'
1151          */
1152         switch (ntohl(transport->tcp_calldir)) {
1153         case RPC_REPLY:
1154                 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1155                 transport->tcp_flags |= TCP_RCV_COPY_DATA;
1156                 transport->tcp_flags |= TCP_RPC_REPLY;
1157                 break;
1158         case RPC_CALL:
1159                 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1160                 transport->tcp_flags |= TCP_RCV_COPY_DATA;
1161                 transport->tcp_flags &= ~TCP_RPC_REPLY;
1162                 break;
1163         default:
1164                 dprintk("RPC:       invalid request message type\n");
1165                 xs_tcp_force_close(&transport->xprt);
1166         }
1167         xs_tcp_check_fraghdr(transport);
1168 }
1169
1170 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1171                                      struct xdr_skb_reader *desc,
1172                                      struct rpc_rqst *req)
1173 {
1174         struct sock_xprt *transport =
1175                                 container_of(xprt, struct sock_xprt, xprt);
1176         struct xdr_buf *rcvbuf;
1177         size_t len;
1178         ssize_t r;
1179
1180         rcvbuf = &req->rq_private_buf;
1181
1182         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1183                 /*
1184                  * Save the RPC direction in the XDR buffer
1185                  */
1186                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1187                         &transport->tcp_calldir,
1188                         sizeof(transport->tcp_calldir));
1189                 transport->tcp_copied += sizeof(transport->tcp_calldir);
1190                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1191         }
1192
1193         len = desc->count;
1194         if (len > transport->tcp_reclen - transport->tcp_offset) {
1195                 struct xdr_skb_reader my_desc;
1196
1197                 len = transport->tcp_reclen - transport->tcp_offset;
1198                 memcpy(&my_desc, desc, sizeof(my_desc));
1199                 my_desc.count = len;
1200                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1201                                           &my_desc, xdr_skb_read_bits);
1202                 desc->count -= r;
1203                 desc->offset += r;
1204         } else
1205                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1206                                           desc, xdr_skb_read_bits);
1207
1208         if (r > 0) {
1209                 transport->tcp_copied += r;
1210                 transport->tcp_offset += r;
1211         }
1212         if (r != len) {
1213                 /* Error when copying to the receive buffer,
1214                  * usually because we weren't able to allocate
1215                  * additional buffer pages. All we can do now
1216                  * is turn off TCP_RCV_COPY_DATA, so the request
1217                  * will not receive any additional updates,
1218                  * and time out.
1219                  * Any remaining data from this record will
1220                  * be discarded.
1221                  */
1222                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1223                 dprintk("RPC:       XID %08x truncated request\n",
1224                                 ntohl(transport->tcp_xid));
1225                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1226                                 "tcp_offset = %u, tcp_reclen = %u\n",
1227                                 xprt, transport->tcp_copied,
1228                                 transport->tcp_offset, transport->tcp_reclen);
1229                 return;
1230         }
1231
1232         dprintk("RPC:       XID %08x read %Zd bytes\n",
1233                         ntohl(transport->tcp_xid), r);
1234         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1235                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1236                         transport->tcp_offset, transport->tcp_reclen);
1237
1238         if (transport->tcp_copied == req->rq_private_buf.buflen)
1239                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1240         else if (transport->tcp_offset == transport->tcp_reclen) {
1241                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1242                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1243         }
1244 }
1245
1246 /*
1247  * Finds the request corresponding to the RPC xid and invokes the common
1248  * tcp read code to read the data.
1249  */
1250 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1251                                     struct xdr_skb_reader *desc)
1252 {
1253         struct sock_xprt *transport =
1254                                 container_of(xprt, struct sock_xprt, xprt);
1255         struct rpc_rqst *req;
1256
1257         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1258
1259         /* Find and lock the request corresponding to this xid */
1260         spin_lock(&xprt->transport_lock);
1261         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1262         if (!req) {
1263                 dprintk("RPC:       XID %08x request not found!\n",
1264                                 ntohl(transport->tcp_xid));
1265                 spin_unlock(&xprt->transport_lock);
1266                 return -1;
1267         }
1268
1269         xs_tcp_read_common(xprt, desc, req);
1270
1271         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1272                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1273
1274         spin_unlock(&xprt->transport_lock);
1275         return 0;
1276 }
1277
1278 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1279 /*
1280  * Obtains an rpc_rqst previously allocated and invokes the common
1281  * tcp read code to read the data.  The result is placed in the callback
1282  * queue.
1283  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1284  * connection and return -1.
1285  */
1286 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1287                                        struct xdr_skb_reader *desc)
1288 {
1289         struct sock_xprt *transport =
1290                                 container_of(xprt, struct sock_xprt, xprt);
1291         struct rpc_rqst *req;
1292
1293         req = xprt_alloc_bc_request(xprt);
1294         if (req == NULL) {
1295                 printk(KERN_WARNING "Callback slot table overflowed\n");
1296                 xprt_force_disconnect(xprt);
1297                 return -1;
1298         }
1299
1300         req->rq_xid = transport->tcp_xid;
1301         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1302         xs_tcp_read_common(xprt, desc, req);
1303
1304         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1305                 struct svc_serv *bc_serv = xprt->bc_serv;
1306
1307                 /*
1308                  * Add callback request to callback list.  The callback
1309                  * service sleeps on the sv_cb_waitq waiting for new
1310                  * requests.  Wake it up after adding enqueing the
1311                  * request.
1312                  */
1313                 dprintk("RPC:       add callback request to list\n");
1314                 spin_lock(&bc_serv->sv_cb_lock);
1315                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1316                 spin_unlock(&bc_serv->sv_cb_lock);
1317                 wake_up(&bc_serv->sv_cb_waitq);
1318         }
1319
1320         req->rq_private_buf.len = transport->tcp_copied;
1321
1322         return 0;
1323 }
1324
1325 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1326                                         struct xdr_skb_reader *desc)
1327 {
1328         struct sock_xprt *transport =
1329                                 container_of(xprt, struct sock_xprt, xprt);
1330
1331         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1332                 xs_tcp_read_reply(xprt, desc) :
1333                 xs_tcp_read_callback(xprt, desc);
1334 }
1335 #else
1336 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1337                                         struct xdr_skb_reader *desc)
1338 {
1339         return xs_tcp_read_reply(xprt, desc);
1340 }
1341 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1342
1343 /*
1344  * Read data off the transport.  This can be either an RPC_CALL or an
1345  * RPC_REPLY.  Relay the processing to helper functions.
1346  */
1347 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1348                                     struct xdr_skb_reader *desc)
1349 {
1350         struct sock_xprt *transport =
1351                                 container_of(xprt, struct sock_xprt, xprt);
1352
1353         if (_xs_tcp_read_data(xprt, desc) == 0)
1354                 xs_tcp_check_fraghdr(transport);
1355         else {
1356                 /*
1357                  * The transport_lock protects the request handling.
1358                  * There's no need to hold it to update the tcp_flags.
1359                  */
1360                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1361         }
1362 }
1363
1364 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1365 {
1366         size_t len;
1367
1368         len = transport->tcp_reclen - transport->tcp_offset;
1369         if (len > desc->count)
1370                 len = desc->count;
1371         desc->count -= len;
1372         desc->offset += len;
1373         transport->tcp_offset += len;
1374         dprintk("RPC:       discarded %Zu bytes\n", len);
1375         xs_tcp_check_fraghdr(transport);
1376 }
1377
1378 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1379 {
1380         struct rpc_xprt *xprt = rd_desc->arg.data;
1381         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1382         struct xdr_skb_reader desc = {
1383                 .skb    = skb,
1384                 .offset = offset,
1385                 .count  = len,
1386         };
1387
1388         dprintk("RPC:       xs_tcp_data_recv started\n");
1389         do {
1390                 /* Read in a new fragment marker if necessary */
1391                 /* Can we ever really expect to get completely empty fragments? */
1392                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1393                         xs_tcp_read_fraghdr(xprt, &desc);
1394                         continue;
1395                 }
1396                 /* Read in the xid if necessary */
1397                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1398                         xs_tcp_read_xid(transport, &desc);
1399                         continue;
1400                 }
1401                 /* Read in the call/reply flag */
1402                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1403                         xs_tcp_read_calldir(transport, &desc);
1404                         continue;
1405                 }
1406                 /* Read in the request data */
1407                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1408                         xs_tcp_read_data(xprt, &desc);
1409                         continue;
1410                 }
1411                 /* Skip over any trailing bytes on short reads */
1412                 xs_tcp_read_discard(transport, &desc);
1413         } while (desc.count);
1414         dprintk("RPC:       xs_tcp_data_recv done\n");
1415         return len - desc.count;
1416 }
1417
1418 /**
1419  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1420  * @sk: socket with data to read
1421  * @bytes: how much data to read
1422  *
1423  */
1424 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1425 {
1426         struct rpc_xprt *xprt;
1427         read_descriptor_t rd_desc;
1428         int read;
1429
1430         dprintk("RPC:       xs_tcp_data_ready...\n");
1431
1432         read_lock_bh(&sk->sk_callback_lock);
1433         if (!(xprt = xprt_from_sock(sk)))
1434                 goto out;
1435         if (xprt->shutdown)
1436                 goto out;
1437
1438         /* Any data means we had a useful conversation, so
1439          * the we don't need to delay the next reconnect
1440          */
1441         if (xprt->reestablish_timeout)
1442                 xprt->reestablish_timeout = 0;
1443
1444         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1445         rd_desc.arg.data = xprt;
1446         do {
1447                 rd_desc.count = 65536;
1448                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1449         } while (read > 0);
1450 out:
1451         read_unlock_bh(&sk->sk_callback_lock);
1452 }
1453
1454 /*
1455  * Do the equivalent of linger/linger2 handling for dealing with
1456  * broken servers that don't close the socket in a timely
1457  * fashion
1458  */
1459 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1460                 unsigned long timeout)
1461 {
1462         struct sock_xprt *transport;
1463
1464         if (xprt_test_and_set_connecting(xprt))
1465                 return;
1466         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1467         transport = container_of(xprt, struct sock_xprt, xprt);
1468         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1469                            timeout);
1470 }
1471
1472 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1473 {
1474         struct sock_xprt *transport;
1475
1476         transport = container_of(xprt, struct sock_xprt, xprt);
1477
1478         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1479             !cancel_delayed_work(&transport->connect_worker))
1480                 return;
1481         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1482         xprt_clear_connecting(xprt);
1483 }
1484
1485 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
1486 {
1487         smp_mb__before_clear_bit();
1488         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1489         clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1490         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1491         clear_bit(XPRT_CLOSING, &xprt->state);
1492         smp_mb__after_clear_bit();
1493 }
1494
1495 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1496 {
1497         xs_sock_reset_connection_flags(xprt);
1498         /* Mark transport as closed and wake up all pending tasks */
1499         xprt_disconnect_done(xprt);
1500 }
1501
1502 /**
1503  * xs_tcp_state_change - callback to handle TCP socket state changes
1504  * @sk: socket whose state has changed
1505  *
1506  */
1507 static void xs_tcp_state_change(struct sock *sk)
1508 {
1509         struct rpc_xprt *xprt;
1510
1511         read_lock_bh(&sk->sk_callback_lock);
1512         if (!(xprt = xprt_from_sock(sk)))
1513                 goto out;
1514         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1515         dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1516                         sk->sk_state, xprt_connected(xprt),
1517                         sock_flag(sk, SOCK_DEAD),
1518                         sock_flag(sk, SOCK_ZAPPED),
1519                         sk->sk_shutdown);
1520
1521         switch (sk->sk_state) {
1522         case TCP_ESTABLISHED:
1523                 spin_lock(&xprt->transport_lock);
1524                 if (!xprt_test_and_set_connected(xprt)) {
1525                         struct sock_xprt *transport = container_of(xprt,
1526                                         struct sock_xprt, xprt);
1527
1528                         /* Reset TCP record info */
1529                         transport->tcp_offset = 0;
1530                         transport->tcp_reclen = 0;
1531                         transport->tcp_copied = 0;
1532                         transport->tcp_flags =
1533                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1534
1535                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1536                 }
1537                 spin_unlock(&xprt->transport_lock);
1538                 break;
1539         case TCP_FIN_WAIT1:
1540                 /* The client initiated a shutdown of the socket */
1541                 xprt->connect_cookie++;
1542                 xprt->reestablish_timeout = 0;
1543                 set_bit(XPRT_CLOSING, &xprt->state);
1544                 smp_mb__before_clear_bit();
1545                 clear_bit(XPRT_CONNECTED, &xprt->state);
1546                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1547                 smp_mb__after_clear_bit();
1548                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1549                 break;
1550         case TCP_CLOSE_WAIT:
1551                 /* The server initiated a shutdown of the socket */
1552                 xprt->connect_cookie++;
1553                 clear_bit(XPRT_CONNECTED, &xprt->state);
1554                 xs_tcp_force_close(xprt);
1555         case TCP_CLOSING:
1556                 /*
1557                  * If the server closed down the connection, make sure that
1558                  * we back off before reconnecting
1559                  */
1560                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1561                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1562                 break;
1563         case TCP_LAST_ACK:
1564                 set_bit(XPRT_CLOSING, &xprt->state);
1565                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1566                 smp_mb__before_clear_bit();
1567                 clear_bit(XPRT_CONNECTED, &xprt->state);
1568                 smp_mb__after_clear_bit();
1569                 break;
1570         case TCP_CLOSE:
1571                 xs_tcp_cancel_linger_timeout(xprt);
1572                 xs_sock_mark_closed(xprt);
1573         }
1574  out:
1575         read_unlock_bh(&sk->sk_callback_lock);
1576 }
1577
1578 static void xs_write_space(struct sock *sk)
1579 {
1580         struct socket *sock;
1581         struct rpc_xprt *xprt;
1582
1583         if (unlikely(!(sock = sk->sk_socket)))
1584                 return;
1585         clear_bit(SOCK_NOSPACE, &sock->flags);
1586
1587         if (unlikely(!(xprt = xprt_from_sock(sk))))
1588                 return;
1589         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1590                 return;
1591
1592         xprt_write_space(xprt);
1593 }
1594
1595 /**
1596  * xs_udp_write_space - callback invoked when socket buffer space
1597  *                             becomes available
1598  * @sk: socket whose state has changed
1599  *
1600  * Called when more output buffer space is available for this socket.
1601  * We try not to wake our writers until they can make "significant"
1602  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1603  * with a bunch of small requests.
1604  */
1605 static void xs_udp_write_space(struct sock *sk)
1606 {
1607         read_lock_bh(&sk->sk_callback_lock);
1608
1609         /* from net/core/sock.c:sock_def_write_space */
1610         if (sock_writeable(sk))
1611                 xs_write_space(sk);
1612
1613         read_unlock_bh(&sk->sk_callback_lock);
1614 }
1615
1616 /**
1617  * xs_tcp_write_space - callback invoked when socket buffer space
1618  *                             becomes available
1619  * @sk: socket whose state has changed
1620  *
1621  * Called when more output buffer space is available for this socket.
1622  * We try not to wake our writers until they can make "significant"
1623  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1624  * with a bunch of small requests.
1625  */
1626 static void xs_tcp_write_space(struct sock *sk)
1627 {
1628         read_lock_bh(&sk->sk_callback_lock);
1629
1630         /* from net/core/stream.c:sk_stream_write_space */
1631         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1632                 xs_write_space(sk);
1633
1634         read_unlock_bh(&sk->sk_callback_lock);
1635 }
1636
1637 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1638 {
1639         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1640         struct sock *sk = transport->inet;
1641
1642         if (transport->rcvsize) {
1643                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1644                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1645         }
1646         if (transport->sndsize) {
1647                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1648                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1649                 sk->sk_write_space(sk);
1650         }
1651 }
1652
1653 /**
1654  * xs_udp_set_buffer_size - set send and receive limits
1655  * @xprt: generic transport
1656  * @sndsize: requested size of send buffer, in bytes
1657  * @rcvsize: requested size of receive buffer, in bytes
1658  *
1659  * Set socket send and receive buffer size limits.
1660  */
1661 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1662 {
1663         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1664
1665         transport->sndsize = 0;
1666         if (sndsize)
1667                 transport->sndsize = sndsize + 1024;
1668         transport->rcvsize = 0;
1669         if (rcvsize)
1670                 transport->rcvsize = rcvsize + 1024;
1671
1672         xs_udp_do_set_buffer_size(xprt);
1673 }
1674
1675 /**
1676  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1677  * @task: task that timed out
1678  *
1679  * Adjust the congestion window after a retransmit timeout has occurred.
1680  */
1681 static void xs_udp_timer(struct rpc_task *task)
1682 {
1683         xprt_adjust_cwnd(task, -ETIMEDOUT);
1684 }
1685
1686 static unsigned short xs_get_random_port(void)
1687 {
1688         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1689         unsigned short rand = (unsigned short) net_random() % range;
1690         return rand + xprt_min_resvport;
1691 }
1692
1693 /**
1694  * xs_set_port - reset the port number in the remote endpoint address
1695  * @xprt: generic transport
1696  * @port: new port number
1697  *
1698  */
1699 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1700 {
1701         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1702
1703         rpc_set_port(xs_addr(xprt), port);
1704         xs_update_peer_port(xprt);
1705 }
1706
1707 static unsigned short xs_get_srcport(struct sock_xprt *transport)
1708 {
1709         unsigned short port = transport->srcport;
1710
1711         if (port == 0 && transport->xprt.resvport)
1712                 port = xs_get_random_port();
1713         return port;
1714 }
1715
1716 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1717 {
1718         if (transport->srcport != 0)
1719                 transport->srcport = 0;
1720         if (!transport->xprt.resvport)
1721                 return 0;
1722         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1723                 return xprt_max_resvport;
1724         return --port;
1725 }
1726 static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1727 {
1728         struct sockaddr_storage myaddr;
1729         int err, nloop = 0;
1730         unsigned short port = xs_get_srcport(transport);
1731         unsigned short last;
1732
1733         memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1734         do {
1735                 rpc_set_port((struct sockaddr *)&myaddr, port);
1736                 err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1737                                 transport->xprt.addrlen);
1738                 if (port == 0)
1739                         break;
1740                 if (err == 0) {
1741                         transport->srcport = port;
1742                         break;
1743                 }
1744                 last = port;
1745                 port = xs_next_srcport(transport, port);
1746                 if (port > last)
1747                         nloop++;
1748         } while (err == -EADDRINUSE && nloop != 2);
1749
1750         if (myaddr.ss_family == AF_INET)
1751                 dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1752                                 &((struct sockaddr_in *)&myaddr)->sin_addr,
1753                                 port, err ? "failed" : "ok", err);
1754         else
1755                 dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1756                                 &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1757                                 port, err ? "failed" : "ok", err);
1758         return err;
1759 }
1760
1761 /*
1762  * We don't support autobind on AF_LOCAL sockets
1763  */
1764 static void xs_local_rpcbind(struct rpc_task *task)
1765 {
1766         xprt_set_bound(task->tk_xprt);
1767 }
1768
1769 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1770 {
1771 }
1772
1773 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1774 static struct lock_class_key xs_key[2];
1775 static struct lock_class_key xs_slock_key[2];
1776
1777 static inline void xs_reclassify_socketu(struct socket *sock)
1778 {
1779         struct sock *sk = sock->sk;
1780
1781         BUG_ON(sock_owned_by_user(sk));
1782         sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1783                 &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1784 }
1785
1786 static inline void xs_reclassify_socket4(struct socket *sock)
1787 {
1788         struct sock *sk = sock->sk;
1789
1790         BUG_ON(sock_owned_by_user(sk));
1791         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1792                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1793 }
1794
1795 static inline void xs_reclassify_socket6(struct socket *sock)
1796 {
1797         struct sock *sk = sock->sk;
1798
1799         BUG_ON(sock_owned_by_user(sk));
1800         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1801                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1802 }
1803
1804 static inline void xs_reclassify_socket(int family, struct socket *sock)
1805 {
1806         switch (family) {
1807         case AF_LOCAL:
1808                 xs_reclassify_socketu(sock);
1809                 break;
1810         case AF_INET:
1811                 xs_reclassify_socket4(sock);
1812                 break;
1813         case AF_INET6:
1814                 xs_reclassify_socket6(sock);
1815                 break;
1816         }
1817 }
1818 #else
1819 static inline void xs_reclassify_socketu(struct socket *sock)
1820 {
1821 }
1822
1823 static inline void xs_reclassify_socket4(struct socket *sock)
1824 {
1825 }
1826
1827 static inline void xs_reclassify_socket6(struct socket *sock)
1828 {
1829 }
1830
1831 static inline void xs_reclassify_socket(int family, struct socket *sock)
1832 {
1833 }
1834 #endif
1835
1836 static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1837                 struct sock_xprt *transport, int family, int type, int protocol)
1838 {
1839         struct socket *sock;
1840         int err;
1841
1842         err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1843         if (err < 0) {
1844                 dprintk("RPC:       can't create %d transport socket (%d).\n",
1845                                 protocol, -err);
1846                 goto out;
1847         }
1848         xs_reclassify_socket(family, sock);
1849
1850         err = xs_bind(transport, sock);
1851         if (err) {
1852                 sock_release(sock);
1853                 goto out;
1854         }
1855
1856         return sock;
1857 out:
1858         return ERR_PTR(err);
1859 }
1860
1861 static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1862                                       struct socket *sock)
1863 {
1864         struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1865                                                                         xprt);
1866
1867         if (!transport->inet) {
1868                 struct sock *sk = sock->sk;
1869
1870                 write_lock_bh(&sk->sk_callback_lock);
1871
1872                 xs_save_old_callbacks(transport, sk);
1873
1874                 sk->sk_user_data = xprt;
1875                 sk->sk_data_ready = xs_local_data_ready;
1876                 sk->sk_write_space = xs_udp_write_space;
1877                 sk->sk_allocation = GFP_ATOMIC;
1878
1879                 xprt_clear_connected(xprt);
1880
1881                 /* Reset to new socket */
1882                 transport->sock = sock;
1883                 transport->inet = sk;
1884
1885                 write_unlock_bh(&sk->sk_callback_lock);
1886         }
1887
1888         /* Tell the socket layer to start connecting... */
1889         xprt->stat.connect_count++;
1890         xprt->stat.connect_start = jiffies;
1891         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1892 }
1893
1894 /**
1895  * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1896  * @xprt: RPC transport to connect
1897  * @transport: socket transport to connect
1898  * @create_sock: function to create a socket of the correct type
1899  *
1900  * Invoked by a work queue tasklet.
1901  */
1902 static void xs_local_setup_socket(struct work_struct *work)
1903 {
1904         struct sock_xprt *transport =
1905                 container_of(work, struct sock_xprt, connect_worker.work);
1906         struct rpc_xprt *xprt = &transport->xprt;
1907         struct socket *sock;
1908         int status = -EIO;
1909
1910         if (xprt->shutdown)
1911                 goto out;
1912
1913         current->flags |= PF_FSTRANS;
1914
1915         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1916         status = __sock_create(xprt->xprt_net, AF_LOCAL,
1917                                         SOCK_STREAM, 0, &sock, 1);
1918         if (status < 0) {
1919                 dprintk("RPC:       can't create AF_LOCAL "
1920                         "transport socket (%d).\n", -status);
1921                 goto out;
1922         }
1923         xs_reclassify_socketu(sock);
1924
1925         dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1926                         xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1927
1928         status = xs_local_finish_connecting(xprt, sock);
1929         switch (status) {
1930         case 0:
1931                 dprintk("RPC:       xprt %p connected to %s\n",
1932                                 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1933                 xprt_set_connected(xprt);
1934                 break;
1935         case -ENOENT:
1936                 dprintk("RPC:       xprt %p: socket %s does not exist\n",
1937                                 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1938                 break;
1939         default:
1940                 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1941                                 __func__, -status,
1942                                 xprt->address_strings[RPC_DISPLAY_ADDR]);
1943         }
1944
1945 out:
1946         xprt_clear_connecting(xprt);
1947         xprt_wake_pending_tasks(xprt, status);
1948         current->flags &= ~PF_FSTRANS;
1949 }
1950
1951 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1952 {
1953         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1954
1955         if (!transport->inet) {
1956                 struct sock *sk = sock->sk;
1957
1958                 write_lock_bh(&sk->sk_callback_lock);
1959
1960                 xs_save_old_callbacks(transport, sk);
1961
1962                 sk->sk_user_data = xprt;
1963                 sk->sk_data_ready = xs_udp_data_ready;
1964                 sk->sk_write_space = xs_udp_write_space;
1965                 sk->sk_no_check = UDP_CSUM_NORCV;
1966                 sk->sk_allocation = GFP_ATOMIC;
1967
1968                 xprt_set_connected(xprt);
1969
1970                 /* Reset to new socket */
1971                 transport->sock = sock;
1972                 transport->inet = sk;
1973
1974                 write_unlock_bh(&sk->sk_callback_lock);
1975         }
1976         xs_udp_do_set_buffer_size(xprt);
1977 }
1978
1979 static void xs_udp_setup_socket(struct work_struct *work)
1980 {
1981         struct sock_xprt *transport =
1982                 container_of(work, struct sock_xprt, connect_worker.work);
1983         struct rpc_xprt *xprt = &transport->xprt;
1984         struct socket *sock = transport->sock;
1985         int status = -EIO;
1986
1987         if (xprt->shutdown)
1988                 goto out;
1989
1990         current->flags |= PF_FSTRANS;
1991
1992         /* Start by resetting any existing state */
1993         xs_reset_transport(transport);
1994         sock = xs_create_sock(xprt, transport,
1995                         xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
1996         if (IS_ERR(sock))
1997                 goto out;
1998
1999         dprintk("RPC:       worker connecting xprt %p via %s to "
2000                                 "%s (port %s)\n", xprt,
2001                         xprt->address_strings[RPC_DISPLAY_PROTO],
2002                         xprt->address_strings[RPC_DISPLAY_ADDR],
2003                         xprt->address_strings[RPC_DISPLAY_PORT]);
2004
2005         xs_udp_finish_connecting(xprt, sock);
2006         status = 0;
2007 out:
2008         xprt_clear_connecting(xprt);
2009         xprt_wake_pending_tasks(xprt, status);
2010         current->flags &= ~PF_FSTRANS;
2011 }
2012
2013 /*
2014  * We need to preserve the port number so the reply cache on the server can
2015  * find our cached RPC replies when we get around to reconnecting.
2016  */
2017 static void xs_abort_connection(struct sock_xprt *transport)
2018 {
2019         int result;
2020         struct sockaddr any;
2021
2022         dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2023
2024         /*
2025          * Disconnect the transport socket by doing a connect operation
2026          * with AF_UNSPEC.  This should return immediately...
2027          */
2028         memset(&any, 0, sizeof(any));
2029         any.sa_family = AF_UNSPEC;
2030         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2031         if (!result)
2032                 xs_sock_reset_connection_flags(&transport->xprt);
2033         dprintk("RPC:       AF_UNSPEC connect return code %d\n", result);
2034 }
2035
2036 static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2037 {
2038         unsigned int state = transport->inet->sk_state;
2039
2040         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2041                 /* we don't need to abort the connection if the socket
2042                  * hasn't undergone a shutdown
2043                  */
2044                 if (transport->inet->sk_shutdown == 0)
2045                         return;
2046                 dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2047                                 __func__, transport->inet->sk_shutdown);
2048         }
2049         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2050                 /* we don't need to abort the connection if the socket
2051                  * hasn't undergone a shutdown
2052                  */
2053                 if (transport->inet->sk_shutdown == 0)
2054                         return;
2055                 dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2056                                 "sk_shutdown set to %d\n",
2057                                 __func__, transport->inet->sk_shutdown);
2058         }
2059         xs_abort_connection(transport);
2060 }
2061
2062 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2063 {
2064         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2065         int ret = -ENOTCONN;
2066
2067         if (!transport->inet) {
2068                 struct sock *sk = sock->sk;
2069
2070                 write_lock_bh(&sk->sk_callback_lock);
2071
2072                 xs_save_old_callbacks(transport, sk);
2073
2074                 sk->sk_user_data = xprt;
2075                 sk->sk_data_ready = xs_tcp_data_ready;
2076                 sk->sk_state_change = xs_tcp_state_change;
2077                 sk->sk_write_space = xs_tcp_write_space;
2078                 sk->sk_allocation = GFP_ATOMIC;
2079
2080                 /* socket options */
2081                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2082                 sock_reset_flag(sk, SOCK_LINGER);
2083                 tcp_sk(sk)->linger2 = 0;
2084                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2085
2086                 xprt_clear_connected(xprt);
2087
2088                 /* Reset to new socket */
2089                 transport->sock = sock;
2090                 transport->inet = sk;
2091
2092                 write_unlock_bh(&sk->sk_callback_lock);
2093         }
2094
2095         if (!xprt_bound(xprt))
2096                 goto out;
2097
2098         /* Tell the socket layer to start connecting... */
2099         xprt->stat.connect_count++;
2100         xprt->stat.connect_start = jiffies;
2101         ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2102         switch (ret) {
2103         case 0:
2104         case -EINPROGRESS:
2105                 /* SYN_SENT! */
2106                 xprt->connect_cookie++;
2107                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2108                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2109         }
2110 out:
2111         return ret;
2112 }
2113
2114 /**
2115  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2116  * @xprt: RPC transport to connect
2117  * @transport: socket transport to connect
2118  * @create_sock: function to create a socket of the correct type
2119  *
2120  * Invoked by a work queue tasklet.
2121  */
2122 static void xs_tcp_setup_socket(struct work_struct *work)
2123 {
2124         struct sock_xprt *transport =
2125                 container_of(work, struct sock_xprt, connect_worker.work);
2126         struct socket *sock = transport->sock;
2127         struct rpc_xprt *xprt = &transport->xprt;
2128         int status = -EIO;
2129
2130         if (xprt->shutdown)
2131                 goto out;
2132
2133         current->flags |= PF_FSTRANS;
2134
2135         if (!sock) {
2136                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2137                 sock = xs_create_sock(xprt, transport,
2138                                 xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2139                 if (IS_ERR(sock)) {
2140                         status = PTR_ERR(sock);
2141                         goto out;
2142                 }
2143         } else {
2144                 int abort_and_exit;
2145
2146                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2147                                 &xprt->state);
2148                 /* "close" the socket, preserving the local port */
2149                 xs_tcp_reuse_connection(transport);
2150
2151                 if (abort_and_exit)
2152                         goto out_eagain;
2153         }
2154
2155         dprintk("RPC:       worker connecting xprt %p via %s to "
2156                                 "%s (port %s)\n", xprt,
2157                         xprt->address_strings[RPC_DISPLAY_PROTO],
2158                         xprt->address_strings[RPC_DISPLAY_ADDR],
2159                         xprt->address_strings[RPC_DISPLAY_PORT]);
2160
2161         status = xs_tcp_finish_connecting(xprt, sock);
2162         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2163                         xprt, -status, xprt_connected(xprt),
2164                         sock->sk->sk_state);
2165         switch (status) {
2166         default:
2167                 printk("%s: connect returned unhandled error %d\n",
2168                         __func__, status);
2169         case -EADDRNOTAVAIL:
2170                 /* We're probably in TIME_WAIT. Get rid of existing socket,
2171                  * and retry
2172                  */
2173                 xs_tcp_force_close(xprt);
2174                 break;
2175         case -ECONNREFUSED:
2176         case -ECONNRESET:
2177         case -ENETUNREACH:
2178                 /* retry with existing socket, after a delay */
2179         case 0:
2180         case -EINPROGRESS:
2181         case -EALREADY:
2182                 xprt_clear_connecting(xprt);
2183                 current->flags &= ~PF_FSTRANS;
2184                 return;
2185         case -EINVAL:
2186                 /* Happens, for instance, if the user specified a link
2187                  * local IPv6 address without a scope-id.
2188                  */
2189                 goto out;
2190         }
2191 out_eagain:
2192         status = -EAGAIN;
2193 out:
2194         xprt_clear_connecting(xprt);
2195         xprt_wake_pending_tasks(xprt, status);
2196         current->flags &= ~PF_FSTRANS;
2197 }
2198
2199 /**
2200  * xs_connect - connect a socket to a remote endpoint
2201  * @task: address of RPC task that manages state of connect request
2202  *
2203  * TCP: If the remote end dropped the connection, delay reconnecting.
2204  *
2205  * UDP socket connects are synchronous, but we use a work queue anyway
2206  * to guarantee that even unprivileged user processes can set up a
2207  * socket on a privileged port.
2208  *
2209  * If a UDP socket connect fails, the delay behavior here prevents
2210  * retry floods (hard mounts).
2211  */
2212 static void xs_connect(struct rpc_task *task)
2213 {
2214         struct rpc_xprt *xprt = task->tk_xprt;
2215         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2216
2217         if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2218                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2219                                 "seconds\n",
2220                                 xprt, xprt->reestablish_timeout / HZ);
2221                 queue_delayed_work(rpciod_workqueue,
2222                                    &transport->connect_worker,
2223                                    xprt->reestablish_timeout);
2224                 xprt->reestablish_timeout <<= 1;
2225                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2226                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2227                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2228                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2229         } else {
2230                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2231                 queue_delayed_work(rpciod_workqueue,
2232                                    &transport->connect_worker, 0);
2233         }
2234 }
2235
2236 /**
2237  * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2238  * @xprt: rpc_xprt struct containing statistics
2239  * @seq: output file
2240  *
2241  */
2242 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2243 {
2244         long idle_time = 0;
2245
2246         if (xprt_connected(xprt))
2247                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2248
2249         seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2250                         "%llu %llu\n",
2251                         xprt->stat.bind_count,
2252                         xprt->stat.connect_count,
2253                         xprt->stat.connect_time,
2254                         idle_time,
2255                         xprt->stat.sends,
2256                         xprt->stat.recvs,
2257                         xprt->stat.bad_xids,
2258                         xprt->stat.req_u,
2259                         xprt->stat.bklog_u);
2260 }
2261
2262 /**
2263  * xs_udp_print_stats - display UDP socket-specifc stats
2264  * @xprt: rpc_xprt struct containing statistics
2265  * @seq: output file
2266  *
2267  */
2268 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2269 {
2270         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2271
2272         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2273                         transport->srcport,
2274                         xprt->stat.bind_count,
2275                         xprt->stat.sends,
2276                         xprt->stat.recvs,
2277                         xprt->stat.bad_xids,
2278                         xprt->stat.req_u,
2279                         xprt->stat.bklog_u);
2280 }
2281
2282 /**
2283  * xs_tcp_print_stats - display TCP socket-specifc stats
2284  * @xprt: rpc_xprt struct containing statistics
2285  * @seq: output file
2286  *
2287  */
2288 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2289 {
2290         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2291         long idle_time = 0;
2292
2293         if (xprt_connected(xprt))
2294                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2295
2296         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2297                         transport->srcport,
2298                         xprt->stat.bind_count,
2299                         xprt->stat.connect_count,
2300                         xprt->stat.connect_time,
2301                         idle_time,
2302                         xprt->stat.sends,
2303                         xprt->stat.recvs,
2304                         xprt->stat.bad_xids,
2305                         xprt->stat.req_u,
2306                         xprt->stat.bklog_u);
2307 }
2308
2309 /*
2310  * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2311  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2312  * to use the server side send routines.
2313  */
2314 static void *bc_malloc(struct rpc_task *task, size_t size)
2315 {
2316         struct page *page;
2317         struct rpc_buffer *buf;
2318
2319         BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2320         page = alloc_page(GFP_KERNEL);
2321
2322         if (!page)
2323                 return NULL;
2324
2325         buf = page_address(page);
2326         buf->len = PAGE_SIZE;
2327
2328         return buf->data;
2329 }
2330
2331 /*
2332  * Free the space allocated in the bc_alloc routine
2333  */
2334 static void bc_free(void *buffer)
2335 {
2336         struct rpc_buffer *buf;
2337
2338         if (!buffer)
2339                 return;
2340
2341         buf = container_of(buffer, struct rpc_buffer, data);
2342         free_page((unsigned long)buf);
2343 }
2344
2345 /*
2346  * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2347  * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2348  */
2349 static int bc_sendto(struct rpc_rqst *req)
2350 {
2351         int len;
2352         struct xdr_buf *xbufp = &req->rq_snd_buf;
2353         struct rpc_xprt *xprt = req->rq_xprt;
2354         struct sock_xprt *transport =
2355                                 container_of(xprt, struct sock_xprt, xprt);
2356         struct socket *sock = transport->sock;
2357         unsigned long headoff;
2358         unsigned long tailoff;
2359
2360         xs_encode_stream_record_marker(xbufp);
2361
2362         tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2363         headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2364         len = svc_send_common(sock, xbufp,
2365                               virt_to_page(xbufp->head[0].iov_base), headoff,
2366                               xbufp->tail[0].iov_base, tailoff);
2367
2368         if (len != xbufp->len) {
2369                 printk(KERN_NOTICE "Error sending entire callback!\n");
2370                 len = -EAGAIN;
2371         }
2372
2373         return len;
2374 }
2375
2376 /*
2377  * The send routine. Borrows from svc_send
2378  */
2379 static int bc_send_request(struct rpc_task *task)
2380 {
2381         struct rpc_rqst *req = task->tk_rqstp;
2382         struct svc_xprt *xprt;
2383         struct svc_sock         *svsk;
2384         u32                     len;
2385
2386         dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2387         /*
2388          * Get the server socket associated with this callback xprt
2389          */
2390         xprt = req->rq_xprt->bc_xprt;
2391         svsk = container_of(xprt, struct svc_sock, sk_xprt);
2392
2393         /*
2394          * Grab the mutex to serialize data as the connection is shared
2395          * with the fore channel
2396          */
2397         if (!mutex_trylock(&xprt->xpt_mutex)) {
2398                 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2399                 if (!mutex_trylock(&xprt->xpt_mutex))
2400                         return -EAGAIN;
2401                 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2402         }
2403         if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2404                 len = -ENOTCONN;
2405         else
2406                 len = bc_sendto(req);
2407         mutex_unlock(&xprt->xpt_mutex);
2408
2409         if (len > 0)
2410                 len = 0;
2411
2412         return len;
2413 }
2414
2415 /*
2416  * The close routine. Since this is client initiated, we do nothing
2417  */
2418
2419 static void bc_close(struct rpc_xprt *xprt)
2420 {
2421 }
2422
2423 /*
2424  * The xprt destroy routine. Again, because this connection is client
2425  * initiated, we do nothing
2426  */
2427
2428 static void bc_destroy(struct rpc_xprt *xprt)
2429 {
2430 }
2431
2432 static struct rpc_xprt_ops xs_local_ops = {
2433         .reserve_xprt           = xprt_reserve_xprt,
2434         .release_xprt           = xs_tcp_release_xprt,
2435         .alloc_slot             = xprt_alloc_slot,
2436         .rpcbind                = xs_local_rpcbind,
2437         .set_port               = xs_local_set_port,
2438         .connect                = xs_connect,
2439         .buf_alloc              = rpc_malloc,
2440         .buf_free               = rpc_free,
2441         .send_request           = xs_local_send_request,
2442         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2443         .close                  = xs_close,
2444         .destroy                = xs_destroy,
2445         .print_stats            = xs_local_print_stats,
2446 };
2447
2448 static struct rpc_xprt_ops xs_udp_ops = {
2449         .set_buffer_size        = xs_udp_set_buffer_size,
2450         .reserve_xprt           = xprt_reserve_xprt_cong,
2451         .release_xprt           = xprt_release_xprt_cong,
2452         .alloc_slot             = xprt_alloc_slot,
2453         .rpcbind                = rpcb_getport_async,
2454         .set_port               = xs_set_port,
2455         .connect                = xs_connect,
2456         .buf_alloc              = rpc_malloc,
2457         .buf_free               = rpc_free,
2458         .send_request           = xs_udp_send_request,
2459         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2460         .timer                  = xs_udp_timer,
2461         .release_request        = xprt_release_rqst_cong,
2462         .close                  = xs_close,
2463         .destroy                = xs_destroy,
2464         .print_stats            = xs_udp_print_stats,
2465 };
2466
2467 static struct rpc_xprt_ops xs_tcp_ops = {
2468         .reserve_xprt           = xprt_reserve_xprt,
2469         .release_xprt           = xs_tcp_release_xprt,
2470         .alloc_slot             = xprt_lock_and_alloc_slot,
2471         .rpcbind                = rpcb_getport_async,
2472         .set_port               = xs_set_port,
2473         .connect                = xs_connect,
2474         .buf_alloc              = rpc_malloc,
2475         .buf_free               = rpc_free,
2476         .send_request           = xs_tcp_send_request,
2477         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2478         .close                  = xs_tcp_close,
2479         .destroy                = xs_destroy,
2480         .print_stats            = xs_tcp_print_stats,
2481 };
2482
2483 /*
2484  * The rpc_xprt_ops for the server backchannel
2485  */
2486
2487 static struct rpc_xprt_ops bc_tcp_ops = {
2488         .reserve_xprt           = xprt_reserve_xprt,
2489         .release_xprt           = xprt_release_xprt,
2490         .alloc_slot             = xprt_alloc_slot,
2491         .buf_alloc              = bc_malloc,
2492         .buf_free               = bc_free,
2493         .send_request           = bc_send_request,
2494         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2495         .close                  = bc_close,
2496         .destroy                = bc_destroy,
2497         .print_stats            = xs_tcp_print_stats,
2498 };
2499
2500 static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2501 {
2502         static const struct sockaddr_in sin = {
2503                 .sin_family             = AF_INET,
2504                 .sin_addr.s_addr        = htonl(INADDR_ANY),
2505         };
2506         static const struct sockaddr_in6 sin6 = {
2507                 .sin6_family            = AF_INET6,
2508                 .sin6_addr              = IN6ADDR_ANY_INIT,
2509         };
2510
2511         switch (family) {
2512         case AF_LOCAL:
2513                 break;
2514         case AF_INET:
2515                 memcpy(sap, &sin, sizeof(sin));
2516                 break;
2517         case AF_INET6:
2518                 memcpy(sap, &sin6, sizeof(sin6));
2519                 break;
2520         default:
2521                 dprintk("RPC:       %s: Bad address family\n", __func__);
2522                 return -EAFNOSUPPORT;
2523         }
2524         return 0;
2525 }
2526
2527 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2528                                       unsigned int slot_table_size,
2529                                       unsigned int max_slot_table_size)
2530 {
2531         struct rpc_xprt *xprt;
2532         struct sock_xprt *new;
2533
2534         if (args->addrlen > sizeof(xprt->addr)) {
2535                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2536                 return ERR_PTR(-EBADF);
2537         }
2538
2539         xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2540                         max_slot_table_size);
2541         if (xprt == NULL) {
2542                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2543                                 "rpc_xprt\n");
2544                 return ERR_PTR(-ENOMEM);
2545         }
2546
2547         new = container_of(xprt, struct sock_xprt, xprt);
2548         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2549         xprt->addrlen = args->addrlen;
2550         if (args->srcaddr)
2551                 memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2552         else {
2553                 int err;
2554                 err = xs_init_anyaddr(args->dstaddr->sa_family,
2555                                         (struct sockaddr *)&new->srcaddr);
2556                 if (err != 0) {
2557                         xprt_free(xprt);
2558                         return ERR_PTR(err);
2559                 }
2560         }
2561
2562         return xprt;
2563 }
2564
2565 static const struct rpc_timeout xs_local_default_timeout = {
2566         .to_initval = 10 * HZ,
2567         .to_maxval = 10 * HZ,
2568         .to_retries = 2,
2569 };
2570
2571 /**
2572  * xs_setup_local - Set up transport to use an AF_LOCAL socket
2573  * @args: rpc transport creation arguments
2574  *
2575  * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2576  */
2577 static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2578 {
2579         struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2580         struct sock_xprt *transport;
2581         struct rpc_xprt *xprt;
2582         struct rpc_xprt *ret;
2583
2584         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2585                         xprt_max_tcp_slot_table_entries);
2586         if (IS_ERR(xprt))
2587                 return xprt;
2588         transport = container_of(xprt, struct sock_xprt, xprt);
2589
2590         xprt->prot = 0;
2591         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2592         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2593
2594         xprt->bind_timeout = XS_BIND_TO;
2595         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2596         xprt->idle_timeout = XS_IDLE_DISC_TO;
2597
2598         xprt->ops = &xs_local_ops;
2599         xprt->timeout = &xs_local_default_timeout;
2600
2601         switch (sun->sun_family) {
2602         case AF_LOCAL:
2603                 if (sun->sun_path[0] != '/') {
2604                         dprintk("RPC:       bad AF_LOCAL address: %s\n",
2605                                         sun->sun_path);
2606                         ret = ERR_PTR(-EINVAL);
2607                         goto out_err;
2608                 }
2609                 xprt_set_bound(xprt);
2610                 INIT_DELAYED_WORK(&transport->connect_worker,
2611                                         xs_local_setup_socket);
2612                 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2613                 break;
2614         default:
2615                 ret = ERR_PTR(-EAFNOSUPPORT);
2616                 goto out_err;
2617         }
2618
2619         dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2620                         xprt->address_strings[RPC_DISPLAY_ADDR]);
2621
2622         if (try_module_get(THIS_MODULE))
2623                 return xprt;
2624         ret = ERR_PTR(-EINVAL);
2625 out_err:
2626         xprt_free(xprt);
2627         return ret;
2628 }
2629
2630 static const struct rpc_timeout xs_udp_default_timeout = {
2631         .to_initval = 5 * HZ,
2632         .to_maxval = 30 * HZ,
2633         .to_increment = 5 * HZ,
2634         .to_retries = 5,
2635 };
2636
2637 /**
2638  * xs_setup_udp - Set up transport to use a UDP socket
2639  * @args: rpc transport creation arguments
2640  *
2641  */
2642 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2643 {
2644         struct sockaddr *addr = args->dstaddr;
2645         struct rpc_xprt *xprt;
2646         struct sock_xprt *transport;
2647         struct rpc_xprt *ret;
2648
2649         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2650                         xprt_udp_slot_table_entries);
2651         if (IS_ERR(xprt))
2652                 return xprt;
2653         transport = container_of(xprt, struct sock_xprt, xprt);
2654
2655         xprt->prot = IPPROTO_UDP;
2656         xprt->tsh_size = 0;
2657         /* XXX: header size can vary due to auth type, IPv6, etc. */
2658         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2659
2660         xprt->bind_timeout = XS_BIND_TO;
2661         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2662         xprt->idle_timeout = XS_IDLE_DISC_TO;
2663
2664         xprt->ops = &xs_udp_ops;
2665
2666         xprt->timeout = &xs_udp_default_timeout;
2667
2668         switch (addr->sa_family) {
2669         case AF_INET:
2670                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2671                         xprt_set_bound(xprt);
2672
2673                 INIT_DELAYED_WORK(&transport->connect_worker,
2674                                         xs_udp_setup_socket);
2675                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2676                 break;
2677         case AF_INET6:
2678                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2679                         xprt_set_bound(xprt);
2680
2681                 INIT_DELAYED_WORK(&transport->connect_worker,
2682                                         xs_udp_setup_socket);
2683                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2684                 break;
2685         default:
2686                 ret = ERR_PTR(-EAFNOSUPPORT);
2687                 goto out_err;
2688         }
2689
2690         if (xprt_bound(xprt))
2691                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2692                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2693                                 xprt->address_strings[RPC_DISPLAY_PORT],
2694                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2695         else
2696                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2697                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2698                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2699
2700         if (try_module_get(THIS_MODULE))
2701                 return xprt;
2702         ret = ERR_PTR(-EINVAL);
2703 out_err:
2704         xprt_free(xprt);
2705         return ret;
2706 }
2707
2708 static const struct rpc_timeout xs_tcp_default_timeout = {
2709         .to_initval = 60 * HZ,
2710         .to_maxval = 60 * HZ,
2711         .to_retries = 2,
2712 };
2713
2714 /**
2715  * xs_setup_tcp - Set up transport to use a TCP socket
2716  * @args: rpc transport creation arguments
2717  *
2718  */
2719 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2720 {
2721         struct sockaddr *addr = args->dstaddr;
2722         struct rpc_xprt *xprt;
2723         struct sock_xprt *transport;
2724         struct rpc_xprt *ret;
2725
2726         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2727                         xprt_max_tcp_slot_table_entries);
2728         if (IS_ERR(xprt))
2729                 return xprt;
2730         transport = container_of(xprt, struct sock_xprt, xprt);
2731
2732         xprt->prot = IPPROTO_TCP;
2733         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2734         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2735
2736         xprt->bind_timeout = XS_BIND_TO;
2737         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2738         xprt->idle_timeout = XS_IDLE_DISC_TO;
2739
2740         xprt->ops = &xs_tcp_ops;
2741         xprt->timeout = &xs_tcp_default_timeout;
2742
2743         switch (addr->sa_family) {
2744         case AF_INET:
2745                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2746                         xprt_set_bound(xprt);
2747
2748                 INIT_DELAYED_WORK(&transport->connect_worker,
2749                                         xs_tcp_setup_socket);
2750                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2751                 break;
2752         case AF_INET6:
2753                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2754                         xprt_set_bound(xprt);
2755
2756                 INIT_DELAYED_WORK(&transport->connect_worker,
2757                                         xs_tcp_setup_socket);
2758                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2759                 break;
2760         default:
2761                 ret = ERR_PTR(-EAFNOSUPPORT);
2762                 goto out_err;
2763         }
2764
2765         if (xprt_bound(xprt))
2766                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2767                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2768                                 xprt->address_strings[RPC_DISPLAY_PORT],
2769                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2770         else
2771                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2772                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2773                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2774
2775
2776         if (try_module_get(THIS_MODULE))
2777                 return xprt;
2778         ret = ERR_PTR(-EINVAL);
2779 out_err:
2780         xprt_free(xprt);
2781         return ret;
2782 }
2783
2784 /**
2785  * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2786  * @args: rpc transport creation arguments
2787  *
2788  */
2789 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2790 {
2791         struct sockaddr *addr = args->dstaddr;
2792         struct rpc_xprt *xprt;
2793         struct sock_xprt *transport;
2794         struct svc_sock *bc_sock;
2795         struct rpc_xprt *ret;
2796
2797         if (args->bc_xprt->xpt_bc_xprt) {
2798                 /*
2799                  * This server connection already has a backchannel
2800                  * export; we can't create a new one, as we wouldn't be
2801                  * able to match replies based on xid any more.  So,
2802                  * reuse the already-existing one:
2803                  */
2804                  return args->bc_xprt->xpt_bc_xprt;
2805         }
2806         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2807                         xprt_tcp_slot_table_entries);
2808         if (IS_ERR(xprt))
2809                 return xprt;
2810         transport = container_of(xprt, struct sock_xprt, xprt);
2811
2812         xprt->prot = IPPROTO_TCP;
2813         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2814         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2815         xprt->timeout = &xs_tcp_default_timeout;
2816
2817         /* backchannel */
2818         xprt_set_bound(xprt);
2819         xprt->bind_timeout = 0;
2820         xprt->reestablish_timeout = 0;
2821         xprt->idle_timeout = 0;
2822
2823         xprt->ops = &bc_tcp_ops;
2824
2825         switch (addr->sa_family) {
2826         case AF_INET:
2827                 xs_format_peer_addresses(xprt, "tcp",
2828                                          RPCBIND_NETID_TCP);
2829                 break;
2830         case AF_INET6:
2831                 xs_format_peer_addresses(xprt, "tcp",
2832                                    RPCBIND_NETID_TCP6);
2833                 break;
2834         default:
2835                 ret = ERR_PTR(-EAFNOSUPPORT);
2836                 goto out_err;
2837         }
2838
2839         dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2840                         xprt->address_strings[RPC_DISPLAY_ADDR],
2841                         xprt->address_strings[RPC_DISPLAY_PORT],
2842                         xprt->address_strings[RPC_DISPLAY_PROTO]);
2843
2844         /*
2845          * Once we've associated a backchannel xprt with a connection,
2846          * we want to keep it around as long as long as the connection
2847          * lasts, in case we need to start using it for a backchannel
2848          * again; this reference won't be dropped until bc_xprt is
2849          * destroyed.
2850          */
2851         xprt_get(xprt);
2852         args->bc_xprt->xpt_bc_xprt = xprt;
2853         xprt->bc_xprt = args->bc_xprt;
2854         bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2855         transport->sock = bc_sock->sk_sock;
2856         transport->inet = bc_sock->sk_sk;
2857
2858         /*
2859          * Since we don't want connections for the backchannel, we set
2860          * the xprt status to connected
2861          */
2862         xprt_set_connected(xprt);
2863
2864
2865         if (try_module_get(THIS_MODULE))
2866                 return xprt;
2867         xprt_put(xprt);
2868         ret = ERR_PTR(-EINVAL);
2869 out_err:
2870         xprt_free(xprt);
2871         return ret;
2872 }
2873
2874 static struct xprt_class        xs_local_transport = {
2875         .list           = LIST_HEAD_INIT(xs_local_transport.list),
2876         .name           = "named UNIX socket",
2877         .owner          = THIS_MODULE,
2878         .ident          = XPRT_TRANSPORT_LOCAL,
2879         .setup          = xs_setup_local,
2880 };
2881
2882 static struct xprt_class        xs_udp_transport = {
2883         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2884         .name           = "udp",
2885         .owner          = THIS_MODULE,
2886         .ident          = XPRT_TRANSPORT_UDP,
2887         .setup          = xs_setup_udp,
2888 };
2889
2890 static struct xprt_class        xs_tcp_transport = {
2891         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2892         .name           = "tcp",
2893         .owner          = THIS_MODULE,
2894         .ident          = XPRT_TRANSPORT_TCP,
2895         .setup          = xs_setup_tcp,
2896 };
2897
2898 static struct xprt_class        xs_bc_tcp_transport = {
2899         .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2900         .name           = "tcp NFSv4.1 backchannel",
2901         .owner          = THIS_MODULE,
2902         .ident          = XPRT_TRANSPORT_BC_TCP,
2903         .setup          = xs_setup_bc_tcp,
2904 };
2905
2906 /**
2907  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2908  *
2909  */
2910 int init_socket_xprt(void)
2911 {
2912 #ifdef RPC_DEBUG
2913         if (!sunrpc_table_header)
2914                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2915 #endif
2916
2917         xprt_register_transport(&xs_local_transport);
2918         xprt_register_transport(&xs_udp_transport);
2919         xprt_register_transport(&xs_tcp_transport);
2920         xprt_register_transport(&xs_bc_tcp_transport);
2921
2922         return 0;
2923 }
2924
2925 /**
2926  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2927  *
2928  */
2929 void cleanup_socket_xprt(void)
2930 {
2931 #ifdef RPC_DEBUG
2932         if (sunrpc_table_header) {
2933                 unregister_sysctl_table(sunrpc_table_header);
2934                 sunrpc_table_header = NULL;
2935         }
2936 #endif
2937
2938         xprt_unregister_transport(&xs_local_transport);
2939         xprt_unregister_transport(&xs_udp_transport);
2940         xprt_unregister_transport(&xs_tcp_transport);
2941         xprt_unregister_transport(&xs_bc_tcp_transport);
2942 }
2943
2944 static int param_set_uint_minmax(const char *val,
2945                 const struct kernel_param *kp,
2946                 unsigned int min, unsigned int max)
2947 {
2948         unsigned long num;
2949         int ret;
2950
2951         if (!val)
2952                 return -EINVAL;
2953         ret = strict_strtoul(val, 0, &num);
2954         if (ret == -EINVAL || num < min || num > max)
2955                 return -EINVAL;
2956         *((unsigned int *)kp->arg) = num;
2957         return 0;
2958 }
2959
2960 static int param_set_portnr(const char *val, const struct kernel_param *kp)
2961 {
2962         return param_set_uint_minmax(val, kp,
2963                         RPC_MIN_RESVPORT,
2964                         RPC_MAX_RESVPORT);
2965 }
2966
2967 static struct kernel_param_ops param_ops_portnr = {
2968         .set = param_set_portnr,
2969         .get = param_get_uint,
2970 };
2971
2972 #define param_check_portnr(name, p) \
2973         __param_check(name, p, unsigned int);
2974
2975 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2976 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2977
2978 static int param_set_slot_table_size(const char *val,
2979                                      const struct kernel_param *kp)
2980 {
2981         return param_set_uint_minmax(val, kp,
2982                         RPC_MIN_SLOT_TABLE,
2983                         RPC_MAX_SLOT_TABLE);
2984 }
2985
2986 static struct kernel_param_ops param_ops_slot_table_size = {
2987         .set = param_set_slot_table_size,
2988         .get = param_get_uint,
2989 };
2990
2991 #define param_check_slot_table_size(name, p) \
2992         __param_check(name, p, unsigned int);
2993
2994 static int param_set_max_slot_table_size(const char *val,
2995                                      const struct kernel_param *kp)
2996 {
2997         return param_set_uint_minmax(val, kp,
2998                         RPC_MIN_SLOT_TABLE,
2999                         RPC_MAX_SLOT_TABLE_LIMIT);
3000 }
3001
3002 static struct kernel_param_ops param_ops_max_slot_table_size = {
3003         .set = param_set_max_slot_table_size,
3004         .get = param_get_uint,
3005 };
3006
3007 #define param_check_max_slot_table_size(name, p) \
3008         __param_check(name, p, unsigned int);
3009
3010 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3011                    slot_table_size, 0644);
3012 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3013                    max_slot_table_size, 0644);
3014 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3015                    slot_table_size, 0644);
3016