vfs: fix race in rcu lookup of pruned dentry
[pandora-kernel.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41
42 /* Connection management: */
43 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
44 #define CONFIRMED 0
45 #define PROBING 1
46
47 #define MAX_REJECT_SIZE 1024
48
49 static struct sk_buff *msg_queue_head;
50 static struct sk_buff *msg_queue_tail;
51
52 DEFINE_SPINLOCK(tipc_port_list_lock);
53 static DEFINE_SPINLOCK(queue_lock);
54
55 static LIST_HEAD(ports);
56 static void port_handle_node_down(unsigned long ref);
57 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
58 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
59 static void port_timeout(unsigned long ref);
60
61
62 static u32 port_peernode(struct tipc_port *p_ptr)
63 {
64         return msg_destnode(&p_ptr->phdr);
65 }
66
67 static u32 port_peerport(struct tipc_port *p_ptr)
68 {
69         return msg_destport(&p_ptr->phdr);
70 }
71
72 /**
73  * tipc_multicast - send a multicast message to local and remote destinations
74  */
75
76 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
77                    u32 num_sect, struct iovec const *msg_sect,
78                    unsigned int total_len)
79 {
80         struct tipc_msg *hdr;
81         struct sk_buff *buf;
82         struct sk_buff *ibuf = NULL;
83         struct port_list dports = {0, NULL, };
84         struct tipc_port *oport = tipc_port_deref(ref);
85         int ext_targets;
86         int res;
87
88         if (unlikely(!oport))
89                 return -EINVAL;
90
91         /* Create multicast message */
92
93         hdr = &oport->phdr;
94         msg_set_type(hdr, TIPC_MCAST_MSG);
95         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
96         msg_set_destport(hdr, 0);
97         msg_set_destnode(hdr, 0);
98         msg_set_nametype(hdr, seq->type);
99         msg_set_namelower(hdr, seq->lower);
100         msg_set_nameupper(hdr, seq->upper);
101         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
102         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
103                         !oport->user_port, &buf);
104         if (unlikely(!buf))
105                 return res;
106
107         /* Figure out where to send multicast message */
108
109         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
110                                                 TIPC_NODE_SCOPE, &dports);
111
112         /* Send message to destinations (duplicate it only if necessary) */
113
114         if (ext_targets) {
115                 if (dports.count != 0) {
116                         ibuf = skb_copy(buf, GFP_ATOMIC);
117                         if (ibuf == NULL) {
118                                 tipc_port_list_free(&dports);
119                                 buf_discard(buf);
120                                 return -ENOMEM;
121                         }
122                 }
123                 res = tipc_bclink_send_msg(buf);
124                 if ((res < 0) && (dports.count != 0))
125                         buf_discard(ibuf);
126         } else {
127                 ibuf = buf;
128         }
129
130         if (res >= 0) {
131                 if (ibuf)
132                         tipc_port_recv_mcast(ibuf, &dports);
133         } else {
134                 tipc_port_list_free(&dports);
135         }
136         return res;
137 }
138
139 /**
140  * tipc_port_recv_mcast - deliver multicast message to all destination ports
141  *
142  * If there is no port list, perform a lookup to create one
143  */
144
145 void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
146 {
147         struct tipc_msg *msg;
148         struct port_list dports = {0, NULL, };
149         struct port_list *item = dp;
150         int cnt = 0;
151
152         msg = buf_msg(buf);
153
154         /* Create destination port list, if one wasn't supplied */
155
156         if (dp == NULL) {
157                 tipc_nametbl_mc_translate(msg_nametype(msg),
158                                      msg_namelower(msg),
159                                      msg_nameupper(msg),
160                                      TIPC_CLUSTER_SCOPE,
161                                      &dports);
162                 item = dp = &dports;
163         }
164
165         /* Deliver a copy of message to each destination port */
166
167         if (dp->count != 0) {
168                 msg_set_destnode(msg, tipc_own_addr);
169                 if (dp->count == 1) {
170                         msg_set_destport(msg, dp->ports[0]);
171                         tipc_port_recv_msg(buf);
172                         tipc_port_list_free(dp);
173                         return;
174                 }
175                 for (; cnt < dp->count; cnt++) {
176                         int index = cnt % PLSIZE;
177                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
178
179                         if (b == NULL) {
180                                 warn("Unable to deliver multicast message(s)\n");
181                                 goto exit;
182                         }
183                         if ((index == 0) && (cnt != 0))
184                                 item = item->next;
185                         msg_set_destport(buf_msg(b), item->ports[index]);
186                         tipc_port_recv_msg(b);
187                 }
188         }
189 exit:
190         buf_discard(buf);
191         tipc_port_list_free(dp);
192 }
193
194 /**
195  * tipc_createport_raw - create a generic TIPC port
196  *
197  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
198  */
199
200 struct tipc_port *tipc_createport_raw(void *usr_handle,
201                         u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
202                         void (*wakeup)(struct tipc_port *),
203                         const u32 importance)
204 {
205         struct tipc_port *p_ptr;
206         struct tipc_msg *msg;
207         u32 ref;
208
209         p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
210         if (!p_ptr) {
211                 warn("Port creation failed, no memory\n");
212                 return NULL;
213         }
214         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
215         if (!ref) {
216                 warn("Port creation failed, reference table exhausted\n");
217                 kfree(p_ptr);
218                 return NULL;
219         }
220
221         p_ptr->usr_handle = usr_handle;
222         p_ptr->max_pkt = MAX_PKT_DEFAULT;
223         p_ptr->ref = ref;
224         msg = &p_ptr->phdr;
225         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
226         msg_set_origport(msg, ref);
227         INIT_LIST_HEAD(&p_ptr->wait_list);
228         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
229         p_ptr->dispatcher = dispatcher;
230         p_ptr->wakeup = wakeup;
231         p_ptr->user_port = NULL;
232         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
233         spin_lock_bh(&tipc_port_list_lock);
234         INIT_LIST_HEAD(&p_ptr->publications);
235         INIT_LIST_HEAD(&p_ptr->port_list);
236         list_add_tail(&p_ptr->port_list, &ports);
237         spin_unlock_bh(&tipc_port_list_lock);
238         return p_ptr;
239 }
240
241 int tipc_deleteport(u32 ref)
242 {
243         struct tipc_port *p_ptr;
244         struct sk_buff *buf = NULL;
245
246         tipc_withdraw(ref, 0, NULL);
247         p_ptr = tipc_port_lock(ref);
248         if (!p_ptr)
249                 return -EINVAL;
250
251         tipc_ref_discard(ref);
252         tipc_port_unlock(p_ptr);
253
254         k_cancel_timer(&p_ptr->timer);
255         if (p_ptr->connected) {
256                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
257                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
258         }
259         kfree(p_ptr->user_port);
260
261         spin_lock_bh(&tipc_port_list_lock);
262         list_del(&p_ptr->port_list);
263         list_del(&p_ptr->wait_list);
264         spin_unlock_bh(&tipc_port_list_lock);
265         k_term_timer(&p_ptr->timer);
266         kfree(p_ptr);
267         tipc_net_route_msg(buf);
268         return 0;
269 }
270
271 static int port_unreliable(struct tipc_port *p_ptr)
272 {
273         return msg_src_droppable(&p_ptr->phdr);
274 }
275
276 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
277 {
278         struct tipc_port *p_ptr;
279
280         p_ptr = tipc_port_lock(ref);
281         if (!p_ptr)
282                 return -EINVAL;
283         *isunreliable = port_unreliable(p_ptr);
284         tipc_port_unlock(p_ptr);
285         return 0;
286 }
287
288 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
289 {
290         struct tipc_port *p_ptr;
291
292         p_ptr = tipc_port_lock(ref);
293         if (!p_ptr)
294                 return -EINVAL;
295         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
296         tipc_port_unlock(p_ptr);
297         return 0;
298 }
299
300 static int port_unreturnable(struct tipc_port *p_ptr)
301 {
302         return msg_dest_droppable(&p_ptr->phdr);
303 }
304
305 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
306 {
307         struct tipc_port *p_ptr;
308
309         p_ptr = tipc_port_lock(ref);
310         if (!p_ptr)
311                 return -EINVAL;
312         *isunrejectable = port_unreturnable(p_ptr);
313         tipc_port_unlock(p_ptr);
314         return 0;
315 }
316
317 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
318 {
319         struct tipc_port *p_ptr;
320
321         p_ptr = tipc_port_lock(ref);
322         if (!p_ptr)
323                 return -EINVAL;
324         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
325         tipc_port_unlock(p_ptr);
326         return 0;
327 }
328
329 /*
330  * port_build_proto_msg(): build a port level protocol
331  * or a connection abortion message. Called with
332  * tipc_port lock on.
333  */
334 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
335                                             u32 origport, u32 orignode,
336                                             u32 usr, u32 type, u32 err,
337                                             u32 ack)
338 {
339         struct sk_buff *buf;
340         struct tipc_msg *msg;
341
342         buf = tipc_buf_acquire(LONG_H_SIZE);
343         if (buf) {
344                 msg = buf_msg(buf);
345                 tipc_msg_init(msg, usr, type, LONG_H_SIZE, destnode);
346                 msg_set_errcode(msg, err);
347                 msg_set_destport(msg, destport);
348                 msg_set_origport(msg, origport);
349                 msg_set_orignode(msg, orignode);
350                 msg_set_msgcnt(msg, ack);
351         }
352         return buf;
353 }
354
355 int tipc_reject_msg(struct sk_buff *buf, u32 err)
356 {
357         struct tipc_msg *msg = buf_msg(buf);
358         struct sk_buff *rbuf;
359         struct tipc_msg *rmsg;
360         int hdr_sz;
361         u32 imp = msg_importance(msg);
362         u32 data_sz = msg_data_sz(msg);
363
364         if (data_sz > MAX_REJECT_SIZE)
365                 data_sz = MAX_REJECT_SIZE;
366         if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
367                 imp++;
368
369         /* discard rejected message if it shouldn't be returned to sender */
370         if (msg_errcode(msg) || msg_dest_droppable(msg)) {
371                 buf_discard(buf);
372                 return data_sz;
373         }
374
375         /* construct rejected message */
376         if (msg_mcast(msg))
377                 hdr_sz = MCAST_H_SIZE;
378         else
379                 hdr_sz = LONG_H_SIZE;
380         rbuf = tipc_buf_acquire(data_sz + hdr_sz);
381         if (rbuf == NULL) {
382                 buf_discard(buf);
383                 return data_sz;
384         }
385         rmsg = buf_msg(rbuf);
386         tipc_msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg));
387         msg_set_errcode(rmsg, err);
388         msg_set_destport(rmsg, msg_origport(msg));
389         msg_set_origport(rmsg, msg_destport(msg));
390         if (msg_short(msg)) {
391                 msg_set_orignode(rmsg, tipc_own_addr);
392                 /* leave name type & instance as zeroes */
393         } else {
394                 msg_set_orignode(rmsg, msg_destnode(msg));
395                 msg_set_nametype(rmsg, msg_nametype(msg));
396                 msg_set_nameinst(rmsg, msg_nameinst(msg));
397         }
398         msg_set_size(rmsg, data_sz + hdr_sz);
399         skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);
400
401         /* send self-abort message when rejecting on a connected port */
402         if (msg_connected(msg)) {
403                 struct sk_buff *abuf = NULL;
404                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
405
406                 if (p_ptr) {
407                         if (p_ptr->connected)
408                                 abuf = port_build_self_abort_msg(p_ptr, err);
409                         tipc_port_unlock(p_ptr);
410                 }
411                 tipc_net_route_msg(abuf);
412         }
413
414         /* send rejected message */
415         buf_discard(buf);
416         tipc_net_route_msg(rbuf);
417         return data_sz;
418 }
419
420 int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
421                               struct iovec const *msg_sect, u32 num_sect,
422                               unsigned int total_len, int err)
423 {
424         struct sk_buff *buf;
425         int res;
426
427         res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
428                         !p_ptr->user_port, &buf);
429         if (!buf)
430                 return res;
431
432         return tipc_reject_msg(buf, err);
433 }
434
435 static void port_timeout(unsigned long ref)
436 {
437         struct tipc_port *p_ptr = tipc_port_lock(ref);
438         struct sk_buff *buf = NULL;
439
440         if (!p_ptr)
441                 return;
442
443         if (!p_ptr->connected) {
444                 tipc_port_unlock(p_ptr);
445                 return;
446         }
447
448         /* Last probe answered ? */
449         if (p_ptr->probing_state == PROBING) {
450                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
451         } else {
452                 buf = port_build_proto_msg(port_peerport(p_ptr),
453                                            port_peernode(p_ptr),
454                                            p_ptr->ref,
455                                            tipc_own_addr,
456                                            CONN_MANAGER,
457                                            CONN_PROBE,
458                                            TIPC_OK,
459                                            0);
460                 p_ptr->probing_state = PROBING;
461                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
462         }
463         tipc_port_unlock(p_ptr);
464         tipc_net_route_msg(buf);
465 }
466
467
468 static void port_handle_node_down(unsigned long ref)
469 {
470         struct tipc_port *p_ptr = tipc_port_lock(ref);
471         struct sk_buff *buf = NULL;
472
473         if (!p_ptr)
474                 return;
475         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
476         tipc_port_unlock(p_ptr);
477         tipc_net_route_msg(buf);
478 }
479
480
481 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
482 {
483         u32 imp = msg_importance(&p_ptr->phdr);
484
485         if (!p_ptr->connected)
486                 return NULL;
487         if (imp < TIPC_CRITICAL_IMPORTANCE)
488                 imp++;
489         return port_build_proto_msg(p_ptr->ref,
490                                     tipc_own_addr,
491                                     port_peerport(p_ptr),
492                                     port_peernode(p_ptr),
493                                     imp,
494                                     TIPC_CONN_MSG,
495                                     err,
496                                     0);
497 }
498
499
500 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
501 {
502         u32 imp = msg_importance(&p_ptr->phdr);
503
504         if (!p_ptr->connected)
505                 return NULL;
506         if (imp < TIPC_CRITICAL_IMPORTANCE)
507                 imp++;
508         return port_build_proto_msg(port_peerport(p_ptr),
509                                     port_peernode(p_ptr),
510                                     p_ptr->ref,
511                                     tipc_own_addr,
512                                     imp,
513                                     TIPC_CONN_MSG,
514                                     err,
515                                     0);
516 }
517
518 void tipc_port_recv_proto_msg(struct sk_buff *buf)
519 {
520         struct tipc_msg *msg = buf_msg(buf);
521         struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
522         u32 err = TIPC_OK;
523         struct sk_buff *r_buf = NULL;
524         struct sk_buff *abort_buf = NULL;
525
526         if (!p_ptr) {
527                 err = TIPC_ERR_NO_PORT;
528         } else if (p_ptr->connected) {
529                 if ((port_peernode(p_ptr) != msg_orignode(msg)) ||
530                     (port_peerport(p_ptr) != msg_origport(msg))) {
531                         err = TIPC_ERR_NO_PORT;
532                 } else if (msg_type(msg) == CONN_ACK) {
533                         int wakeup = tipc_port_congested(p_ptr) &&
534                                      p_ptr->congested &&
535                                      p_ptr->wakeup;
536                         p_ptr->acked += msg_msgcnt(msg);
537                         if (tipc_port_congested(p_ptr))
538                                 goto exit;
539                         p_ptr->congested = 0;
540                         if (!wakeup)
541                                 goto exit;
542                         p_ptr->wakeup(p_ptr);
543                         goto exit;
544                 }
545         } else if (p_ptr->published) {
546                 err = TIPC_ERR_NO_PORT;
547         }
548         if (err) {
549                 r_buf = port_build_proto_msg(msg_origport(msg),
550                                              msg_orignode(msg),
551                                              msg_destport(msg),
552                                              tipc_own_addr,
553                                              TIPC_HIGH_IMPORTANCE,
554                                              TIPC_CONN_MSG,
555                                              err,
556                                              0);
557                 goto exit;
558         }
559
560         /* All is fine */
561         if (msg_type(msg) == CONN_PROBE) {
562                 r_buf = port_build_proto_msg(msg_origport(msg),
563                                              msg_orignode(msg),
564                                              msg_destport(msg),
565                                              tipc_own_addr,
566                                              CONN_MANAGER,
567                                              CONN_PROBE_REPLY,
568                                              TIPC_OK,
569                                              0);
570         }
571         p_ptr->probing_state = CONFIRMED;
572 exit:
573         if (p_ptr)
574                 tipc_port_unlock(p_ptr);
575         tipc_net_route_msg(r_buf);
576         tipc_net_route_msg(abort_buf);
577         buf_discard(buf);
578 }
579
580 static void port_print(struct tipc_port *p_ptr, struct print_buf *buf, int full_id)
581 {
582         struct publication *publ;
583
584         if (full_id)
585                 tipc_printf(buf, "<%u.%u.%u:%u>:",
586                             tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
587                             tipc_node(tipc_own_addr), p_ptr->ref);
588         else
589                 tipc_printf(buf, "%-10u:", p_ptr->ref);
590
591         if (p_ptr->connected) {
592                 u32 dport = port_peerport(p_ptr);
593                 u32 destnode = port_peernode(p_ptr);
594
595                 tipc_printf(buf, " connected to <%u.%u.%u:%u>",
596                             tipc_zone(destnode), tipc_cluster(destnode),
597                             tipc_node(destnode), dport);
598                 if (p_ptr->conn_type != 0)
599                         tipc_printf(buf, " via {%u,%u}",
600                                     p_ptr->conn_type,
601                                     p_ptr->conn_instance);
602         } else if (p_ptr->published) {
603                 tipc_printf(buf, " bound to");
604                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
605                         if (publ->lower == publ->upper)
606                                 tipc_printf(buf, " {%u,%u}", publ->type,
607                                             publ->lower);
608                         else
609                                 tipc_printf(buf, " {%u,%u,%u}", publ->type,
610                                             publ->lower, publ->upper);
611                 }
612         }
613         tipc_printf(buf, "\n");
614 }
615
616 #define MAX_PORT_QUERY 32768
617
618 struct sk_buff *tipc_port_get_ports(void)
619 {
620         struct sk_buff *buf;
621         struct tlv_desc *rep_tlv;
622         struct print_buf pb;
623         struct tipc_port *p_ptr;
624         int str_len;
625
626         buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
627         if (!buf)
628                 return NULL;
629         rep_tlv = (struct tlv_desc *)buf->data;
630
631         tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
632         spin_lock_bh(&tipc_port_list_lock);
633         list_for_each_entry(p_ptr, &ports, port_list) {
634                 spin_lock_bh(p_ptr->lock);
635                 port_print(p_ptr, &pb, 0);
636                 spin_unlock_bh(p_ptr->lock);
637         }
638         spin_unlock_bh(&tipc_port_list_lock);
639         str_len = tipc_printbuf_validate(&pb);
640
641         skb_put(buf, TLV_SPACE(str_len));
642         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
643
644         return buf;
645 }
646
647 void tipc_port_reinit(void)
648 {
649         struct tipc_port *p_ptr;
650         struct tipc_msg *msg;
651
652         spin_lock_bh(&tipc_port_list_lock);
653         list_for_each_entry(p_ptr, &ports, port_list) {
654                 msg = &p_ptr->phdr;
655                 if (msg_orignode(msg) == tipc_own_addr)
656                         break;
657                 msg_set_prevnode(msg, tipc_own_addr);
658                 msg_set_orignode(msg, tipc_own_addr);
659         }
660         spin_unlock_bh(&tipc_port_list_lock);
661 }
662
663
664 /*
665  *  port_dispatcher_sigh(): Signal handler for messages destinated
666  *                          to the tipc_port interface.
667  */
668
669 static void port_dispatcher_sigh(void *dummy)
670 {
671         struct sk_buff *buf;
672
673         spin_lock_bh(&queue_lock);
674         buf = msg_queue_head;
675         msg_queue_head = NULL;
676         spin_unlock_bh(&queue_lock);
677
678         while (buf) {
679                 struct tipc_port *p_ptr;
680                 struct user_port *up_ptr;
681                 struct tipc_portid orig;
682                 struct tipc_name_seq dseq;
683                 void *usr_handle;
684                 int connected;
685                 int published;
686                 u32 message_type;
687
688                 struct sk_buff *next = buf->next;
689                 struct tipc_msg *msg = buf_msg(buf);
690                 u32 dref = msg_destport(msg);
691
692                 message_type = msg_type(msg);
693                 if (message_type > TIPC_DIRECT_MSG)
694                         goto reject;    /* Unsupported message type */
695
696                 p_ptr = tipc_port_lock(dref);
697                 if (!p_ptr)
698                         goto reject;    /* Port deleted while msg in queue */
699
700                 orig.ref = msg_origport(msg);
701                 orig.node = msg_orignode(msg);
702                 up_ptr = p_ptr->user_port;
703                 usr_handle = up_ptr->usr_handle;
704                 connected = p_ptr->connected;
705                 published = p_ptr->published;
706
707                 if (unlikely(msg_errcode(msg)))
708                         goto err;
709
710                 switch (message_type) {
711
712                 case TIPC_CONN_MSG:{
713                                 tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
714                                 u32 peer_port = port_peerport(p_ptr);
715                                 u32 peer_node = port_peernode(p_ptr);
716                                 u32 dsz;
717
718                                 tipc_port_unlock(p_ptr);
719                                 if (unlikely(!cb))
720                                         goto reject;
721                                 if (unlikely(!connected)) {
722                                         if (tipc_connect2port(dref, &orig))
723                                                 goto reject;
724                                 } else if ((msg_origport(msg) != peer_port) ||
725                                            (msg_orignode(msg) != peer_node))
726                                         goto reject;
727                                 dsz = msg_data_sz(msg);
728                                 if (unlikely(dsz &&
729                                              (++p_ptr->conn_unacked >=
730                                               TIPC_FLOW_CONTROL_WIN)))
731                                         tipc_acknowledge(dref,
732                                                          p_ptr->conn_unacked);
733                                 skb_pull(buf, msg_hdr_sz(msg));
734                                 cb(usr_handle, dref, &buf, msg_data(msg), dsz);
735                                 break;
736                         }
737                 case TIPC_DIRECT_MSG:{
738                                 tipc_msg_event cb = up_ptr->msg_cb;
739
740                                 tipc_port_unlock(p_ptr);
741                                 if (unlikely(!cb || connected))
742                                         goto reject;
743                                 skb_pull(buf, msg_hdr_sz(msg));
744                                 cb(usr_handle, dref, &buf, msg_data(msg),
745                                    msg_data_sz(msg), msg_importance(msg),
746                                    &orig);
747                                 break;
748                         }
749                 case TIPC_MCAST_MSG:
750                 case TIPC_NAMED_MSG:{
751                                 tipc_named_msg_event cb = up_ptr->named_msg_cb;
752
753                                 tipc_port_unlock(p_ptr);
754                                 if (unlikely(!cb || connected || !published))
755                                         goto reject;
756                                 dseq.type =  msg_nametype(msg);
757                                 dseq.lower = msg_nameinst(msg);
758                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
759                                         ? dseq.lower : msg_nameupper(msg);
760                                 skb_pull(buf, msg_hdr_sz(msg));
761                                 cb(usr_handle, dref, &buf, msg_data(msg),
762                                    msg_data_sz(msg), msg_importance(msg),
763                                    &orig, &dseq);
764                                 break;
765                         }
766                 }
767                 if (buf)
768                         buf_discard(buf);
769                 buf = next;
770                 continue;
771 err:
772                 switch (message_type) {
773
774                 case TIPC_CONN_MSG:{
775                                 tipc_conn_shutdown_event cb =
776                                         up_ptr->conn_err_cb;
777                                 u32 peer_port = port_peerport(p_ptr);
778                                 u32 peer_node = port_peernode(p_ptr);
779
780                                 tipc_port_unlock(p_ptr);
781                                 if (!cb || !connected)
782                                         break;
783                                 if ((msg_origport(msg) != peer_port) ||
784                                     (msg_orignode(msg) != peer_node))
785                                         break;
786                                 tipc_disconnect(dref);
787                                 skb_pull(buf, msg_hdr_sz(msg));
788                                 cb(usr_handle, dref, &buf, msg_data(msg),
789                                    msg_data_sz(msg), msg_errcode(msg));
790                                 break;
791                         }
792                 case TIPC_DIRECT_MSG:{
793                                 tipc_msg_err_event cb = up_ptr->err_cb;
794
795                                 tipc_port_unlock(p_ptr);
796                                 if (!cb || connected)
797                                         break;
798                                 skb_pull(buf, msg_hdr_sz(msg));
799                                 cb(usr_handle, dref, &buf, msg_data(msg),
800                                    msg_data_sz(msg), msg_errcode(msg), &orig);
801                                 break;
802                         }
803                 case TIPC_MCAST_MSG:
804                 case TIPC_NAMED_MSG:{
805                                 tipc_named_msg_err_event cb =
806                                         up_ptr->named_err_cb;
807
808                                 tipc_port_unlock(p_ptr);
809                                 if (!cb || connected)
810                                         break;
811                                 dseq.type =  msg_nametype(msg);
812                                 dseq.lower = msg_nameinst(msg);
813                                 dseq.upper = (message_type == TIPC_NAMED_MSG)
814                                         ? dseq.lower : msg_nameupper(msg);
815                                 skb_pull(buf, msg_hdr_sz(msg));
816                                 cb(usr_handle, dref, &buf, msg_data(msg),
817                                    msg_data_sz(msg), msg_errcode(msg), &dseq);
818                                 break;
819                         }
820                 }
821                 if (buf)
822                         buf_discard(buf);
823                 buf = next;
824                 continue;
825 reject:
826                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
827                 buf = next;
828         }
829 }
830
831 /*
832  *  port_dispatcher(): Dispatcher for messages destinated
833  *  to the tipc_port interface. Called with port locked.
834  */
835
836 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
837 {
838         buf->next = NULL;
839         spin_lock_bh(&queue_lock);
840         if (msg_queue_head) {
841                 msg_queue_tail->next = buf;
842                 msg_queue_tail = buf;
843         } else {
844                 msg_queue_tail = msg_queue_head = buf;
845                 tipc_k_signal((Handler)port_dispatcher_sigh, 0);
846         }
847         spin_unlock_bh(&queue_lock);
848         return 0;
849 }
850
851 /*
852  * Wake up port after congestion: Called with port locked,
853  *
854  */
855
856 static void port_wakeup_sh(unsigned long ref)
857 {
858         struct tipc_port *p_ptr;
859         struct user_port *up_ptr;
860         tipc_continue_event cb = NULL;
861         void *uh = NULL;
862
863         p_ptr = tipc_port_lock(ref);
864         if (p_ptr) {
865                 up_ptr = p_ptr->user_port;
866                 if (up_ptr) {
867                         cb = up_ptr->continue_event_cb;
868                         uh = up_ptr->usr_handle;
869                 }
870                 tipc_port_unlock(p_ptr);
871         }
872         if (cb)
873                 cb(uh, ref);
874 }
875
876
877 static void port_wakeup(struct tipc_port *p_ptr)
878 {
879         tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
880 }
881
882 void tipc_acknowledge(u32 ref, u32 ack)
883 {
884         struct tipc_port *p_ptr;
885         struct sk_buff *buf = NULL;
886
887         p_ptr = tipc_port_lock(ref);
888         if (!p_ptr)
889                 return;
890         if (p_ptr->connected) {
891                 p_ptr->conn_unacked -= ack;
892                 buf = port_build_proto_msg(port_peerport(p_ptr),
893                                            port_peernode(p_ptr),
894                                            ref,
895                                            tipc_own_addr,
896                                            CONN_MANAGER,
897                                            CONN_ACK,
898                                            TIPC_OK,
899                                            ack);
900         }
901         tipc_port_unlock(p_ptr);
902         tipc_net_route_msg(buf);
903 }
904
905 /*
906  * tipc_createport(): user level call.
907  */
908
909 int tipc_createport(void *usr_handle,
910                     unsigned int importance,
911                     tipc_msg_err_event error_cb,
912                     tipc_named_msg_err_event named_error_cb,
913                     tipc_conn_shutdown_event conn_error_cb,
914                     tipc_msg_event msg_cb,
915                     tipc_named_msg_event named_msg_cb,
916                     tipc_conn_msg_event conn_msg_cb,
917                     tipc_continue_event continue_event_cb,/* May be zero */
918                     u32 *portref)
919 {
920         struct user_port *up_ptr;
921         struct tipc_port *p_ptr;
922
923         up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
924         if (!up_ptr) {
925                 warn("Port creation failed, no memory\n");
926                 return -ENOMEM;
927         }
928         p_ptr = (struct tipc_port *)tipc_createport_raw(NULL, port_dispatcher,
929                                                    port_wakeup, importance);
930         if (!p_ptr) {
931                 kfree(up_ptr);
932                 return -ENOMEM;
933         }
934
935         p_ptr->user_port = up_ptr;
936         up_ptr->usr_handle = usr_handle;
937         up_ptr->ref = p_ptr->ref;
938         up_ptr->err_cb = error_cb;
939         up_ptr->named_err_cb = named_error_cb;
940         up_ptr->conn_err_cb = conn_error_cb;
941         up_ptr->msg_cb = msg_cb;
942         up_ptr->named_msg_cb = named_msg_cb;
943         up_ptr->conn_msg_cb = conn_msg_cb;
944         up_ptr->continue_event_cb = continue_event_cb;
945         *portref = p_ptr->ref;
946         tipc_port_unlock(p_ptr);
947         return 0;
948 }
949
950 int tipc_portimportance(u32 ref, unsigned int *importance)
951 {
952         struct tipc_port *p_ptr;
953
954         p_ptr = tipc_port_lock(ref);
955         if (!p_ptr)
956                 return -EINVAL;
957         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
958         tipc_port_unlock(p_ptr);
959         return 0;
960 }
961
962 int tipc_set_portimportance(u32 ref, unsigned int imp)
963 {
964         struct tipc_port *p_ptr;
965
966         if (imp > TIPC_CRITICAL_IMPORTANCE)
967                 return -EINVAL;
968
969         p_ptr = tipc_port_lock(ref);
970         if (!p_ptr)
971                 return -EINVAL;
972         msg_set_importance(&p_ptr->phdr, (u32)imp);
973         tipc_port_unlock(p_ptr);
974         return 0;
975 }
976
977
978 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
979 {
980         struct tipc_port *p_ptr;
981         struct publication *publ;
982         u32 key;
983         int res = -EINVAL;
984
985         p_ptr = tipc_port_lock(ref);
986         if (!p_ptr)
987                 return -EINVAL;
988
989         if (p_ptr->connected)
990                 goto exit;
991         if (seq->lower > seq->upper)
992                 goto exit;
993         if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
994                 goto exit;
995         key = ref + p_ptr->pub_count + 1;
996         if (key == ref) {
997                 res = -EADDRINUSE;
998                 goto exit;
999         }
1000         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1001                                     scope, p_ptr->ref, key);
1002         if (publ) {
1003                 list_add(&publ->pport_list, &p_ptr->publications);
1004                 p_ptr->pub_count++;
1005                 p_ptr->published = 1;
1006                 res = 0;
1007         }
1008 exit:
1009         tipc_port_unlock(p_ptr);
1010         return res;
1011 }
1012
1013 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1014 {
1015         struct tipc_port *p_ptr;
1016         struct publication *publ;
1017         struct publication *tpubl;
1018         int res = -EINVAL;
1019
1020         p_ptr = tipc_port_lock(ref);
1021         if (!p_ptr)
1022                 return -EINVAL;
1023         if (!seq) {
1024                 list_for_each_entry_safe(publ, tpubl,
1025                                          &p_ptr->publications, pport_list) {
1026                         tipc_nametbl_withdraw(publ->type, publ->lower,
1027                                               publ->ref, publ->key);
1028                 }
1029                 res = 0;
1030         } else {
1031                 list_for_each_entry_safe(publ, tpubl,
1032                                          &p_ptr->publications, pport_list) {
1033                         if (publ->scope != scope)
1034                                 continue;
1035                         if (publ->type != seq->type)
1036                                 continue;
1037                         if (publ->lower != seq->lower)
1038                                 continue;
1039                         if (publ->upper != seq->upper)
1040                                 break;
1041                         tipc_nametbl_withdraw(publ->type, publ->lower,
1042                                               publ->ref, publ->key);
1043                         res = 0;
1044                         break;
1045                 }
1046         }
1047         if (list_empty(&p_ptr->publications))
1048                 p_ptr->published = 0;
1049         tipc_port_unlock(p_ptr);
1050         return res;
1051 }
1052
1053 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1054 {
1055         struct tipc_port *p_ptr;
1056         struct tipc_msg *msg;
1057         int res = -EINVAL;
1058
1059         p_ptr = tipc_port_lock(ref);
1060         if (!p_ptr)
1061                 return -EINVAL;
1062         if (p_ptr->published || p_ptr->connected)
1063                 goto exit;
1064         if (!peer->ref)
1065                 goto exit;
1066
1067         msg = &p_ptr->phdr;
1068         msg_set_destnode(msg, peer->node);
1069         msg_set_destport(msg, peer->ref);
1070         msg_set_orignode(msg, tipc_own_addr);
1071         msg_set_origport(msg, p_ptr->ref);
1072         msg_set_type(msg, TIPC_CONN_MSG);
1073         msg_set_lookup_scope(msg, 0);
1074         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1075
1076         p_ptr->probing_interval = PROBING_INTERVAL;
1077         p_ptr->probing_state = CONFIRMED;
1078         p_ptr->connected = 1;
1079         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1080
1081         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1082                           (void *)(unsigned long)ref,
1083                           (net_ev_handler)port_handle_node_down);
1084         res = 0;
1085 exit:
1086         tipc_port_unlock(p_ptr);
1087         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1088         return res;
1089 }
1090
1091 /**
1092  * tipc_disconnect_port - disconnect port from peer
1093  *
1094  * Port must be locked.
1095  */
1096
1097 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1098 {
1099         int res;
1100
1101         if (tp_ptr->connected) {
1102                 tp_ptr->connected = 0;
1103                 /* let timer expire on it's own to avoid deadlock! */
1104                 tipc_nodesub_unsubscribe(
1105                         &((struct tipc_port *)tp_ptr)->subscription);
1106                 res = 0;
1107         } else {
1108                 res = -ENOTCONN;
1109         }
1110         return res;
1111 }
1112
1113 /*
1114  * tipc_disconnect(): Disconnect port form peer.
1115  *                    This is a node local operation.
1116  */
1117
1118 int tipc_disconnect(u32 ref)
1119 {
1120         struct tipc_port *p_ptr;
1121         int res;
1122
1123         p_ptr = tipc_port_lock(ref);
1124         if (!p_ptr)
1125                 return -EINVAL;
1126         res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1127         tipc_port_unlock(p_ptr);
1128         return res;
1129 }
1130
1131 /*
1132  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1133  */
1134 int tipc_shutdown(u32 ref)
1135 {
1136         struct tipc_port *p_ptr;
1137         struct sk_buff *buf = NULL;
1138
1139         p_ptr = tipc_port_lock(ref);
1140         if (!p_ptr)
1141                 return -EINVAL;
1142
1143         if (p_ptr->connected) {
1144                 u32 imp = msg_importance(&p_ptr->phdr);
1145                 if (imp < TIPC_CRITICAL_IMPORTANCE)
1146                         imp++;
1147                 buf = port_build_proto_msg(port_peerport(p_ptr),
1148                                            port_peernode(p_ptr),
1149                                            ref,
1150                                            tipc_own_addr,
1151                                            imp,
1152                                            TIPC_CONN_MSG,
1153                                            TIPC_CONN_SHUTDOWN,
1154                                            0);
1155         }
1156         tipc_port_unlock(p_ptr);
1157         tipc_net_route_msg(buf);
1158         return tipc_disconnect(ref);
1159 }
1160
1161 /*
1162  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1163  *                        message for this node.
1164  */
1165
1166 static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1167                                    struct iovec const *msg_sect,
1168                                    unsigned int total_len)
1169 {
1170         struct sk_buff *buf;
1171         int res;
1172
1173         res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1174                         MAX_MSG_SIZE, !sender->user_port, &buf);
1175         if (likely(buf))
1176                 tipc_port_recv_msg(buf);
1177         return res;
1178 }
1179
1180 /**
1181  * tipc_send - send message sections on connection
1182  */
1183
1184 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1185               unsigned int total_len)
1186 {
1187         struct tipc_port *p_ptr;
1188         u32 destnode;
1189         int res;
1190
1191         p_ptr = tipc_port_deref(ref);
1192         if (!p_ptr || !p_ptr->connected)
1193                 return -EINVAL;
1194
1195         p_ptr->congested = 1;
1196         if (!tipc_port_congested(p_ptr)) {
1197                 destnode = port_peernode(p_ptr);
1198                 if (likely(destnode != tipc_own_addr))
1199                         res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1200                                                            total_len, destnode);
1201                 else
1202                         res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1203                                                       total_len);
1204
1205                 if (likely(res != -ELINKCONG)) {
1206                         p_ptr->congested = 0;
1207                         if (res > 0)
1208                                 p_ptr->sent++;
1209                         return res;
1210                 }
1211         }
1212         if (port_unreliable(p_ptr)) {
1213                 p_ptr->congested = 0;
1214                 return total_len;
1215         }
1216         return -ELINKCONG;
1217 }
1218
1219 /**
1220  * tipc_send2name - send message sections to port name
1221  */
1222
1223 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1224                    unsigned int num_sect, struct iovec const *msg_sect,
1225                    unsigned int total_len)
1226 {
1227         struct tipc_port *p_ptr;
1228         struct tipc_msg *msg;
1229         u32 destnode = domain;
1230         u32 destport;
1231         int res;
1232
1233         p_ptr = tipc_port_deref(ref);
1234         if (!p_ptr || p_ptr->connected)
1235                 return -EINVAL;
1236
1237         msg = &p_ptr->phdr;
1238         msg_set_type(msg, TIPC_NAMED_MSG);
1239         msg_set_orignode(msg, tipc_own_addr);
1240         msg_set_origport(msg, ref);
1241         msg_set_hdr_sz(msg, LONG_H_SIZE);
1242         msg_set_nametype(msg, name->type);
1243         msg_set_nameinst(msg, name->instance);
1244         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1245         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1246         msg_set_destnode(msg, destnode);
1247         msg_set_destport(msg, destport);
1248
1249         if (likely(destport)) {
1250                 if (likely(destnode == tipc_own_addr))
1251                         res = tipc_port_recv_sections(p_ptr, num_sect,
1252                                                       msg_sect, total_len);
1253                 else
1254                         res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1255                                                            num_sect, total_len,
1256                                                            destnode);
1257                 if (likely(res != -ELINKCONG)) {
1258                         if (res > 0)
1259                                 p_ptr->sent++;
1260                         return res;
1261                 }
1262                 if (port_unreliable(p_ptr)) {
1263                         return total_len;
1264                 }
1265                 return -ELINKCONG;
1266         }
1267         return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1268                                          total_len, TIPC_ERR_NO_NAME);
1269 }
1270
1271 /**
1272  * tipc_send2port - send message sections to port identity
1273  */
1274
1275 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1276                    unsigned int num_sect, struct iovec const *msg_sect,
1277                    unsigned int total_len)
1278 {
1279         struct tipc_port *p_ptr;
1280         struct tipc_msg *msg;
1281         int res;
1282
1283         p_ptr = tipc_port_deref(ref);
1284         if (!p_ptr || p_ptr->connected)
1285                 return -EINVAL;
1286
1287         msg = &p_ptr->phdr;
1288         msg_set_type(msg, TIPC_DIRECT_MSG);
1289         msg_set_lookup_scope(msg, 0);
1290         msg_set_orignode(msg, tipc_own_addr);
1291         msg_set_origport(msg, ref);
1292         msg_set_destnode(msg, dest->node);
1293         msg_set_destport(msg, dest->ref);
1294         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1295
1296         if (dest->node == tipc_own_addr)
1297                 res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1298                                                total_len);
1299         else
1300                 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1301                                                    total_len, dest->node);
1302         if (likely(res != -ELINKCONG)) {
1303                 if (res > 0)
1304                         p_ptr->sent++;
1305                 return res;
1306         }
1307         if (port_unreliable(p_ptr)) {
1308                 return total_len;
1309         }
1310         return -ELINKCONG;
1311 }
1312
1313 /**
1314  * tipc_send_buf2port - send message buffer to port identity
1315  */
1316
1317 int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1318                struct sk_buff *buf, unsigned int dsz)
1319 {
1320         struct tipc_port *p_ptr;
1321         struct tipc_msg *msg;
1322         int res;
1323
1324         p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1325         if (!p_ptr || p_ptr->connected)
1326                 return -EINVAL;
1327
1328         msg = &p_ptr->phdr;
1329         msg_set_type(msg, TIPC_DIRECT_MSG);
1330         msg_set_orignode(msg, tipc_own_addr);
1331         msg_set_origport(msg, ref);
1332         msg_set_destnode(msg, dest->node);
1333         msg_set_destport(msg, dest->ref);
1334         msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1335         msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1336         if (skb_cow(buf, DIR_MSG_H_SIZE))
1337                 return -ENOMEM;
1338
1339         skb_push(buf, DIR_MSG_H_SIZE);
1340         skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1341
1342         if (dest->node == tipc_own_addr)
1343                 res = tipc_port_recv_msg(buf);
1344         else
1345                 res = tipc_send_buf_fast(buf, dest->node);
1346         if (likely(res != -ELINKCONG)) {
1347                 if (res > 0)
1348                         p_ptr->sent++;
1349                 return res;
1350         }
1351         if (port_unreliable(p_ptr))
1352                 return dsz;
1353         return -ELINKCONG;
1354 }
1355