bfb78fab06eacddac77b11c3725ef4274d7ce211
[pandora-kernel.git] / net / sunrpc / xprtsock.c
1 /*
2  * linux/net/sunrpc/xprtsock.c
3  *
4  * Client-side transport implementation for sockets.
5  *
6  * TCP callback races fixes (C) 1998 Red Hat
7  * TCP send fixes (C) 1998 Red Hat
8  * TCP NFS related read + write fixes
9  *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10  *
11  * Rewrite of larges part of the code in order to stabilize TCP stuff.
12  * Fix behaviour when socket buffer is full.
13  *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14  *
15  * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16  *
17  * IPv6 support contributed by Gilles Quillard, Bull Open Source, 2005.
18  *   <gilles.quillard@bull.net>
19  */
20
21 #include <linux/types.h>
22 #include <linux/string.h>
23 #include <linux/slab.h>
24 #include <linux/module.h>
25 #include <linux/capability.h>
26 #include <linux/pagemap.h>
27 #include <linux/errno.h>
28 #include <linux/socket.h>
29 #include <linux/in.h>
30 #include <linux/net.h>
31 #include <linux/mm.h>
32 #include <linux/un.h>
33 #include <linux/udp.h>
34 #include <linux/tcp.h>
35 #include <linux/sunrpc/clnt.h>
36 #include <linux/sunrpc/sched.h>
37 #include <linux/sunrpc/svcsock.h>
38 #include <linux/sunrpc/xprtsock.h>
39 #include <linux/file.h>
40 #ifdef CONFIG_SUNRPC_BACKCHANNEL
41 #include <linux/sunrpc/bc_xprt.h>
42 #endif
43
44 #include <net/sock.h>
45 #include <net/checksum.h>
46 #include <net/udp.h>
47 #include <net/tcp.h>
48
49 #include "sunrpc.h"
50
51 static void xs_close(struct rpc_xprt *xprt);
52
53 /*
54  * xprtsock tunables
55  */
56 unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
57 unsigned int xprt_tcp_slot_table_entries = RPC_MIN_SLOT_TABLE;
58 unsigned int xprt_max_tcp_slot_table_entries = RPC_MAX_SLOT_TABLE;
59
60 unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
61 unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
62
63 #define XS_TCP_LINGER_TO        (15U * HZ)
64 static unsigned int xs_tcp_fin_timeout __read_mostly = XS_TCP_LINGER_TO;
65
66 /*
67  * We can register our own files under /proc/sys/sunrpc by
68  * calling register_sysctl_table() again.  The files in that
69  * directory become the union of all files registered there.
70  *
71  * We simply need to make sure that we don't collide with
72  * someone else's file names!
73  */
74
75 #ifdef RPC_DEBUG
76
77 static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
78 static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
79 static unsigned int max_tcp_slot_table_limit = RPC_MAX_SLOT_TABLE_LIMIT;
80 static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
81 static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
82
83 static struct ctl_table_header *sunrpc_table_header;
84
85 /*
86  * FIXME: changing the UDP slot table size should also resize the UDP
87  *        socket buffers for existing UDP transports
88  */
89 static ctl_table xs_tunables_table[] = {
90         {
91                 .procname       = "udp_slot_table_entries",
92                 .data           = &xprt_udp_slot_table_entries,
93                 .maxlen         = sizeof(unsigned int),
94                 .mode           = 0644,
95                 .proc_handler   = proc_dointvec_minmax,
96                 .extra1         = &min_slot_table_size,
97                 .extra2         = &max_slot_table_size
98         },
99         {
100                 .procname       = "tcp_slot_table_entries",
101                 .data           = &xprt_tcp_slot_table_entries,
102                 .maxlen         = sizeof(unsigned int),
103                 .mode           = 0644,
104                 .proc_handler   = proc_dointvec_minmax,
105                 .extra1         = &min_slot_table_size,
106                 .extra2         = &max_slot_table_size
107         },
108         {
109                 .procname       = "tcp_max_slot_table_entries",
110                 .data           = &xprt_max_tcp_slot_table_entries,
111                 .maxlen         = sizeof(unsigned int),
112                 .mode           = 0644,
113                 .proc_handler   = proc_dointvec_minmax,
114                 .extra1         = &min_slot_table_size,
115                 .extra2         = &max_tcp_slot_table_limit
116         },
117         {
118                 .procname       = "min_resvport",
119                 .data           = &xprt_min_resvport,
120                 .maxlen         = sizeof(unsigned int),
121                 .mode           = 0644,
122                 .proc_handler   = proc_dointvec_minmax,
123                 .extra1         = &xprt_min_resvport_limit,
124                 .extra2         = &xprt_max_resvport_limit
125         },
126         {
127                 .procname       = "max_resvport",
128                 .data           = &xprt_max_resvport,
129                 .maxlen         = sizeof(unsigned int),
130                 .mode           = 0644,
131                 .proc_handler   = proc_dointvec_minmax,
132                 .extra1         = &xprt_min_resvport_limit,
133                 .extra2         = &xprt_max_resvport_limit
134         },
135         {
136                 .procname       = "tcp_fin_timeout",
137                 .data           = &xs_tcp_fin_timeout,
138                 .maxlen         = sizeof(xs_tcp_fin_timeout),
139                 .mode           = 0644,
140                 .proc_handler   = proc_dointvec_jiffies,
141         },
142         { },
143 };
144
145 static ctl_table sunrpc_table[] = {
146         {
147                 .procname       = "sunrpc",
148                 .mode           = 0555,
149                 .child          = xs_tunables_table
150         },
151         { },
152 };
153
154 #endif
155
156 /*
157  * Wait duration for a reply from the RPC portmapper.
158  */
159 #define XS_BIND_TO              (60U * HZ)
160
161 /*
162  * Delay if a UDP socket connect error occurs.  This is most likely some
163  * kind of resource problem on the local host.
164  */
165 #define XS_UDP_REEST_TO         (2U * HZ)
166
167 /*
168  * The reestablish timeout allows clients to delay for a bit before attempting
169  * to reconnect to a server that just dropped our connection.
170  *
171  * We implement an exponential backoff when trying to reestablish a TCP
172  * transport connection with the server.  Some servers like to drop a TCP
173  * connection when they are overworked, so we start with a short timeout and
174  * increase over time if the server is down or not responding.
175  */
176 #define XS_TCP_INIT_REEST_TO    (3U * HZ)
177 #define XS_TCP_MAX_REEST_TO     (5U * 60 * HZ)
178
179 /*
180  * TCP idle timeout; client drops the transport socket if it is idle
181  * for this long.  Note that we also timeout UDP sockets to prevent
182  * holding port numbers when there is no RPC traffic.
183  */
184 #define XS_IDLE_DISC_TO         (5U * 60 * HZ)
185
186 #ifdef RPC_DEBUG
187 # undef  RPC_DEBUG_DATA
188 # define RPCDBG_FACILITY        RPCDBG_TRANS
189 #endif
190
191 #ifdef RPC_DEBUG_DATA
192 static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
193 {
194         u8 *buf = (u8 *) packet;
195         int j;
196
197         dprintk("RPC:       %s\n", msg);
198         for (j = 0; j < count && j < 128; j += 4) {
199                 if (!(j & 31)) {
200                         if (j)
201                                 dprintk("\n");
202                         dprintk("0x%04x ", j);
203                 }
204                 dprintk("%02x%02x%02x%02x ",
205                         buf[j], buf[j+1], buf[j+2], buf[j+3]);
206         }
207         dprintk("\n");
208 }
209 #else
210 static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
211 {
212         /* NOP */
213 }
214 #endif
215
216 struct sock_xprt {
217         struct rpc_xprt         xprt;
218
219         /*
220          * Network layer
221          */
222         struct socket *         sock;
223         struct sock *           inet;
224
225         /*
226          * State of TCP reply receive
227          */
228         __be32                  tcp_fraghdr,
229                                 tcp_xid,
230                                 tcp_calldir;
231
232         u32                     tcp_offset,
233                                 tcp_reclen;
234
235         unsigned long           tcp_copied,
236                                 tcp_flags;
237
238         /*
239          * Connection of transports
240          */
241         struct delayed_work     connect_worker;
242         struct sockaddr_storage srcaddr;
243         unsigned short          srcport;
244
245         /*
246          * UDP socket buffer size parameters
247          */
248         size_t                  rcvsize,
249                                 sndsize;
250
251         /*
252          * Saved socket callback addresses
253          */
254         void                    (*old_data_ready)(struct sock *, int);
255         void                    (*old_state_change)(struct sock *);
256         void                    (*old_write_space)(struct sock *);
257 };
258
259 /*
260  * TCP receive state flags
261  */
262 #define TCP_RCV_LAST_FRAG       (1UL << 0)
263 #define TCP_RCV_COPY_FRAGHDR    (1UL << 1)
264 #define TCP_RCV_COPY_XID        (1UL << 2)
265 #define TCP_RCV_COPY_DATA       (1UL << 3)
266 #define TCP_RCV_READ_CALLDIR    (1UL << 4)
267 #define TCP_RCV_COPY_CALLDIR    (1UL << 5)
268
269 /*
270  * TCP RPC flags
271  */
272 #define TCP_RPC_REPLY           (1UL << 6)
273
274 static inline struct sockaddr *xs_addr(struct rpc_xprt *xprt)
275 {
276         return (struct sockaddr *) &xprt->addr;
277 }
278
279 static inline struct sockaddr_un *xs_addr_un(struct rpc_xprt *xprt)
280 {
281         return (struct sockaddr_un *) &xprt->addr;
282 }
283
284 static inline struct sockaddr_in *xs_addr_in(struct rpc_xprt *xprt)
285 {
286         return (struct sockaddr_in *) &xprt->addr;
287 }
288
289 static inline struct sockaddr_in6 *xs_addr_in6(struct rpc_xprt *xprt)
290 {
291         return (struct sockaddr_in6 *) &xprt->addr;
292 }
293
294 static void xs_format_common_peer_addresses(struct rpc_xprt *xprt)
295 {
296         struct sockaddr *sap = xs_addr(xprt);
297         struct sockaddr_in6 *sin6;
298         struct sockaddr_in *sin;
299         struct sockaddr_un *sun;
300         char buf[128];
301
302         switch (sap->sa_family) {
303         case AF_LOCAL:
304                 sun = xs_addr_un(xprt);
305                 strlcpy(buf, sun->sun_path, sizeof(buf));
306                 xprt->address_strings[RPC_DISPLAY_ADDR] =
307                                                 kstrdup(buf, GFP_KERNEL);
308                 break;
309         case AF_INET:
310                 (void)rpc_ntop(sap, buf, sizeof(buf));
311                 xprt->address_strings[RPC_DISPLAY_ADDR] =
312                                                 kstrdup(buf, GFP_KERNEL);
313                 sin = xs_addr_in(xprt);
314                 snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
315                 break;
316         case AF_INET6:
317                 (void)rpc_ntop(sap, buf, sizeof(buf));
318                 xprt->address_strings[RPC_DISPLAY_ADDR] =
319                                                 kstrdup(buf, GFP_KERNEL);
320                 sin6 = xs_addr_in6(xprt);
321                 snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
322                 break;
323         default:
324                 BUG();
325         }
326
327         xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
328 }
329
330 static void xs_format_common_peer_ports(struct rpc_xprt *xprt)
331 {
332         struct sockaddr *sap = xs_addr(xprt);
333         char buf[128];
334
335         snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
336         xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
337
338         snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
339         xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
340 }
341
342 static void xs_format_peer_addresses(struct rpc_xprt *xprt,
343                                      const char *protocol,
344                                      const char *netid)
345 {
346         xprt->address_strings[RPC_DISPLAY_PROTO] = protocol;
347         xprt->address_strings[RPC_DISPLAY_NETID] = netid;
348         xs_format_common_peer_addresses(xprt);
349         xs_format_common_peer_ports(xprt);
350 }
351
352 static void xs_update_peer_port(struct rpc_xprt *xprt)
353 {
354         kfree(xprt->address_strings[RPC_DISPLAY_HEX_PORT]);
355         kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
356
357         xs_format_common_peer_ports(xprt);
358 }
359
360 static void xs_free_peer_addresses(struct rpc_xprt *xprt)
361 {
362         unsigned int i;
363
364         for (i = 0; i < RPC_DISPLAY_MAX; i++)
365                 switch (i) {
366                 case RPC_DISPLAY_PROTO:
367                 case RPC_DISPLAY_NETID:
368                         continue;
369                 default:
370                         kfree(xprt->address_strings[i]);
371                 }
372 }
373
374 #define XS_SENDMSG_FLAGS        (MSG_DONTWAIT | MSG_NOSIGNAL)
375
376 static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
377 {
378         struct msghdr msg = {
379                 .msg_name       = addr,
380                 .msg_namelen    = addrlen,
381                 .msg_flags      = XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
382         };
383         struct kvec iov = {
384                 .iov_base       = vec->iov_base + base,
385                 .iov_len        = vec->iov_len - base,
386         };
387
388         if (iov.iov_len != 0)
389                 return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
390         return kernel_sendmsg(sock, &msg, NULL, 0, 0);
391 }
392
393 static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more, bool zerocopy)
394 {
395         ssize_t (*do_sendpage)(struct socket *sock, struct page *page,
396                         int offset, size_t size, int flags);
397         struct page **ppage;
398         unsigned int remainder;
399         int err, sent = 0;
400
401         remainder = xdr->page_len - base;
402         base += xdr->page_base;
403         ppage = xdr->pages + (base >> PAGE_SHIFT);
404         base &= ~PAGE_MASK;
405         do_sendpage = sock->ops->sendpage;
406         if (!zerocopy)
407                 do_sendpage = sock_no_sendpage;
408         for(;;) {
409                 unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
410                 int flags = XS_SENDMSG_FLAGS;
411
412                 remainder -= len;
413                 if (remainder != 0 || more)
414                         flags |= MSG_MORE;
415                 err = do_sendpage(sock, *ppage, base, len, flags);
416                 if (remainder == 0 || err != len)
417                         break;
418                 sent += err;
419                 ppage++;
420                 base = 0;
421         }
422         if (sent == 0)
423                 return err;
424         if (err > 0)
425                 sent += err;
426         return sent;
427 }
428
429 /**
430  * xs_sendpages - write pages directly to a socket
431  * @sock: socket to send on
432  * @addr: UDP only -- address of destination
433  * @addrlen: UDP only -- length of destination address
434  * @xdr: buffer containing this request
435  * @base: starting position in the buffer
436  * @zerocopy: true if it is safe to use sendpage()
437  *
438  */
439 static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base, bool zerocopy)
440 {
441         unsigned int remainder = xdr->len - base;
442         int err, sent = 0;
443
444         if (unlikely(!sock))
445                 return -ENOTSOCK;
446
447         clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
448         if (base != 0) {
449                 addr = NULL;
450                 addrlen = 0;
451         }
452
453         if (base < xdr->head[0].iov_len || addr != NULL) {
454                 unsigned int len = xdr->head[0].iov_len - base;
455                 remainder -= len;
456                 err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
457                 if (remainder == 0 || err != len)
458                         goto out;
459                 sent += err;
460                 base = 0;
461         } else
462                 base -= xdr->head[0].iov_len;
463
464         if (base < xdr->page_len) {
465                 unsigned int len = xdr->page_len - base;
466                 remainder -= len;
467                 err = xs_send_pagedata(sock, xdr, base, remainder != 0, zerocopy);
468                 if (remainder == 0 || err != len)
469                         goto out;
470                 sent += err;
471                 base = 0;
472         } else
473                 base -= xdr->page_len;
474
475         if (base >= xdr->tail[0].iov_len)
476                 return sent;
477         err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
478 out:
479         if (sent == 0)
480                 return err;
481         if (err > 0)
482                 sent += err;
483         return sent;
484 }
485
486 static void xs_nospace_callback(struct rpc_task *task)
487 {
488         struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
489
490         transport->inet->sk_write_pending--;
491         clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
492 }
493
494 /**
495  * xs_nospace - place task on wait queue if transmit was incomplete
496  * @task: task to put to sleep
497  *
498  */
499 static int xs_nospace(struct rpc_task *task)
500 {
501         struct rpc_rqst *req = task->tk_rqstp;
502         struct rpc_xprt *xprt = req->rq_xprt;
503         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
504         int ret = -EAGAIN;
505
506         dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
507                         task->tk_pid, req->rq_slen - req->rq_bytes_sent,
508                         req->rq_slen);
509
510         /* Protect against races with write_space */
511         spin_lock_bh(&xprt->transport_lock);
512
513         /* Don't race with disconnect */
514         if (xprt_connected(xprt)) {
515                 if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
516                         /*
517                          * Notify TCP that we're limited by the application
518                          * window size
519                          */
520                         set_bit(SOCK_NOSPACE, &transport->sock->flags);
521                         transport->inet->sk_write_pending++;
522                         /* ...and wait for more buffer space */
523                         xprt_wait_for_buffer_space(task, xs_nospace_callback);
524                 }
525         } else {
526                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
527                 ret = -ENOTCONN;
528         }
529
530         spin_unlock_bh(&xprt->transport_lock);
531         return ret;
532 }
533
534 /*
535  * Construct a stream transport record marker in @buf.
536  */
537 static inline void xs_encode_stream_record_marker(struct xdr_buf *buf)
538 {
539         u32 reclen = buf->len - sizeof(rpc_fraghdr);
540         rpc_fraghdr *base = buf->head[0].iov_base;
541         *base = cpu_to_be32(RPC_LAST_STREAM_FRAGMENT | reclen);
542 }
543
544 /**
545  * xs_local_send_request - write an RPC request to an AF_LOCAL socket
546  * @task: RPC task that manages the state of an RPC request
547  *
548  * Return values:
549  *        0:    The request has been sent
550  *   EAGAIN:    The socket was blocked, please call again later to
551  *              complete the request
552  * ENOTCONN:    Caller needs to invoke connect logic then call again
553  *    other:    Some other error occured, the request was not sent
554  */
555 static int xs_local_send_request(struct rpc_task *task)
556 {
557         struct rpc_rqst *req = task->tk_rqstp;
558         struct rpc_xprt *xprt = req->rq_xprt;
559         struct sock_xprt *transport =
560                                 container_of(xprt, struct sock_xprt, xprt);
561         struct xdr_buf *xdr = &req->rq_snd_buf;
562         int status;
563
564         xs_encode_stream_record_marker(&req->rq_snd_buf);
565
566         xs_pktdump("packet data:",
567                         req->rq_svec->iov_base, req->rq_svec->iov_len);
568
569         status = xs_sendpages(transport->sock, NULL, 0,
570                                                 xdr, req->rq_bytes_sent, true);
571         dprintk("RPC:       %s(%u) = %d\n",
572                         __func__, xdr->len - req->rq_bytes_sent, status);
573         if (likely(status >= 0)) {
574                 req->rq_bytes_sent += status;
575                 req->rq_xmit_bytes_sent += status;
576                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
577                         req->rq_bytes_sent = 0;
578                         return 0;
579                 }
580                 status = -EAGAIN;
581         }
582
583         switch (status) {
584         case -EAGAIN:
585                 status = xs_nospace(task);
586                 break;
587         default:
588                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
589                         -status);
590         case -EPIPE:
591                 xs_close(xprt);
592                 status = -ENOTCONN;
593         }
594
595         return status;
596 }
597
598 /**
599  * xs_udp_send_request - write an RPC request to a UDP socket
600  * @task: address of RPC task that manages the state of an RPC request
601  *
602  * Return values:
603  *        0:    The request has been sent
604  *   EAGAIN:    The socket was blocked, please call again later to
605  *              complete the request
606  * ENOTCONN:    Caller needs to invoke connect logic then call again
607  *    other:    Some other error occurred, the request was not sent
608  */
609 static int xs_udp_send_request(struct rpc_task *task)
610 {
611         struct rpc_rqst *req = task->tk_rqstp;
612         struct rpc_xprt *xprt = req->rq_xprt;
613         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
614         struct xdr_buf *xdr = &req->rq_snd_buf;
615         int status;
616
617         xs_pktdump("packet data:",
618                                 req->rq_svec->iov_base,
619                                 req->rq_svec->iov_len);
620
621         if (!xprt_bound(xprt))
622                 return -ENOTCONN;
623         status = xs_sendpages(transport->sock,
624                               xs_addr(xprt),
625                               xprt->addrlen, xdr,
626                               req->rq_bytes_sent, true);
627
628         dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
629                         xdr->len - req->rq_bytes_sent, status);
630
631         if (status >= 0) {
632                 req->rq_xmit_bytes_sent += status;
633                 if (status >= req->rq_slen)
634                         return 0;
635                 /* Still some bytes left; set up for a retry later. */
636                 status = -EAGAIN;
637         }
638
639         switch (status) {
640         case -ENOTSOCK:
641                 status = -ENOTCONN;
642                 /* Should we call xs_close() here? */
643                 break;
644         case -EAGAIN:
645                 status = xs_nospace(task);
646                 break;
647         default:
648                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
649                         -status);
650         case -ENETUNREACH:
651         case -EPIPE:
652         case -ECONNREFUSED:
653                 /* When the server has died, an ICMP port unreachable message
654                  * prompts ECONNREFUSED. */
655                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
656         }
657
658         return status;
659 }
660
661 /**
662  * xs_tcp_shutdown - gracefully shut down a TCP socket
663  * @xprt: transport
664  *
665  * Initiates a graceful shutdown of the TCP socket by calling the
666  * equivalent of shutdown(SHUT_WR);
667  */
668 static void xs_tcp_shutdown(struct rpc_xprt *xprt)
669 {
670         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
671         struct socket *sock = transport->sock;
672
673         if (sock != NULL)
674                 kernel_sock_shutdown(sock, SHUT_WR);
675 }
676
677 /**
678  * xs_tcp_send_request - write an RPC request to a TCP socket
679  * @task: address of RPC task that manages the state of an RPC request
680  *
681  * Return values:
682  *        0:    The request has been sent
683  *   EAGAIN:    The socket was blocked, please call again later to
684  *              complete the request
685  * ENOTCONN:    Caller needs to invoke connect logic then call again
686  *    other:    Some other error occurred, the request was not sent
687  *
688  * XXX: In the case of soft timeouts, should we eventually give up
689  *      if sendmsg is not able to make progress?
690  */
691 static int xs_tcp_send_request(struct rpc_task *task)
692 {
693         struct rpc_rqst *req = task->tk_rqstp;
694         struct rpc_xprt *xprt = req->rq_xprt;
695         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
696         struct xdr_buf *xdr = &req->rq_snd_buf;
697         bool zerocopy = true;
698         int status;
699
700         xs_encode_stream_record_marker(&req->rq_snd_buf);
701
702         xs_pktdump("packet data:",
703                                 req->rq_svec->iov_base,
704                                 req->rq_svec->iov_len);
705         /* Don't use zero copy if this is a resend. If the RPC call
706          * completes while the socket holds a reference to the pages,
707          * then we may end up resending corrupted data.
708          */
709         if (task->tk_flags & RPC_TASK_SENT)
710                 zerocopy = false;
711
712         /* Continue transmitting the packet/record. We must be careful
713          * to cope with writespace callbacks arriving _after_ we have
714          * called sendmsg(). */
715         while (1) {
716                 status = xs_sendpages(transport->sock,
717                                         NULL, 0, xdr, req->rq_bytes_sent,
718                                         zerocopy);
719
720                 dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
721                                 xdr->len - req->rq_bytes_sent, status);
722
723                 if (unlikely(status < 0))
724                         break;
725
726                 /* If we've sent the entire packet, immediately
727                  * reset the count of bytes sent. */
728                 req->rq_bytes_sent += status;
729                 req->rq_xmit_bytes_sent += status;
730                 if (likely(req->rq_bytes_sent >= req->rq_slen)) {
731                         req->rq_bytes_sent = 0;
732                         return 0;
733                 }
734
735                 if (status != 0)
736                         continue;
737                 status = -EAGAIN;
738                 break;
739         }
740
741         switch (status) {
742         case -ENOTSOCK:
743                 status = -ENOTCONN;
744                 /* Should we call xs_close() here? */
745                 break;
746         case -EAGAIN:
747                 status = xs_nospace(task);
748                 break;
749         default:
750                 dprintk("RPC:       sendmsg returned unrecognized error %d\n",
751                         -status);
752         case -ECONNRESET:
753                 xs_tcp_shutdown(xprt);
754         case -ECONNREFUSED:
755         case -ENOTCONN:
756         case -EPIPE:
757                 clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
758         }
759
760         return status;
761 }
762
763 /**
764  * xs_tcp_release_xprt - clean up after a tcp transmission
765  * @xprt: transport
766  * @task: rpc task
767  *
768  * This cleans up if an error causes us to abort the transmission of a request.
769  * In this case, the socket may need to be reset in order to avoid confusing
770  * the server.
771  */
772 static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
773 {
774         struct rpc_rqst *req;
775
776         if (task != xprt->snd_task)
777                 return;
778         if (task == NULL)
779                 goto out_release;
780         req = task->tk_rqstp;
781         if (req == NULL)
782                 goto out_release;
783         if (req->rq_bytes_sent == 0)
784                 goto out_release;
785         if (req->rq_bytes_sent == req->rq_snd_buf.len)
786                 goto out_release;
787         set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
788 out_release:
789         xprt_release_xprt(xprt, task);
790 }
791
792 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
793 {
794         transport->old_data_ready = sk->sk_data_ready;
795         transport->old_state_change = sk->sk_state_change;
796         transport->old_write_space = sk->sk_write_space;
797 }
798
799 static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *sk)
800 {
801         sk->sk_data_ready = transport->old_data_ready;
802         sk->sk_state_change = transport->old_state_change;
803         sk->sk_write_space = transport->old_write_space;
804 }
805
806 static void xs_reset_transport(struct sock_xprt *transport)
807 {
808         struct socket *sock = transport->sock;
809         struct sock *sk = transport->inet;
810
811         if (sk == NULL)
812                 return;
813
814         transport->srcport = 0;
815
816         write_lock_bh(&sk->sk_callback_lock);
817         transport->inet = NULL;
818         transport->sock = NULL;
819
820         sk->sk_user_data = NULL;
821
822         xs_restore_old_callbacks(transport, sk);
823         write_unlock_bh(&sk->sk_callback_lock);
824
825         sk->sk_no_check = 0;
826
827         sock_release(sock);
828 }
829
830 /**
831  * xs_close - close a socket
832  * @xprt: transport
833  *
834  * This is used when all requests are complete; ie, no DRC state remains
835  * on the server we want to save.
836  *
837  * The caller _must_ be holding XPRT_LOCKED in order to avoid issues with
838  * xs_reset_transport() zeroing the socket from underneath a writer.
839  */
840 static void xs_close(struct rpc_xprt *xprt)
841 {
842         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
843
844         dprintk("RPC:       xs_close xprt %p\n", xprt);
845
846         xs_reset_transport(transport);
847         xprt->reestablish_timeout = 0;
848
849         smp_mb__before_clear_bit();
850         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
851         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
852         clear_bit(XPRT_CLOSING, &xprt->state);
853         smp_mb__after_clear_bit();
854         xprt_disconnect_done(xprt);
855 }
856
857 static void xs_tcp_close(struct rpc_xprt *xprt)
858 {
859         if (test_and_clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state))
860                 xs_close(xprt);
861         else
862                 xs_tcp_shutdown(xprt);
863 }
864
865 /**
866  * xs_destroy - prepare to shutdown a transport
867  * @xprt: doomed transport
868  *
869  */
870 static void xs_destroy(struct rpc_xprt *xprt)
871 {
872         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
873
874         dprintk("RPC:       xs_destroy xprt %p\n", xprt);
875
876         cancel_delayed_work_sync(&transport->connect_worker);
877
878         xs_close(xprt);
879         xs_free_peer_addresses(xprt);
880         xprt_free(xprt);
881         module_put(THIS_MODULE);
882 }
883
884 static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
885 {
886         return (struct rpc_xprt *) sk->sk_user_data;
887 }
888
889 static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
890 {
891         struct xdr_skb_reader desc = {
892                 .skb            = skb,
893                 .offset         = sizeof(rpc_fraghdr),
894                 .count          = skb->len - sizeof(rpc_fraghdr),
895         };
896
897         if (xdr_partial_copy_from_skb(xdr, 0, &desc, xdr_skb_read_bits) < 0)
898                 return -1;
899         if (desc.count)
900                 return -1;
901         return 0;
902 }
903
904 /**
905  * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
906  * @sk: socket with data to read
907  * @len: how much data to read
908  *
909  * Currently this assumes we can read the whole reply in a single gulp.
910  */
911 static void xs_local_data_ready(struct sock *sk, int len)
912 {
913         struct rpc_task *task;
914         struct rpc_xprt *xprt;
915         struct rpc_rqst *rovr;
916         struct sk_buff *skb;
917         int err, repsize, copied;
918         u32 _xid;
919         __be32 *xp;
920
921         read_lock_bh(&sk->sk_callback_lock);
922         dprintk("RPC:       %s...\n", __func__);
923         xprt = xprt_from_sock(sk);
924         if (xprt == NULL)
925                 goto out;
926
927         skb = skb_recv_datagram(sk, 0, 1, &err);
928         if (skb == NULL)
929                 goto out;
930
931         if (xprt->shutdown)
932                 goto dropit;
933
934         repsize = skb->len - sizeof(rpc_fraghdr);
935         if (repsize < 4) {
936                 dprintk("RPC:       impossible RPC reply size %d\n", repsize);
937                 goto dropit;
938         }
939
940         /* Copy the XID from the skb... */
941         xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
942         if (xp == NULL)
943                 goto dropit;
944
945         /* Look up and lock the request corresponding to the given XID */
946         spin_lock(&xprt->transport_lock);
947         rovr = xprt_lookup_rqst(xprt, *xp);
948         if (!rovr)
949                 goto out_unlock;
950         task = rovr->rq_task;
951
952         copied = rovr->rq_private_buf.buflen;
953         if (copied > repsize)
954                 copied = repsize;
955
956         if (xs_local_copy_to_xdr(&rovr->rq_private_buf, skb)) {
957                 dprintk("RPC:       sk_buff copy failed\n");
958                 goto out_unlock;
959         }
960
961         xprt_complete_rqst(task, copied);
962
963  out_unlock:
964         spin_unlock(&xprt->transport_lock);
965  dropit:
966         skb_free_datagram(sk, skb);
967  out:
968         read_unlock_bh(&sk->sk_callback_lock);
969 }
970
971 /**
972  * xs_udp_data_ready - "data ready" callback for UDP sockets
973  * @sk: socket with data to read
974  * @len: how much data to read
975  *
976  */
977 static void xs_udp_data_ready(struct sock *sk, int len)
978 {
979         struct rpc_task *task;
980         struct rpc_xprt *xprt;
981         struct rpc_rqst *rovr;
982         struct sk_buff *skb;
983         int err, repsize, copied;
984         u32 _xid;
985         __be32 *xp;
986
987         read_lock_bh(&sk->sk_callback_lock);
988         dprintk("RPC:       xs_udp_data_ready...\n");
989         if (!(xprt = xprt_from_sock(sk)))
990                 goto out;
991
992         if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
993                 goto out;
994
995         if (xprt->shutdown)
996                 goto dropit;
997
998         repsize = skb->len - sizeof(struct udphdr);
999         if (repsize < 4) {
1000                 dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
1001                 goto dropit;
1002         }
1003
1004         /* Copy the XID from the skb... */
1005         xp = skb_header_pointer(skb, sizeof(struct udphdr),
1006                                 sizeof(_xid), &_xid);
1007         if (xp == NULL)
1008                 goto dropit;
1009
1010         /* Look up and lock the request corresponding to the given XID */
1011         spin_lock(&xprt->transport_lock);
1012         rovr = xprt_lookup_rqst(xprt, *xp);
1013         if (!rovr)
1014                 goto out_unlock;
1015         task = rovr->rq_task;
1016
1017         if ((copied = rovr->rq_private_buf.buflen) > repsize)
1018                 copied = repsize;
1019
1020         /* Suck it into the iovec, verify checksum if not done by hw. */
1021         if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) {
1022                 UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS);
1023                 goto out_unlock;
1024         }
1025
1026         UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS);
1027
1028         /* Something worked... */
1029         dst_confirm(skb_dst(skb));
1030
1031         xprt_adjust_cwnd(task, copied);
1032         xprt_complete_rqst(task, copied);
1033
1034  out_unlock:
1035         spin_unlock(&xprt->transport_lock);
1036  dropit:
1037         skb_free_datagram(sk, skb);
1038  out:
1039         read_unlock_bh(&sk->sk_callback_lock);
1040 }
1041
1042 /*
1043  * Helper function to force a TCP close if the server is sending
1044  * junk and/or it has put us in CLOSE_WAIT
1045  */
1046 static void xs_tcp_force_close(struct rpc_xprt *xprt)
1047 {
1048         set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1049         xprt_force_disconnect(xprt);
1050 }
1051
1052 static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
1053 {
1054         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1055         size_t len, used;
1056         char *p;
1057
1058         p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
1059         len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
1060         used = xdr_skb_read_bits(desc, p, len);
1061         transport->tcp_offset += used;
1062         if (used != len)
1063                 return;
1064
1065         transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
1066         if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
1067                 transport->tcp_flags |= TCP_RCV_LAST_FRAG;
1068         else
1069                 transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
1070         transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
1071
1072         transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
1073         transport->tcp_offset = 0;
1074
1075         /* Sanity check of the record length */
1076         if (unlikely(transport->tcp_reclen < 8)) {
1077                 dprintk("RPC:       invalid TCP record fragment length\n");
1078                 xs_tcp_force_close(xprt);
1079                 return;
1080         }
1081         dprintk("RPC:       reading TCP record fragment of length %d\n",
1082                         transport->tcp_reclen);
1083 }
1084
1085 static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
1086 {
1087         if (transport->tcp_offset == transport->tcp_reclen) {
1088                 transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
1089                 transport->tcp_offset = 0;
1090                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
1091                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1092                         transport->tcp_flags |= TCP_RCV_COPY_XID;
1093                         transport->tcp_copied = 0;
1094                 }
1095         }
1096 }
1097
1098 static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1099 {
1100         size_t len, used;
1101         char *p;
1102
1103         len = sizeof(transport->tcp_xid) - transport->tcp_offset;
1104         dprintk("RPC:       reading XID (%Zu bytes)\n", len);
1105         p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
1106         used = xdr_skb_read_bits(desc, p, len);
1107         transport->tcp_offset += used;
1108         if (used != len)
1109                 return;
1110         transport->tcp_flags &= ~TCP_RCV_COPY_XID;
1111         transport->tcp_flags |= TCP_RCV_READ_CALLDIR;
1112         transport->tcp_copied = 4;
1113         dprintk("RPC:       reading %s XID %08x\n",
1114                         (transport->tcp_flags & TCP_RPC_REPLY) ? "reply for"
1115                                                               : "request with",
1116                         ntohl(transport->tcp_xid));
1117         xs_tcp_check_fraghdr(transport);
1118 }
1119
1120 static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
1121                                        struct xdr_skb_reader *desc)
1122 {
1123         size_t len, used;
1124         u32 offset;
1125         char *p;
1126
1127         /*
1128          * We want transport->tcp_offset to be 8 at the end of this routine
1129          * (4 bytes for the xid and 4 bytes for the call/reply flag).
1130          * When this function is called for the first time,
1131          * transport->tcp_offset is 4 (after having already read the xid).
1132          */
1133         offset = transport->tcp_offset - sizeof(transport->tcp_xid);
1134         len = sizeof(transport->tcp_calldir) - offset;
1135         dprintk("RPC:       reading CALL/REPLY flag (%Zu bytes)\n", len);
1136         p = ((char *) &transport->tcp_calldir) + offset;
1137         used = xdr_skb_read_bits(desc, p, len);
1138         transport->tcp_offset += used;
1139         if (used != len)
1140                 return;
1141         transport->tcp_flags &= ~TCP_RCV_READ_CALLDIR;
1142         /*
1143          * We don't yet have the XDR buffer, so we will write the calldir
1144          * out after we get the buffer from the 'struct rpc_rqst'
1145          */
1146         switch (ntohl(transport->tcp_calldir)) {
1147         case RPC_REPLY:
1148                 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1149                 transport->tcp_flags |= TCP_RCV_COPY_DATA;
1150                 transport->tcp_flags |= TCP_RPC_REPLY;
1151                 break;
1152         case RPC_CALL:
1153                 transport->tcp_flags |= TCP_RCV_COPY_CALLDIR;
1154                 transport->tcp_flags |= TCP_RCV_COPY_DATA;
1155                 transport->tcp_flags &= ~TCP_RPC_REPLY;
1156                 break;
1157         default:
1158                 dprintk("RPC:       invalid request message type\n");
1159                 xs_tcp_force_close(&transport->xprt);
1160         }
1161         xs_tcp_check_fraghdr(transport);
1162 }
1163
1164 static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
1165                                      struct xdr_skb_reader *desc,
1166                                      struct rpc_rqst *req)
1167 {
1168         struct sock_xprt *transport =
1169                                 container_of(xprt, struct sock_xprt, xprt);
1170         struct xdr_buf *rcvbuf;
1171         size_t len;
1172         ssize_t r;
1173
1174         rcvbuf = &req->rq_private_buf;
1175
1176         if (transport->tcp_flags & TCP_RCV_COPY_CALLDIR) {
1177                 /*
1178                  * Save the RPC direction in the XDR buffer
1179                  */
1180                 memcpy(rcvbuf->head[0].iov_base + transport->tcp_copied,
1181                         &transport->tcp_calldir,
1182                         sizeof(transport->tcp_calldir));
1183                 transport->tcp_copied += sizeof(transport->tcp_calldir);
1184                 transport->tcp_flags &= ~TCP_RCV_COPY_CALLDIR;
1185         }
1186
1187         len = desc->count;
1188         if (len > transport->tcp_reclen - transport->tcp_offset) {
1189                 struct xdr_skb_reader my_desc;
1190
1191                 len = transport->tcp_reclen - transport->tcp_offset;
1192                 memcpy(&my_desc, desc, sizeof(my_desc));
1193                 my_desc.count = len;
1194                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1195                                           &my_desc, xdr_skb_read_bits);
1196                 desc->count -= r;
1197                 desc->offset += r;
1198         } else
1199                 r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
1200                                           desc, xdr_skb_read_bits);
1201
1202         if (r > 0) {
1203                 transport->tcp_copied += r;
1204                 transport->tcp_offset += r;
1205         }
1206         if (r != len) {
1207                 /* Error when copying to the receive buffer,
1208                  * usually because we weren't able to allocate
1209                  * additional buffer pages. All we can do now
1210                  * is turn off TCP_RCV_COPY_DATA, so the request
1211                  * will not receive any additional updates,
1212                  * and time out.
1213                  * Any remaining data from this record will
1214                  * be discarded.
1215                  */
1216                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1217                 dprintk("RPC:       XID %08x truncated request\n",
1218                                 ntohl(transport->tcp_xid));
1219                 dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
1220                                 "tcp_offset = %u, tcp_reclen = %u\n",
1221                                 xprt, transport->tcp_copied,
1222                                 transport->tcp_offset, transport->tcp_reclen);
1223                 return;
1224         }
1225
1226         dprintk("RPC:       XID %08x read %Zd bytes\n",
1227                         ntohl(transport->tcp_xid), r);
1228         dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
1229                         "tcp_reclen = %u\n", xprt, transport->tcp_copied,
1230                         transport->tcp_offset, transport->tcp_reclen);
1231
1232         if (transport->tcp_copied == req->rq_private_buf.buflen)
1233                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1234         else if (transport->tcp_offset == transport->tcp_reclen) {
1235                 if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
1236                         transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1237         }
1238 }
1239
1240 /*
1241  * Finds the request corresponding to the RPC xid and invokes the common
1242  * tcp read code to read the data.
1243  */
1244 static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
1245                                     struct xdr_skb_reader *desc)
1246 {
1247         struct sock_xprt *transport =
1248                                 container_of(xprt, struct sock_xprt, xprt);
1249         struct rpc_rqst *req;
1250
1251         dprintk("RPC:       read reply XID %08x\n", ntohl(transport->tcp_xid));
1252
1253         /* Find and lock the request corresponding to this xid */
1254         spin_lock(&xprt->transport_lock);
1255         req = xprt_lookup_rqst(xprt, transport->tcp_xid);
1256         if (!req) {
1257                 dprintk("RPC:       XID %08x request not found!\n",
1258                                 ntohl(transport->tcp_xid));
1259                 spin_unlock(&xprt->transport_lock);
1260                 return -1;
1261         }
1262
1263         xs_tcp_read_common(xprt, desc, req);
1264
1265         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
1266                 xprt_complete_rqst(req->rq_task, transport->tcp_copied);
1267
1268         spin_unlock(&xprt->transport_lock);
1269         return 0;
1270 }
1271
1272 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1273 /*
1274  * Obtains an rpc_rqst previously allocated and invokes the common
1275  * tcp read code to read the data.  The result is placed in the callback
1276  * queue.
1277  * If we're unable to obtain the rpc_rqst we schedule the closing of the
1278  * connection and return -1.
1279  */
1280 static inline int xs_tcp_read_callback(struct rpc_xprt *xprt,
1281                                        struct xdr_skb_reader *desc)
1282 {
1283         struct sock_xprt *transport =
1284                                 container_of(xprt, struct sock_xprt, xprt);
1285         struct rpc_rqst *req;
1286
1287         req = xprt_alloc_bc_request(xprt);
1288         if (req == NULL) {
1289                 printk(KERN_WARNING "Callback slot table overflowed\n");
1290                 xprt_force_disconnect(xprt);
1291                 return -1;
1292         }
1293
1294         req->rq_xid = transport->tcp_xid;
1295         dprintk("RPC:       read callback  XID %08x\n", ntohl(req->rq_xid));
1296         xs_tcp_read_common(xprt, desc, req);
1297
1298         if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) {
1299                 struct svc_serv *bc_serv = xprt->bc_serv;
1300
1301                 /*
1302                  * Add callback request to callback list.  The callback
1303                  * service sleeps on the sv_cb_waitq waiting for new
1304                  * requests.  Wake it up after adding enqueing the
1305                  * request.
1306                  */
1307                 dprintk("RPC:       add callback request to list\n");
1308                 spin_lock(&bc_serv->sv_cb_lock);
1309                 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
1310                 spin_unlock(&bc_serv->sv_cb_lock);
1311                 wake_up(&bc_serv->sv_cb_waitq);
1312         }
1313
1314         req->rq_private_buf.len = transport->tcp_copied;
1315
1316         return 0;
1317 }
1318
1319 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1320                                         struct xdr_skb_reader *desc)
1321 {
1322         struct sock_xprt *transport =
1323                                 container_of(xprt, struct sock_xprt, xprt);
1324
1325         return (transport->tcp_flags & TCP_RPC_REPLY) ?
1326                 xs_tcp_read_reply(xprt, desc) :
1327                 xs_tcp_read_callback(xprt, desc);
1328 }
1329 #else
1330 static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
1331                                         struct xdr_skb_reader *desc)
1332 {
1333         return xs_tcp_read_reply(xprt, desc);
1334 }
1335 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1336
1337 /*
1338  * Read data off the transport.  This can be either an RPC_CALL or an
1339  * RPC_REPLY.  Relay the processing to helper functions.
1340  */
1341 static void xs_tcp_read_data(struct rpc_xprt *xprt,
1342                                     struct xdr_skb_reader *desc)
1343 {
1344         struct sock_xprt *transport =
1345                                 container_of(xprt, struct sock_xprt, xprt);
1346
1347         if (_xs_tcp_read_data(xprt, desc) == 0)
1348                 xs_tcp_check_fraghdr(transport);
1349         else {
1350                 /*
1351                  * The transport_lock protects the request handling.
1352                  * There's no need to hold it to update the tcp_flags.
1353                  */
1354                 transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
1355         }
1356 }
1357
1358 static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
1359 {
1360         size_t len;
1361
1362         len = transport->tcp_reclen - transport->tcp_offset;
1363         if (len > desc->count)
1364                 len = desc->count;
1365         desc->count -= len;
1366         desc->offset += len;
1367         transport->tcp_offset += len;
1368         dprintk("RPC:       discarded %Zu bytes\n", len);
1369         xs_tcp_check_fraghdr(transport);
1370 }
1371
1372 static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
1373 {
1374         struct rpc_xprt *xprt = rd_desc->arg.data;
1375         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1376         struct xdr_skb_reader desc = {
1377                 .skb    = skb,
1378                 .offset = offset,
1379                 .count  = len,
1380         };
1381
1382         dprintk("RPC:       xs_tcp_data_recv started\n");
1383         do {
1384                 /* Read in a new fragment marker if necessary */
1385                 /* Can we ever really expect to get completely empty fragments? */
1386                 if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
1387                         xs_tcp_read_fraghdr(xprt, &desc);
1388                         continue;
1389                 }
1390                 /* Read in the xid if necessary */
1391                 if (transport->tcp_flags & TCP_RCV_COPY_XID) {
1392                         xs_tcp_read_xid(transport, &desc);
1393                         continue;
1394                 }
1395                 /* Read in the call/reply flag */
1396                 if (transport->tcp_flags & TCP_RCV_READ_CALLDIR) {
1397                         xs_tcp_read_calldir(transport, &desc);
1398                         continue;
1399                 }
1400                 /* Read in the request data */
1401                 if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
1402                         xs_tcp_read_data(xprt, &desc);
1403                         continue;
1404                 }
1405                 /* Skip over any trailing bytes on short reads */
1406                 xs_tcp_read_discard(transport, &desc);
1407         } while (desc.count);
1408         dprintk("RPC:       xs_tcp_data_recv done\n");
1409         return len - desc.count;
1410 }
1411
1412 /**
1413  * xs_tcp_data_ready - "data ready" callback for TCP sockets
1414  * @sk: socket with data to read
1415  * @bytes: how much data to read
1416  *
1417  */
1418 static void xs_tcp_data_ready(struct sock *sk, int bytes)
1419 {
1420         struct rpc_xprt *xprt;
1421         read_descriptor_t rd_desc;
1422         int read;
1423
1424         dprintk("RPC:       xs_tcp_data_ready...\n");
1425
1426         read_lock_bh(&sk->sk_callback_lock);
1427         if (!(xprt = xprt_from_sock(sk)))
1428                 goto out;
1429         if (xprt->shutdown)
1430                 goto out;
1431
1432         /* Any data means we had a useful conversation, so
1433          * the we don't need to delay the next reconnect
1434          */
1435         if (xprt->reestablish_timeout)
1436                 xprt->reestablish_timeout = 0;
1437
1438         /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
1439         rd_desc.arg.data = xprt;
1440         do {
1441                 rd_desc.count = 65536;
1442                 read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
1443         } while (read > 0);
1444 out:
1445         read_unlock_bh(&sk->sk_callback_lock);
1446 }
1447
1448 /*
1449  * Do the equivalent of linger/linger2 handling for dealing with
1450  * broken servers that don't close the socket in a timely
1451  * fashion
1452  */
1453 static void xs_tcp_schedule_linger_timeout(struct rpc_xprt *xprt,
1454                 unsigned long timeout)
1455 {
1456         struct sock_xprt *transport;
1457
1458         if (xprt_test_and_set_connecting(xprt))
1459                 return;
1460         set_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1461         transport = container_of(xprt, struct sock_xprt, xprt);
1462         queue_delayed_work(rpciod_workqueue, &transport->connect_worker,
1463                            timeout);
1464 }
1465
1466 static void xs_tcp_cancel_linger_timeout(struct rpc_xprt *xprt)
1467 {
1468         struct sock_xprt *transport;
1469
1470         transport = container_of(xprt, struct sock_xprt, xprt);
1471
1472         if (!test_bit(XPRT_CONNECTION_ABORT, &xprt->state) ||
1473             !cancel_delayed_work(&transport->connect_worker))
1474                 return;
1475         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1476         xprt_clear_connecting(xprt);
1477 }
1478
1479 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
1480 {
1481         smp_mb__before_clear_bit();
1482         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1483         clear_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
1484         clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1485         clear_bit(XPRT_CLOSING, &xprt->state);
1486         smp_mb__after_clear_bit();
1487 }
1488
1489 static void xs_sock_mark_closed(struct rpc_xprt *xprt)
1490 {
1491         xs_sock_reset_connection_flags(xprt);
1492         /* Mark transport as closed and wake up all pending tasks */
1493         xprt_disconnect_done(xprt);
1494 }
1495
1496 /**
1497  * xs_tcp_state_change - callback to handle TCP socket state changes
1498  * @sk: socket whose state has changed
1499  *
1500  */
1501 static void xs_tcp_state_change(struct sock *sk)
1502 {
1503         struct rpc_xprt *xprt;
1504
1505         read_lock_bh(&sk->sk_callback_lock);
1506         if (!(xprt = xprt_from_sock(sk)))
1507                 goto out;
1508         dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
1509         dprintk("RPC:       state %x conn %d dead %d zapped %d sk_shutdown %d\n",
1510                         sk->sk_state, xprt_connected(xprt),
1511                         sock_flag(sk, SOCK_DEAD),
1512                         sock_flag(sk, SOCK_ZAPPED),
1513                         sk->sk_shutdown);
1514
1515         switch (sk->sk_state) {
1516         case TCP_ESTABLISHED:
1517                 spin_lock(&xprt->transport_lock);
1518                 if (!xprt_test_and_set_connected(xprt)) {
1519                         struct sock_xprt *transport = container_of(xprt,
1520                                         struct sock_xprt, xprt);
1521
1522                         /* Reset TCP record info */
1523                         transport->tcp_offset = 0;
1524                         transport->tcp_reclen = 0;
1525                         transport->tcp_copied = 0;
1526                         transport->tcp_flags =
1527                                 TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
1528
1529                         xprt_wake_pending_tasks(xprt, -EAGAIN);
1530                 }
1531                 spin_unlock(&xprt->transport_lock);
1532                 break;
1533         case TCP_FIN_WAIT1:
1534                 /* The client initiated a shutdown of the socket */
1535                 xprt->connect_cookie++;
1536                 xprt->reestablish_timeout = 0;
1537                 set_bit(XPRT_CLOSING, &xprt->state);
1538                 smp_mb__before_clear_bit();
1539                 clear_bit(XPRT_CONNECTED, &xprt->state);
1540                 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
1541                 smp_mb__after_clear_bit();
1542                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1543                 break;
1544         case TCP_CLOSE_WAIT:
1545                 /* The server initiated a shutdown of the socket */
1546                 xprt->connect_cookie++;
1547                 clear_bit(XPRT_CONNECTED, &xprt->state);
1548                 xs_tcp_force_close(xprt);
1549         case TCP_CLOSING:
1550                 /*
1551                  * If the server closed down the connection, make sure that
1552                  * we back off before reconnecting
1553                  */
1554                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
1555                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1556                 break;
1557         case TCP_LAST_ACK:
1558                 set_bit(XPRT_CLOSING, &xprt->state);
1559                 xs_tcp_schedule_linger_timeout(xprt, xs_tcp_fin_timeout);
1560                 smp_mb__before_clear_bit();
1561                 clear_bit(XPRT_CONNECTED, &xprt->state);
1562                 smp_mb__after_clear_bit();
1563                 break;
1564         case TCP_CLOSE:
1565                 xs_tcp_cancel_linger_timeout(xprt);
1566                 xs_sock_mark_closed(xprt);
1567         }
1568  out:
1569         read_unlock_bh(&sk->sk_callback_lock);
1570 }
1571
1572 static void xs_write_space(struct sock *sk)
1573 {
1574         struct socket *sock;
1575         struct rpc_xprt *xprt;
1576
1577         if (unlikely(!(sock = sk->sk_socket)))
1578                 return;
1579         clear_bit(SOCK_NOSPACE, &sock->flags);
1580
1581         if (unlikely(!(xprt = xprt_from_sock(sk))))
1582                 return;
1583         if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
1584                 return;
1585
1586         xprt_write_space(xprt);
1587 }
1588
1589 /**
1590  * xs_udp_write_space - callback invoked when socket buffer space
1591  *                             becomes available
1592  * @sk: socket whose state has changed
1593  *
1594  * Called when more output buffer space is available for this socket.
1595  * We try not to wake our writers until they can make "significant"
1596  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1597  * with a bunch of small requests.
1598  */
1599 static void xs_udp_write_space(struct sock *sk)
1600 {
1601         read_lock_bh(&sk->sk_callback_lock);
1602
1603         /* from net/core/sock.c:sock_def_write_space */
1604         if (sock_writeable(sk))
1605                 xs_write_space(sk);
1606
1607         read_unlock_bh(&sk->sk_callback_lock);
1608 }
1609
1610 /**
1611  * xs_tcp_write_space - callback invoked when socket buffer space
1612  *                             becomes available
1613  * @sk: socket whose state has changed
1614  *
1615  * Called when more output buffer space is available for this socket.
1616  * We try not to wake our writers until they can make "significant"
1617  * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1618  * with a bunch of small requests.
1619  */
1620 static void xs_tcp_write_space(struct sock *sk)
1621 {
1622         read_lock_bh(&sk->sk_callback_lock);
1623
1624         /* from net/core/stream.c:sk_stream_write_space */
1625         if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
1626                 xs_write_space(sk);
1627
1628         read_unlock_bh(&sk->sk_callback_lock);
1629 }
1630
1631 static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1632 {
1633         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1634         struct sock *sk = transport->inet;
1635
1636         if (transport->rcvsize) {
1637                 sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1638                 sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1639         }
1640         if (transport->sndsize) {
1641                 sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1642                 sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1643                 sk->sk_write_space(sk);
1644         }
1645 }
1646
1647 /**
1648  * xs_udp_set_buffer_size - set send and receive limits
1649  * @xprt: generic transport
1650  * @sndsize: requested size of send buffer, in bytes
1651  * @rcvsize: requested size of receive buffer, in bytes
1652  *
1653  * Set socket send and receive buffer size limits.
1654  */
1655 static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1656 {
1657         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1658
1659         transport->sndsize = 0;
1660         if (sndsize)
1661                 transport->sndsize = sndsize + 1024;
1662         transport->rcvsize = 0;
1663         if (rcvsize)
1664                 transport->rcvsize = rcvsize + 1024;
1665
1666         xs_udp_do_set_buffer_size(xprt);
1667 }
1668
1669 /**
1670  * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1671  * @task: task that timed out
1672  *
1673  * Adjust the congestion window after a retransmit timeout has occurred.
1674  */
1675 static void xs_udp_timer(struct rpc_task *task)
1676 {
1677         xprt_adjust_cwnd(task, -ETIMEDOUT);
1678 }
1679
1680 static unsigned short xs_get_random_port(void)
1681 {
1682         unsigned short range = xprt_max_resvport - xprt_min_resvport;
1683         unsigned short rand = (unsigned short) net_random() % range;
1684         return rand + xprt_min_resvport;
1685 }
1686
1687 /**
1688  * xs_set_port - reset the port number in the remote endpoint address
1689  * @xprt: generic transport
1690  * @port: new port number
1691  *
1692  */
1693 static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1694 {
1695         dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1696
1697         rpc_set_port(xs_addr(xprt), port);
1698         xs_update_peer_port(xprt);
1699 }
1700
1701 static unsigned short xs_get_srcport(struct sock_xprt *transport)
1702 {
1703         unsigned short port = transport->srcport;
1704
1705         if (port == 0 && transport->xprt.resvport)
1706                 port = xs_get_random_port();
1707         return port;
1708 }
1709
1710 static unsigned short xs_next_srcport(struct sock_xprt *transport, unsigned short port)
1711 {
1712         if (transport->srcport != 0)
1713                 transport->srcport = 0;
1714         if (!transport->xprt.resvport)
1715                 return 0;
1716         if (port <= xprt_min_resvport || port > xprt_max_resvport)
1717                 return xprt_max_resvport;
1718         return --port;
1719 }
1720 static int xs_bind(struct sock_xprt *transport, struct socket *sock)
1721 {
1722         struct sockaddr_storage myaddr;
1723         int err, nloop = 0;
1724         unsigned short port = xs_get_srcport(transport);
1725         unsigned short last;
1726
1727         memcpy(&myaddr, &transport->srcaddr, transport->xprt.addrlen);
1728         do {
1729                 rpc_set_port((struct sockaddr *)&myaddr, port);
1730                 err = kernel_bind(sock, (struct sockaddr *)&myaddr,
1731                                 transport->xprt.addrlen);
1732                 if (port == 0)
1733                         break;
1734                 if (err == 0) {
1735                         transport->srcport = port;
1736                         break;
1737                 }
1738                 last = port;
1739                 port = xs_next_srcport(transport, port);
1740                 if (port > last)
1741                         nloop++;
1742         } while (err == -EADDRINUSE && nloop != 2);
1743
1744         if (myaddr.ss_family == AF_INET)
1745                 dprintk("RPC:       %s %pI4:%u: %s (%d)\n", __func__,
1746                                 &((struct sockaddr_in *)&myaddr)->sin_addr,
1747                                 port, err ? "failed" : "ok", err);
1748         else
1749                 dprintk("RPC:       %s %pI6:%u: %s (%d)\n", __func__,
1750                                 &((struct sockaddr_in6 *)&myaddr)->sin6_addr,
1751                                 port, err ? "failed" : "ok", err);
1752         return err;
1753 }
1754
1755 /*
1756  * We don't support autobind on AF_LOCAL sockets
1757  */
1758 static void xs_local_rpcbind(struct rpc_task *task)
1759 {
1760         xprt_set_bound(task->tk_xprt);
1761 }
1762
1763 static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
1764 {
1765 }
1766
1767 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1768 static struct lock_class_key xs_key[2];
1769 static struct lock_class_key xs_slock_key[2];
1770
1771 static inline void xs_reclassify_socketu(struct socket *sock)
1772 {
1773         struct sock *sk = sock->sk;
1774
1775         BUG_ON(sock_owned_by_user(sk));
1776         sock_lock_init_class_and_name(sk, "slock-AF_LOCAL-RPC",
1777                 &xs_slock_key[1], "sk_lock-AF_LOCAL-RPC", &xs_key[1]);
1778 }
1779
1780 static inline void xs_reclassify_socket4(struct socket *sock)
1781 {
1782         struct sock *sk = sock->sk;
1783
1784         BUG_ON(sock_owned_by_user(sk));
1785         sock_lock_init_class_and_name(sk, "slock-AF_INET-RPC",
1786                 &xs_slock_key[0], "sk_lock-AF_INET-RPC", &xs_key[0]);
1787 }
1788
1789 static inline void xs_reclassify_socket6(struct socket *sock)
1790 {
1791         struct sock *sk = sock->sk;
1792
1793         BUG_ON(sock_owned_by_user(sk));
1794         sock_lock_init_class_and_name(sk, "slock-AF_INET6-RPC",
1795                 &xs_slock_key[1], "sk_lock-AF_INET6-RPC", &xs_key[1]);
1796 }
1797
1798 static inline void xs_reclassify_socket(int family, struct socket *sock)
1799 {
1800         switch (family) {
1801         case AF_LOCAL:
1802                 xs_reclassify_socketu(sock);
1803                 break;
1804         case AF_INET:
1805                 xs_reclassify_socket4(sock);
1806                 break;
1807         case AF_INET6:
1808                 xs_reclassify_socket6(sock);
1809                 break;
1810         }
1811 }
1812 #else
1813 static inline void xs_reclassify_socketu(struct socket *sock)
1814 {
1815 }
1816
1817 static inline void xs_reclassify_socket4(struct socket *sock)
1818 {
1819 }
1820
1821 static inline void xs_reclassify_socket6(struct socket *sock)
1822 {
1823 }
1824
1825 static inline void xs_reclassify_socket(int family, struct socket *sock)
1826 {
1827 }
1828 #endif
1829
1830 static struct socket *xs_create_sock(struct rpc_xprt *xprt,
1831                 struct sock_xprt *transport, int family, int type, int protocol)
1832 {
1833         struct socket *sock;
1834         int err;
1835
1836         err = __sock_create(xprt->xprt_net, family, type, protocol, &sock, 1);
1837         if (err < 0) {
1838                 dprintk("RPC:       can't create %d transport socket (%d).\n",
1839                                 protocol, -err);
1840                 goto out;
1841         }
1842         xs_reclassify_socket(family, sock);
1843
1844         err = xs_bind(transport, sock);
1845         if (err) {
1846                 sock_release(sock);
1847                 goto out;
1848         }
1849
1850         return sock;
1851 out:
1852         return ERR_PTR(err);
1853 }
1854
1855 static int xs_local_finish_connecting(struct rpc_xprt *xprt,
1856                                       struct socket *sock)
1857 {
1858         struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
1859                                                                         xprt);
1860
1861         if (!transport->inet) {
1862                 struct sock *sk = sock->sk;
1863
1864                 write_lock_bh(&sk->sk_callback_lock);
1865
1866                 xs_save_old_callbacks(transport, sk);
1867
1868                 sk->sk_user_data = xprt;
1869                 sk->sk_data_ready = xs_local_data_ready;
1870                 sk->sk_write_space = xs_udp_write_space;
1871                 sk->sk_allocation = GFP_ATOMIC;
1872
1873                 xprt_clear_connected(xprt);
1874
1875                 /* Reset to new socket */
1876                 transport->sock = sock;
1877                 transport->inet = sk;
1878
1879                 write_unlock_bh(&sk->sk_callback_lock);
1880         }
1881
1882         /* Tell the socket layer to start connecting... */
1883         xprt->stat.connect_count++;
1884         xprt->stat.connect_start = jiffies;
1885         return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, 0);
1886 }
1887
1888 /**
1889  * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
1890  * @xprt: RPC transport to connect
1891  * @transport: socket transport to connect
1892  * @create_sock: function to create a socket of the correct type
1893  *
1894  * Invoked by a work queue tasklet.
1895  */
1896 static void xs_local_setup_socket(struct work_struct *work)
1897 {
1898         struct sock_xprt *transport =
1899                 container_of(work, struct sock_xprt, connect_worker.work);
1900         struct rpc_xprt *xprt = &transport->xprt;
1901         struct socket *sock;
1902         int status = -EIO;
1903
1904         if (xprt->shutdown)
1905                 goto out;
1906
1907         current->flags |= PF_FSTRANS;
1908
1909         clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
1910         status = __sock_create(xprt->xprt_net, AF_LOCAL,
1911                                         SOCK_STREAM, 0, &sock, 1);
1912         if (status < 0) {
1913                 dprintk("RPC:       can't create AF_LOCAL "
1914                         "transport socket (%d).\n", -status);
1915                 goto out;
1916         }
1917         xs_reclassify_socketu(sock);
1918
1919         dprintk("RPC:       worker connecting xprt %p via AF_LOCAL to %s\n",
1920                         xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1921
1922         status = xs_local_finish_connecting(xprt, sock);
1923         switch (status) {
1924         case 0:
1925                 dprintk("RPC:       xprt %p connected to %s\n",
1926                                 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1927                 xprt_set_connected(xprt);
1928                 break;
1929         case -ENOENT:
1930                 dprintk("RPC:       xprt %p: socket %s does not exist\n",
1931                                 xprt, xprt->address_strings[RPC_DISPLAY_ADDR]);
1932                 break;
1933         default:
1934                 printk(KERN_ERR "%s: unhandled error (%d) connecting to %s\n",
1935                                 __func__, -status,
1936                                 xprt->address_strings[RPC_DISPLAY_ADDR]);
1937         }
1938
1939 out:
1940         xprt_clear_connecting(xprt);
1941         xprt_wake_pending_tasks(xprt, status);
1942         current->flags &= ~PF_FSTRANS;
1943 }
1944
1945 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
1946 {
1947         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1948
1949         if (!transport->inet) {
1950                 struct sock *sk = sock->sk;
1951
1952                 write_lock_bh(&sk->sk_callback_lock);
1953
1954                 xs_save_old_callbacks(transport, sk);
1955
1956                 sk->sk_user_data = xprt;
1957                 sk->sk_data_ready = xs_udp_data_ready;
1958                 sk->sk_write_space = xs_udp_write_space;
1959                 sk->sk_no_check = UDP_CSUM_NORCV;
1960                 sk->sk_allocation = GFP_ATOMIC;
1961
1962                 xprt_set_connected(xprt);
1963
1964                 /* Reset to new socket */
1965                 transport->sock = sock;
1966                 transport->inet = sk;
1967
1968                 write_unlock_bh(&sk->sk_callback_lock);
1969         }
1970         xs_udp_do_set_buffer_size(xprt);
1971 }
1972
1973 static void xs_udp_setup_socket(struct work_struct *work)
1974 {
1975         struct sock_xprt *transport =
1976                 container_of(work, struct sock_xprt, connect_worker.work);
1977         struct rpc_xprt *xprt = &transport->xprt;
1978         struct socket *sock = transport->sock;
1979         int status = -EIO;
1980
1981         if (xprt->shutdown)
1982                 goto out;
1983
1984         current->flags |= PF_FSTRANS;
1985
1986         /* Start by resetting any existing state */
1987         xs_reset_transport(transport);
1988         sock = xs_create_sock(xprt, transport,
1989                         xs_addr(xprt)->sa_family, SOCK_DGRAM, IPPROTO_UDP);
1990         if (IS_ERR(sock))
1991                 goto out;
1992
1993         dprintk("RPC:       worker connecting xprt %p via %s to "
1994                                 "%s (port %s)\n", xprt,
1995                         xprt->address_strings[RPC_DISPLAY_PROTO],
1996                         xprt->address_strings[RPC_DISPLAY_ADDR],
1997                         xprt->address_strings[RPC_DISPLAY_PORT]);
1998
1999         xs_udp_finish_connecting(xprt, sock);
2000         status = 0;
2001 out:
2002         xprt_clear_connecting(xprt);
2003         xprt_wake_pending_tasks(xprt, status);
2004         current->flags &= ~PF_FSTRANS;
2005 }
2006
2007 /*
2008  * We need to preserve the port number so the reply cache on the server can
2009  * find our cached RPC replies when we get around to reconnecting.
2010  */
2011 static void xs_abort_connection(struct sock_xprt *transport)
2012 {
2013         int result;
2014         struct sockaddr any;
2015
2016         dprintk("RPC:       disconnecting xprt %p to reuse port\n", transport);
2017
2018         /*
2019          * Disconnect the transport socket by doing a connect operation
2020          * with AF_UNSPEC.  This should return immediately...
2021          */
2022         memset(&any, 0, sizeof(any));
2023         any.sa_family = AF_UNSPEC;
2024         result = kernel_connect(transport->sock, &any, sizeof(any), 0);
2025         if (!result)
2026                 xs_sock_reset_connection_flags(&transport->xprt);
2027         dprintk("RPC:       AF_UNSPEC connect return code %d\n", result);
2028 }
2029
2030 static void xs_tcp_reuse_connection(struct sock_xprt *transport)
2031 {
2032         unsigned int state = transport->inet->sk_state;
2033
2034         if (state == TCP_CLOSE && transport->sock->state == SS_UNCONNECTED) {
2035                 /* we don't need to abort the connection if the socket
2036                  * hasn't undergone a shutdown
2037                  */
2038                 if (transport->inet->sk_shutdown == 0)
2039                         return;
2040                 dprintk("RPC:       %s: TCP_CLOSEd and sk_shutdown set to %d\n",
2041                                 __func__, transport->inet->sk_shutdown);
2042         }
2043         if ((1 << state) & (TCPF_ESTABLISHED|TCPF_SYN_SENT)) {
2044                 /* we don't need to abort the connection if the socket
2045                  * hasn't undergone a shutdown
2046                  */
2047                 if (transport->inet->sk_shutdown == 0)
2048                         return;
2049                 dprintk("RPC:       %s: ESTABLISHED/SYN_SENT "
2050                                 "sk_shutdown set to %d\n",
2051                                 __func__, transport->inet->sk_shutdown);
2052         }
2053         xs_abort_connection(transport);
2054 }
2055
2056 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
2057 {
2058         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2059         int ret = -ENOTCONN;
2060
2061         if (!transport->inet) {
2062                 struct sock *sk = sock->sk;
2063
2064                 write_lock_bh(&sk->sk_callback_lock);
2065
2066                 xs_save_old_callbacks(transport, sk);
2067
2068                 sk->sk_user_data = xprt;
2069                 sk->sk_data_ready = xs_tcp_data_ready;
2070                 sk->sk_state_change = xs_tcp_state_change;
2071                 sk->sk_write_space = xs_tcp_write_space;
2072                 sk->sk_allocation = GFP_ATOMIC;
2073
2074                 /* socket options */
2075                 sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
2076                 sock_reset_flag(sk, SOCK_LINGER);
2077                 tcp_sk(sk)->linger2 = 0;
2078                 tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
2079
2080                 xprt_clear_connected(xprt);
2081
2082                 /* Reset to new socket */
2083                 transport->sock = sock;
2084                 transport->inet = sk;
2085
2086                 write_unlock_bh(&sk->sk_callback_lock);
2087         }
2088
2089         if (!xprt_bound(xprt))
2090                 goto out;
2091
2092         /* Tell the socket layer to start connecting... */
2093         xprt->stat.connect_count++;
2094         xprt->stat.connect_start = jiffies;
2095         ret = kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);
2096         switch (ret) {
2097         case 0:
2098         case -EINPROGRESS:
2099                 /* SYN_SENT! */
2100                 xprt->connect_cookie++;
2101                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2102                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2103         }
2104 out:
2105         return ret;
2106 }
2107
2108 /**
2109  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
2110  * @xprt: RPC transport to connect
2111  * @transport: socket transport to connect
2112  * @create_sock: function to create a socket of the correct type
2113  *
2114  * Invoked by a work queue tasklet.
2115  */
2116 static void xs_tcp_setup_socket(struct work_struct *work)
2117 {
2118         struct sock_xprt *transport =
2119                 container_of(work, struct sock_xprt, connect_worker.work);
2120         struct socket *sock = transport->sock;
2121         struct rpc_xprt *xprt = &transport->xprt;
2122         int status = -EIO;
2123
2124         if (xprt->shutdown)
2125                 goto out;
2126
2127         current->flags |= PF_FSTRANS;
2128
2129         if (!sock) {
2130                 clear_bit(XPRT_CONNECTION_ABORT, &xprt->state);
2131                 sock = xs_create_sock(xprt, transport,
2132                                 xs_addr(xprt)->sa_family, SOCK_STREAM, IPPROTO_TCP);
2133                 if (IS_ERR(sock)) {
2134                         status = PTR_ERR(sock);
2135                         goto out;
2136                 }
2137         } else {
2138                 int abort_and_exit;
2139
2140                 abort_and_exit = test_and_clear_bit(XPRT_CONNECTION_ABORT,
2141                                 &xprt->state);
2142                 /* "close" the socket, preserving the local port */
2143                 xs_tcp_reuse_connection(transport);
2144
2145                 if (abort_and_exit)
2146                         goto out_eagain;
2147         }
2148
2149         dprintk("RPC:       worker connecting xprt %p via %s to "
2150                                 "%s (port %s)\n", xprt,
2151                         xprt->address_strings[RPC_DISPLAY_PROTO],
2152                         xprt->address_strings[RPC_DISPLAY_ADDR],
2153                         xprt->address_strings[RPC_DISPLAY_PORT]);
2154
2155         status = xs_tcp_finish_connecting(xprt, sock);
2156         dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
2157                         xprt, -status, xprt_connected(xprt),
2158                         sock->sk->sk_state);
2159         switch (status) {
2160         default:
2161                 printk("%s: connect returned unhandled error %d\n",
2162                         __func__, status);
2163         case -EADDRNOTAVAIL:
2164                 /* We're probably in TIME_WAIT. Get rid of existing socket,
2165                  * and retry
2166                  */
2167                 xs_tcp_force_close(xprt);
2168                 break;
2169         case -ECONNREFUSED:
2170         case -ECONNRESET:
2171         case -ENETUNREACH:
2172                 /* retry with existing socket, after a delay */
2173         case 0:
2174         case -EINPROGRESS:
2175         case -EALREADY:
2176                 xprt_clear_connecting(xprt);
2177                 current->flags &= ~PF_FSTRANS;
2178                 return;
2179         case -EINVAL:
2180                 /* Happens, for instance, if the user specified a link
2181                  * local IPv6 address without a scope-id.
2182                  */
2183                 goto out;
2184         }
2185 out_eagain:
2186         status = -EAGAIN;
2187 out:
2188         xprt_clear_connecting(xprt);
2189         xprt_wake_pending_tasks(xprt, status);
2190         current->flags &= ~PF_FSTRANS;
2191 }
2192
2193 /**
2194  * xs_connect - connect a socket to a remote endpoint
2195  * @task: address of RPC task that manages state of connect request
2196  *
2197  * TCP: If the remote end dropped the connection, delay reconnecting.
2198  *
2199  * UDP socket connects are synchronous, but we use a work queue anyway
2200  * to guarantee that even unprivileged user processes can set up a
2201  * socket on a privileged port.
2202  *
2203  * If a UDP socket connect fails, the delay behavior here prevents
2204  * retry floods (hard mounts).
2205  */
2206 static void xs_connect(struct rpc_task *task)
2207 {
2208         struct rpc_xprt *xprt = task->tk_xprt;
2209         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2210
2211         if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
2212                 dprintk("RPC:       xs_connect delayed xprt %p for %lu "
2213                                 "seconds\n",
2214                                 xprt, xprt->reestablish_timeout / HZ);
2215                 queue_delayed_work(rpciod_workqueue,
2216                                    &transport->connect_worker,
2217                                    xprt->reestablish_timeout);
2218                 xprt->reestablish_timeout <<= 1;
2219                 if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO)
2220                         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2221                 if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
2222                         xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
2223         } else {
2224                 dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
2225                 queue_delayed_work(rpciod_workqueue,
2226                                    &transport->connect_worker, 0);
2227         }
2228 }
2229
2230 /**
2231  * xs_local_print_stats - display AF_LOCAL socket-specifc stats
2232  * @xprt: rpc_xprt struct containing statistics
2233  * @seq: output file
2234  *
2235  */
2236 static void xs_local_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2237 {
2238         long idle_time = 0;
2239
2240         if (xprt_connected(xprt))
2241                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2242
2243         seq_printf(seq, "\txprt:\tlocal %lu %lu %lu %ld %lu %lu %lu "
2244                         "%llu %llu\n",
2245                         xprt->stat.bind_count,
2246                         xprt->stat.connect_count,
2247                         xprt->stat.connect_time,
2248                         idle_time,
2249                         xprt->stat.sends,
2250                         xprt->stat.recvs,
2251                         xprt->stat.bad_xids,
2252                         xprt->stat.req_u,
2253                         xprt->stat.bklog_u);
2254 }
2255
2256 /**
2257  * xs_udp_print_stats - display UDP socket-specifc stats
2258  * @xprt: rpc_xprt struct containing statistics
2259  * @seq: output file
2260  *
2261  */
2262 static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2263 {
2264         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2265
2266         seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
2267                         transport->srcport,
2268                         xprt->stat.bind_count,
2269                         xprt->stat.sends,
2270                         xprt->stat.recvs,
2271                         xprt->stat.bad_xids,
2272                         xprt->stat.req_u,
2273                         xprt->stat.bklog_u);
2274 }
2275
2276 /**
2277  * xs_tcp_print_stats - display TCP socket-specifc stats
2278  * @xprt: rpc_xprt struct containing statistics
2279  * @seq: output file
2280  *
2281  */
2282 static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
2283 {
2284         struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
2285         long idle_time = 0;
2286
2287         if (xprt_connected(xprt))
2288                 idle_time = (long)(jiffies - xprt->last_used) / HZ;
2289
2290         seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
2291                         transport->srcport,
2292                         xprt->stat.bind_count,
2293                         xprt->stat.connect_count,
2294                         xprt->stat.connect_time,
2295                         idle_time,
2296                         xprt->stat.sends,
2297                         xprt->stat.recvs,
2298                         xprt->stat.bad_xids,
2299                         xprt->stat.req_u,
2300                         xprt->stat.bklog_u);
2301 }
2302
2303 /*
2304  * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
2305  * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
2306  * to use the server side send routines.
2307  */
2308 static void *bc_malloc(struct rpc_task *task, size_t size)
2309 {
2310         struct page *page;
2311         struct rpc_buffer *buf;
2312
2313         BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
2314         page = alloc_page(GFP_KERNEL);
2315
2316         if (!page)
2317                 return NULL;
2318
2319         buf = page_address(page);
2320         buf->len = PAGE_SIZE;
2321
2322         return buf->data;
2323 }
2324
2325 /*
2326  * Free the space allocated in the bc_alloc routine
2327  */
2328 static void bc_free(void *buffer)
2329 {
2330         struct rpc_buffer *buf;
2331
2332         if (!buffer)
2333                 return;
2334
2335         buf = container_of(buffer, struct rpc_buffer, data);
2336         free_page((unsigned long)buf);
2337 }
2338
2339 /*
2340  * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
2341  * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
2342  */
2343 static int bc_sendto(struct rpc_rqst *req)
2344 {
2345         int len;
2346         struct xdr_buf *xbufp = &req->rq_snd_buf;
2347         struct rpc_xprt *xprt = req->rq_xprt;
2348         struct sock_xprt *transport =
2349                                 container_of(xprt, struct sock_xprt, xprt);
2350         struct socket *sock = transport->sock;
2351         unsigned long headoff;
2352         unsigned long tailoff;
2353
2354         xs_encode_stream_record_marker(xbufp);
2355
2356         tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
2357         headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
2358         len = svc_send_common(sock, xbufp,
2359                               virt_to_page(xbufp->head[0].iov_base), headoff,
2360                               xbufp->tail[0].iov_base, tailoff);
2361
2362         if (len != xbufp->len) {
2363                 printk(KERN_NOTICE "Error sending entire callback!\n");
2364                 len = -EAGAIN;
2365         }
2366
2367         return len;
2368 }
2369
2370 /*
2371  * The send routine. Borrows from svc_send
2372  */
2373 static int bc_send_request(struct rpc_task *task)
2374 {
2375         struct rpc_rqst *req = task->tk_rqstp;
2376         struct svc_xprt *xprt;
2377         struct svc_sock         *svsk;
2378         u32                     len;
2379
2380         dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
2381         /*
2382          * Get the server socket associated with this callback xprt
2383          */
2384         xprt = req->rq_xprt->bc_xprt;
2385         svsk = container_of(xprt, struct svc_sock, sk_xprt);
2386
2387         /*
2388          * Grab the mutex to serialize data as the connection is shared
2389          * with the fore channel
2390          */
2391         if (!mutex_trylock(&xprt->xpt_mutex)) {
2392                 rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
2393                 if (!mutex_trylock(&xprt->xpt_mutex))
2394                         return -EAGAIN;
2395                 rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
2396         }
2397         if (test_bit(XPT_DEAD, &xprt->xpt_flags))
2398                 len = -ENOTCONN;
2399         else
2400                 len = bc_sendto(req);
2401         mutex_unlock(&xprt->xpt_mutex);
2402
2403         if (len > 0)
2404                 len = 0;
2405
2406         return len;
2407 }
2408
2409 /*
2410  * The close routine. Since this is client initiated, we do nothing
2411  */
2412
2413 static void bc_close(struct rpc_xprt *xprt)
2414 {
2415 }
2416
2417 /*
2418  * The xprt destroy routine. Again, because this connection is client
2419  * initiated, we do nothing
2420  */
2421
2422 static void bc_destroy(struct rpc_xprt *xprt)
2423 {
2424 }
2425
2426 static struct rpc_xprt_ops xs_local_ops = {
2427         .reserve_xprt           = xprt_reserve_xprt,
2428         .release_xprt           = xs_tcp_release_xprt,
2429         .alloc_slot             = xprt_alloc_slot,
2430         .rpcbind                = xs_local_rpcbind,
2431         .set_port               = xs_local_set_port,
2432         .connect                = xs_connect,
2433         .buf_alloc              = rpc_malloc,
2434         .buf_free               = rpc_free,
2435         .send_request           = xs_local_send_request,
2436         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2437         .close                  = xs_close,
2438         .destroy                = xs_destroy,
2439         .print_stats            = xs_local_print_stats,
2440 };
2441
2442 static struct rpc_xprt_ops xs_udp_ops = {
2443         .set_buffer_size        = xs_udp_set_buffer_size,
2444         .reserve_xprt           = xprt_reserve_xprt_cong,
2445         .release_xprt           = xprt_release_xprt_cong,
2446         .alloc_slot             = xprt_alloc_slot,
2447         .rpcbind                = rpcb_getport_async,
2448         .set_port               = xs_set_port,
2449         .connect                = xs_connect,
2450         .buf_alloc              = rpc_malloc,
2451         .buf_free               = rpc_free,
2452         .send_request           = xs_udp_send_request,
2453         .set_retrans_timeout    = xprt_set_retrans_timeout_rtt,
2454         .timer                  = xs_udp_timer,
2455         .release_request        = xprt_release_rqst_cong,
2456         .close                  = xs_close,
2457         .destroy                = xs_destroy,
2458         .print_stats            = xs_udp_print_stats,
2459 };
2460
2461 static struct rpc_xprt_ops xs_tcp_ops = {
2462         .reserve_xprt           = xprt_reserve_xprt,
2463         .release_xprt           = xs_tcp_release_xprt,
2464         .alloc_slot             = xprt_lock_and_alloc_slot,
2465         .rpcbind                = rpcb_getport_async,
2466         .set_port               = xs_set_port,
2467         .connect                = xs_connect,
2468         .buf_alloc              = rpc_malloc,
2469         .buf_free               = rpc_free,
2470         .send_request           = xs_tcp_send_request,
2471         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2472         .close                  = xs_tcp_close,
2473         .destroy                = xs_destroy,
2474         .print_stats            = xs_tcp_print_stats,
2475 };
2476
2477 /*
2478  * The rpc_xprt_ops for the server backchannel
2479  */
2480
2481 static struct rpc_xprt_ops bc_tcp_ops = {
2482         .reserve_xprt           = xprt_reserve_xprt,
2483         .release_xprt           = xprt_release_xprt,
2484         .alloc_slot             = xprt_alloc_slot,
2485         .buf_alloc              = bc_malloc,
2486         .buf_free               = bc_free,
2487         .send_request           = bc_send_request,
2488         .set_retrans_timeout    = xprt_set_retrans_timeout_def,
2489         .close                  = bc_close,
2490         .destroy                = bc_destroy,
2491         .print_stats            = xs_tcp_print_stats,
2492 };
2493
2494 static int xs_init_anyaddr(const int family, struct sockaddr *sap)
2495 {
2496         static const struct sockaddr_in sin = {
2497                 .sin_family             = AF_INET,
2498                 .sin_addr.s_addr        = htonl(INADDR_ANY),
2499         };
2500         static const struct sockaddr_in6 sin6 = {
2501                 .sin6_family            = AF_INET6,
2502                 .sin6_addr              = IN6ADDR_ANY_INIT,
2503         };
2504
2505         switch (family) {
2506         case AF_LOCAL:
2507                 break;
2508         case AF_INET:
2509                 memcpy(sap, &sin, sizeof(sin));
2510                 break;
2511         case AF_INET6:
2512                 memcpy(sap, &sin6, sizeof(sin6));
2513                 break;
2514         default:
2515                 dprintk("RPC:       %s: Bad address family\n", __func__);
2516                 return -EAFNOSUPPORT;
2517         }
2518         return 0;
2519 }
2520
2521 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
2522                                       unsigned int slot_table_size,
2523                                       unsigned int max_slot_table_size)
2524 {
2525         struct rpc_xprt *xprt;
2526         struct sock_xprt *new;
2527
2528         if (args->addrlen > sizeof(xprt->addr)) {
2529                 dprintk("RPC:       xs_setup_xprt: address too large\n");
2530                 return ERR_PTR(-EBADF);
2531         }
2532
2533         xprt = xprt_alloc(args->net, sizeof(*new), slot_table_size,
2534                         max_slot_table_size);
2535         if (xprt == NULL) {
2536                 dprintk("RPC:       xs_setup_xprt: couldn't allocate "
2537                                 "rpc_xprt\n");
2538                 return ERR_PTR(-ENOMEM);
2539         }
2540
2541         new = container_of(xprt, struct sock_xprt, xprt);
2542         memcpy(&xprt->addr, args->dstaddr, args->addrlen);
2543         xprt->addrlen = args->addrlen;
2544         if (args->srcaddr)
2545                 memcpy(&new->srcaddr, args->srcaddr, args->addrlen);
2546         else {
2547                 int err;
2548                 err = xs_init_anyaddr(args->dstaddr->sa_family,
2549                                         (struct sockaddr *)&new->srcaddr);
2550                 if (err != 0) {
2551                         xprt_free(xprt);
2552                         return ERR_PTR(err);
2553                 }
2554         }
2555
2556         return xprt;
2557 }
2558
2559 static const struct rpc_timeout xs_local_default_timeout = {
2560         .to_initval = 10 * HZ,
2561         .to_maxval = 10 * HZ,
2562         .to_retries = 2,
2563 };
2564
2565 /**
2566  * xs_setup_local - Set up transport to use an AF_LOCAL socket
2567  * @args: rpc transport creation arguments
2568  *
2569  * AF_LOCAL is a "tpi_cots_ord" transport, just like TCP
2570  */
2571 static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
2572 {
2573         struct sockaddr_un *sun = (struct sockaddr_un *)args->dstaddr;
2574         struct sock_xprt *transport;
2575         struct rpc_xprt *xprt;
2576         struct rpc_xprt *ret;
2577
2578         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2579                         xprt_max_tcp_slot_table_entries);
2580         if (IS_ERR(xprt))
2581                 return xprt;
2582         transport = container_of(xprt, struct sock_xprt, xprt);
2583
2584         xprt->prot = 0;
2585         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2586         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2587
2588         xprt->bind_timeout = XS_BIND_TO;
2589         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2590         xprt->idle_timeout = XS_IDLE_DISC_TO;
2591
2592         xprt->ops = &xs_local_ops;
2593         xprt->timeout = &xs_local_default_timeout;
2594
2595         switch (sun->sun_family) {
2596         case AF_LOCAL:
2597                 if (sun->sun_path[0] != '/') {
2598                         dprintk("RPC:       bad AF_LOCAL address: %s\n",
2599                                         sun->sun_path);
2600                         ret = ERR_PTR(-EINVAL);
2601                         goto out_err;
2602                 }
2603                 xprt_set_bound(xprt);
2604                 INIT_DELAYED_WORK(&transport->connect_worker,
2605                                         xs_local_setup_socket);
2606                 xs_format_peer_addresses(xprt, "local", RPCBIND_NETID_LOCAL);
2607                 break;
2608         default:
2609                 ret = ERR_PTR(-EAFNOSUPPORT);
2610                 goto out_err;
2611         }
2612
2613         dprintk("RPC:       set up xprt to %s via AF_LOCAL\n",
2614                         xprt->address_strings[RPC_DISPLAY_ADDR]);
2615
2616         if (try_module_get(THIS_MODULE))
2617                 return xprt;
2618         ret = ERR_PTR(-EINVAL);
2619 out_err:
2620         xprt_free(xprt);
2621         return ret;
2622 }
2623
2624 static const struct rpc_timeout xs_udp_default_timeout = {
2625         .to_initval = 5 * HZ,
2626         .to_maxval = 30 * HZ,
2627         .to_increment = 5 * HZ,
2628         .to_retries = 5,
2629 };
2630
2631 /**
2632  * xs_setup_udp - Set up transport to use a UDP socket
2633  * @args: rpc transport creation arguments
2634  *
2635  */
2636 static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
2637 {
2638         struct sockaddr *addr = args->dstaddr;
2639         struct rpc_xprt *xprt;
2640         struct sock_xprt *transport;
2641         struct rpc_xprt *ret;
2642
2643         xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries,
2644                         xprt_udp_slot_table_entries);
2645         if (IS_ERR(xprt))
2646                 return xprt;
2647         transport = container_of(xprt, struct sock_xprt, xprt);
2648
2649         xprt->prot = IPPROTO_UDP;
2650         xprt->tsh_size = 0;
2651         /* XXX: header size can vary due to auth type, IPv6, etc. */
2652         xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
2653
2654         xprt->bind_timeout = XS_BIND_TO;
2655         xprt->reestablish_timeout = XS_UDP_REEST_TO;
2656         xprt->idle_timeout = XS_IDLE_DISC_TO;
2657
2658         xprt->ops = &xs_udp_ops;
2659
2660         xprt->timeout = &xs_udp_default_timeout;
2661
2662         switch (addr->sa_family) {
2663         case AF_INET:
2664                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2665                         xprt_set_bound(xprt);
2666
2667                 INIT_DELAYED_WORK(&transport->connect_worker,
2668                                         xs_udp_setup_socket);
2669                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
2670                 break;
2671         case AF_INET6:
2672                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2673                         xprt_set_bound(xprt);
2674
2675                 INIT_DELAYED_WORK(&transport->connect_worker,
2676                                         xs_udp_setup_socket);
2677                 xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
2678                 break;
2679         default:
2680                 ret = ERR_PTR(-EAFNOSUPPORT);
2681                 goto out_err;
2682         }
2683
2684         if (xprt_bound(xprt))
2685                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2686                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2687                                 xprt->address_strings[RPC_DISPLAY_PORT],
2688                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2689         else
2690                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2691                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2692                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2693
2694         if (try_module_get(THIS_MODULE))
2695                 return xprt;
2696         ret = ERR_PTR(-EINVAL);
2697 out_err:
2698         xprt_free(xprt);
2699         return ret;
2700 }
2701
2702 static const struct rpc_timeout xs_tcp_default_timeout = {
2703         .to_initval = 60 * HZ,
2704         .to_maxval = 60 * HZ,
2705         .to_retries = 2,
2706 };
2707
2708 /**
2709  * xs_setup_tcp - Set up transport to use a TCP socket
2710  * @args: rpc transport creation arguments
2711  *
2712  */
2713 static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
2714 {
2715         struct sockaddr *addr = args->dstaddr;
2716         struct rpc_xprt *xprt;
2717         struct sock_xprt *transport;
2718         struct rpc_xprt *ret;
2719
2720         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2721                         xprt_max_tcp_slot_table_entries);
2722         if (IS_ERR(xprt))
2723                 return xprt;
2724         transport = container_of(xprt, struct sock_xprt, xprt);
2725
2726         xprt->prot = IPPROTO_TCP;
2727         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2728         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2729
2730         xprt->bind_timeout = XS_BIND_TO;
2731         xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
2732         xprt->idle_timeout = XS_IDLE_DISC_TO;
2733
2734         xprt->ops = &xs_tcp_ops;
2735         xprt->timeout = &xs_tcp_default_timeout;
2736
2737         switch (addr->sa_family) {
2738         case AF_INET:
2739                 if (((struct sockaddr_in *)addr)->sin_port != htons(0))
2740                         xprt_set_bound(xprt);
2741
2742                 INIT_DELAYED_WORK(&transport->connect_worker,
2743                                         xs_tcp_setup_socket);
2744                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
2745                 break;
2746         case AF_INET6:
2747                 if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
2748                         xprt_set_bound(xprt);
2749
2750                 INIT_DELAYED_WORK(&transport->connect_worker,
2751                                         xs_tcp_setup_socket);
2752                 xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
2753                 break;
2754         default:
2755                 ret = ERR_PTR(-EAFNOSUPPORT);
2756                 goto out_err;
2757         }
2758
2759         if (xprt_bound(xprt))
2760                 dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2761                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2762                                 xprt->address_strings[RPC_DISPLAY_PORT],
2763                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2764         else
2765                 dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
2766                                 xprt->address_strings[RPC_DISPLAY_ADDR],
2767                                 xprt->address_strings[RPC_DISPLAY_PROTO]);
2768
2769
2770         if (try_module_get(THIS_MODULE))
2771                 return xprt;
2772         ret = ERR_PTR(-EINVAL);
2773 out_err:
2774         xprt_free(xprt);
2775         return ret;
2776 }
2777
2778 /**
2779  * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
2780  * @args: rpc transport creation arguments
2781  *
2782  */
2783 static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
2784 {
2785         struct sockaddr *addr = args->dstaddr;
2786         struct rpc_xprt *xprt;
2787         struct sock_xprt *transport;
2788         struct svc_sock *bc_sock;
2789         struct rpc_xprt *ret;
2790
2791         if (args->bc_xprt->xpt_bc_xprt) {
2792                 /*
2793                  * This server connection already has a backchannel
2794                  * export; we can't create a new one, as we wouldn't be
2795                  * able to match replies based on xid any more.  So,
2796                  * reuse the already-existing one:
2797                  */
2798                  return args->bc_xprt->xpt_bc_xprt;
2799         }
2800         xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries,
2801                         xprt_tcp_slot_table_entries);
2802         if (IS_ERR(xprt))
2803                 return xprt;
2804         transport = container_of(xprt, struct sock_xprt, xprt);
2805
2806         xprt->prot = IPPROTO_TCP;
2807         xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
2808         xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
2809         xprt->timeout = &xs_tcp_default_timeout;
2810
2811         /* backchannel */
2812         xprt_set_bound(xprt);
2813         xprt->bind_timeout = 0;
2814         xprt->reestablish_timeout = 0;
2815         xprt->idle_timeout = 0;
2816
2817         xprt->ops = &bc_tcp_ops;
2818
2819         switch (addr->sa_family) {
2820         case AF_INET:
2821                 xs_format_peer_addresses(xprt, "tcp",
2822                                          RPCBIND_NETID_TCP);
2823                 break;
2824         case AF_INET6:
2825                 xs_format_peer_addresses(xprt, "tcp",
2826                                    RPCBIND_NETID_TCP6);
2827                 break;
2828         default:
2829                 ret = ERR_PTR(-EAFNOSUPPORT);
2830                 goto out_err;
2831         }
2832
2833         dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
2834                         xprt->address_strings[RPC_DISPLAY_ADDR],
2835                         xprt->address_strings[RPC_DISPLAY_PORT],
2836                         xprt->address_strings[RPC_DISPLAY_PROTO]);
2837
2838         /*
2839          * Once we've associated a backchannel xprt with a connection,
2840          * we want to keep it around as long as long as the connection
2841          * lasts, in case we need to start using it for a backchannel
2842          * again; this reference won't be dropped until bc_xprt is
2843          * destroyed.
2844          */
2845         xprt_get(xprt);
2846         args->bc_xprt->xpt_bc_xprt = xprt;
2847         xprt->bc_xprt = args->bc_xprt;
2848         bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
2849         transport->sock = bc_sock->sk_sock;
2850         transport->inet = bc_sock->sk_sk;
2851
2852         /*
2853          * Since we don't want connections for the backchannel, we set
2854          * the xprt status to connected
2855          */
2856         xprt_set_connected(xprt);
2857
2858
2859         if (try_module_get(THIS_MODULE))
2860                 return xprt;
2861         xprt_put(xprt);
2862         ret = ERR_PTR(-EINVAL);
2863 out_err:
2864         xprt_free(xprt);
2865         return ret;
2866 }
2867
2868 static struct xprt_class        xs_local_transport = {
2869         .list           = LIST_HEAD_INIT(xs_local_transport.list),
2870         .name           = "named UNIX socket",
2871         .owner          = THIS_MODULE,
2872         .ident          = XPRT_TRANSPORT_LOCAL,
2873         .setup          = xs_setup_local,
2874 };
2875
2876 static struct xprt_class        xs_udp_transport = {
2877         .list           = LIST_HEAD_INIT(xs_udp_transport.list),
2878         .name           = "udp",
2879         .owner          = THIS_MODULE,
2880         .ident          = XPRT_TRANSPORT_UDP,
2881         .setup          = xs_setup_udp,
2882 };
2883
2884 static struct xprt_class        xs_tcp_transport = {
2885         .list           = LIST_HEAD_INIT(xs_tcp_transport.list),
2886         .name           = "tcp",
2887         .owner          = THIS_MODULE,
2888         .ident          = XPRT_TRANSPORT_TCP,
2889         .setup          = xs_setup_tcp,
2890 };
2891
2892 static struct xprt_class        xs_bc_tcp_transport = {
2893         .list           = LIST_HEAD_INIT(xs_bc_tcp_transport.list),
2894         .name           = "tcp NFSv4.1 backchannel",
2895         .owner          = THIS_MODULE,
2896         .ident          = XPRT_TRANSPORT_BC_TCP,
2897         .setup          = xs_setup_bc_tcp,
2898 };
2899
2900 /**
2901  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
2902  *
2903  */
2904 int init_socket_xprt(void)
2905 {
2906 #ifdef RPC_DEBUG
2907         if (!sunrpc_table_header)
2908                 sunrpc_table_header = register_sysctl_table(sunrpc_table);
2909 #endif
2910
2911         xprt_register_transport(&xs_local_transport);
2912         xprt_register_transport(&xs_udp_transport);
2913         xprt_register_transport(&xs_tcp_transport);
2914         xprt_register_transport(&xs_bc_tcp_transport);
2915
2916         return 0;
2917 }
2918
2919 /**
2920  * cleanup_socket_xprt - remove xprtsock's sysctls, unregister
2921  *
2922  */
2923 void cleanup_socket_xprt(void)
2924 {
2925 #ifdef RPC_DEBUG
2926         if (sunrpc_table_header) {
2927                 unregister_sysctl_table(sunrpc_table_header);
2928                 sunrpc_table_header = NULL;
2929         }
2930 #endif
2931
2932         xprt_unregister_transport(&xs_local_transport);
2933         xprt_unregister_transport(&xs_udp_transport);
2934         xprt_unregister_transport(&xs_tcp_transport);
2935         xprt_unregister_transport(&xs_bc_tcp_transport);
2936 }
2937
2938 static int param_set_uint_minmax(const char *val,
2939                 const struct kernel_param *kp,
2940                 unsigned int min, unsigned int max)
2941 {
2942         unsigned long num;
2943         int ret;
2944
2945         if (!val)
2946                 return -EINVAL;
2947         ret = strict_strtoul(val, 0, &num);
2948         if (ret == -EINVAL || num < min || num > max)
2949                 return -EINVAL;
2950         *((unsigned int *)kp->arg) = num;
2951         return 0;
2952 }
2953
2954 static int param_set_portnr(const char *val, const struct kernel_param *kp)
2955 {
2956         return param_set_uint_minmax(val, kp,
2957                         RPC_MIN_RESVPORT,
2958                         RPC_MAX_RESVPORT);
2959 }
2960
2961 static struct kernel_param_ops param_ops_portnr = {
2962         .set = param_set_portnr,
2963         .get = param_get_uint,
2964 };
2965
2966 #define param_check_portnr(name, p) \
2967         __param_check(name, p, unsigned int);
2968
2969 module_param_named(min_resvport, xprt_min_resvport, portnr, 0644);
2970 module_param_named(max_resvport, xprt_max_resvport, portnr, 0644);
2971
2972 static int param_set_slot_table_size(const char *val,
2973                                      const struct kernel_param *kp)
2974 {
2975         return param_set_uint_minmax(val, kp,
2976                         RPC_MIN_SLOT_TABLE,
2977                         RPC_MAX_SLOT_TABLE);
2978 }
2979
2980 static struct kernel_param_ops param_ops_slot_table_size = {
2981         .set = param_set_slot_table_size,
2982         .get = param_get_uint,
2983 };
2984
2985 #define param_check_slot_table_size(name, p) \
2986         __param_check(name, p, unsigned int);
2987
2988 static int param_set_max_slot_table_size(const char *val,
2989                                      const struct kernel_param *kp)
2990 {
2991         return param_set_uint_minmax(val, kp,
2992                         RPC_MIN_SLOT_TABLE,
2993                         RPC_MAX_SLOT_TABLE_LIMIT);
2994 }
2995
2996 static struct kernel_param_ops param_ops_max_slot_table_size = {
2997         .set = param_set_max_slot_table_size,
2998         .get = param_get_uint,
2999 };
3000
3001 #define param_check_max_slot_table_size(name, p) \
3002         __param_check(name, p, unsigned int);
3003
3004 module_param_named(tcp_slot_table_entries, xprt_tcp_slot_table_entries,
3005                    slot_table_size, 0644);
3006 module_param_named(tcp_max_slot_table_entries, xprt_max_tcp_slot_table_entries,
3007                    max_slot_table_size, 0644);
3008 module_param_named(udp_slot_table_entries, xprt_udp_slot_table_entries,
3009                    slot_table_size, 0644);
3010