Merge branch 'upstream-fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/linvil...
[pandora-kernel.git] / fs / dlm / lowcomms.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never (well, hardly ever) waits.
42  *
43  */
44
45 #include <asm/ioctls.h>
46 #include <net/sock.h>
47 #include <net/tcp.h>
48 #include <net/sctp/user.h>
49 #include <linux/pagemap.h>
50 #include <linux/socket.h>
51 #include <linux/idr.h>
52
53 #include "dlm_internal.h"
54 #include "lowcomms.h"
55 #include "config.h"
56 #include "midcomms.h"
57
58 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
59 static int                      dlm_local_count;
60 static int                      dlm_local_nodeid;
61
62 /* One of these per connected node */
63
64 #define NI_INIT_PENDING 1
65 #define NI_WRITE_PENDING 2
66
67 struct nodeinfo {
68         spinlock_t              lock;
69         sctp_assoc_t            assoc_id;
70         unsigned long           flags;
71         struct list_head        write_list; /* nodes with pending writes */
72         struct list_head        writequeue; /* outgoing writequeue_entries */
73         spinlock_t              writequeue_lock;
74         int                     nodeid;
75 };
76
77 static DEFINE_IDR(nodeinfo_idr);
78 static struct rw_semaphore      nodeinfo_lock;
79 static int                      max_nodeid;
80
81 struct cbuf {
82         unsigned                base;
83         unsigned                len;
84         unsigned                mask;
85 };
86
87 /* Just the one of these, now. But this struct keeps
88    the connection-specific variables together */
89
90 #define CF_READ_PENDING 1
91
92 struct connection {
93         struct socket          *sock;
94         unsigned long           flags;
95         struct page            *rx_page;
96         atomic_t                waiting_requests;
97         struct cbuf             cb;
98         int                     eagain_flag;
99 };
100
101 /* An entry waiting to be sent */
102
103 struct writequeue_entry {
104         struct list_head        list;
105         struct page            *page;
106         int                     offset;
107         int                     len;
108         int                     end;
109         int                     users;
110         struct nodeinfo        *ni;
111 };
112
113 #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
114 #define CBUF_EMPTY(cb) ((cb)->len == 0)
115 #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
116 #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
117
118 #define CBUF_INIT(cb, size) \
119 do { \
120         (cb)->base = (cb)->len = 0; \
121         (cb)->mask = ((size)-1); \
122 } while(0)
123
124 #define CBUF_EAT(cb, n) \
125 do { \
126         (cb)->len  -= (n); \
127         (cb)->base += (n); \
128         (cb)->base &= (cb)->mask; \
129 } while(0)
130
131
132 /* List of nodes which have writes pending */
133 static struct list_head write_nodes;
134 static spinlock_t write_nodes_lock;
135
136 /* Maximum number of incoming messages to process before
137  * doing a schedule()
138  */
139 #define MAX_RX_MSG_COUNT 25
140
141 /* Manage daemons */
142 static struct task_struct *recv_task;
143 static struct task_struct *send_task;
144 static wait_queue_head_t lowcomms_recv_wait;
145 static atomic_t accepting;
146
147 /* The SCTP connection */
148 static struct connection sctp_con;
149
150
151 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
152 {
153         struct sockaddr_storage addr;
154         int error;
155
156         if (!dlm_local_count)
157                 return -1;
158
159         error = dlm_nodeid_to_addr(nodeid, &addr);
160         if (error)
161                 return error;
162
163         if (dlm_local_addr[0]->ss_family == AF_INET) {
164                 struct sockaddr_in *in4  = (struct sockaddr_in *) &addr;
165                 struct sockaddr_in *ret4 = (struct sockaddr_in *) retaddr;
166                 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
167         } else {
168                 struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &addr;
169                 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) retaddr;
170                 memcpy(&ret6->sin6_addr, &in6->sin6_addr,
171                        sizeof(in6->sin6_addr));
172         }
173
174         return 0;
175 }
176
177 static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
178 {
179         struct nodeinfo *ni;
180         int r;
181         int n;
182
183         down_read(&nodeinfo_lock);
184         ni = idr_find(&nodeinfo_idr, nodeid);
185         up_read(&nodeinfo_lock);
186
187         if (!ni && alloc) {
188                 down_write(&nodeinfo_lock);
189
190                 ni = idr_find(&nodeinfo_idr, nodeid);
191                 if (ni)
192                         goto out_up;
193
194                 r = idr_pre_get(&nodeinfo_idr, alloc);
195                 if (!r)
196                         goto out_up;
197
198                 ni = kmalloc(sizeof(struct nodeinfo), alloc);
199                 if (!ni)
200                         goto out_up;
201
202                 r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n);
203                 if (r) {
204                         kfree(ni);
205                         ni = NULL;
206                         goto out_up;
207                 }
208                 if (n != nodeid) {
209                         idr_remove(&nodeinfo_idr, n);
210                         kfree(ni);
211                         ni = NULL;
212                         goto out_up;
213                 }
214                 memset(ni, 0, sizeof(struct nodeinfo));
215                 spin_lock_init(&ni->lock);
216                 INIT_LIST_HEAD(&ni->writequeue);
217                 spin_lock_init(&ni->writequeue_lock);
218                 ni->nodeid = nodeid;
219
220                 if (nodeid > max_nodeid)
221                         max_nodeid = nodeid;
222         out_up:
223                 up_write(&nodeinfo_lock);
224         }
225
226         return ni;
227 }
228
229 /* Don't call this too often... */
230 static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
231 {
232         int i;
233         struct nodeinfo *ni;
234
235         for (i=1; i<=max_nodeid; i++) {
236                 ni = nodeid2nodeinfo(i, 0);
237                 if (ni && ni->assoc_id == assoc)
238                         return ni;
239         }
240         return NULL;
241 }
242
243 /* Data or notification available on socket */
244 static void lowcomms_data_ready(struct sock *sk, int count_unused)
245 {
246         atomic_inc(&sctp_con.waiting_requests);
247         if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
248                 return;
249
250         wake_up_interruptible(&lowcomms_recv_wait);
251 }
252
253
254 /* Add the port number to an IP6 or 4 sockaddr and return the address length.
255    Also padd out the struct with zeros to make comparisons meaningful */
256
257 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
258                           int *addr_len)
259 {
260         struct sockaddr_in *local4_addr;
261         struct sockaddr_in6 *local6_addr;
262
263         if (!dlm_local_count)
264                 return;
265
266         if (!port) {
267                 if (dlm_local_addr[0]->ss_family == AF_INET) {
268                         local4_addr = (struct sockaddr_in *)dlm_local_addr[0];
269                         port = be16_to_cpu(local4_addr->sin_port);
270                 } else {
271                         local6_addr = (struct sockaddr_in6 *)dlm_local_addr[0];
272                         port = be16_to_cpu(local6_addr->sin6_port);
273                 }
274         }
275
276         saddr->ss_family = dlm_local_addr[0]->ss_family;
277         if (dlm_local_addr[0]->ss_family == AF_INET) {
278                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
279                 in4_addr->sin_port = cpu_to_be16(port);
280                 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
281                 memset(in4_addr+1, 0, sizeof(struct sockaddr_storage) -
282                                       sizeof(struct sockaddr_in));
283                 *addr_len = sizeof(struct sockaddr_in);
284         } else {
285                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
286                 in6_addr->sin6_port = cpu_to_be16(port);
287                 memset(in6_addr+1, 0, sizeof(struct sockaddr_storage) -
288                                       sizeof(struct sockaddr_in6));
289                 *addr_len = sizeof(struct sockaddr_in6);
290         }
291 }
292
293 /* Close the connection and tidy up */
294 static void close_connection(void)
295 {
296         if (sctp_con.sock) {
297                 sock_release(sctp_con.sock);
298                 sctp_con.sock = NULL;
299         }
300
301         if (sctp_con.rx_page) {
302                 __free_page(sctp_con.rx_page);
303                 sctp_con.rx_page = NULL;
304         }
305 }
306
307 /* We only send shutdown messages to nodes that are not part of the cluster */
308 static void send_shutdown(sctp_assoc_t associd)
309 {
310         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
311         struct msghdr outmessage;
312         struct cmsghdr *cmsg;
313         struct sctp_sndrcvinfo *sinfo;
314         int ret;
315
316         outmessage.msg_name = NULL;
317         outmessage.msg_namelen = 0;
318         outmessage.msg_control = outcmsg;
319         outmessage.msg_controllen = sizeof(outcmsg);
320         outmessage.msg_flags = MSG_EOR;
321
322         cmsg = CMSG_FIRSTHDR(&outmessage);
323         cmsg->cmsg_level = IPPROTO_SCTP;
324         cmsg->cmsg_type = SCTP_SNDRCV;
325         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
326         outmessage.msg_controllen = cmsg->cmsg_len;
327         sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
328         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
329
330         sinfo->sinfo_flags |= MSG_EOF;
331         sinfo->sinfo_assoc_id = associd;
332
333         ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0);
334
335         if (ret != 0)
336                 log_print("send EOF to node failed: %d", ret);
337 }
338
339
340 /* INIT failed but we don't know which node...
341    restart INIT on all pending nodes */
342 static void init_failed(void)
343 {
344         int i;
345         struct nodeinfo *ni;
346
347         for (i=1; i<=max_nodeid; i++) {
348                 ni = nodeid2nodeinfo(i, 0);
349                 if (!ni)
350                         continue;
351
352                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
353                         ni->assoc_id = 0;
354                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
355                                 spin_lock_bh(&write_nodes_lock);
356                                 list_add_tail(&ni->write_list, &write_nodes);
357                                 spin_unlock_bh(&write_nodes_lock);
358                         }
359                 }
360         }
361         wake_up_process(send_task);
362 }
363
364 /* Something happened to an association */
365 static void process_sctp_notification(struct msghdr *msg, char *buf)
366 {
367         union sctp_notification *sn = (union sctp_notification *)buf;
368
369         if (sn->sn_header.sn_type == SCTP_ASSOC_CHANGE) {
370                 switch (sn->sn_assoc_change.sac_state) {
371
372                 case SCTP_COMM_UP:
373                 case SCTP_RESTART:
374                 {
375                         /* Check that the new node is in the lockspace */
376                         struct sctp_prim prim;
377                         mm_segment_t fs;
378                         int nodeid;
379                         int prim_len, ret;
380                         int addr_len;
381                         struct nodeinfo *ni;
382
383                         /* This seems to happen when we received a connection
384                          * too early... or something...  anyway, it happens but
385                          * we always seem to get a real message too, see
386                          * receive_from_sock */
387
388                         if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
389                                 log_print("COMM_UP for invalid assoc ID %d",
390                                          (int)sn->sn_assoc_change.sac_assoc_id);
391                                 init_failed();
392                                 return;
393                         }
394                         memset(&prim, 0, sizeof(struct sctp_prim));
395                         prim_len = sizeof(struct sctp_prim);
396                         prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
397
398                         fs = get_fs();
399                         set_fs(get_ds());
400                         ret = sctp_con.sock->ops->getsockopt(sctp_con.sock,
401                                                 IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
402                                                 (char*)&prim, &prim_len);
403                         set_fs(fs);
404                         if (ret < 0) {
405                                 struct nodeinfo *ni;
406
407                                 log_print("getsockopt/sctp_primary_addr on "
408                                           "new assoc %d failed : %d",
409                                     (int)sn->sn_assoc_change.sac_assoc_id, ret);
410
411                                 /* Retry INIT later */
412                                 ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
413                                 if (ni)
414                                         clear_bit(NI_INIT_PENDING, &ni->flags);
415                                 return;
416                         }
417                         make_sockaddr(&prim.ssp_addr, 0, &addr_len);
418                         if (dlm_addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
419                                 log_print("reject connect from unknown addr");
420                                 send_shutdown(prim.ssp_assoc_id);
421                                 return;
422                         }
423
424                         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
425                         if (!ni)
426                                 return;
427
428                         /* Save the assoc ID */
429                         spin_lock(&ni->lock);
430                         ni->assoc_id = sn->sn_assoc_change.sac_assoc_id;
431                         spin_unlock(&ni->lock);
432
433                         log_print("got new/restarted association %d nodeid %d",
434                                (int)sn->sn_assoc_change.sac_assoc_id, nodeid);
435
436                         /* Send any pending writes */
437                         clear_bit(NI_INIT_PENDING, &ni->flags);
438                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
439                                 spin_lock_bh(&write_nodes_lock);
440                                 list_add_tail(&ni->write_list, &write_nodes);
441                                 spin_unlock_bh(&write_nodes_lock);
442                         }
443                         wake_up_process(send_task);
444                 }
445                 break;
446
447                 case SCTP_COMM_LOST:
448                 case SCTP_SHUTDOWN_COMP:
449                 {
450                         struct nodeinfo *ni;
451
452                         ni = assoc2nodeinfo(sn->sn_assoc_change.sac_assoc_id);
453                         if (ni) {
454                                 spin_lock(&ni->lock);
455                                 ni->assoc_id = 0;
456                                 spin_unlock(&ni->lock);
457                         }
458                 }
459                 break;
460
461                 /* We don't know which INIT failed, so clear the PENDING flags
462                  * on them all.  if assoc_id is zero then it will then try
463                  * again */
464
465                 case SCTP_CANT_STR_ASSOC:
466                 {
467                         log_print("Can't start SCTP association - retrying");
468                         init_failed();
469                 }
470                 break;
471
472                 default:
473                         log_print("unexpected SCTP assoc change id=%d state=%d",
474                                   (int)sn->sn_assoc_change.sac_assoc_id,
475                                   sn->sn_assoc_change.sac_state);
476                 }
477         }
478 }
479
480 /* Data received from remote end */
481 static int receive_from_sock(void)
482 {
483         int ret = 0;
484         struct msghdr msg;
485         struct kvec iov[2];
486         unsigned len;
487         int r;
488         struct sctp_sndrcvinfo *sinfo;
489         struct cmsghdr *cmsg;
490         struct nodeinfo *ni;
491
492         /* These two are marginally too big for stack allocation, but this
493          * function is (currently) only called by dlm_recvd so static should be
494          * OK.
495          */
496         static struct sockaddr_storage msgname;
497         static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
498
499         if (sctp_con.sock == NULL)
500                 goto out;
501
502         if (sctp_con.rx_page == NULL) {
503                 /*
504                  * This doesn't need to be atomic, but I think it should
505                  * improve performance if it is.
506                  */
507                 sctp_con.rx_page = alloc_page(GFP_ATOMIC);
508                 if (sctp_con.rx_page == NULL)
509                         goto out_resched;
510                 CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE);
511         }
512
513         memset(&incmsg, 0, sizeof(incmsg));
514         memset(&msgname, 0, sizeof(msgname));
515
516         memset(incmsg, 0, sizeof(incmsg));
517         msg.msg_name = &msgname;
518         msg.msg_namelen = sizeof(msgname);
519         msg.msg_flags = 0;
520         msg.msg_control = incmsg;
521         msg.msg_controllen = sizeof(incmsg);
522         msg.msg_iovlen = 1;
523
524         /* I don't see why this circular buffer stuff is necessary for SCTP
525          * which is a packet-based protocol, but the whole thing breaks under
526          * load without it! The overhead is minimal (and is in the TCP lowcomms
527          * anyway, of course) so I'll leave it in until I can figure out what's
528          * really happening.
529          */
530
531         /*
532          * iov[0] is the bit of the circular buffer between the current end
533          * point (cb.base + cb.len) and the end of the buffer.
534          */
535         iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb);
536         iov[0].iov_base = page_address(sctp_con.rx_page) +
537                           CBUF_DATA(&sctp_con.cb);
538         iov[1].iov_len = 0;
539
540         /*
541          * iov[1] is the bit of the circular buffer between the start of the
542          * buffer and the start of the currently used section (cb.base)
543          */
544         if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) {
545                 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb);
546                 iov[1].iov_len = sctp_con.cb.base;
547                 iov[1].iov_base = page_address(sctp_con.rx_page);
548                 msg.msg_iovlen = 2;
549         }
550         len = iov[0].iov_len + iov[1].iov_len;
551
552         r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len,
553                                  MSG_NOSIGNAL | MSG_DONTWAIT);
554         if (ret <= 0)
555                 goto out_close;
556
557         msg.msg_control = incmsg;
558         msg.msg_controllen = sizeof(incmsg);
559         cmsg = CMSG_FIRSTHDR(&msg);
560         sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
561
562         if (msg.msg_flags & MSG_NOTIFICATION) {
563                 process_sctp_notification(&msg, page_address(sctp_con.rx_page));
564                 return 0;
565         }
566
567         /* Is this a new association ? */
568         ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL);
569         if (ni) {
570                 ni->assoc_id = sinfo->sinfo_assoc_id;
571                 if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) {
572
573                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
574                                 spin_lock_bh(&write_nodes_lock);
575                                 list_add_tail(&ni->write_list, &write_nodes);
576                                 spin_unlock_bh(&write_nodes_lock);
577                         }
578                         wake_up_process(send_task);
579                 }
580         }
581
582         /* INIT sends a message with length of 1 - ignore it */
583         if (r == 1)
584                 return 0;
585
586         CBUF_ADD(&sctp_con.cb, ret);
587         ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
588                                           page_address(sctp_con.rx_page),
589                                           sctp_con.cb.base, sctp_con.cb.len,
590                                           PAGE_CACHE_SIZE);
591         if (ret < 0)
592                 goto out_close;
593         CBUF_EAT(&sctp_con.cb, ret);
594
595       out:
596         ret = 0;
597         goto out_ret;
598
599       out_resched:
600         lowcomms_data_ready(sctp_con.sock->sk, 0);
601         ret = 0;
602         schedule();
603         goto out_ret;
604
605       out_close:
606         if (ret != -EAGAIN)
607                 log_print("error reading from sctp socket: %d", ret);
608       out_ret:
609         return ret;
610 }
611
612 /* Bind to an IP address. SCTP allows multiple address so it can do multi-homing */
613 static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
614 {
615         mm_segment_t fs;
616         int result = 0;
617
618         fs = get_fs();
619         set_fs(get_ds());
620         if (num == 1)
621                 result = sctp_con.sock->ops->bind(sctp_con.sock,
622                                         (struct sockaddr *) addr, addr_len);
623         else
624                 result = sctp_con.sock->ops->setsockopt(sctp_con.sock, SOL_SCTP,
625                                 SCTP_SOCKOPT_BINDX_ADD, (char *)addr, addr_len);
626         set_fs(fs);
627
628         if (result < 0)
629                 log_print("Can't bind to port %d addr number %d",
630                           dlm_config.tcp_port, num);
631
632         return result;
633 }
634
635 static void init_local(void)
636 {
637         struct sockaddr_storage sas, *addr;
638         int i;
639
640         dlm_local_nodeid = dlm_our_nodeid();
641
642         for (i = 0; i < DLM_MAX_ADDR_COUNT - 1; i++) {
643                 if (dlm_our_addr(&sas, i))
644                         break;
645
646                 addr = kmalloc(sizeof(*addr), GFP_KERNEL);
647                 if (!addr)
648                         break;
649                 memcpy(addr, &sas, sizeof(*addr));
650                 dlm_local_addr[dlm_local_count++] = addr;
651         }
652 }
653
654 /* Initialise SCTP socket and bind to all interfaces */
655 static int init_sock(void)
656 {
657         mm_segment_t fs;
658         struct socket *sock = NULL;
659         struct sockaddr_storage localaddr;
660         struct sctp_event_subscribe subscribe;
661         int result = -EINVAL, num = 1, i, addr_len;
662
663         if (!dlm_local_count) {
664                 init_local();
665                 if (!dlm_local_count) {
666                         log_print("no local IP address has been set");
667                         goto out;
668                 }
669         }
670
671         result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
672                                   IPPROTO_SCTP, &sock);
673         if (result < 0) {
674                 log_print("Can't create comms socket, check SCTP is loaded");
675                 goto out;
676         }
677
678         /* Listen for events */
679         memset(&subscribe, 0, sizeof(subscribe));
680         subscribe.sctp_data_io_event = 1;
681         subscribe.sctp_association_event = 1;
682         subscribe.sctp_send_failure_event = 1;
683         subscribe.sctp_shutdown_event = 1;
684         subscribe.sctp_partial_delivery_event = 1;
685
686         fs = get_fs();
687         set_fs(get_ds());
688         result = sock->ops->setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
689                                        (char *)&subscribe, sizeof(subscribe));
690         set_fs(fs);
691
692         if (result < 0) {
693                 log_print("Failed to set SCTP_EVENTS on socket: result=%d",
694                           result);
695                 goto create_delsock;
696         }
697
698         /* Init con struct */
699         sock->sk->sk_user_data = &sctp_con;
700         sctp_con.sock = sock;
701         sctp_con.sock->sk->sk_data_ready = lowcomms_data_ready;
702
703         /* Bind to all interfaces. */
704         for (i = 0; i < dlm_local_count; i++) {
705                 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
706                 make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len);
707
708                 result = add_bind_addr(&localaddr, addr_len, num);
709                 if (result)
710                         goto create_delsock;
711                 ++num;
712         }
713
714         result = sock->ops->listen(sock, 5);
715         if (result < 0) {
716                 log_print("Can't set socket listening");
717                 goto create_delsock;
718         }
719
720         return 0;
721
722  create_delsock:
723         sock_release(sock);
724         sctp_con.sock = NULL;
725  out:
726         return result;
727 }
728
729
730 static struct writequeue_entry *new_writequeue_entry(gfp_t allocation)
731 {
732         struct writequeue_entry *entry;
733
734         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
735         if (!entry)
736                 return NULL;
737
738         entry->page = alloc_page(allocation);
739         if (!entry->page) {
740                 kfree(entry);
741                 return NULL;
742         }
743
744         entry->offset = 0;
745         entry->len = 0;
746         entry->end = 0;
747         entry->users = 0;
748
749         return entry;
750 }
751
752 void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
753 {
754         struct writequeue_entry *e;
755         int offset = 0;
756         int users = 0;
757         struct nodeinfo *ni;
758
759         if (!atomic_read(&accepting))
760                 return NULL;
761
762         ni = nodeid2nodeinfo(nodeid, allocation);
763         if (!ni)
764                 return NULL;
765
766         spin_lock(&ni->writequeue_lock);
767         e = list_entry(ni->writequeue.prev, struct writequeue_entry, list);
768         if (((struct list_head *) e == &ni->writequeue) ||
769             (PAGE_CACHE_SIZE - e->end < len)) {
770                 e = NULL;
771         } else {
772                 offset = e->end;
773                 e->end += len;
774                 users = e->users++;
775         }
776         spin_unlock(&ni->writequeue_lock);
777
778         if (e) {
779               got_one:
780                 if (users == 0)
781                         kmap(e->page);
782                 *ppc = page_address(e->page) + offset;
783                 return e;
784         }
785
786         e = new_writequeue_entry(allocation);
787         if (e) {
788                 spin_lock(&ni->writequeue_lock);
789                 offset = e->end;
790                 e->end += len;
791                 e->ni = ni;
792                 users = e->users++;
793                 list_add_tail(&e->list, &ni->writequeue);
794                 spin_unlock(&ni->writequeue_lock);
795                 goto got_one;
796         }
797         return NULL;
798 }
799
800 void dlm_lowcomms_commit_buffer(void *arg)
801 {
802         struct writequeue_entry *e = (struct writequeue_entry *) arg;
803         int users;
804         struct nodeinfo *ni = e->ni;
805
806         if (!atomic_read(&accepting))
807                 return;
808
809         spin_lock(&ni->writequeue_lock);
810         users = --e->users;
811         if (users)
812                 goto out;
813         e->len = e->end - e->offset;
814         kunmap(e->page);
815         spin_unlock(&ni->writequeue_lock);
816
817         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
818                 spin_lock_bh(&write_nodes_lock);
819                 list_add_tail(&ni->write_list, &write_nodes);
820                 spin_unlock_bh(&write_nodes_lock);
821                 wake_up_process(send_task);
822         }
823         return;
824
825       out:
826         spin_unlock(&ni->writequeue_lock);
827         return;
828 }
829
830 static void free_entry(struct writequeue_entry *e)
831 {
832         __free_page(e->page);
833         kfree(e);
834 }
835
836 /* Initiate an SCTP association. In theory we could just use sendmsg() on
837    the first IP address and it should work, but this allows us to set up the
838    association before sending any valuable data that we can't afford to lose.
839    It also keeps the send path clean as it can now always use the association ID */
840 static void initiate_association(int nodeid)
841 {
842         struct sockaddr_storage rem_addr;
843         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
844         struct msghdr outmessage;
845         struct cmsghdr *cmsg;
846         struct sctp_sndrcvinfo *sinfo;
847         int ret;
848         int addrlen;
849         char buf[1];
850         struct kvec iov[1];
851         struct nodeinfo *ni;
852
853         log_print("Initiating association with node %d", nodeid);
854
855         ni = nodeid2nodeinfo(nodeid, GFP_KERNEL);
856         if (!ni)
857                 return;
858
859         if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) {
860                 log_print("no address for nodeid %d", nodeid);
861                 return;
862         }
863
864         make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
865
866         outmessage.msg_name = &rem_addr;
867         outmessage.msg_namelen = addrlen;
868         outmessage.msg_control = outcmsg;
869         outmessage.msg_controllen = sizeof(outcmsg);
870         outmessage.msg_flags = MSG_EOR;
871
872         iov[0].iov_base = buf;
873         iov[0].iov_len = 1;
874
875         /* Real INIT messages seem to cause trouble. Just send a 1 byte message
876            we can afford to lose */
877         cmsg = CMSG_FIRSTHDR(&outmessage);
878         cmsg->cmsg_level = IPPROTO_SCTP;
879         cmsg->cmsg_type = SCTP_SNDRCV;
880         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
881         sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
882         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
883         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
884
885         outmessage.msg_controllen = cmsg->cmsg_len;
886         ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1);
887         if (ret < 0) {
888                 log_print("send INIT to node failed: %d", ret);
889                 /* Try again later */
890                 clear_bit(NI_INIT_PENDING, &ni->flags);
891         }
892 }
893
894 /* Send a message */
895 static int send_to_sock(struct nodeinfo *ni)
896 {
897         int ret = 0;
898         struct writequeue_entry *e;
899         int len, offset;
900         struct msghdr outmsg;
901         static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
902         struct cmsghdr *cmsg;
903         struct sctp_sndrcvinfo *sinfo;
904         struct kvec iov;
905
906         /* See if we need to init an association before we start
907            sending precious messages */
908         spin_lock(&ni->lock);
909         if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
910                 spin_unlock(&ni->lock);
911                 initiate_association(ni->nodeid);
912                 return 0;
913         }
914         spin_unlock(&ni->lock);
915
916         outmsg.msg_name = NULL; /* We use assoc_id */
917         outmsg.msg_namelen = 0;
918         outmsg.msg_control = outcmsg;
919         outmsg.msg_controllen = sizeof(outcmsg);
920         outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR;
921
922         cmsg = CMSG_FIRSTHDR(&outmsg);
923         cmsg->cmsg_level = IPPROTO_SCTP;
924         cmsg->cmsg_type = SCTP_SNDRCV;
925         cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
926         sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg);
927         memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
928         sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid);
929         sinfo->sinfo_assoc_id = ni->assoc_id;
930         outmsg.msg_controllen = cmsg->cmsg_len;
931
932         spin_lock(&ni->writequeue_lock);
933         for (;;) {
934                 if (list_empty(&ni->writequeue))
935                         break;
936                 e = list_entry(ni->writequeue.next, struct writequeue_entry,
937                                list);
938                 len = e->len;
939                 offset = e->offset;
940                 BUG_ON(len == 0 && e->users == 0);
941                 spin_unlock(&ni->writequeue_lock);
942                 kmap(e->page);
943
944                 ret = 0;
945                 if (len) {
946                         iov.iov_base = page_address(e->page)+offset;
947                         iov.iov_len = len;
948
949                         ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1,
950                                              len);
951                         if (ret == -EAGAIN) {
952                                 sctp_con.eagain_flag = 1;
953                                 goto out;
954                         } else if (ret < 0)
955                                 goto send_error;
956                 } else {
957                         /* Don't starve people filling buffers */
958                         schedule();
959                 }
960
961                 spin_lock(&ni->writequeue_lock);
962                 e->offset += ret;
963                 e->len -= ret;
964
965                 if (e->len == 0 && e->users == 0) {
966                         list_del(&e->list);
967                         free_entry(e);
968                         continue;
969                 }
970         }
971         spin_unlock(&ni->writequeue_lock);
972  out:
973         return ret;
974
975  send_error:
976         log_print("Error sending to node %d %d", ni->nodeid, ret);
977         spin_lock(&ni->lock);
978         if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) {
979                 ni->assoc_id = 0;
980                 spin_unlock(&ni->lock);
981                 initiate_association(ni->nodeid);
982         } else
983                 spin_unlock(&ni->lock);
984
985         return ret;
986 }
987
988 /* Try to send any messages that are pending */
989 static void process_output_queue(void)
990 {
991         struct list_head *list;
992         struct list_head *temp;
993
994         spin_lock_bh(&write_nodes_lock);
995         list_for_each_safe(list, temp, &write_nodes) {
996                 struct nodeinfo *ni =
997                     list_entry(list, struct nodeinfo, write_list);
998                 clear_bit(NI_WRITE_PENDING, &ni->flags);
999                 list_del(&ni->write_list);
1000
1001                 spin_unlock_bh(&write_nodes_lock);
1002
1003                 send_to_sock(ni);
1004                 spin_lock_bh(&write_nodes_lock);
1005         }
1006         spin_unlock_bh(&write_nodes_lock);
1007 }
1008
1009 /* Called after we've had -EAGAIN and been woken up */
1010 static void refill_write_queue(void)
1011 {
1012         int i;
1013
1014         for (i=1; i<=max_nodeid; i++) {
1015                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1016
1017                 if (ni) {
1018                         if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) {
1019                                 spin_lock_bh(&write_nodes_lock);
1020                                 list_add_tail(&ni->write_list, &write_nodes);
1021                                 spin_unlock_bh(&write_nodes_lock);
1022                         }
1023                 }
1024         }
1025 }
1026
1027 static void clean_one_writequeue(struct nodeinfo *ni)
1028 {
1029         struct list_head *list;
1030         struct list_head *temp;
1031
1032         spin_lock(&ni->writequeue_lock);
1033         list_for_each_safe(list, temp, &ni->writequeue) {
1034                 struct writequeue_entry *e =
1035                         list_entry(list, struct writequeue_entry, list);
1036                 list_del(&e->list);
1037                 free_entry(e);
1038         }
1039         spin_unlock(&ni->writequeue_lock);
1040 }
1041
1042 static void clean_writequeues(void)
1043 {
1044         int i;
1045
1046         for (i=1; i<=max_nodeid; i++) {
1047                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1048                 if (ni)
1049                         clean_one_writequeue(ni);
1050         }
1051 }
1052
1053
1054 static void dealloc_nodeinfo(void)
1055 {
1056         int i;
1057
1058         for (i=1; i<=max_nodeid; i++) {
1059                 struct nodeinfo *ni = nodeid2nodeinfo(i, 0);
1060                 if (ni) {
1061                         idr_remove(&nodeinfo_idr, i);
1062                         kfree(ni);
1063                 }
1064         }
1065 }
1066
1067 int dlm_lowcomms_close(int nodeid)
1068 {
1069         struct nodeinfo *ni;
1070
1071         ni = nodeid2nodeinfo(nodeid, 0);
1072         if (!ni)
1073                 return -1;
1074
1075         spin_lock(&ni->lock);
1076         if (ni->assoc_id) {
1077                 ni->assoc_id = 0;
1078                 /* Don't send shutdown here, sctp will just queue it
1079                    till the node comes back up! */
1080         }
1081         spin_unlock(&ni->lock);
1082
1083         clean_one_writequeue(ni);
1084         clear_bit(NI_INIT_PENDING, &ni->flags);
1085         return 0;
1086 }
1087
1088 static int write_list_empty(void)
1089 {
1090         int status;
1091
1092         spin_lock_bh(&write_nodes_lock);
1093         status = list_empty(&write_nodes);
1094         spin_unlock_bh(&write_nodes_lock);
1095
1096         return status;
1097 }
1098
1099 static int dlm_recvd(void *data)
1100 {
1101         DECLARE_WAITQUEUE(wait, current);
1102
1103         while (!kthread_should_stop()) {
1104                 int count = 0;
1105
1106                 set_current_state(TASK_INTERRUPTIBLE);
1107                 add_wait_queue(&lowcomms_recv_wait, &wait);
1108                 if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
1109                         schedule();
1110                 remove_wait_queue(&lowcomms_recv_wait, &wait);
1111                 set_current_state(TASK_RUNNING);
1112
1113                 if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
1114                         int ret;
1115
1116                         do {
1117                                 ret = receive_from_sock();
1118
1119                                 /* Don't starve out everyone else */
1120                                 if (++count >= MAX_RX_MSG_COUNT) {
1121                                         schedule();
1122                                         count = 0;
1123                                 }
1124                         } while (!kthread_should_stop() && ret >=0);
1125                 }
1126                 schedule();
1127         }
1128
1129         return 0;
1130 }
1131
1132 static int dlm_sendd(void *data)
1133 {
1134         DECLARE_WAITQUEUE(wait, current);
1135
1136         add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1137
1138         while (!kthread_should_stop()) {
1139                 set_current_state(TASK_INTERRUPTIBLE);
1140                 if (write_list_empty())
1141                         schedule();
1142                 set_current_state(TASK_RUNNING);
1143
1144                 if (sctp_con.eagain_flag) {
1145                         sctp_con.eagain_flag = 0;
1146                         refill_write_queue();
1147                 }
1148                 process_output_queue();
1149         }
1150
1151         remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
1152
1153         return 0;
1154 }
1155
1156 static void daemons_stop(void)
1157 {
1158         kthread_stop(recv_task);
1159         kthread_stop(send_task);
1160 }
1161
1162 static int daemons_start(void)
1163 {
1164         struct task_struct *p;
1165         int error;
1166
1167         p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1168         error = IS_ERR(p);
1169         if (error) {
1170                 log_print("can't start dlm_recvd %d", error);
1171                 return error;
1172         }
1173         recv_task = p;
1174
1175         p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1176         error = IS_ERR(p);
1177         if (error) {
1178                 log_print("can't start dlm_sendd %d", error);
1179                 kthread_stop(recv_task);
1180                 return error;
1181         }
1182         send_task = p;
1183
1184         return 0;
1185 }
1186
1187 /*
1188  * This is quite likely to sleep...
1189  */
1190 int dlm_lowcomms_start(void)
1191 {
1192         int error;
1193
1194         error = init_sock();
1195         if (error)
1196                 goto fail_sock;
1197         error = daemons_start();
1198         if (error)
1199                 goto fail_sock;
1200         atomic_set(&accepting, 1);
1201         return 0;
1202
1203  fail_sock:
1204         close_connection();
1205         return error;
1206 }
1207
1208 /* Set all the activity flags to prevent any socket activity. */
1209
1210 void dlm_lowcomms_stop(void)
1211 {
1212         atomic_set(&accepting, 0);
1213         sctp_con.flags = 0x7;
1214         daemons_stop();
1215         clean_writequeues();
1216         close_connection();
1217         dealloc_nodeinfo();
1218         max_nodeid = 0;
1219 }
1220
1221 int dlm_lowcomms_init(void)
1222 {
1223         init_waitqueue_head(&lowcomms_recv_wait);
1224         spin_lock_init(&write_nodes_lock);
1225         INIT_LIST_HEAD(&write_nodes);
1226         init_rwsem(&nodeinfo_lock);
1227         return 0;
1228 }
1229
1230 void dlm_lowcomms_exit(void)
1231 {
1232         int i;
1233
1234         for (i = 0; i < dlm_local_count; i++)
1235                 kfree(dlm_local_addr[i]);
1236         dlm_local_count = 0;
1237         dlm_local_nodeid = 0;
1238 }
1239