[TIPC] Initial merge
[pandora-kernel.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  * 
4  * Copyright (c) 2003-2005, Ericsson Research Canada
5  * Copyright (c) 2004-2005, Wind River Systems
6  * Copyright (c) 2005-2006, Ericsson AB
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without 
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * Redistributions of source code must retain the above copyright notice, this 
13  * list of conditions and the following disclaimer.
14  * Redistributions in binary form must reproduce the above copyright notice, 
15  * this list of conditions and the following disclaimer in the documentation 
16  * and/or other materials provided with the distribution.
17  * Neither the names of the copyright holders nor the names of its 
18  * contributors may be used to endorse or promote products derived from this 
19  * software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "core.h"
35 #include "dbg.h"
36 #include "link.h"
37 #include "net.h"
38 #include "node.h"
39 #include "port.h"
40 #include "addr.h"
41 #include "node_subscr.h"
42 #include "name_distr.h"
43 #include "bearer.h"
44 #include "name_table.h"
45 #include "discover.h"
46 #include "config.h"
47 #include "bcast.h"
48
49
50 /* 
51  * Limit for deferred reception queue: 
52  */
53
54 #define DEF_QUEUE_LIMIT 256u
55
56 /* 
57  * Link state events: 
58  */
59
60 #define  STARTING_EVT    856384768      /* link processing trigger */
61 #define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
62 #define  TIMEOUT_EVT     560817u        /* link timer expired */
63
64 /*   
65  * The following two 'message types' is really just implementation 
66  * data conveniently stored in the message header. 
67  * They must not be considered part of the protocol
68  */
69 #define OPEN_MSG   0
70 #define CLOSED_MSG 1
71
72 /* 
73  * State value stored in 'exp_msg_count'
74  */
75
76 #define START_CHANGEOVER 100000u
77
78 /**
79  * struct link_name - deconstructed link name
80  * @addr_local: network address of node at this end
81  * @if_local: name of interface at this end
82  * @addr_peer: network address of node at far end
83  * @if_peer: name of interface at far end
84  */
85
86 struct link_name {
87         u32 addr_local;
88         char if_local[TIPC_MAX_IF_NAME];
89         u32 addr_peer;
90         char if_peer[TIPC_MAX_IF_NAME];
91 };
92
93 #if 0
94
95 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
96
97 /** 
98  * struct link_event - link up/down event notification
99  */
100
101 struct link_event {
102         u32 addr;
103         int up;
104         void (*fcn)(u32, char *, int);
105         char name[TIPC_MAX_LINK_NAME];
106 };
107
108 #endif
109
110 static void link_handle_out_of_seq_msg(struct link *l_ptr,
111                                        struct sk_buff *buf);
112 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
113 static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
114 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
115 static int  link_send_sections_long(struct port *sender,
116                                     struct iovec const *msg_sect,
117                                     u32 num_sect, u32 destnode);
118 static void link_check_defragm_bufs(struct link *l_ptr);
119 static void link_state_event(struct link *l_ptr, u32 event);
120 static void link_reset_statistics(struct link *l_ptr);
121 static void link_print(struct link *l_ptr, struct print_buf *buf, 
122                        const char *str);
123
124 /*
125  * Debugging code used by link routines only
126  *
127  * When debugging link problems on a system that has multiple links,
128  * the standard TIPC debugging routines may not be useful since they
129  * allow the output from multiple links to be intermixed.  For this reason
130  * routines of the form "dbg_link_XXX()" have been created that will capture
131  * debug info into a link's personal print buffer, which can then be dumped
132  * into the TIPC system log (LOG) upon request.
133  *
134  * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
135  * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
136  * the dbg_link_XXX() routines simply send their output to the standard 
137  * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
138  * when there is only a single link in the system being debugged.
139  *
140  * Notes:
141  * - When enabled, LINK_LOG_BUF_SIZE should be set to at least 1000 (bytes)
142  * - "l_ptr" must be valid when using dbg_link_XXX() macros  
143  */
144
145 #define LINK_LOG_BUF_SIZE 0
146
147 #define dbg_link(fmt, arg...)  do {if (LINK_LOG_BUF_SIZE) tipc_printf(&l_ptr->print_buf, fmt, ## arg); } while(0)
148 #define dbg_link_msg(msg, txt) do {if (LINK_LOG_BUF_SIZE) msg_print(&l_ptr->print_buf, msg, txt); } while(0)
149 #define dbg_link_state(txt) do {if (LINK_LOG_BUF_SIZE) link_print(l_ptr, &l_ptr->print_buf, txt); } while(0)
150 #define dbg_link_dump() do { \
151         if (LINK_LOG_BUF_SIZE) { \
152                 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
153                 printbuf_move(LOG, &l_ptr->print_buf); \
154         } \
155 } while (0)
156
157 static inline void dbg_print_link(struct link *l_ptr, const char *str)
158 {
159         if (DBG_OUTPUT)
160                 link_print(l_ptr, DBG_OUTPUT, str);
161 }
162
163 static inline void dbg_print_buf_chain(struct sk_buff *root_buf)
164 {
165         if (DBG_OUTPUT) {
166                 struct sk_buff *buf = root_buf;
167
168                 while (buf) {
169                         msg_dbg(buf_msg(buf), "In chain: ");
170                         buf = buf->next;
171                 }
172         }
173 }
174
175 /*
176  *  Simple inlined link routines
177  */
178
179 static inline unsigned int align(unsigned int i)
180 {
181         return (i + 3) & ~3u;
182 }
183
184 static inline int link_working_working(struct link *l_ptr)
185 {
186         return (l_ptr->state == WORKING_WORKING);
187 }
188
189 static inline int link_working_unknown(struct link *l_ptr)
190 {
191         return (l_ptr->state == WORKING_UNKNOWN);
192 }
193
194 static inline int link_reset_unknown(struct link *l_ptr)
195 {
196         return (l_ptr->state == RESET_UNKNOWN);
197 }
198
199 static inline int link_reset_reset(struct link *l_ptr)
200 {
201         return (l_ptr->state == RESET_RESET);
202 }
203
204 static inline int link_blocked(struct link *l_ptr)
205 {
206         return (l_ptr->exp_msg_count || l_ptr->blocked);
207 }
208
209 static inline int link_congested(struct link *l_ptr)
210 {
211         return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]);
212 }
213
214 static inline u32 link_max_pkt(struct link *l_ptr)
215 {
216         return l_ptr->max_pkt;
217 }
218
219 static inline void link_init_max_pkt(struct link *l_ptr)
220 {
221         u32 max_pkt;
222         
223         max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
224         if (max_pkt > MAX_MSG_SIZE)
225                 max_pkt = MAX_MSG_SIZE;
226
227         l_ptr->max_pkt_target = max_pkt;
228         if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
229                 l_ptr->max_pkt = l_ptr->max_pkt_target;
230         else 
231                 l_ptr->max_pkt = MAX_PKT_DEFAULT;
232
233         l_ptr->max_pkt_probes = 0;
234 }
235
236 static inline u32 link_next_sent(struct link *l_ptr)
237 {
238         if (l_ptr->next_out)
239                 return msg_seqno(buf_msg(l_ptr->next_out));
240         return mod(l_ptr->next_out_no);
241 }
242
243 static inline u32 link_last_sent(struct link *l_ptr)
244 {
245         return mod(link_next_sent(l_ptr) - 1);
246 }
247
248 /*
249  *  Simple non-inlined link routines (i.e. referenced outside this file)
250  */
251
252 int link_is_up(struct link *l_ptr)
253 {
254         if (!l_ptr)
255                 return 0;
256         return (link_working_working(l_ptr) || link_working_unknown(l_ptr));
257 }
258
259 int link_is_active(struct link *l_ptr)
260 {
261         return ((l_ptr->owner->active_links[0] == l_ptr) ||
262                 (l_ptr->owner->active_links[1] == l_ptr));
263 }
264
265 /**
266  * link_name_validate - validate & (optionally) deconstruct link name
267  * @name - ptr to link name string
268  * @name_parts - ptr to area for link name components (or NULL if not needed)
269  * 
270  * Returns 1 if link name is valid, otherwise 0.
271  */
272
273 static int link_name_validate(const char *name, struct link_name *name_parts)
274 {
275         char name_copy[TIPC_MAX_LINK_NAME];
276         char *addr_local;
277         char *if_local;
278         char *addr_peer;
279         char *if_peer;
280         char dummy;
281         u32 z_local, c_local, n_local;
282         u32 z_peer, c_peer, n_peer;
283         u32 if_local_len;
284         u32 if_peer_len;
285
286         /* copy link name & ensure length is OK */
287
288         name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
289         /* need above in case non-Posix strncpy() doesn't pad with nulls */
290         strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
291         if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
292                 return 0;
293
294         /* ensure all component parts of link name are present */
295
296         addr_local = name_copy;
297         if ((if_local = strchr(addr_local, ':')) == NULL)
298                 return 0;
299         *(if_local++) = 0;
300         if ((addr_peer = strchr(if_local, '-')) == NULL)
301                 return 0;
302         *(addr_peer++) = 0;
303         if_local_len = addr_peer - if_local;
304         if ((if_peer = strchr(addr_peer, ':')) == NULL)
305                 return 0;
306         *(if_peer++) = 0;
307         if_peer_len = strlen(if_peer) + 1;
308
309         /* validate component parts of link name */
310
311         if ((sscanf(addr_local, "%u.%u.%u%c",
312                     &z_local, &c_local, &n_local, &dummy) != 3) ||
313             (sscanf(addr_peer, "%u.%u.%u%c",
314                     &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
315             (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
316             (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
317             (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 
318             (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) || 
319             (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
320             (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
321                 return 0;
322
323         /* return link name components, if necessary */
324
325         if (name_parts) {
326                 name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
327                 strcpy(name_parts->if_local, if_local);
328                 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
329                 strcpy(name_parts->if_peer, if_peer);
330         }
331         return 1;
332 }
333
334 /**
335  * link_timeout - handle expiration of link timer
336  * @l_ptr: pointer to link
337  * 
338  * This routine must not grab "net_lock" to avoid a potential deadlock conflict
339  * with link_delete().  (There is no risk that the node will be deleted by
340  * another thread because link_delete() always cancels the link timer before
341  * node_delete() is called.)
342  */
343
344 static void link_timeout(struct link *l_ptr)
345 {
346         node_lock(l_ptr->owner);
347
348         /* update counters used in statistical profiling of send traffic */
349
350         l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
351         l_ptr->stats.queue_sz_counts++;
352
353         if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
354                 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
355
356         if (l_ptr->first_out) {
357                 struct tipc_msg *msg = buf_msg(l_ptr->first_out);
358                 u32 length = msg_size(msg);
359
360                 if ((msg_user(msg) == MSG_FRAGMENTER)
361                     && (msg_type(msg) == FIRST_FRAGMENT)) {
362                         length = msg_size(msg_get_wrapped(msg));
363                 }
364                 if (length) {
365                         l_ptr->stats.msg_lengths_total += length;
366                         l_ptr->stats.msg_length_counts++;
367                         if (length <= 64)
368                                 l_ptr->stats.msg_length_profile[0]++;
369                         else if (length <= 256)
370                                 l_ptr->stats.msg_length_profile[1]++;
371                         else if (length <= 1024)
372                                 l_ptr->stats.msg_length_profile[2]++;
373                         else if (length <= 4096)
374                                 l_ptr->stats.msg_length_profile[3]++;
375                         else if (length <= 16384)
376                                 l_ptr->stats.msg_length_profile[4]++;
377                         else if (length <= 32768)
378                                 l_ptr->stats.msg_length_profile[5]++;
379                         else
380                                 l_ptr->stats.msg_length_profile[6]++;
381                 }
382         }
383
384         /* do all other link processing performed on a periodic basis */
385
386         link_check_defragm_bufs(l_ptr);
387
388         link_state_event(l_ptr, TIMEOUT_EVT);
389
390         if (l_ptr->next_out)
391                 link_push_queue(l_ptr);
392
393         node_unlock(l_ptr->owner);
394 }
395
396 static inline void link_set_timer(struct link *l_ptr, u32 time)
397 {
398         k_start_timer(&l_ptr->timer, time);
399 }
400
401 /**
402  * link_create - create a new link
403  * @b_ptr: pointer to associated bearer
404  * @peer: network address of node at other end of link
405  * @media_addr: media address to use when sending messages over link
406  * 
407  * Returns pointer to link.
408  */
409
410 struct link *link_create(struct bearer *b_ptr, const u32 peer,
411                          const struct tipc_media_addr *media_addr)
412 {
413         struct link *l_ptr;
414         struct tipc_msg *msg;
415         char *if_name;
416
417         l_ptr = (struct link *)kmalloc(sizeof(*l_ptr), GFP_ATOMIC);
418         if (!l_ptr) {
419                 warn("Memory squeeze; Failed to create link\n");
420                 return NULL;
421         }
422         memset(l_ptr, 0, sizeof(*l_ptr));
423
424         l_ptr->addr = peer;
425         if_name = strchr(b_ptr->publ.name, ':') + 1;
426         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
427                 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
428                 tipc_node(tipc_own_addr), 
429                 if_name,
430                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
431                 /* note: peer i/f is appended to link name by reset/activate */
432         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
433         k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
434         list_add_tail(&l_ptr->link_list, &b_ptr->links);
435         l_ptr->checkpoint = 1;
436         l_ptr->b_ptr = b_ptr;
437         link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
438         l_ptr->state = RESET_UNKNOWN;
439
440         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
441         msg = l_ptr->pmsg;
442         msg_init(msg, LINK_PROTOCOL, RESET_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
443         msg_set_size(msg, sizeof(l_ptr->proto_msg));
444         msg_set_session(msg, tipc_random);
445         msg_set_bearer_id(msg, b_ptr->identity);
446         strcpy((char *)msg_data(msg), if_name);
447
448         l_ptr->priority = b_ptr->priority;
449         link_set_queue_limits(l_ptr, b_ptr->media->window);
450
451         link_init_max_pkt(l_ptr);
452
453         l_ptr->next_out_no = 1;
454         INIT_LIST_HEAD(&l_ptr->waiting_ports);
455
456         link_reset_statistics(l_ptr);
457
458         l_ptr->owner = node_attach_link(l_ptr);
459         if (!l_ptr->owner) {
460                 kfree(l_ptr);
461                 return NULL;
462         }
463
464         if (LINK_LOG_BUF_SIZE) {
465                 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
466
467                 if (!pb) {
468                         kfree(l_ptr);
469                         warn("Memory squeeze; Failed to create link\n");
470                         return NULL;
471                 }
472                 printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
473         }
474
475         k_signal((Handler)link_start, (unsigned long)l_ptr);
476
477         dbg("link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
478             l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
479         
480         return l_ptr;
481 }
482
483 /** 
484  * link_delete - delete a link
485  * @l_ptr: pointer to link
486  * 
487  * Note: 'net_lock' is write_locked, bearer is locked.
488  * This routine must not grab the node lock until after link timer cancellation
489  * to avoid a potential deadlock situation.  
490  */
491
492 void link_delete(struct link *l_ptr)
493 {
494         if (!l_ptr) {
495                 err("Attempt to delete non-existent link\n");
496                 return;
497         }
498
499         dbg("link_delete()\n");
500
501         k_cancel_timer(&l_ptr->timer);
502         
503         node_lock(l_ptr->owner);
504         link_reset(l_ptr);
505         node_detach_link(l_ptr->owner, l_ptr);
506         link_stop(l_ptr);
507         list_del_init(&l_ptr->link_list);
508         if (LINK_LOG_BUF_SIZE)
509                 kfree(l_ptr->print_buf.buf);
510         node_unlock(l_ptr->owner);
511         k_term_timer(&l_ptr->timer);
512         kfree(l_ptr);
513 }
514
515 void link_start(struct link *l_ptr)
516 {
517         dbg("link_start %x\n", l_ptr);
518         link_state_event(l_ptr, STARTING_EVT);
519 }
520
521 /**
522  * link_schedule_port - schedule port for deferred sending 
523  * @l_ptr: pointer to link
524  * @origport: reference to sending port
525  * @sz: amount of data to be sent
526  * 
527  * Schedules port for renewed sending of messages after link congestion 
528  * has abated.
529  */
530
531 static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
532 {
533         struct port *p_ptr;
534
535         spin_lock_bh(&port_list_lock);
536         p_ptr = port_lock(origport);
537         if (p_ptr) {
538                 if (!p_ptr->wakeup)
539                         goto exit;
540                 if (!list_empty(&p_ptr->wait_list))
541                         goto exit;
542                 p_ptr->congested_link = l_ptr;
543                 p_ptr->publ.congested = 1;
544                 p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr));
545                 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
546                 l_ptr->stats.link_congs++;
547 exit:
548                 port_unlock(p_ptr);
549         }
550         spin_unlock_bh(&port_list_lock);
551         return -ELINKCONG;
552 }
553
554 void link_wakeup_ports(struct link *l_ptr, int all)
555 {
556         struct port *p_ptr;
557         struct port *temp_p_ptr;
558         int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
559
560         if (all)
561                 win = 100000;
562         if (win <= 0)
563                 return;
564         if (!spin_trylock_bh(&port_list_lock))
565                 return;
566         if (link_congested(l_ptr))
567                 goto exit;
568         list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 
569                                  wait_list) {
570                 if (win <= 0)
571                         break;
572                 list_del_init(&p_ptr->wait_list);
573                 p_ptr->congested_link = 0;
574                 assert(p_ptr->wakeup);
575                 spin_lock_bh(p_ptr->publ.lock);
576                 p_ptr->publ.congested = 0;
577                 p_ptr->wakeup(&p_ptr->publ);
578                 win -= p_ptr->waiting_pkts;
579                 spin_unlock_bh(p_ptr->publ.lock);
580         }
581
582 exit:
583         spin_unlock_bh(&port_list_lock);
584 }
585
586 /** 
587  * link_release_outqueue - purge link's outbound message queue
588  * @l_ptr: pointer to link
589  */
590
591 static void link_release_outqueue(struct link *l_ptr)
592 {
593         struct sk_buff *buf = l_ptr->first_out;
594         struct sk_buff *next;
595
596         while (buf) {
597                 next = buf->next;
598                 buf_discard(buf);
599                 buf = next;
600         }
601         l_ptr->first_out = NULL;
602         l_ptr->out_queue_size = 0;
603 }
604
605 /**
606  * link_reset_fragments - purge link's inbound message fragments queue
607  * @l_ptr: pointer to link
608  */
609
610 void link_reset_fragments(struct link *l_ptr)
611 {
612         struct sk_buff *buf = l_ptr->defragm_buf;
613         struct sk_buff *next;
614
615         while (buf) {
616                 next = buf->next;
617                 buf_discard(buf);
618                 buf = next;
619         }
620         l_ptr->defragm_buf = NULL;
621 }
622
623 /** 
624  * link_stop - purge all inbound and outbound messages associated with link
625  * @l_ptr: pointer to link
626  */
627
628 void link_stop(struct link *l_ptr)
629 {
630         struct sk_buff *buf;
631         struct sk_buff *next;
632
633         buf = l_ptr->oldest_deferred_in;
634         while (buf) {
635                 next = buf->next;
636                 buf_discard(buf);
637                 buf = next;
638         }
639
640         buf = l_ptr->first_out;
641         while (buf) {
642                 next = buf->next;
643                 buf_discard(buf);
644                 buf = next;
645         }
646
647         link_reset_fragments(l_ptr);
648
649         buf_discard(l_ptr->proto_msg_queue);
650         l_ptr->proto_msg_queue = NULL;
651 }
652
653 #if 0
654
655 /* LINK EVENT CODE IS NOT SUPPORTED AT PRESENT */
656
657 static void link_recv_event(struct link_event *ev)
658 {
659         ev->fcn(ev->addr, ev->name, ev->up);
660         kfree(ev);
661 }
662
663 static void link_send_event(void (*fcn)(u32 a, char *n, int up),
664                             struct link *l_ptr, int up)
665 {
666         struct link_event *ev;
667         
668         ev = kmalloc(sizeof(*ev), GFP_ATOMIC);
669         if (!ev) {
670                 warn("Link event allocation failure\n");
671                 return;
672         }
673         ev->addr = l_ptr->addr;
674         ev->up = up;
675         ev->fcn = fcn;
676         memcpy(ev->name, l_ptr->name, TIPC_MAX_LINK_NAME);
677         k_signal((Handler)link_recv_event, (unsigned long)ev);
678 }
679
680 #else
681
682 #define link_send_event(fcn, l_ptr, up) do { } while (0)
683
684 #endif
685
686 void link_reset(struct link *l_ptr)
687 {
688         struct sk_buff *buf;
689         u32 prev_state = l_ptr->state;
690         u32 checkpoint = l_ptr->next_in_no;
691         
692         msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1);
693
694         /* Link is down, accept any session: */
695         l_ptr->peer_session = 0;
696
697         /* Prepare for max packet size negotiation */
698         link_init_max_pkt(l_ptr);
699         
700         l_ptr->state = RESET_UNKNOWN;
701         dbg_link_state("Resetting Link\n");
702
703         if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
704                 return;
705
706         node_link_down(l_ptr->owner, l_ptr);
707         bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
708 #if 0
709         tipc_printf(CONS, "\nReset link <%s>\n", l_ptr->name);
710         dbg_link_dump();
711 #endif
712         if (node_has_active_links(l_ptr->owner) &&
713             l_ptr->owner->permit_changeover) {
714                 l_ptr->reset_checkpoint = checkpoint;
715                 l_ptr->exp_msg_count = START_CHANGEOVER;
716         }
717
718         /* Clean up all queues: */
719
720         link_release_outqueue(l_ptr);
721         buf_discard(l_ptr->proto_msg_queue);
722         l_ptr->proto_msg_queue = NULL;
723         buf = l_ptr->oldest_deferred_in;
724         while (buf) {
725                 struct sk_buff *next = buf->next;
726                 buf_discard(buf);
727                 buf = next;
728         }
729         if (!list_empty(&l_ptr->waiting_ports))
730                 link_wakeup_ports(l_ptr, 1);
731
732         l_ptr->retransm_queue_head = 0;
733         l_ptr->retransm_queue_size = 0;
734         l_ptr->last_out = NULL;
735         l_ptr->first_out = NULL;
736         l_ptr->next_out = NULL;
737         l_ptr->unacked_window = 0;
738         l_ptr->checkpoint = 1;
739         l_ptr->next_out_no = 1;
740         l_ptr->deferred_inqueue_sz = 0;
741         l_ptr->oldest_deferred_in = NULL;
742         l_ptr->newest_deferred_in = NULL;
743         l_ptr->fsm_msg_cnt = 0;
744         l_ptr->stale_count = 0;
745         link_reset_statistics(l_ptr);
746
747         link_send_event(cfg_link_event, l_ptr, 0);
748         if (!in_own_cluster(l_ptr->addr))
749                 link_send_event(disc_link_event, l_ptr, 0);
750 }
751
752
753 static void link_activate(struct link *l_ptr)
754 {
755         l_ptr->next_in_no = 1;
756         node_link_up(l_ptr->owner, l_ptr);
757         bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
758         link_send_event(cfg_link_event, l_ptr, 1);
759         if (!in_own_cluster(l_ptr->addr))
760                 link_send_event(disc_link_event, l_ptr, 1);
761 }
762
763 /**
764  * link_state_event - link finite state machine
765  * @l_ptr: pointer to link
766  * @event: state machine event to process
767  */
768
769 static void link_state_event(struct link *l_ptr, unsigned event)
770 {
771         struct link *other; 
772         u32 cont_intv = l_ptr->continuity_interval;
773
774         if (!l_ptr->started && (event != STARTING_EVT))
775                 return;         /* Not yet. */
776
777         if (link_blocked(l_ptr)) {
778                 if (event == TIMEOUT_EVT) {
779                         link_set_timer(l_ptr, cont_intv);
780                 }
781                 return;   /* Changeover going on */
782         }
783         dbg_link("STATE_EV: <%s> ", l_ptr->name);
784
785         switch (l_ptr->state) {
786         case WORKING_WORKING:
787                 dbg_link("WW/");
788                 switch (event) {
789                 case TRAFFIC_MSG_EVT:
790                         dbg_link("TRF-");
791                         /* fall through */
792                 case ACTIVATE_MSG:
793                         dbg_link("ACT\n");
794                         break;
795                 case TIMEOUT_EVT:
796                         dbg_link("TIM ");
797                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
798                                 l_ptr->checkpoint = l_ptr->next_in_no;
799                                 if (bclink_acks_missing(l_ptr->owner)) {
800                                         link_send_proto_msg(l_ptr, STATE_MSG, 
801                                                             0, 0, 0, 0, 0);
802                                         l_ptr->fsm_msg_cnt++;
803                                 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
804                                         link_send_proto_msg(l_ptr, STATE_MSG, 
805                                                             1, 0, 0, 0, 0);
806                                         l_ptr->fsm_msg_cnt++;
807                                 }
808                                 link_set_timer(l_ptr, cont_intv);
809                                 break;
810                         }
811                         dbg_link(" -> WU\n");
812                         l_ptr->state = WORKING_UNKNOWN;
813                         l_ptr->fsm_msg_cnt = 0;
814                         link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
815                         l_ptr->fsm_msg_cnt++;
816                         link_set_timer(l_ptr, cont_intv / 4);
817                         break;
818                 case RESET_MSG:
819                         dbg_link("RES -> RR\n");
820                         link_reset(l_ptr);
821                         l_ptr->state = RESET_RESET;
822                         l_ptr->fsm_msg_cnt = 0;
823                         link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
824                         l_ptr->fsm_msg_cnt++;
825                         link_set_timer(l_ptr, cont_intv);
826                         break;
827                 default:
828                         err("Unknown link event %u in WW state\n", event);
829                 }
830                 break;
831         case WORKING_UNKNOWN:
832                 dbg_link("WU/");
833                 switch (event) {
834                 case TRAFFIC_MSG_EVT:
835                         dbg_link("TRF-");
836                 case ACTIVATE_MSG:
837                         dbg_link("ACT -> WW\n");
838                         l_ptr->state = WORKING_WORKING;
839                         l_ptr->fsm_msg_cnt = 0;
840                         link_set_timer(l_ptr, cont_intv);
841                         break;
842                 case RESET_MSG:
843                         dbg_link("RES -> RR\n");
844                         link_reset(l_ptr);
845                         l_ptr->state = RESET_RESET;
846                         l_ptr->fsm_msg_cnt = 0;
847                         link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
848                         l_ptr->fsm_msg_cnt++;
849                         link_set_timer(l_ptr, cont_intv);
850                         break;
851                 case TIMEOUT_EVT:
852                         dbg_link("TIM ");
853                         if (l_ptr->next_in_no != l_ptr->checkpoint) {
854                                 dbg_link("-> WW \n");
855                                 l_ptr->state = WORKING_WORKING;
856                                 l_ptr->fsm_msg_cnt = 0;
857                                 l_ptr->checkpoint = l_ptr->next_in_no;
858                                 if (bclink_acks_missing(l_ptr->owner)) {
859                                         link_send_proto_msg(l_ptr, STATE_MSG,
860                                                             0, 0, 0, 0, 0);
861                                         l_ptr->fsm_msg_cnt++;
862                                 }
863                                 link_set_timer(l_ptr, cont_intv);
864                         } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
865                                 dbg_link("Probing %u/%u,timer = %u ms)\n",
866                                          l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
867                                          cont_intv / 4);
868                                 link_send_proto_msg(l_ptr, STATE_MSG, 
869                                                     1, 0, 0, 0, 0);
870                                 l_ptr->fsm_msg_cnt++;
871                                 link_set_timer(l_ptr, cont_intv / 4);
872                         } else {        /* Link has failed */
873                                 dbg_link("-> RU (%u probes unanswered)\n",
874                                          l_ptr->fsm_msg_cnt);
875                                 link_reset(l_ptr);
876                                 l_ptr->state = RESET_UNKNOWN;
877                                 l_ptr->fsm_msg_cnt = 0;
878                                 link_send_proto_msg(l_ptr, RESET_MSG,
879                                                     0, 0, 0, 0, 0);
880                                 l_ptr->fsm_msg_cnt++;
881                                 link_set_timer(l_ptr, cont_intv);
882                         }
883                         break;
884                 default:
885                         err("Unknown link event %u in WU state\n", event);
886                 }
887                 break;
888         case RESET_UNKNOWN:
889                 dbg_link("RU/");
890                 switch (event) {
891                 case TRAFFIC_MSG_EVT:
892                         dbg_link("TRF-\n");
893                         break;
894                 case ACTIVATE_MSG:
895                         other = l_ptr->owner->active_links[0];
896                         if (other && link_working_unknown(other)) {
897                                 dbg_link("ACT\n");
898                                 break;
899                         }
900                         dbg_link("ACT -> WW\n");
901                         l_ptr->state = WORKING_WORKING;
902                         l_ptr->fsm_msg_cnt = 0;
903                         link_activate(l_ptr);
904                         link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
905                         l_ptr->fsm_msg_cnt++;
906                         link_set_timer(l_ptr, cont_intv);
907                         break;
908                 case RESET_MSG:
909                         dbg_link("RES \n");
910                         dbg_link(" -> RR\n");
911                         l_ptr->state = RESET_RESET;
912                         l_ptr->fsm_msg_cnt = 0;
913                         link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
914                         l_ptr->fsm_msg_cnt++;
915                         link_set_timer(l_ptr, cont_intv);
916                         break;
917                 case STARTING_EVT:
918                         dbg_link("START-");
919                         l_ptr->started = 1;
920                         /* fall through */
921                 case TIMEOUT_EVT:
922                         dbg_link("TIM \n");
923                         link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
924                         l_ptr->fsm_msg_cnt++;
925                         link_set_timer(l_ptr, cont_intv);
926                         break;
927                 default:
928                         err("Unknown link event %u in RU state\n", event);
929                 }
930                 break;
931         case RESET_RESET:
932                 dbg_link("RR/ ");
933                 switch (event) {
934                 case TRAFFIC_MSG_EVT:
935                         dbg_link("TRF-");
936                         /* fall through */
937                 case ACTIVATE_MSG:
938                         other = l_ptr->owner->active_links[0];
939                         if (other && link_working_unknown(other)) {
940                                 dbg_link("ACT\n");
941                                 break;
942                         }
943                         dbg_link("ACT -> WW\n");
944                         l_ptr->state = WORKING_WORKING;
945                         l_ptr->fsm_msg_cnt = 0;
946                         link_activate(l_ptr);
947                         link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
948                         l_ptr->fsm_msg_cnt++;
949                         link_set_timer(l_ptr, cont_intv);
950                         break;
951                 case RESET_MSG:
952                         dbg_link("RES\n");
953                         break;
954                 case TIMEOUT_EVT:
955                         dbg_link("TIM\n");
956                         link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
957                         l_ptr->fsm_msg_cnt++;
958                         link_set_timer(l_ptr, cont_intv);
959                         dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
960                         break;
961                 default:
962                         err("Unknown link event %u in RR state\n", event);
963                 }
964                 break;
965         default:
966                 err("Unknown link state %u/%u\n", l_ptr->state, event);
967         }
968 }
969
970 /*
971  * link_bundle_buf(): Append contents of a buffer to
972  * the tail of an existing one. 
973  */
974
975 static int link_bundle_buf(struct link *l_ptr,
976                            struct sk_buff *bundler, 
977                            struct sk_buff *buf)
978 {
979         struct tipc_msg *bundler_msg = buf_msg(bundler);
980         struct tipc_msg *msg = buf_msg(buf);
981         u32 size = msg_size(msg);
982         u32 to_pos = align(msg_size(bundler_msg));
983         u32 rest = link_max_pkt(l_ptr) - to_pos;
984
985         if (msg_user(bundler_msg) != MSG_BUNDLER)
986                 return 0;
987         if (msg_type(bundler_msg) != OPEN_MSG)
988                 return 0;
989         if (rest < align(size))
990                 return 0;
991
992         skb_put(bundler, (to_pos - msg_size(bundler_msg)) + size);
993         memcpy(bundler->data + to_pos, buf->data, size);
994         msg_set_size(bundler_msg, to_pos + size);
995         msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
996         dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
997             msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
998         msg_dbg(msg, "PACKD:");
999         buf_discard(buf);
1000         l_ptr->stats.sent_bundled++;
1001         return 1;
1002 }
1003
1004 static inline void link_add_to_outqueue(struct link *l_ptr, 
1005                                         struct sk_buff *buf, 
1006                                         struct tipc_msg *msg)
1007 {
1008         u32 ack = mod(l_ptr->next_in_no - 1);
1009         u32 seqno = mod(l_ptr->next_out_no++);
1010
1011         msg_set_word(msg, 2, ((ack << 16) | seqno));
1012         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1013         buf->next = NULL;
1014         if (l_ptr->first_out) {
1015                 l_ptr->last_out->next = buf;
1016                 l_ptr->last_out = buf;
1017         } else
1018                 l_ptr->first_out = l_ptr->last_out = buf;
1019         l_ptr->out_queue_size++;
1020 }
1021
1022 /* 
1023  * link_send_buf() is the 'full path' for messages, called from 
1024  * inside TIPC when the 'fast path' in tipc_send_buf
1025  * has failed, and from link_send()
1026  */
1027
1028 int link_send_buf(struct link *l_ptr, struct sk_buff *buf)
1029 {
1030         struct tipc_msg *msg = buf_msg(buf);
1031         u32 size = msg_size(msg);
1032         u32 dsz = msg_data_sz(msg);
1033         u32 queue_size = l_ptr->out_queue_size;
1034         u32 imp = msg_tot_importance(msg);
1035         u32 queue_limit = l_ptr->queue_limit[imp];
1036         u32 max_packet = link_max_pkt(l_ptr);
1037
1038         msg_set_prevnode(msg, tipc_own_addr);   /* If routed message */
1039
1040         /* Match msg importance against queue limits: */
1041
1042         if (unlikely(queue_size >= queue_limit)) {
1043                 if (imp <= TIPC_CRITICAL_IMPORTANCE) {
1044                         return link_schedule_port(l_ptr, msg_origport(msg),
1045                                                   size);
1046                 }
1047                 msg_dbg(msg, "TIPC: Congestion, throwing away\n");
1048                 buf_discard(buf);
1049                 if (imp > CONN_MANAGER) {
1050                         warn("Resetting <%s>, send queue full", l_ptr->name);
1051                         link_reset(l_ptr);
1052                 }
1053                 return dsz;
1054         }
1055
1056         /* Fragmentation needed ? */
1057
1058         if (size > max_packet)
1059                 return link_send_long_buf(l_ptr, buf);
1060
1061         /* Packet can be queued or sent: */
1062
1063         if (queue_size > l_ptr->stats.max_queue_sz)
1064                 l_ptr->stats.max_queue_sz = queue_size;
1065
1066         if (likely(!bearer_congested(l_ptr->b_ptr, l_ptr) && 
1067                    !link_congested(l_ptr))) {
1068                 link_add_to_outqueue(l_ptr, buf, msg);
1069
1070                 if (likely(bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1071                         l_ptr->unacked_window = 0;
1072                 } else {
1073                         bearer_schedule(l_ptr->b_ptr, l_ptr);
1074                         l_ptr->stats.bearer_congs++;
1075                         l_ptr->next_out = buf;
1076                 }
1077                 return dsz;
1078         }
1079         /* Congestion: can message be bundled ?: */
1080
1081         if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1082             (msg_user(msg) != MSG_FRAGMENTER)) {
1083
1084                 /* Try adding message to an existing bundle */
1085
1086                 if (l_ptr->next_out && 
1087                     link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1088                         bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1089                         return dsz;
1090                 }
1091
1092                 /* Try creating a new bundle */
1093
1094                 if (size <= max_packet * 2 / 3) {
1095                         struct sk_buff *bundler = buf_acquire(max_packet);
1096                         struct tipc_msg bundler_hdr;
1097
1098                         if (bundler) {
1099                                 msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1100                                          TIPC_OK, INT_H_SIZE, l_ptr->addr);
1101                                 memcpy(bundler->data, (unchar *)&bundler_hdr, 
1102                                        INT_H_SIZE);
1103                                 skb_trim(bundler, INT_H_SIZE);
1104                                 link_bundle_buf(l_ptr, bundler, buf);
1105                                 buf = bundler;
1106                                 msg = buf_msg(buf);
1107                                 l_ptr->stats.sent_bundles++;
1108                         }
1109                 }
1110         }
1111         if (!l_ptr->next_out)
1112                 l_ptr->next_out = buf;
1113         link_add_to_outqueue(l_ptr, buf, msg);
1114         bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1115         return dsz;
1116 }
1117
1118 /* 
1119  * link_send(): same as link_send_buf(), but the link to use has 
1120  * not been selected yet, and the the owner node is not locked
1121  * Called by TIPC internal users, e.g. the name distributor
1122  */
1123
1124 int link_send(struct sk_buff *buf, u32 dest, u32 selector)
1125 {
1126         struct link *l_ptr;
1127         struct node *n_ptr;
1128         int res = -ELINKCONG;
1129
1130         read_lock_bh(&net_lock);
1131         n_ptr = node_select(dest, selector);
1132         if (n_ptr) {
1133                 node_lock(n_ptr);
1134                 l_ptr = n_ptr->active_links[selector & 1];
1135                 dbg("link_send: found link %x for dest %x\n", l_ptr, dest);
1136                 if (l_ptr) {
1137                         res = link_send_buf(l_ptr, buf);
1138                 }
1139                 node_unlock(n_ptr);
1140         } else {
1141                 dbg("Attempt to send msg to unknown node:\n");
1142                 msg_dbg(buf_msg(buf),">>>");
1143                 buf_discard(buf);
1144         }
1145         read_unlock_bh(&net_lock);
1146         return res;
1147 }
1148
1149 /* 
1150  * link_send_buf_fast: Entry for data messages where the 
1151  * destination link is known and the header is complete,
1152  * inclusive total message length. Very time critical.
1153  * Link is locked. Returns user data length.
1154  */
1155
1156 static inline int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1157                                      u32 *used_max_pkt)
1158 {
1159         struct tipc_msg *msg = buf_msg(buf);
1160         int res = msg_data_sz(msg);
1161
1162         if (likely(!link_congested(l_ptr))) {
1163                 if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) {
1164                         if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1165                                 link_add_to_outqueue(l_ptr, buf, msg);
1166                                 if (likely(bearer_send(l_ptr->b_ptr, buf,
1167                                                        &l_ptr->media_addr))) {
1168                                         l_ptr->unacked_window = 0;
1169                                         msg_dbg(msg,"SENT_FAST:");
1170                                         return res;
1171                                 }
1172                                 dbg("failed sent fast...\n");
1173                                 bearer_schedule(l_ptr->b_ptr, l_ptr);
1174                                 l_ptr->stats.bearer_congs++;
1175                                 l_ptr->next_out = buf;
1176                                 return res;
1177                         }
1178                 }
1179                 else
1180                         *used_max_pkt = link_max_pkt(l_ptr);
1181         }
1182         return link_send_buf(l_ptr, buf);  /* All other cases */
1183 }
1184
1185 /* 
1186  * tipc_send_buf_fast: Entry for data messages where the 
1187  * destination node is known and the header is complete,
1188  * inclusive total message length.
1189  * Returns user data length.
1190  */
1191 int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1192 {
1193         struct link *l_ptr;
1194         struct node *n_ptr;
1195         int res;
1196         u32 selector = msg_origport(buf_msg(buf)) & 1;
1197         u32 dummy;
1198
1199         if (destnode == tipc_own_addr)
1200                 return port_recv_msg(buf);
1201
1202         read_lock_bh(&net_lock);
1203         n_ptr = node_select(destnode, selector);
1204         if (likely(n_ptr)) {
1205                 node_lock(n_ptr);
1206                 l_ptr = n_ptr->active_links[selector];
1207                 dbg("send_fast: buf %x selected %x, destnode = %x\n",
1208                     buf, l_ptr, destnode);
1209                 if (likely(l_ptr)) {
1210                         res = link_send_buf_fast(l_ptr, buf, &dummy);
1211                         node_unlock(n_ptr);
1212                         read_unlock_bh(&net_lock);
1213                         return res;
1214                 }
1215                 node_unlock(n_ptr);
1216         }
1217         read_unlock_bh(&net_lock);
1218         res = msg_data_sz(buf_msg(buf));
1219         tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1220         return res;
1221 }
1222
1223
1224 /* 
1225  * link_send_sections_fast: Entry for messages where the 
1226  * destination processor is known and the header is complete,
1227  * except for total message length. 
1228  * Returns user data length or errno.
1229  */
1230 int link_send_sections_fast(struct port *sender, 
1231                             struct iovec const *msg_sect,
1232                             const u32 num_sect, 
1233                             u32 destaddr)
1234 {
1235         struct tipc_msg *hdr = &sender->publ.phdr;
1236         struct link *l_ptr;
1237         struct sk_buff *buf;
1238         struct node *node;
1239         int res;
1240         u32 selector = msg_origport(hdr) & 1;
1241
1242         assert(destaddr != tipc_own_addr);
1243
1244 again:
1245         /*
1246          * Try building message using port's max_pkt hint.
1247          * (Must not hold any locks while building message.)
1248          */
1249
1250         res = msg_build(hdr, msg_sect, num_sect, sender->max_pkt,
1251                         !sender->user_port, &buf);
1252
1253         read_lock_bh(&net_lock);
1254         node = node_select(destaddr, selector);
1255         if (likely(node)) {
1256                 node_lock(node);
1257                 l_ptr = node->active_links[selector];
1258                 if (likely(l_ptr)) {
1259                         if (likely(buf)) {
1260                                 res = link_send_buf_fast(l_ptr, buf,
1261                                                          &sender->max_pkt);
1262                                 if (unlikely(res < 0))
1263                                         buf_discard(buf);
1264 exit:
1265                                 node_unlock(node);
1266                                 read_unlock_bh(&net_lock);
1267                                 return res;
1268                         }
1269
1270                         /* Exit if build request was invalid */
1271
1272                         if (unlikely(res < 0))
1273                                 goto exit;
1274
1275                         /* Exit if link (or bearer) is congested */
1276
1277                         if (link_congested(l_ptr) || 
1278                             !list_empty(&l_ptr->b_ptr->cong_links)) {
1279                                 res = link_schedule_port(l_ptr,
1280                                                          sender->publ.ref, res);
1281                                 goto exit;
1282                         }
1283
1284                         /* 
1285                          * Message size exceeds max_pkt hint; update hint,
1286                          * then re-try fast path or fragment the message
1287                          */
1288
1289                         sender->max_pkt = link_max_pkt(l_ptr);
1290                         node_unlock(node);
1291                         read_unlock_bh(&net_lock);
1292
1293
1294                         if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1295                                 goto again;
1296
1297                         return link_send_sections_long(sender, msg_sect,
1298                                                        num_sect, destaddr);
1299                 }
1300                 node_unlock(node);
1301         }
1302         read_unlock_bh(&net_lock);
1303
1304         /* Couldn't find a link to the destination node */
1305
1306         if (buf)
1307                 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1308         if (res >= 0)
1309                 return port_reject_sections(sender, hdr, msg_sect, num_sect,
1310                                             TIPC_ERR_NO_NODE);
1311         return res;
1312 }
1313
1314 /* 
1315  * link_send_sections_long(): Entry for long messages where the 
1316  * destination node is known and the header is complete,
1317  * inclusive total message length. 
1318  * Link and bearer congestion status have been checked to be ok,
1319  * and are ignored if they change.
1320  *
1321  * Note that fragments do not use the full link MTU so that they won't have
1322  * to undergo refragmentation if link changeover causes them to be sent
1323  * over another link with an additional tunnel header added as prefix.
1324  * (Refragmentation will still occur if the other link has a smaller MTU.)
1325  *
1326  * Returns user data length or errno.
1327  */
1328 static int link_send_sections_long(struct port *sender,
1329                                    struct iovec const *msg_sect,
1330                                    u32 num_sect,
1331                                    u32 destaddr)
1332 {
1333         struct link *l_ptr;
1334         struct node *node;
1335         struct tipc_msg *hdr = &sender->publ.phdr;
1336         u32 dsz = msg_data_sz(hdr);
1337         u32 max_pkt,fragm_sz,rest;
1338         struct tipc_msg fragm_hdr;
1339         struct sk_buff *buf,*buf_chain,*prev;
1340         u32 fragm_crs,fragm_rest,hsz,sect_rest;
1341         const unchar *sect_crs;
1342         int curr_sect;
1343         u32 fragm_no;
1344
1345 again:
1346         fragm_no = 1;
1347         max_pkt = sender->max_pkt - INT_H_SIZE;  
1348                 /* leave room for tunnel header in case of link changeover */
1349         fragm_sz = max_pkt - INT_H_SIZE; 
1350                 /* leave room for fragmentation header in each fragment */
1351         rest = dsz;
1352         fragm_crs = 0;
1353         fragm_rest = 0;
1354         sect_rest = 0;
1355         sect_crs = 0;
1356         curr_sect = -1;
1357
1358         /* Prepare reusable fragment header: */
1359
1360         msg_dbg(hdr, ">FRAGMENTING>");
1361         msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1362                  TIPC_OK, INT_H_SIZE, msg_destnode(hdr));
1363         msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1364         msg_set_size(&fragm_hdr, max_pkt);
1365         msg_set_fragm_no(&fragm_hdr, 1);
1366
1367         /* Prepare header of first fragment: */
1368
1369         buf_chain = buf = buf_acquire(max_pkt);
1370         if (!buf)
1371                 return -ENOMEM;
1372         buf->next = NULL;
1373         memcpy(buf->data, (unchar *)&fragm_hdr, INT_H_SIZE);
1374         hsz = msg_hdr_sz(hdr);
1375         memcpy(buf->data + INT_H_SIZE, (unchar *)hdr, hsz);
1376         msg_dbg(buf_msg(buf), ">BUILD>");
1377
1378         /* Chop up message: */
1379
1380         fragm_crs = INT_H_SIZE + hsz;
1381         fragm_rest = fragm_sz - hsz;
1382
1383         do {            /* For all sections */
1384                 u32 sz;
1385
1386                 if (!sect_rest) {
1387                         sect_rest = msg_sect[++curr_sect].iov_len;
1388                         sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1389                 }
1390
1391                 if (sect_rest < fragm_rest)
1392                         sz = sect_rest;
1393                 else
1394                         sz = fragm_rest;
1395
1396                 if (likely(!sender->user_port)) {
1397                         if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1398 error:
1399                                 for (; buf_chain; buf_chain = buf) {
1400                                         buf = buf_chain->next;
1401                                         buf_discard(buf_chain);
1402                                 }
1403                                 return -EFAULT;
1404                         }
1405                 } else
1406                         memcpy(buf->data + fragm_crs, sect_crs, sz);
1407
1408                 sect_crs += sz;
1409                 sect_rest -= sz;
1410                 fragm_crs += sz;
1411                 fragm_rest -= sz;
1412                 rest -= sz;
1413
1414                 if (!fragm_rest && rest) {
1415
1416                         /* Initiate new fragment: */
1417                         if (rest <= fragm_sz) {
1418                                 fragm_sz = rest;
1419                                 msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1420                         } else {
1421                                 msg_set_type(&fragm_hdr, FRAGMENT);
1422                         }
1423                         msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1424                         msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1425                         prev = buf;
1426                         buf = buf_acquire(fragm_sz + INT_H_SIZE);
1427                         if (!buf)
1428                                 goto error;
1429
1430                         buf->next = NULL;                                
1431                         prev->next = buf;
1432                         memcpy(buf->data, (unchar *)&fragm_hdr, INT_H_SIZE);
1433                         fragm_crs = INT_H_SIZE;
1434                         fragm_rest = fragm_sz;
1435                         msg_dbg(buf_msg(buf),"  >BUILD>");
1436                 }
1437         }
1438         while (rest > 0);
1439
1440         /* 
1441          * Now we have a buffer chain. Select a link and check
1442          * that packet size is still OK
1443          */
1444         node = node_select(destaddr, sender->publ.ref & 1);
1445         if (likely(node)) {
1446                 node_lock(node);
1447                 l_ptr = node->active_links[sender->publ.ref & 1];
1448                 if (!l_ptr) {
1449                         node_unlock(node);
1450                         goto reject;
1451                 }
1452                 if (link_max_pkt(l_ptr) < max_pkt) {
1453                         sender->max_pkt = link_max_pkt(l_ptr);
1454                         node_unlock(node);
1455                         for (; buf_chain; buf_chain = buf) {
1456                                 buf = buf_chain->next;
1457                                 buf_discard(buf_chain);
1458                         }
1459                         goto again;
1460                 }
1461         } else {
1462 reject:
1463                 for (; buf_chain; buf_chain = buf) {
1464                         buf = buf_chain->next;
1465                         buf_discard(buf_chain);
1466                 }
1467                 return port_reject_sections(sender, hdr, msg_sect, num_sect,
1468                                             TIPC_ERR_NO_NODE);
1469         }
1470
1471         /* Append whole chain to send queue: */
1472
1473         buf = buf_chain;
1474         l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1475         if (!l_ptr->next_out)
1476                 l_ptr->next_out = buf_chain;
1477         l_ptr->stats.sent_fragmented++;
1478         while (buf) {
1479                 struct sk_buff *next = buf->next;
1480                 struct tipc_msg *msg = buf_msg(buf);
1481
1482                 l_ptr->stats.sent_fragments++;
1483                 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1484                 link_add_to_outqueue(l_ptr, buf, msg);
1485                 msg_dbg(msg, ">ADD>");
1486                 buf = next;
1487         }
1488
1489         /* Send it, if possible: */
1490
1491         link_push_queue(l_ptr);
1492         node_unlock(node);
1493         return dsz;
1494 }
1495
1496 /* 
1497  * link_push_packet: Push one unsent packet to the media
1498  */
1499 u32 link_push_packet(struct link *l_ptr)
1500 {
1501         struct sk_buff *buf = l_ptr->first_out;
1502         u32 r_q_size = l_ptr->retransm_queue_size;
1503         u32 r_q_head = l_ptr->retransm_queue_head;
1504
1505         /* Step to position where retransmission failed, if any,    */
1506         /* consider that buffers may have been released in meantime */
1507
1508         if (r_q_size && buf) {
1509                 u32 last = lesser(mod(r_q_head + r_q_size), 
1510                                   link_last_sent(l_ptr));
1511                 u32 first = msg_seqno(buf_msg(buf));
1512
1513                 while (buf && less(first, r_q_head)) {
1514                         first = mod(first + 1);
1515                         buf = buf->next;
1516                 }
1517                 l_ptr->retransm_queue_head = r_q_head = first;
1518                 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1519         }
1520
1521         /* Continue retransmission now, if there is anything: */
1522
1523         if (r_q_size && buf && !skb_cloned(buf)) {
1524                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1525                 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 
1526                 if (bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1527                         msg_dbg(buf_msg(buf), ">DEF-RETR>");
1528                         l_ptr->retransm_queue_head = mod(++r_q_head);
1529                         l_ptr->retransm_queue_size = --r_q_size;
1530                         l_ptr->stats.retransmitted++;
1531                         return TIPC_OK;
1532                 } else {
1533                         l_ptr->stats.bearer_congs++;
1534                         msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1535                         return PUSH_FAILED;
1536                 }
1537         }
1538
1539         /* Send deferred protocol message, if any: */
1540
1541         buf = l_ptr->proto_msg_queue;
1542         if (buf) {
1543                 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1544                 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); 
1545                 if (bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1546                         msg_dbg(buf_msg(buf), ">DEF-PROT>");
1547                         l_ptr->unacked_window = 0;
1548                         buf_discard(buf);
1549                         l_ptr->proto_msg_queue = 0;
1550                         return TIPC_OK;
1551                 } else {
1552                         msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1553                         l_ptr->stats.bearer_congs++;
1554                         return PUSH_FAILED;
1555                 }
1556         }
1557
1558         /* Send one deferred data message, if send window not full: */
1559
1560         buf = l_ptr->next_out;
1561         if (buf) {
1562                 struct tipc_msg *msg = buf_msg(buf);
1563                 u32 next = msg_seqno(msg);
1564                 u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1565
1566                 if (mod(next - first) < l_ptr->queue_limit[0]) {
1567                         msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1568                         msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
1569                         if (bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1570                                 if (msg_user(msg) == MSG_BUNDLER)
1571                                         msg_set_type(msg, CLOSED_MSG);
1572                                 msg_dbg(msg, ">PUSH-DATA>");
1573                                 l_ptr->next_out = buf->next;
1574                                 return TIPC_OK;
1575                         } else {
1576                                 msg_dbg(msg, "|PUSH-DATA|");
1577                                 l_ptr->stats.bearer_congs++;
1578                                 return PUSH_FAILED;
1579                         }
1580                 }
1581         }
1582         return PUSH_FINISHED;
1583 }
1584
1585 /*
1586  * push_queue(): push out the unsent messages of a link where
1587  *               congestion has abated. Node is locked
1588  */
1589 void link_push_queue(struct link *l_ptr)
1590 {
1591         u32 res;
1592
1593         if (bearer_congested(l_ptr->b_ptr, l_ptr))
1594                 return;
1595
1596         do {
1597                 res = link_push_packet(l_ptr);
1598         }
1599         while (res == TIPC_OK);
1600         if (res == PUSH_FAILED)
1601                 bearer_schedule(l_ptr->b_ptr, l_ptr);
1602 }
1603
1604 void link_retransmit(struct link *l_ptr, struct sk_buff *buf, 
1605                      u32 retransmits)
1606 {
1607         struct tipc_msg *msg;
1608
1609         dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1610
1611         if (bearer_congested(l_ptr->b_ptr, l_ptr) && buf && !skb_cloned(buf)) {
1612                 msg_dbg(buf_msg(buf), ">NO_RETR->BCONG>");
1613                 dbg_print_link(l_ptr, "   ");
1614                 l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1615                 l_ptr->retransm_queue_size = retransmits;
1616                 return;
1617         }
1618         while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) {
1619                 msg = buf_msg(buf);
1620                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1621                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
1622                 if (bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1623                         /* Catch if retransmissions fail repeatedly: */
1624                         if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1625                                 if (++l_ptr->stale_count > 100) {
1626                                         msg_print(CONS, buf_msg(buf), ">RETR>");
1627                                         info("...Retransmitted %u times\n",
1628                                              l_ptr->stale_count);
1629                                         link_print(l_ptr, CONS, "Resetting Link\n");;
1630                                         link_reset(l_ptr);
1631                                         break;
1632                                 }
1633                         } else {
1634                                 l_ptr->stale_count = 0;
1635                         }
1636                         l_ptr->last_retransmitted = msg_seqno(msg);
1637
1638                         msg_dbg(buf_msg(buf), ">RETR>");
1639                         buf = buf->next;
1640                         retransmits--;
1641                         l_ptr->stats.retransmitted++;
1642                 } else {
1643                         bearer_schedule(l_ptr->b_ptr, l_ptr);
1644                         l_ptr->stats.bearer_congs++;
1645                         l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1646                         l_ptr->retransm_queue_size = retransmits;
1647                         return;
1648                 }
1649         }
1650         l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1651 }
1652
1653 /* 
1654  * link_recv_non_seq: Receive packets which are outside
1655  *                    the link sequence flow
1656  */
1657
1658 static void link_recv_non_seq(struct sk_buff *buf)
1659 {
1660         struct tipc_msg *msg = buf_msg(buf);
1661
1662         if (msg_user(msg) ==  LINK_CONFIG)
1663                 disc_recv_msg(buf);
1664         else
1665                 bclink_recv_pkt(buf);
1666 }
1667
1668 /** 
1669  * link_insert_deferred_queue - insert deferred messages back into receive chain
1670  */
1671
1672 static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 
1673                                                   struct sk_buff *buf)
1674 {
1675         u32 seq_no;
1676
1677         if (l_ptr->oldest_deferred_in == NULL)
1678                 return buf;
1679
1680         seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1681         if (seq_no == mod(l_ptr->next_in_no)) {
1682                 l_ptr->newest_deferred_in->next = buf;
1683                 buf = l_ptr->oldest_deferred_in;
1684                 l_ptr->oldest_deferred_in = NULL;
1685                 l_ptr->deferred_inqueue_sz = 0;
1686         }
1687         return buf;
1688 }
1689
1690 void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1691 {
1692         read_lock_bh(&net_lock);
1693         while (head) {
1694                 struct bearer *b_ptr;
1695                 struct node *n_ptr;
1696                 struct link *l_ptr;
1697                 struct sk_buff *crs;
1698                 struct sk_buff *buf = head;
1699                 struct tipc_msg *msg = buf_msg(buf);
1700                 u32 seq_no = msg_seqno(msg);
1701                 u32 ackd = msg_ack(msg);
1702                 u32 released = 0;
1703                 int type;
1704
1705                 b_ptr = (struct bearer *)tb_ptr;
1706                 TIPC_SKB_CB(buf)->handle = b_ptr;
1707
1708                 head = head->next;
1709                 if (unlikely(msg_version(msg) != TIPC_VERSION))
1710                         goto cont;
1711 #if 0
1712                 if (msg_user(msg) != LINK_PROTOCOL)
1713 #endif
1714                         msg_dbg(msg,"<REC<");
1715
1716                 if (unlikely(msg_non_seq(msg))) {
1717                         link_recv_non_seq(buf);
1718                         continue;
1719                 }
1720                 n_ptr = node_find(msg_prevnode(msg));
1721                 if (unlikely(!n_ptr))
1722                         goto cont;
1723
1724                 node_lock(n_ptr);
1725                 l_ptr = n_ptr->links[b_ptr->identity];
1726                 if (unlikely(!l_ptr)) {
1727                         node_unlock(n_ptr);
1728                         goto cont;
1729                 }
1730                 /* 
1731                  * Release acked messages 
1732                  */
1733                 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1734                         if (node_is_up(n_ptr) && n_ptr->bclink.supported)
1735                                 bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1736                 }
1737
1738                 crs = l_ptr->first_out;
1739                 while ((crs != l_ptr->next_out) && 
1740                        less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1741                         struct sk_buff *next = crs->next;
1742
1743                         buf_discard(crs);
1744                         crs = next;
1745                         released++;
1746                 }
1747                 if (released) {
1748                         l_ptr->first_out = crs;
1749                         l_ptr->out_queue_size -= released;
1750                 }
1751                 if (unlikely(l_ptr->next_out))
1752                         link_push_queue(l_ptr);
1753                 if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1754                         link_wakeup_ports(l_ptr, 0);
1755                 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1756                         l_ptr->stats.sent_acks++;
1757                         link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1758                 }
1759
1760 protocol_check:
1761                 if (likely(link_working_working(l_ptr))) {
1762                         if (likely(seq_no == mod(l_ptr->next_in_no))) {
1763                                 l_ptr->next_in_no++;
1764                                 if (unlikely(l_ptr->oldest_deferred_in))
1765                                         head = link_insert_deferred_queue(l_ptr,
1766                                                                           head);
1767                                 if (likely(msg_is_dest(msg, tipc_own_addr))) {
1768 deliver:
1769                                         if (likely(msg_isdata(msg))) {
1770                                                 node_unlock(n_ptr);
1771                                                 port_recv_msg(buf);
1772                                                 continue;
1773                                         }
1774                                         switch (msg_user(msg)) {
1775                                         case MSG_BUNDLER:
1776                                                 l_ptr->stats.recv_bundles++;
1777                                                 l_ptr->stats.recv_bundled += 
1778                                                         msg_msgcnt(msg);
1779                                                 node_unlock(n_ptr);
1780                                                 link_recv_bundle(buf);
1781                                                 continue;
1782                                         case ROUTE_DISTRIBUTOR:
1783                                                 node_unlock(n_ptr);
1784                                                 cluster_recv_routing_table(buf);
1785                                                 continue;
1786                                         case NAME_DISTRIBUTOR:
1787                                                 node_unlock(n_ptr);
1788                                                 named_recv(buf);
1789                                                 continue;
1790                                         case CONN_MANAGER:
1791                                                 node_unlock(n_ptr);
1792                                                 port_recv_proto_msg(buf);
1793                                                 continue;
1794                                         case MSG_FRAGMENTER:
1795                                                 l_ptr->stats.recv_fragments++;
1796                                                 if (link_recv_fragment(
1797                                                         &l_ptr->defragm_buf, 
1798                                                         &buf, &msg)) {
1799                                                         l_ptr->stats.recv_fragmented++;
1800                                                         goto deliver;
1801                                                 }
1802                                                 break;
1803                                         case CHANGEOVER_PROTOCOL:
1804                                                 type = msg_type(msg);
1805                                                 if (link_recv_changeover_msg(
1806                                                         &l_ptr, &buf)) {
1807                                                         msg = buf_msg(buf);
1808                                                         seq_no = msg_seqno(msg);
1809                                                         TIPC_SKB_CB(buf)->handle 
1810                                                                 = b_ptr;
1811                                                         if (type == ORIGINAL_MSG)
1812                                                                 goto deliver;
1813                                                         goto protocol_check;
1814                                                 }
1815                                                 break;
1816                                         }
1817                                 }
1818                                 node_unlock(n_ptr);
1819                                 net_route_msg(buf);
1820                                 continue;
1821                         }
1822                         link_handle_out_of_seq_msg(l_ptr, buf);
1823                         head = link_insert_deferred_queue(l_ptr, head);
1824                         node_unlock(n_ptr);
1825                         continue;
1826                 }
1827
1828                 if (msg_user(msg) == LINK_PROTOCOL) {
1829                         link_recv_proto_msg(l_ptr, buf);
1830                         head = link_insert_deferred_queue(l_ptr, head);
1831                         node_unlock(n_ptr);
1832                         continue;
1833                 }
1834                 msg_dbg(msg,"NSEQ<REC<");
1835                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1836
1837                 if (link_working_working(l_ptr)) {
1838                         /* Re-insert in front of queue */
1839                         msg_dbg(msg,"RECV-REINS:");
1840                         buf->next = head;
1841                         head = buf;
1842                         node_unlock(n_ptr);
1843                         continue;
1844                 }
1845                 node_unlock(n_ptr);
1846 cont:
1847                 buf_discard(buf);
1848         }
1849         read_unlock_bh(&net_lock);
1850 }
1851
1852 /* 
1853  * link_defer_buf(): Sort a received out-of-sequence packet 
1854  *                   into the deferred reception queue.
1855  * Returns the increase of the queue length,i.e. 0 or 1
1856  */
1857
1858 u32 link_defer_pkt(struct sk_buff **head,
1859                    struct sk_buff **tail,
1860                    struct sk_buff *buf)
1861 {
1862         struct sk_buff *prev = 0;
1863         struct sk_buff *crs = *head;
1864         u32 seq_no = msg_seqno(buf_msg(buf));
1865
1866         buf->next = NULL;
1867
1868         /* Empty queue ? */
1869         if (*head == NULL) {
1870                 *head = *tail = buf;
1871                 return 1;
1872         }
1873
1874         /* Last ? */
1875         if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1876                 (*tail)->next = buf;
1877                 *tail = buf;
1878                 return 1;
1879         }
1880
1881         /* Scan through queue and sort it in */
1882         do {
1883                 struct tipc_msg *msg = buf_msg(crs);
1884
1885                 if (less(seq_no, msg_seqno(msg))) {
1886                         buf->next = crs;
1887                         if (prev)
1888                                 prev->next = buf;
1889                         else
1890                                 *head = buf;   
1891                         return 1;
1892                 }
1893                 if (seq_no == msg_seqno(msg)) {
1894                         break;
1895                 }
1896                 prev = crs;
1897                 crs = crs->next;
1898         }
1899         while (crs);
1900
1901         /* Message is a duplicate of an existing message */
1902
1903         buf_discard(buf);
1904         return 0;
1905 }
1906
1907 /** 
1908  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1909  */
1910
1911 static void link_handle_out_of_seq_msg(struct link *l_ptr, 
1912                                        struct sk_buff *buf)
1913 {
1914         u32 seq_no = msg_seqno(buf_msg(buf));
1915
1916         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1917                 link_recv_proto_msg(l_ptr, buf);
1918                 return;
1919         }
1920
1921         dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", 
1922             seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
1923
1924         /* Record OOS packet arrival (force mismatch on next timeout) */
1925
1926         l_ptr->checkpoint--;
1927
1928         /* 
1929          * Discard packet if a duplicate; otherwise add it to deferred queue
1930          * and notify peer of gap as per protocol specification
1931          */
1932
1933         if (less(seq_no, mod(l_ptr->next_in_no))) {
1934                 l_ptr->stats.duplicates++;
1935                 buf_discard(buf);
1936                 return;
1937         }
1938
1939         if (link_defer_pkt(&l_ptr->oldest_deferred_in,
1940                            &l_ptr->newest_deferred_in, buf)) {
1941                 l_ptr->deferred_inqueue_sz++;
1942                 l_ptr->stats.deferred_recv++;
1943                 if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1944                         link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1945         } else
1946                 l_ptr->stats.duplicates++;
1947 }
1948
1949 /*
1950  * Send protocol message to the other endpoint.
1951  */
1952 void link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
1953                          u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1954 {
1955         struct sk_buff *buf = 0;
1956         struct tipc_msg *msg = l_ptr->pmsg;
1957         u32 msg_size = sizeof(l_ptr->proto_msg);
1958
1959         if (link_blocked(l_ptr))
1960                 return;
1961         msg_set_type(msg, msg_typ);
1962         msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
1963         msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 
1964         msg_set_last_bcast(msg, bclink_get_last_sent());
1965
1966         if (msg_typ == STATE_MSG) {
1967                 u32 next_sent = mod(l_ptr->next_out_no);
1968
1969                 if (!link_is_up(l_ptr))
1970                         return;
1971                 if (l_ptr->next_out)
1972                         next_sent = msg_seqno(buf_msg(l_ptr->next_out));
1973                 msg_set_next_sent(msg, next_sent);
1974                 if (l_ptr->oldest_deferred_in) {
1975                         u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1976                         gap = mod(rec - mod(l_ptr->next_in_no));
1977                 }
1978                 msg_set_seq_gap(msg, gap);
1979                 if (gap)
1980                         l_ptr->stats.sent_nacks++;
1981                 msg_set_link_tolerance(msg, tolerance);
1982                 msg_set_linkprio(msg, priority);
1983                 msg_set_max_pkt(msg, ack_mtu);
1984                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1985                 msg_set_probe(msg, probe_msg != 0);
1986                 if (probe_msg) { 
1987                         u32 mtu = l_ptr->max_pkt;
1988
1989                         if ((mtu < l_ptr->max_pkt_target) &&
1990                             link_working_working(l_ptr) &&
1991                             l_ptr->fsm_msg_cnt) {
1992                                 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1993                                 if (l_ptr->max_pkt_probes == 10) {
1994                                         l_ptr->max_pkt_target = (msg_size - 4);
1995                                         l_ptr->max_pkt_probes = 0;
1996                                         msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1997                                 }
1998                                 l_ptr->max_pkt_probes++;
1999                         }
2000
2001                         l_ptr->stats.sent_probes++;
2002                 }
2003                 l_ptr->stats.sent_states++;
2004         } else {                /* RESET_MSG or ACTIVATE_MSG */
2005                 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2006                 msg_set_seq_gap(msg, 0);
2007                 msg_set_next_sent(msg, 1);
2008                 msg_set_link_tolerance(msg, l_ptr->tolerance);
2009                 msg_set_linkprio(msg, l_ptr->priority);
2010                 msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2011         }
2012
2013         if (node_has_redundant_links(l_ptr->owner)) {
2014                 msg_set_redundant_link(msg);
2015         } else {
2016                 msg_clear_redundant_link(msg);
2017         }
2018         msg_set_linkprio(msg, l_ptr->priority);
2019
2020         /* Ensure sequence number will not fit : */
2021
2022         msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2023
2024         /* Congestion? */
2025
2026         if (bearer_congested(l_ptr->b_ptr, l_ptr)) {
2027                 if (!l_ptr->proto_msg_queue) {
2028                         l_ptr->proto_msg_queue =
2029                                 buf_acquire(sizeof(l_ptr->proto_msg));
2030                 }
2031                 buf = l_ptr->proto_msg_queue;
2032                 if (!buf)
2033                         return;
2034                 memcpy(buf->data, (unchar *)msg, sizeof(l_ptr->proto_msg));
2035                 return;
2036         }
2037         msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2038
2039         /* Message can be sent */
2040
2041         msg_dbg(msg, ">>");
2042
2043         buf = buf_acquire(msg_size);
2044         if (!buf)
2045                 return;
2046
2047         memcpy(buf->data, (unchar *)msg, sizeof(l_ptr->proto_msg));
2048         msg_set_size(buf_msg(buf), msg_size);
2049
2050         if (bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2051                 l_ptr->unacked_window = 0;
2052                 buf_discard(buf);
2053                 return;
2054         }
2055
2056         /* New congestion */
2057         bearer_schedule(l_ptr->b_ptr, l_ptr);
2058         l_ptr->proto_msg_queue = buf;
2059         l_ptr->stats.bearer_congs++;
2060 }
2061
2062 /*
2063  * Receive protocol message :
2064  * Note that network plane id propagates through the network, and may 
2065  * change at any time. The node with lowest address rules    
2066  */
2067
2068 static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2069 {
2070         u32 rec_gap = 0;
2071         u32 max_pkt_info;
2072         u32 max_pkt_ack;
2073         u32 msg_tol;
2074         struct tipc_msg *msg = buf_msg(buf);
2075
2076         dbg("AT(%u):", jiffies_to_msecs(jiffies));
2077         msg_dbg(msg, "<<");
2078         if (link_blocked(l_ptr))
2079                 goto exit;
2080
2081         /* record unnumbered packet arrival (force mismatch on next timeout) */
2082
2083         l_ptr->checkpoint--;
2084
2085         if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2086                 if (tipc_own_addr > msg_prevnode(msg))
2087                         l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2088
2089         l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2090
2091         switch (msg_type(msg)) {
2092         
2093         case RESET_MSG:
2094                 if (!link_working_unknown(l_ptr) && l_ptr->peer_session) {
2095                         if (msg_session(msg) == l_ptr->peer_session) {
2096                                 dbg("Duplicate RESET: %u<->%u\n",
2097                                     msg_session(msg), l_ptr->peer_session);                                     
2098                                 break; /* duplicate: ignore */
2099                         }
2100                 }
2101                 /* fall thru' */
2102         case ACTIVATE_MSG:
2103                 /* Update link settings according other endpoint's values */
2104
2105                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2106
2107                 if ((msg_tol = msg_link_tolerance(msg)) &&
2108                     (msg_tol > l_ptr->tolerance))
2109                         link_set_supervision_props(l_ptr, msg_tol);
2110
2111                 if (msg_linkprio(msg) > l_ptr->priority)
2112                         l_ptr->priority = msg_linkprio(msg);
2113
2114                 max_pkt_info = msg_max_pkt(msg);
2115                 if (max_pkt_info) {
2116                         if (max_pkt_info < l_ptr->max_pkt_target)
2117                                 l_ptr->max_pkt_target = max_pkt_info;
2118                         if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2119                                 l_ptr->max_pkt = l_ptr->max_pkt_target;
2120                 } else {
2121                         l_ptr->max_pkt = l_ptr->max_pkt_target;
2122                 }
2123                 l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2124
2125                 link_state_event(l_ptr, msg_type(msg));
2126
2127                 l_ptr->peer_session = msg_session(msg);
2128                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
2129
2130                 /* Synchronize broadcast sequence numbers */
2131                 if (!node_has_redundant_links(l_ptr->owner)) {
2132                         l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2133                 }
2134                 break;
2135         case STATE_MSG:
2136
2137                 if ((msg_tol = msg_link_tolerance(msg)))
2138                         link_set_supervision_props(l_ptr, msg_tol);
2139                 
2140                 if (msg_linkprio(msg) && 
2141                     (msg_linkprio(msg) != l_ptr->priority)) {
2142                         warn("Changing prio <%s>: %u->%u\n",
2143                              l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2144                         l_ptr->priority = msg_linkprio(msg);
2145                         link_reset(l_ptr); /* Enforce change to take effect */
2146                         break;
2147                 }
2148                 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2149                 l_ptr->stats.recv_states++;
2150                 if (link_reset_unknown(l_ptr))
2151                         break;
2152
2153                 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2154                         rec_gap = mod(msg_next_sent(msg) - 
2155                                       mod(l_ptr->next_in_no));
2156                 }
2157
2158                 max_pkt_ack = msg_max_pkt(msg);
2159                 if (max_pkt_ack > l_ptr->max_pkt) {
2160                         dbg("Link <%s> updated MTU %u -> %u\n",
2161                             l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2162                         l_ptr->max_pkt = max_pkt_ack;
2163                         l_ptr->max_pkt_probes = 0;
2164                 }
2165
2166                 max_pkt_ack = 0;
2167                 if (msg_probe(msg)) {
2168                         l_ptr->stats.recv_probes++;
2169                         if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2170                                 max_pkt_ack = msg_size(msg);
2171                         }
2172                 }
2173
2174                 /* Protocol message before retransmits, reduce loss risk */
2175
2176                 bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2177
2178                 if (rec_gap || (msg_probe(msg))) {
2179                         link_send_proto_msg(l_ptr, STATE_MSG,
2180                                             0, rec_gap, 0, 0, max_pkt_ack);
2181                 }
2182                 if (msg_seq_gap(msg)) {
2183                         msg_dbg(msg, "With Gap:");
2184                         l_ptr->stats.recv_nacks++;
2185                         link_retransmit(l_ptr, l_ptr->first_out,
2186                                         msg_seq_gap(msg));
2187                 }
2188                 break;
2189         default:
2190                 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2191         }
2192 exit:
2193         buf_discard(buf);
2194 }
2195
2196
2197 /*
2198  * link_tunnel(): Send one message via a link belonging to 
2199  * another bearer. Owner node is locked.
2200  */
2201 void link_tunnel(struct link *l_ptr, 
2202             struct tipc_msg *tunnel_hdr, 
2203             struct tipc_msg  *msg,
2204             u32 selector)
2205 {
2206         struct link *tunnel;
2207         struct sk_buff *buf;
2208         u32 length = msg_size(msg);
2209
2210         tunnel = l_ptr->owner->active_links[selector & 1];
2211         if (!link_is_up(tunnel))
2212                 return;
2213         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2214         buf = buf_acquire(length + INT_H_SIZE);
2215         if (!buf)
2216                 return;
2217         memcpy(buf->data, (unchar *)tunnel_hdr, INT_H_SIZE);
2218         memcpy(buf->data + INT_H_SIZE, (unchar *)msg, length);
2219         dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2220         msg_dbg(buf_msg(buf), ">SEND>");
2221         assert(tunnel);
2222         link_send_buf(tunnel, buf);
2223 }
2224
2225
2226
2227 /*
2228  * changeover(): Send whole message queue via the remaining link
2229  *               Owner node is locked.
2230  */
2231
2232 void link_changeover(struct link *l_ptr)
2233 {
2234         u32 msgcount = l_ptr->out_queue_size;
2235         struct sk_buff *crs = l_ptr->first_out;
2236         struct link *tunnel = l_ptr->owner->active_links[0];
2237         int split_bundles = node_has_redundant_links(l_ptr->owner);
2238         struct tipc_msg tunnel_hdr;
2239
2240         if (!tunnel)
2241                 return;
2242
2243         if (!l_ptr->owner->permit_changeover)
2244                 return;
2245
2246         msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2247                  ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
2248         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2249         msg_set_msgcnt(&tunnel_hdr, msgcount);
2250         if (!l_ptr->first_out) {
2251                 struct sk_buff *buf;
2252
2253                 assert(!msgcount);
2254                 buf = buf_acquire(INT_H_SIZE);
2255                 if (buf) {
2256                         memcpy(buf->data, (unchar *)&tunnel_hdr, INT_H_SIZE);
2257                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
2258                         dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2259                             tunnel->b_ptr->net_plane);
2260                         msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2261                         link_send_buf(tunnel, buf);
2262                 } else {
2263                         warn("Memory squeeze; link changeover failed\n");
2264                 }
2265                 return;
2266         }
2267         while (crs) {
2268                 struct tipc_msg *msg = buf_msg(crs);
2269
2270                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2271                         u32 msgcount = msg_msgcnt(msg);
2272                         struct tipc_msg *m = msg_get_wrapped(msg);
2273                         unchar* pos = (unchar*)m;
2274
2275                         while (msgcount--) {
2276                                 msg_set_seqno(m,msg_seqno(msg));
2277                                 link_tunnel(l_ptr, &tunnel_hdr, m,
2278                                             msg_link_selector(m));
2279                                 pos += align(msg_size(m));
2280                                 m = (struct tipc_msg *)pos;
2281                         }
2282                 } else {
2283                         link_tunnel(l_ptr, &tunnel_hdr, msg,
2284                                     msg_link_selector(msg));
2285                 }
2286                 crs = crs->next;
2287         }
2288 }
2289
2290 void link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2291 {
2292         struct sk_buff *iter;
2293         struct tipc_msg tunnel_hdr;
2294
2295         msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2296                  DUPLICATE_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
2297         msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2298         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2299         iter = l_ptr->first_out;
2300         while (iter) {
2301                 struct sk_buff *outbuf;
2302                 struct tipc_msg *msg = buf_msg(iter);
2303                 u32 length = msg_size(msg);
2304
2305                 if (msg_user(msg) == MSG_BUNDLER)
2306                         msg_set_type(msg, CLOSED_MSG);
2307                 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
2308                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 
2309                 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2310                 outbuf = buf_acquire(length + INT_H_SIZE);
2311                 if (outbuf == NULL) {
2312                         warn("Memory squeeze; buffer duplication failed\n");
2313                         return;
2314                 }
2315                 memcpy(outbuf->data, (unchar *)&tunnel_hdr, INT_H_SIZE);
2316                 memcpy(outbuf->data + INT_H_SIZE, iter->data, length);
2317                 dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2318                     tunnel->b_ptr->net_plane);
2319                 msg_dbg(buf_msg(outbuf), ">SEND>");
2320                 link_send_buf(tunnel, outbuf);
2321                 if (!link_is_up(l_ptr))
2322                         return;
2323                 iter = iter->next;
2324         }
2325 }
2326
2327
2328
2329 /**
2330  * buf_extract - extracts embedded TIPC message from another message
2331  * @skb: encapsulating message buffer
2332  * @from_pos: offset to extract from
2333  *
2334  * Returns a new message buffer containing an embedded message.  The 
2335  * encapsulating message itself is left unchanged.
2336  */
2337
2338 static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2339 {
2340         struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2341         u32 size = msg_size(msg);
2342         struct sk_buff *eb;
2343
2344         eb = buf_acquire(size);
2345         if (eb)
2346                 memcpy(eb->data, (unchar *)msg, size);
2347         return eb;
2348 }
2349
2350 /* 
2351  *  link_recv_changeover_msg(): Receive tunneled packet sent
2352  *  via other link. Node is locked. Return extracted buffer.
2353  */
2354
2355 static int link_recv_changeover_msg(struct link **l_ptr,
2356                                     struct sk_buff **buf)
2357 {
2358         struct sk_buff *tunnel_buf = *buf;
2359         struct link *dest_link;
2360         struct tipc_msg *msg;
2361         struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2362         u32 msg_typ = msg_type(tunnel_msg);
2363         u32 msg_count = msg_msgcnt(tunnel_msg);
2364
2365         dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2366         assert(dest_link != *l_ptr);
2367         if (!dest_link) {
2368                 msg_dbg(tunnel_msg, "NOLINK/<REC<");
2369                 goto exit;
2370         }
2371         dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2372             (*l_ptr)->b_ptr->net_plane);
2373         *l_ptr = dest_link;
2374         msg = msg_get_wrapped(tunnel_msg);
2375
2376         if (msg_typ == DUPLICATE_MSG) {
2377                 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2378                         msg_dbg(tunnel_msg, "DROP/<REC<");
2379                         goto exit;
2380                 }
2381                 *buf = buf_extract(tunnel_buf,INT_H_SIZE);
2382                 if (*buf == NULL) {
2383                         warn("Memory squeeze; failed to extract msg\n");
2384                         goto exit;
2385                 }
2386                 msg_dbg(tunnel_msg, "TNL<REC<");
2387                 buf_discard(tunnel_buf);
2388                 return 1;
2389         }
2390
2391         /* First original message ?: */
2392
2393         if (link_is_up(dest_link)) {
2394                 msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2395                 link_reset(dest_link);
2396                 dest_link->exp_msg_count = msg_count;
2397                 if (!msg_count)
2398                         goto exit;
2399         } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2400                 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2401                 dest_link->exp_msg_count = msg_count;
2402                 if (!msg_count)
2403                         goto exit;
2404         }
2405
2406         /* Receive original message */
2407
2408         if (dest_link->exp_msg_count == 0) {
2409                 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2410                 dbg_print_link(dest_link, "LINK:");
2411                 goto exit;
2412         }
2413         dest_link->exp_msg_count--;
2414         if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2415                 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2416                 goto exit;
2417         } else {
2418                 *buf = buf_extract(tunnel_buf, INT_H_SIZE);
2419                 if (*buf != NULL) {
2420                         msg_dbg(tunnel_msg, "TNL<REC<");
2421                         buf_discard(tunnel_buf);
2422                         return 1;
2423                 } else {
2424                         warn("Memory squeeze; dropped incoming msg\n");
2425                 }
2426         }
2427 exit:
2428         *buf = 0;
2429         buf_discard(tunnel_buf);
2430         return 0;
2431 }
2432
2433 /*
2434  *  Bundler functionality:
2435  */
2436 void link_recv_bundle(struct sk_buff *buf)
2437 {
2438         u32 msgcount = msg_msgcnt(buf_msg(buf));
2439         u32 pos = INT_H_SIZE;
2440         struct sk_buff *obuf;
2441
2442         msg_dbg(buf_msg(buf), "<BNDL<: ");
2443         while (msgcount--) {
2444                 obuf = buf_extract(buf, pos);
2445                 if (obuf == NULL) {
2446                         char addr_string[16];
2447
2448                         warn("Buffer allocation failure;\n");
2449                         warn("  incoming message(s) from %s lost\n",
2450                              addr_string_fill(addr_string, 
2451                                               msg_orignode(buf_msg(buf))));
2452                         return;
2453                 };
2454                 pos += align(msg_size(buf_msg(obuf)));
2455                 msg_dbg(buf_msg(obuf), "     /");
2456                 net_route_msg(obuf);
2457         }
2458         buf_discard(buf);
2459 }
2460
2461 /*
2462  *  Fragmentation/defragmentation:
2463  */
2464
2465
2466 /* 
2467  * link_send_long_buf: Entry for buffers needing fragmentation.
2468  * The buffer is complete, inclusive total message length. 
2469  * Returns user data length.
2470  */
2471 int link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2472 {
2473         struct tipc_msg *inmsg = buf_msg(buf);
2474         struct tipc_msg fragm_hdr;
2475         u32 insize = msg_size(inmsg);
2476         u32 dsz = msg_data_sz(inmsg);
2477         unchar *crs = buf->data;
2478         u32 rest = insize;
2479         u32 pack_sz = link_max_pkt(l_ptr);
2480         u32 fragm_sz = pack_sz - INT_H_SIZE;
2481         u32 fragm_no = 1;
2482         u32 destaddr = msg_destnode(inmsg);
2483
2484         if (msg_short(inmsg))
2485                 destaddr = l_ptr->addr;
2486
2487         if (msg_routed(inmsg))
2488                 msg_set_prevnode(inmsg, tipc_own_addr);
2489
2490         /* Prepare reusable fragment header: */
2491
2492         msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2493                  TIPC_OK, INT_H_SIZE, destaddr);
2494         msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2495         msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2496         msg_set_fragm_no(&fragm_hdr, fragm_no);
2497         l_ptr->stats.sent_fragmented++;
2498
2499         /* Chop up message: */
2500
2501         while (rest > 0) {
2502                 struct sk_buff *fragm;
2503
2504                 if (rest <= fragm_sz) {
2505                         fragm_sz = rest;
2506                         msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2507                 }
2508                 fragm = buf_acquire(fragm_sz + INT_H_SIZE);
2509                 if (fragm == NULL) {
2510                         warn("Memory squeeze; failed to fragment msg\n");
2511                         dsz = -ENOMEM;
2512                         goto exit;
2513                 }
2514                 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2515                 memcpy(fragm->data, (unchar *)&fragm_hdr, INT_H_SIZE);
2516                 memcpy(fragm->data + INT_H_SIZE, crs, fragm_sz);
2517
2518                 /*  Send queued messages first, if any: */
2519
2520                 l_ptr->stats.sent_fragments++;
2521                 link_send_buf(l_ptr, fragm);
2522                 if (!link_is_up(l_ptr))
2523                         return dsz;
2524                 msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2525                 rest -= fragm_sz;
2526                 crs += fragm_sz;
2527                 msg_set_type(&fragm_hdr, FRAGMENT);
2528         }
2529 exit:
2530         buf_discard(buf);
2531         return dsz;
2532 }
2533
2534 /* 
2535  * A pending message being re-assembled must store certain values 
2536  * to handle subsequent fragments correctly. The following functions 
2537  * help storing these values in unused, available fields in the
2538  * pending message. This makes dynamic memory allocation unecessary.
2539  */
2540
2541 static inline u32 get_long_msg_seqno(struct sk_buff *buf)
2542 {
2543         return msg_seqno(buf_msg(buf));
2544 }
2545
2546 static inline void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2547 {
2548         msg_set_seqno(buf_msg(buf), seqno);
2549 }
2550
2551 static inline u32 get_fragm_size(struct sk_buff *buf)
2552 {
2553         return msg_ack(buf_msg(buf));
2554 }
2555
2556 static inline void set_fragm_size(struct sk_buff *buf, u32 sz)
2557 {
2558         msg_set_ack(buf_msg(buf), sz);
2559 }
2560
2561 static inline u32 get_expected_frags(struct sk_buff *buf)
2562 {
2563         return msg_bcast_ack(buf_msg(buf));
2564 }
2565
2566 static inline void set_expected_frags(struct sk_buff *buf, u32 exp)
2567 {
2568         msg_set_bcast_ack(buf_msg(buf), exp);
2569 }
2570
2571 static inline u32 get_timer_cnt(struct sk_buff *buf)
2572 {
2573         return msg_reroute_cnt(buf_msg(buf));
2574 }
2575
2576 static inline void incr_timer_cnt(struct sk_buff *buf)
2577 {
2578         msg_incr_reroute_cnt(buf_msg(buf));
2579 }
2580
2581 /* 
2582  * link_recv_fragment(): Called with node lock on. Returns 
2583  * the reassembled buffer if message is complete.
2584  */
2585 int link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 
2586                        struct tipc_msg **m)
2587 {
2588         struct sk_buff *prev = 0;
2589         struct sk_buff *fbuf = *fb;
2590         struct tipc_msg *fragm = buf_msg(fbuf);
2591         struct sk_buff *pbuf = *pending;
2592         u32 long_msg_seq_no = msg_long_msgno(fragm);
2593
2594         *fb = 0;
2595         msg_dbg(fragm,"FRG<REC<");
2596
2597         /* Is there an incomplete message waiting for this fragment? */
2598
2599         while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no)
2600                         || (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2601                 prev = pbuf;
2602                 pbuf = pbuf->next;
2603         }
2604
2605         if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2606                 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2607                 u32 msg_sz = msg_size(imsg);
2608                 u32 fragm_sz = msg_data_sz(fragm);
2609                 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2610                 u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2611                 if (msg_type(imsg) == TIPC_MCAST_MSG)
2612                         max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2613                 if (msg_size(imsg) > max) {
2614                         msg_dbg(fragm,"<REC<Oversized: ");
2615                         buf_discard(fbuf);
2616                         return 0;
2617                 }
2618                 pbuf = buf_acquire(msg_size(imsg));
2619                 if (pbuf != NULL) {
2620                         pbuf->next = *pending;
2621                         *pending = pbuf;
2622                         memcpy(pbuf->data, (unchar *)imsg, msg_data_sz(fragm));
2623
2624                         /*  Prepare buffer for subsequent fragments. */
2625
2626                         set_long_msg_seqno(pbuf, long_msg_seq_no); 
2627                         set_fragm_size(pbuf,fragm_sz); 
2628                         set_expected_frags(pbuf,exp_fragm_cnt - 1); 
2629                 } else {
2630                         warn("Memory squeeze; got no defragmenting buffer\n");
2631                 }
2632                 buf_discard(fbuf);
2633                 return 0;
2634         } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2635                 u32 dsz = msg_data_sz(fragm);
2636                 u32 fsz = get_fragm_size(pbuf);
2637                 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2638                 u32 exp_frags = get_expected_frags(pbuf) - 1;
2639                 memcpy(pbuf->data + crs, msg_data(fragm), dsz);
2640                 buf_discard(fbuf);
2641
2642                 /* Is message complete? */
2643
2644                 if (exp_frags == 0) {
2645                         if (prev)
2646                                 prev->next = pbuf->next;
2647                         else
2648                                 *pending = pbuf->next;
2649                         msg_reset_reroute_cnt(buf_msg(pbuf));
2650                         *fb = pbuf;
2651                         *m = buf_msg(pbuf);
2652                         return 1;
2653                 }
2654                 set_expected_frags(pbuf,exp_frags);     
2655                 return 0;
2656         }
2657         dbg(" Discarding orphan fragment %x\n",fbuf);
2658         msg_dbg(fragm,"ORPHAN:");
2659         dbg("Pending long buffers:\n");
2660         dbg_print_buf_chain(*pending);
2661         buf_discard(fbuf);
2662         return 0;
2663 }
2664
2665 /**
2666  * link_check_defragm_bufs - flush stale incoming message fragments
2667  * @l_ptr: pointer to link
2668  */
2669
2670 static void link_check_defragm_bufs(struct link *l_ptr)
2671 {
2672         struct sk_buff *prev = 0;
2673         struct sk_buff *next = 0;
2674         struct sk_buff *buf = l_ptr->defragm_buf;
2675
2676         if (!buf)
2677                 return;
2678         if (!link_working_working(l_ptr))
2679                 return;
2680         while (buf) {
2681                 u32 cnt = get_timer_cnt(buf);
2682
2683                 next = buf->next;
2684                 if (cnt < 4) {
2685                         incr_timer_cnt(buf);
2686                         prev = buf;
2687                 } else {
2688                         dbg(" Discarding incomplete long buffer\n");
2689                         msg_dbg(buf_msg(buf), "LONG:");
2690                         dbg_print_link(l_ptr, "curr:");
2691                         dbg("Pending long buffers:\n");
2692                         dbg_print_buf_chain(l_ptr->defragm_buf);
2693                         if (prev)
2694                                 prev->next = buf->next;
2695                         else
2696                                 l_ptr->defragm_buf = buf->next;
2697                         buf_discard(buf);
2698                 }
2699                 buf = next;
2700         }
2701 }
2702
2703
2704
2705 static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2706 {
2707         l_ptr->tolerance = tolerance;
2708         l_ptr->continuity_interval =
2709                 ((tolerance / 4) > 500) ? 500 : tolerance / 4;
2710         l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2711 }
2712
2713
2714 void link_set_queue_limits(struct link *l_ptr, u32 window)
2715 {
2716         /* Data messages from this node, inclusive FIRST_FRAGM */
2717         l_ptr->queue_limit[DATA_LOW] = window;
2718         l_ptr->queue_limit[DATA_MEDIUM] = (window / 3) * 4;
2719         l_ptr->queue_limit[DATA_HIGH] = (window / 3) * 5;
2720         l_ptr->queue_limit[DATA_CRITICAL] = (window / 3) * 6;
2721         /* Transiting data messages,inclusive FIRST_FRAGM */
2722         l_ptr->queue_limit[DATA_LOW + 4] = 300;
2723         l_ptr->queue_limit[DATA_MEDIUM + 4] = 600;
2724         l_ptr->queue_limit[DATA_HIGH + 4] = 900;
2725         l_ptr->queue_limit[DATA_CRITICAL + 4] = 1200;
2726         l_ptr->queue_limit[CONN_MANAGER] = 1200;
2727         l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2728         l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2729         l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2730         /* FRAGMENT and LAST_FRAGMENT packets */
2731         l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2732 }
2733
2734 /**
2735  * link_find_link - locate link by name
2736  * @name - ptr to link name string
2737  * @node - ptr to area to be filled with ptr to associated node
2738  * 
2739  * Caller must hold 'net_lock' to ensure node and bearer are not deleted;
2740  * this also prevents link deletion.
2741  * 
2742  * Returns pointer to link (or 0 if invalid link name).
2743  */
2744
2745 static struct link *link_find_link(const char *name, struct node **node)
2746 {
2747         struct link_name link_name_parts;
2748         struct bearer *b_ptr;
2749         struct link *l_ptr; 
2750
2751         if (!link_name_validate(name, &link_name_parts))
2752                 return 0;
2753
2754         b_ptr = bearer_find_interface(link_name_parts.if_local);
2755         if (!b_ptr)
2756                 return 0;
2757
2758         *node = node_find(link_name_parts.addr_peer); 
2759         if (!*node)
2760                 return 0;
2761
2762         l_ptr = (*node)->links[b_ptr->identity];
2763         if (!l_ptr || strcmp(l_ptr->name, name))
2764                 return 0;
2765
2766         return l_ptr;
2767 }
2768
2769 struct sk_buff *link_cmd_config(const void *req_tlv_area, int req_tlv_space, 
2770                                 u16 cmd)
2771 {
2772         struct tipc_link_config *args;
2773         u32 new_value;
2774         struct link *l_ptr;
2775         struct node *node;
2776         int res;
2777
2778         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2779                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2780
2781         args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2782         new_value = ntohl(args->value);
2783
2784         if (!strcmp(args->name, bc_link_name)) {
2785                 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2786                     (bclink_set_queue_limits(new_value) == 0))
2787                         return cfg_reply_none();
2788                 return cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2789                                               " (cannot change setting on broadcast link)");
2790         }
2791
2792         read_lock_bh(&net_lock);
2793         l_ptr = link_find_link(args->name, &node); 
2794         if (!l_ptr) {
2795                 read_unlock_bh(&net_lock);
2796                 return cfg_reply_error_string("link not found");
2797         }
2798
2799         node_lock(node);
2800         res = -EINVAL;
2801         switch (cmd) {
2802         case TIPC_CMD_SET_LINK_TOL: 
2803                 if ((new_value >= TIPC_MIN_LINK_TOL) && 
2804                     (new_value <= TIPC_MAX_LINK_TOL)) {
2805                         link_set_supervision_props(l_ptr, new_value);
2806                         link_send_proto_msg(l_ptr, STATE_MSG, 
2807                                             0, 0, new_value, 0, 0);
2808                         res = TIPC_OK;
2809                 }
2810                 break;
2811         case TIPC_CMD_SET_LINK_PRI: 
2812                 if (new_value < TIPC_NUM_LINK_PRI) {
2813                         l_ptr->priority = new_value;
2814                         link_send_proto_msg(l_ptr, STATE_MSG, 
2815                                             0, 0, 0, new_value, 0);
2816                         res = TIPC_OK;
2817                 }
2818                 break;
2819         case TIPC_CMD_SET_LINK_WINDOW: 
2820                 if ((new_value >= TIPC_MIN_LINK_WIN) && 
2821                     (new_value <= TIPC_MAX_LINK_WIN)) {
2822                         link_set_queue_limits(l_ptr, new_value);
2823                         res = TIPC_OK;
2824                 }
2825                 break;
2826         }
2827         node_unlock(node);
2828
2829         read_unlock_bh(&net_lock);
2830         if (res)
2831                 return cfg_reply_error_string("cannot change link setting");
2832
2833         return cfg_reply_none();
2834 }
2835
2836 /**
2837  * link_reset_statistics - reset link statistics
2838  * @l_ptr: pointer to link
2839  */
2840
2841 static void link_reset_statistics(struct link *l_ptr)
2842 {
2843         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2844         l_ptr->stats.sent_info = l_ptr->next_out_no;
2845         l_ptr->stats.recv_info = l_ptr->next_in_no;
2846 }
2847
2848 struct sk_buff *link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2849 {
2850         char *link_name;
2851         struct link *l_ptr; 
2852         struct node *node;
2853
2854         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2855                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2856
2857         link_name = (char *)TLV_DATA(req_tlv_area);
2858         if (!strcmp(link_name, bc_link_name)) {
2859                 if (bclink_reset_stats())
2860                         return cfg_reply_error_string("link not found");
2861                 return cfg_reply_none();
2862         }
2863
2864         read_lock_bh(&net_lock);
2865         l_ptr = link_find_link(link_name, &node); 
2866         if (!l_ptr) {
2867                 read_unlock_bh(&net_lock);
2868                 return cfg_reply_error_string("link not found");
2869         }
2870
2871         node_lock(node);
2872         link_reset_statistics(l_ptr);
2873         node_unlock(node);
2874         read_unlock_bh(&net_lock);
2875         return cfg_reply_none();
2876 }
2877
2878 /**
2879  * percent - convert count to a percentage of total (rounding up or down)
2880  */
2881
2882 static u32 percent(u32 count, u32 total)
2883 {
2884         return (count * 100 + (total / 2)) / total;
2885 }
2886
2887 /**
2888  * link_stats - print link statistics
2889  * @name: link name
2890  * @buf: print buffer area
2891  * @buf_size: size of print buffer area
2892  * 
2893  * Returns length of print buffer data string (or 0 if error)
2894  */
2895
2896 static int link_stats(const char *name, char *buf, const u32 buf_size)
2897 {
2898         struct print_buf pb;
2899         struct link *l_ptr; 
2900         struct node *node;
2901         char *status;
2902         u32 profile_total = 0;
2903
2904         if (!strcmp(name, bc_link_name))
2905                 return bclink_stats(buf, buf_size);
2906
2907         printbuf_init(&pb, buf, buf_size);
2908
2909         read_lock_bh(&net_lock);
2910         l_ptr = link_find_link(name, &node); 
2911         if (!l_ptr) {
2912                 read_unlock_bh(&net_lock);
2913                 return 0;
2914         }
2915         node_lock(node);
2916
2917         if (link_is_active(l_ptr))
2918                 status = "ACTIVE";
2919         else if (link_is_up(l_ptr))
2920                 status = "STANDBY";
2921         else
2922                 status = "DEFUNCT";
2923         tipc_printf(&pb, "Link <%s>\n"
2924                          "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2925                          "  Window:%u packets\n", 
2926                     l_ptr->name, status, link_max_pkt(l_ptr), 
2927                     l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
2928         tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n", 
2929                     l_ptr->next_in_no - l_ptr->stats.recv_info,
2930                     l_ptr->stats.recv_fragments,
2931                     l_ptr->stats.recv_fragmented,
2932                     l_ptr->stats.recv_bundles,
2933                     l_ptr->stats.recv_bundled);
2934         tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n", 
2935                     l_ptr->next_out_no - l_ptr->stats.sent_info,
2936                     l_ptr->stats.sent_fragments,
2937                     l_ptr->stats.sent_fragmented, 
2938                     l_ptr->stats.sent_bundles,
2939                     l_ptr->stats.sent_bundled);
2940         profile_total = l_ptr->stats.msg_length_counts;
2941         if (!profile_total)
2942                 profile_total = 1;
2943         tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
2944                          "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
2945                          "-16354:%u%% -32768:%u%% -66000:%u%%\n",
2946                     l_ptr->stats.msg_length_counts,
2947                     l_ptr->stats.msg_lengths_total / profile_total,
2948                     percent(l_ptr->stats.msg_length_profile[0], profile_total),
2949                     percent(l_ptr->stats.msg_length_profile[1], profile_total),
2950                     percent(l_ptr->stats.msg_length_profile[2], profile_total),
2951                     percent(l_ptr->stats.msg_length_profile[3], profile_total),
2952                     percent(l_ptr->stats.msg_length_profile[4], profile_total),
2953                     percent(l_ptr->stats.msg_length_profile[5], profile_total),
2954                     percent(l_ptr->stats.msg_length_profile[6], profile_total));
2955         tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 
2956                     l_ptr->stats.recv_states,
2957                     l_ptr->stats.recv_probes,
2958                     l_ptr->stats.recv_nacks,
2959                     l_ptr->stats.deferred_recv, 
2960                     l_ptr->stats.duplicates);
2961         tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 
2962                     l_ptr->stats.sent_states, 
2963                     l_ptr->stats.sent_probes, 
2964                     l_ptr->stats.sent_nacks, 
2965                     l_ptr->stats.sent_acks, 
2966                     l_ptr->stats.retransmitted);
2967         tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
2968                     l_ptr->stats.bearer_congs,
2969                     l_ptr->stats.link_congs, 
2970                     l_ptr->stats.max_queue_sz,
2971                     l_ptr->stats.queue_sz_counts
2972                     ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
2973                     : 0);
2974
2975         node_unlock(node);
2976         read_unlock_bh(&net_lock);
2977         return printbuf_validate(&pb);
2978 }
2979
2980 #define MAX_LINK_STATS_INFO 2000
2981
2982 struct sk_buff *link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
2983 {
2984         struct sk_buff *buf;
2985         struct tlv_desc *rep_tlv;
2986         int str_len;
2987
2988         if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2989                 return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2990
2991         buf = cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
2992         if (!buf)
2993                 return NULL;
2994
2995         rep_tlv = (struct tlv_desc *)buf->data;
2996
2997         str_len = link_stats((char *)TLV_DATA(req_tlv_area),
2998                              (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
2999         if (!str_len) {
3000                 buf_discard(buf);
3001                 return cfg_reply_error_string("link not found");
3002         }
3003
3004         skb_put(buf, TLV_SPACE(str_len));
3005         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3006
3007         return buf;
3008 }
3009
3010 #if 0
3011 int link_control(const char *name, u32 op, u32 val)
3012 {
3013         int res = -EINVAL;
3014         struct link *l_ptr;
3015         u32 bearer_id;
3016         struct node * node;
3017         u32 a;
3018
3019         a = link_name2addr(name, &bearer_id);
3020         read_lock_bh(&net_lock);
3021         node = node_find(a);
3022         if (node) {
3023                 node_lock(node);
3024                 l_ptr = node->links[bearer_id];
3025                 if (l_ptr) {
3026                         if (op == TIPC_REMOVE_LINK) {
3027                                 struct bearer *b_ptr = l_ptr->b_ptr;
3028                                 spin_lock_bh(&b_ptr->publ.lock);
3029                                 link_delete(l_ptr);
3030                                 spin_unlock_bh(&b_ptr->publ.lock);
3031                         }
3032                         if (op == TIPC_CMD_BLOCK_LINK) {
3033                                 link_reset(l_ptr);
3034                                 l_ptr->blocked = 1;
3035                         }
3036                         if (op == TIPC_CMD_UNBLOCK_LINK) {
3037                                 l_ptr->blocked = 0;
3038                         }
3039                         res = TIPC_OK;
3040                 }
3041                 node_unlock(node);
3042         }
3043         read_unlock_bh(&net_lock);
3044         return res;
3045 }
3046 #endif
3047
3048 /**
3049  * link_get_max_pkt - get maximum packet size to use when sending to destination
3050  * @dest: network address of destination node
3051  * @selector: used to select from set of active links
3052  * 
3053  * If no active link can be found, uses default maximum packet size.
3054  */
3055
3056 u32 link_get_max_pkt(u32 dest, u32 selector)
3057 {
3058         struct node *n_ptr;
3059         struct link *l_ptr;
3060         u32 res = MAX_PKT_DEFAULT;
3061         
3062         if (dest == tipc_own_addr)
3063                 return MAX_MSG_SIZE;
3064
3065         read_lock_bh(&net_lock);        
3066         n_ptr = node_select(dest, selector);
3067         if (n_ptr) {
3068                 node_lock(n_ptr);
3069                 l_ptr = n_ptr->active_links[selector & 1];
3070                 if (l_ptr)
3071                         res = link_max_pkt(l_ptr);
3072                 node_unlock(n_ptr);
3073         }
3074         read_unlock_bh(&net_lock);       
3075         return res;
3076 }
3077
3078 #if 0
3079 static void link_dump_rec_queue(struct link *l_ptr)
3080 {
3081         struct sk_buff *crs;
3082
3083         if (!l_ptr->oldest_deferred_in) {
3084                 info("Reception queue empty\n");
3085                 return;
3086         }
3087         info("Contents of Reception queue:\n");
3088         crs = l_ptr->oldest_deferred_in;
3089         while (crs) {
3090                 if (crs->data == (void *)0x0000a3a3) {
3091                         info("buffer %x invalid\n", crs);
3092                         return;
3093                 }
3094                 msg_dbg(buf_msg(crs), "In rec queue: \n");
3095                 crs = crs->next;
3096         }
3097 }
3098 #endif
3099
3100 static void link_dump_send_queue(struct link *l_ptr)
3101 {
3102         if (l_ptr->next_out) {
3103                 info("\nContents of unsent queue:\n");
3104                 dbg_print_buf_chain(l_ptr->next_out);
3105         }
3106         info("\nContents of send queue:\n");
3107         if (l_ptr->first_out) {
3108                 dbg_print_buf_chain(l_ptr->first_out);
3109         }
3110         info("Empty send queue\n");
3111 }
3112
3113 static void link_print(struct link *l_ptr, struct print_buf *buf,
3114                        const char *str)
3115 {
3116         tipc_printf(buf, str);
3117         if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3118                 return;
3119         tipc_printf(buf, "Link %x<%s>:",
3120                     l_ptr->addr, l_ptr->b_ptr->publ.name);
3121         tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3122         tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3123         tipc_printf(buf, "SQUE");
3124         if (l_ptr->first_out) {
3125                 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3126                 if (l_ptr->next_out)
3127                         tipc_printf(buf, "%u..",
3128                                     msg_seqno(buf_msg(l_ptr->next_out)));
3129                 tipc_printf(buf, "%u]",
3130                             msg_seqno(buf_msg
3131                                       (l_ptr->last_out)), l_ptr->out_queue_size);
3132                 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 
3133                          msg_seqno(buf_msg(l_ptr->first_out))) 
3134                      != (l_ptr->out_queue_size - 1))
3135                     || (l_ptr->last_out->next != 0)) {
3136                         tipc_printf(buf, "\nSend queue inconsistency\n");
3137                         tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3138                         tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3139                         tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3140                         link_dump_send_queue(l_ptr);
3141                 }
3142         } else
3143                 tipc_printf(buf, "[]");
3144         tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3145         if (l_ptr->oldest_deferred_in) {
3146                 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3147                 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3148                 tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3149                 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3150                         tipc_printf(buf, ":RQSIZ(%u)",
3151                                     l_ptr->deferred_inqueue_sz);
3152                 }
3153         }
3154         if (link_working_unknown(l_ptr))
3155                 tipc_printf(buf, ":WU");
3156         if (link_reset_reset(l_ptr))
3157                 tipc_printf(buf, ":RR");
3158         if (link_reset_unknown(l_ptr))
3159                 tipc_printf(buf, ":RU");
3160         if (link_working_working(l_ptr))
3161                 tipc_printf(buf, ":WW");
3162         tipc_printf(buf, "\n");
3163 }
3164