Merge master.kernel.org:/pub/scm/linux/kernel/git/davej/cpufreq
[pandora-kernel.git] / arch / ia64 / sn / kernel / xpc_channel.c
1 /*
2  * This file is subject to the terms and conditions of the GNU General Public
3  * License.  See the file "COPYING" in the main directory of this archive
4  * for more details.
5  *
6  * Copyright (c) 2004-2006 Silicon Graphics, Inc.  All Rights Reserved.
7  */
8
9
10 /*
11  * Cross Partition Communication (XPC) channel support.
12  *
13  *      This is the part of XPC that manages the channels and
14  *      sends/receives messages across them to/from other partitions.
15  *
16  */
17
18
19 #include <linux/kernel.h>
20 #include <linux/init.h>
21 #include <linux/sched.h>
22 #include <linux/cache.h>
23 #include <linux/interrupt.h>
24 #include <linux/slab.h>
25 #include <linux/mutex.h>
26 #include <linux/completion.h>
27 #include <asm/sn/bte.h>
28 #include <asm/sn/sn_sal.h>
29 #include <asm/sn/xpc.h>
30
31
32 /*
33  * Set up the initial values for the XPartition Communication channels.
34  */
35 static void
36 xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
37 {
38         int ch_number;
39         struct xpc_channel *ch;
40
41
42         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
43                 ch = &part->channels[ch_number];
44
45                 ch->partid = partid;
46                 ch->number = ch_number;
47                 ch->flags = XPC_C_DISCONNECTED;
48
49                 ch->local_GP = &part->local_GPs[ch_number];
50                 ch->local_openclose_args =
51                                         &part->local_openclose_args[ch_number];
52
53                 atomic_set(&ch->kthreads_assigned, 0);
54                 atomic_set(&ch->kthreads_idle, 0);
55                 atomic_set(&ch->kthreads_active, 0);
56
57                 atomic_set(&ch->references, 0);
58                 atomic_set(&ch->n_to_notify, 0);
59
60                 spin_lock_init(&ch->lock);
61                 mutex_init(&ch->msg_to_pull_mutex);
62                 init_completion(&ch->wdisconnect_wait);
63
64                 atomic_set(&ch->n_on_msg_allocate_wq, 0);
65                 init_waitqueue_head(&ch->msg_allocate_wq);
66                 init_waitqueue_head(&ch->idle_wq);
67         }
68 }
69
70
71 /*
72  * Setup the infrastructure necessary to support XPartition Communication
73  * between the specified remote partition and the local one.
74  */
75 enum xpc_retval
76 xpc_setup_infrastructure(struct xpc_partition *part)
77 {
78         int ret, cpuid;
79         struct timer_list *timer;
80         partid_t partid = XPC_PARTID(part);
81
82
83         /*
84          * Zero out MOST of the entry for this partition. Only the fields
85          * starting with `nchannels' will be zeroed. The preceding fields must
86          * remain `viable' across partition ups and downs, since they may be
87          * referenced during this memset() operation.
88          */
89         memset(&part->nchannels, 0, sizeof(struct xpc_partition) -
90                                 offsetof(struct xpc_partition, nchannels));
91
92         /*
93          * Allocate all of the channel structures as a contiguous chunk of
94          * memory.
95          */
96         part->channels = kmalloc(sizeof(struct xpc_channel) * XPC_NCHANNELS,
97                                                                 GFP_KERNEL);
98         if (part->channels == NULL) {
99                 dev_err(xpc_chan, "can't get memory for channels\n");
100                 return xpcNoMemory;
101         }
102         memset(part->channels, 0, sizeof(struct xpc_channel) * XPC_NCHANNELS);
103
104         part->nchannels = XPC_NCHANNELS;
105
106
107         /* allocate all the required GET/PUT values */
108
109         part->local_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
110                                         GFP_KERNEL, &part->local_GPs_base);
111         if (part->local_GPs == NULL) {
112                 kfree(part->channels);
113                 part->channels = NULL;
114                 dev_err(xpc_chan, "can't get memory for local get/put "
115                         "values\n");
116                 return xpcNoMemory;
117         }
118         memset(part->local_GPs, 0, XPC_GP_SIZE);
119
120         part->remote_GPs = xpc_kmalloc_cacheline_aligned(XPC_GP_SIZE,
121                                         GFP_KERNEL, &part->remote_GPs_base);
122         if (part->remote_GPs == NULL) {
123                 kfree(part->channels);
124                 part->channels = NULL;
125                 kfree(part->local_GPs_base);
126                 part->local_GPs = NULL;
127                 dev_err(xpc_chan, "can't get memory for remote get/put "
128                         "values\n");
129                 return xpcNoMemory;
130         }
131         memset(part->remote_GPs, 0, XPC_GP_SIZE);
132
133
134         /* allocate all the required open and close args */
135
136         part->local_openclose_args = xpc_kmalloc_cacheline_aligned(
137                                         XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
138                                         &part->local_openclose_args_base);
139         if (part->local_openclose_args == NULL) {
140                 kfree(part->channels);
141                 part->channels = NULL;
142                 kfree(part->local_GPs_base);
143                 part->local_GPs = NULL;
144                 kfree(part->remote_GPs_base);
145                 part->remote_GPs = NULL;
146                 dev_err(xpc_chan, "can't get memory for local connect args\n");
147                 return xpcNoMemory;
148         }
149         memset(part->local_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
150
151         part->remote_openclose_args = xpc_kmalloc_cacheline_aligned(
152                                         XPC_OPENCLOSE_ARGS_SIZE, GFP_KERNEL,
153                                         &part->remote_openclose_args_base);
154         if (part->remote_openclose_args == NULL) {
155                 kfree(part->channels);
156                 part->channels = NULL;
157                 kfree(part->local_GPs_base);
158                 part->local_GPs = NULL;
159                 kfree(part->remote_GPs_base);
160                 part->remote_GPs = NULL;
161                 kfree(part->local_openclose_args_base);
162                 part->local_openclose_args = NULL;
163                 dev_err(xpc_chan, "can't get memory for remote connect args\n");
164                 return xpcNoMemory;
165         }
166         memset(part->remote_openclose_args, 0, XPC_OPENCLOSE_ARGS_SIZE);
167
168
169         xpc_initialize_channels(part, partid);
170
171         atomic_set(&part->nchannels_active, 0);
172         atomic_set(&part->nchannels_engaged, 0);
173
174
175         /* local_IPI_amo were set to 0 by an earlier memset() */
176
177         /* Initialize this partitions AMO_t structure */
178         part->local_IPI_amo_va = xpc_IPI_init(partid);
179
180         spin_lock_init(&part->IPI_lock);
181
182         atomic_set(&part->channel_mgr_requests, 1);
183         init_waitqueue_head(&part->channel_mgr_wq);
184
185         sprintf(part->IPI_owner, "xpc%02d", partid);
186         ret = request_irq(SGI_XPC_NOTIFY, xpc_notify_IRQ_handler, SA_SHIRQ,
187                                 part->IPI_owner, (void *) (u64) partid);
188         if (ret != 0) {
189                 kfree(part->channels);
190                 part->channels = NULL;
191                 kfree(part->local_GPs_base);
192                 part->local_GPs = NULL;
193                 kfree(part->remote_GPs_base);
194                 part->remote_GPs = NULL;
195                 kfree(part->local_openclose_args_base);
196                 part->local_openclose_args = NULL;
197                 kfree(part->remote_openclose_args_base);
198                 part->remote_openclose_args = NULL;
199                 dev_err(xpc_chan, "can't register NOTIFY IRQ handler, "
200                         "errno=%d\n", -ret);
201                 return xpcLackOfResources;
202         }
203
204         /* Setup a timer to check for dropped IPIs */
205         timer = &part->dropped_IPI_timer;
206         init_timer(timer);
207         timer->function = (void (*)(unsigned long)) xpc_dropped_IPI_check;
208         timer->data = (unsigned long) part;
209         timer->expires = jiffies + XPC_P_DROPPED_IPI_WAIT;
210         add_timer(timer);
211
212         /*
213          * With the setting of the partition setup_state to XPC_P_SETUP, we're
214          * declaring that this partition is ready to go.
215          */
216         part->setup_state = XPC_P_SETUP;
217
218
219         /*
220          * Setup the per partition specific variables required by the
221          * remote partition to establish channel connections with us.
222          *
223          * The setting of the magic # indicates that these per partition
224          * specific variables are ready to be used.
225          */
226         xpc_vars_part[partid].GPs_pa = __pa(part->local_GPs);
227         xpc_vars_part[partid].openclose_args_pa =
228                                         __pa(part->local_openclose_args);
229         xpc_vars_part[partid].IPI_amo_pa = __pa(part->local_IPI_amo_va);
230         cpuid = raw_smp_processor_id(); /* any CPU in this partition will do */
231         xpc_vars_part[partid].IPI_nasid = cpuid_to_nasid(cpuid);
232         xpc_vars_part[partid].IPI_phys_cpuid = cpu_physical_id(cpuid);
233         xpc_vars_part[partid].nchannels = part->nchannels;
234         xpc_vars_part[partid].magic = XPC_VP_MAGIC1;
235
236         return xpcSuccess;
237 }
238
239
240 /*
241  * Create a wrapper that hides the underlying mechanism for pulling a cacheline
242  * (or multiple cachelines) from a remote partition.
243  *
244  * src must be a cacheline aligned physical address on the remote partition.
245  * dst must be a cacheline aligned virtual address on this partition.
246  * cnt must be an cacheline sized
247  */
248 static enum xpc_retval
249 xpc_pull_remote_cachelines(struct xpc_partition *part, void *dst,
250                                 const void *src, size_t cnt)
251 {
252         bte_result_t bte_ret;
253
254
255         DBUG_ON((u64) src != L1_CACHE_ALIGN((u64) src));
256         DBUG_ON((u64) dst != L1_CACHE_ALIGN((u64) dst));
257         DBUG_ON(cnt != L1_CACHE_ALIGN(cnt));
258
259         if (part->act_state == XPC_P_DEACTIVATING) {
260                 return part->reason;
261         }
262
263         bte_ret = xp_bte_copy((u64) src, (u64) ia64_tpa((u64) dst),
264                                 (u64) cnt, (BTE_NORMAL | BTE_WACQUIRE), NULL);
265         if (bte_ret == BTE_SUCCESS) {
266                 return xpcSuccess;
267         }
268
269         dev_dbg(xpc_chan, "xp_bte_copy() from partition %d failed, ret=%d\n",
270                 XPC_PARTID(part), bte_ret);
271
272         return xpc_map_bte_errors(bte_ret);
273 }
274
275
276 /*
277  * Pull the remote per partititon specific variables from the specified
278  * partition.
279  */
280 enum xpc_retval
281 xpc_pull_remote_vars_part(struct xpc_partition *part)
282 {
283         u8 buffer[L1_CACHE_BYTES * 2];
284         struct xpc_vars_part *pulled_entry_cacheline =
285                         (struct xpc_vars_part *) L1_CACHE_ALIGN((u64) buffer);
286         struct xpc_vars_part *pulled_entry;
287         u64 remote_entry_cacheline_pa, remote_entry_pa;
288         partid_t partid = XPC_PARTID(part);
289         enum xpc_retval ret;
290
291
292         /* pull the cacheline that contains the variables we're interested in */
293
294         DBUG_ON(part->remote_vars_part_pa !=
295                                 L1_CACHE_ALIGN(part->remote_vars_part_pa));
296         DBUG_ON(sizeof(struct xpc_vars_part) != L1_CACHE_BYTES / 2);
297
298         remote_entry_pa = part->remote_vars_part_pa +
299                         sn_partition_id * sizeof(struct xpc_vars_part);
300
301         remote_entry_cacheline_pa = (remote_entry_pa & ~(L1_CACHE_BYTES - 1));
302
303         pulled_entry = (struct xpc_vars_part *) ((u64) pulled_entry_cacheline +
304                                 (remote_entry_pa & (L1_CACHE_BYTES - 1)));
305
306         ret = xpc_pull_remote_cachelines(part, pulled_entry_cacheline,
307                                         (void *) remote_entry_cacheline_pa,
308                                         L1_CACHE_BYTES);
309         if (ret != xpcSuccess) {
310                 dev_dbg(xpc_chan, "failed to pull XPC vars_part from "
311                         "partition %d, ret=%d\n", partid, ret);
312                 return ret;
313         }
314
315
316         /* see if they've been set up yet */
317
318         if (pulled_entry->magic != XPC_VP_MAGIC1 &&
319                                 pulled_entry->magic != XPC_VP_MAGIC2) {
320
321                 if (pulled_entry->magic != 0) {
322                         dev_dbg(xpc_chan, "partition %d's XPC vars_part for "
323                                 "partition %d has bad magic value (=0x%lx)\n",
324                                 partid, sn_partition_id, pulled_entry->magic);
325                         return xpcBadMagic;
326                 }
327
328                 /* they've not been initialized yet */
329                 return xpcRetry;
330         }
331
332         if (xpc_vars_part[partid].magic == XPC_VP_MAGIC1) {
333
334                 /* validate the variables */
335
336                 if (pulled_entry->GPs_pa == 0 ||
337                                 pulled_entry->openclose_args_pa == 0 ||
338                                         pulled_entry->IPI_amo_pa == 0) {
339
340                         dev_err(xpc_chan, "partition %d's XPC vars_part for "
341                                 "partition %d are not valid\n", partid,
342                                 sn_partition_id);
343                         return xpcInvalidAddress;
344                 }
345
346                 /* the variables we imported look to be valid */
347
348                 part->remote_GPs_pa = pulled_entry->GPs_pa;
349                 part->remote_openclose_args_pa =
350                                         pulled_entry->openclose_args_pa;
351                 part->remote_IPI_amo_va =
352                                       (AMO_t *) __va(pulled_entry->IPI_amo_pa);
353                 part->remote_IPI_nasid = pulled_entry->IPI_nasid;
354                 part->remote_IPI_phys_cpuid = pulled_entry->IPI_phys_cpuid;
355
356                 if (part->nchannels > pulled_entry->nchannels) {
357                         part->nchannels = pulled_entry->nchannels;
358                 }
359
360                 /* let the other side know that we've pulled their variables */
361
362                 xpc_vars_part[partid].magic = XPC_VP_MAGIC2;
363         }
364
365         if (pulled_entry->magic == XPC_VP_MAGIC1) {
366                 return xpcRetry;
367         }
368
369         return xpcSuccess;
370 }
371
372
373 /*
374  * Get the IPI flags and pull the openclose args and/or remote GPs as needed.
375  */
376 static u64
377 xpc_get_IPI_flags(struct xpc_partition *part)
378 {
379         unsigned long irq_flags;
380         u64 IPI_amo;
381         enum xpc_retval ret;
382
383
384         /*
385          * See if there are any IPI flags to be handled.
386          */
387
388         spin_lock_irqsave(&part->IPI_lock, irq_flags);
389         if ((IPI_amo = part->local_IPI_amo) != 0) {
390                 part->local_IPI_amo = 0;
391         }
392         spin_unlock_irqrestore(&part->IPI_lock, irq_flags);
393
394
395         if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_amo)) {
396                 ret = xpc_pull_remote_cachelines(part,
397                                         part->remote_openclose_args,
398                                         (void *) part->remote_openclose_args_pa,
399                                         XPC_OPENCLOSE_ARGS_SIZE);
400                 if (ret != xpcSuccess) {
401                         XPC_DEACTIVATE_PARTITION(part, ret);
402
403                         dev_dbg(xpc_chan, "failed to pull openclose args from "
404                                 "partition %d, ret=%d\n", XPC_PARTID(part),
405                                 ret);
406
407                         /* don't bother processing IPIs anymore */
408                         IPI_amo = 0;
409                 }
410         }
411
412         if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_amo)) {
413                 ret = xpc_pull_remote_cachelines(part, part->remote_GPs,
414                                                 (void *) part->remote_GPs_pa,
415                                                 XPC_GP_SIZE);
416                 if (ret != xpcSuccess) {
417                         XPC_DEACTIVATE_PARTITION(part, ret);
418
419                         dev_dbg(xpc_chan, "failed to pull GPs from partition "
420                                 "%d, ret=%d\n", XPC_PARTID(part), ret);
421
422                         /* don't bother processing IPIs anymore */
423                         IPI_amo = 0;
424                 }
425         }
426
427         return IPI_amo;
428 }
429
430
431 /*
432  * Allocate the local message queue and the notify queue.
433  */
434 static enum xpc_retval
435 xpc_allocate_local_msgqueue(struct xpc_channel *ch)
436 {
437         unsigned long irq_flags;
438         int nentries;
439         size_t nbytes;
440
441
442         // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between
443         // >>> iterations of the for-loop, bail if set?
444
445         // >>> should we impose a minumum #of entries? like 4 or 8?
446         for (nentries = ch->local_nentries; nentries > 0; nentries--) {
447
448                 nbytes = nentries * ch->msg_size;
449                 ch->local_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
450                                                 GFP_KERNEL,
451                                                 &ch->local_msgqueue_base);
452                 if (ch->local_msgqueue == NULL) {
453                         continue;
454                 }
455                 memset(ch->local_msgqueue, 0, nbytes);
456
457                 nbytes = nentries * sizeof(struct xpc_notify);
458                 ch->notify_queue = kmalloc(nbytes, GFP_KERNEL);
459                 if (ch->notify_queue == NULL) {
460                         kfree(ch->local_msgqueue_base);
461                         ch->local_msgqueue = NULL;
462                         continue;
463                 }
464                 memset(ch->notify_queue, 0, nbytes);
465
466                 spin_lock_irqsave(&ch->lock, irq_flags);
467                 if (nentries < ch->local_nentries) {
468                         dev_dbg(xpc_chan, "nentries=%d local_nentries=%d, "
469                                 "partid=%d, channel=%d\n", nentries,
470                                 ch->local_nentries, ch->partid, ch->number);
471
472                         ch->local_nentries = nentries;
473                 }
474                 spin_unlock_irqrestore(&ch->lock, irq_flags);
475                 return xpcSuccess;
476         }
477
478         dev_dbg(xpc_chan, "can't get memory for local message queue and notify "
479                 "queue, partid=%d, channel=%d\n", ch->partid, ch->number);
480         return xpcNoMemory;
481 }
482
483
484 /*
485  * Allocate the cached remote message queue.
486  */
487 static enum xpc_retval
488 xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
489 {
490         unsigned long irq_flags;
491         int nentries;
492         size_t nbytes;
493
494
495         DBUG_ON(ch->remote_nentries <= 0);
496
497         // >>> may want to check for ch->flags & XPC_C_DISCONNECTING between
498         // >>> iterations of the for-loop, bail if set?
499
500         // >>> should we impose a minumum #of entries? like 4 or 8?
501         for (nentries = ch->remote_nentries; nentries > 0; nentries--) {
502
503                 nbytes = nentries * ch->msg_size;
504                 ch->remote_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
505                                                 GFP_KERNEL,
506                                                 &ch->remote_msgqueue_base);
507                 if (ch->remote_msgqueue == NULL) {
508                         continue;
509                 }
510                 memset(ch->remote_msgqueue, 0, nbytes);
511
512                 spin_lock_irqsave(&ch->lock, irq_flags);
513                 if (nentries < ch->remote_nentries) {
514                         dev_dbg(xpc_chan, "nentries=%d remote_nentries=%d, "
515                                 "partid=%d, channel=%d\n", nentries,
516                                 ch->remote_nentries, ch->partid, ch->number);
517
518                         ch->remote_nentries = nentries;
519                 }
520                 spin_unlock_irqrestore(&ch->lock, irq_flags);
521                 return xpcSuccess;
522         }
523
524         dev_dbg(xpc_chan, "can't get memory for cached remote message queue, "
525                 "partid=%d, channel=%d\n", ch->partid, ch->number);
526         return xpcNoMemory;
527 }
528
529
530 /*
531  * Allocate message queues and other stuff associated with a channel.
532  *
533  * Note: Assumes all of the channel sizes are filled in.
534  */
535 static enum xpc_retval
536 xpc_allocate_msgqueues(struct xpc_channel *ch)
537 {
538         unsigned long irq_flags;
539         enum xpc_retval ret;
540
541
542         DBUG_ON(ch->flags & XPC_C_SETUP);
543
544         if ((ret = xpc_allocate_local_msgqueue(ch)) != xpcSuccess) {
545                 return ret;
546         }
547
548         if ((ret = xpc_allocate_remote_msgqueue(ch)) != xpcSuccess) {
549                 kfree(ch->local_msgqueue_base);
550                 ch->local_msgqueue = NULL;
551                 kfree(ch->notify_queue);
552                 ch->notify_queue = NULL;
553                 return ret;
554         }
555
556         spin_lock_irqsave(&ch->lock, irq_flags);
557         ch->flags |= XPC_C_SETUP;
558         spin_unlock_irqrestore(&ch->lock, irq_flags);
559
560         return xpcSuccess;
561 }
562
563
564 /*
565  * Process a connect message from a remote partition.
566  *
567  * Note: xpc_process_connect() is expecting to be called with the
568  * spin_lock_irqsave held and will leave it locked upon return.
569  */
570 static void
571 xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
572 {
573         enum xpc_retval ret;
574
575
576         DBUG_ON(!spin_is_locked(&ch->lock));
577
578         if (!(ch->flags & XPC_C_OPENREQUEST) ||
579                                 !(ch->flags & XPC_C_ROPENREQUEST)) {
580                 /* nothing more to do for now */
581                 return;
582         }
583         DBUG_ON(!(ch->flags & XPC_C_CONNECTING));
584
585         if (!(ch->flags & XPC_C_SETUP)) {
586                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
587                 ret = xpc_allocate_msgqueues(ch);
588                 spin_lock_irqsave(&ch->lock, *irq_flags);
589
590                 if (ret != xpcSuccess) {
591                         XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
592                 }
593                 if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING)) {
594                         return;
595                 }
596
597                 DBUG_ON(!(ch->flags & XPC_C_SETUP));
598                 DBUG_ON(ch->local_msgqueue == NULL);
599                 DBUG_ON(ch->remote_msgqueue == NULL);
600         }
601
602         if (!(ch->flags & XPC_C_OPENREPLY)) {
603                 ch->flags |= XPC_C_OPENREPLY;
604                 xpc_IPI_send_openreply(ch, irq_flags);
605         }
606
607         if (!(ch->flags & XPC_C_ROPENREPLY)) {
608                 return;
609         }
610
611         DBUG_ON(ch->remote_msgqueue_pa == 0);
612
613         ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);    /* clear all else */
614
615         dev_info(xpc_chan, "channel %d to partition %d connected\n",
616                 ch->number, ch->partid);
617
618         spin_unlock_irqrestore(&ch->lock, *irq_flags);
619         xpc_create_kthreads(ch, 1);
620         spin_lock_irqsave(&ch->lock, *irq_flags);
621 }
622
623
624 /*
625  * Notify those who wanted to be notified upon delivery of their message.
626  */
627 static void
628 xpc_notify_senders(struct xpc_channel *ch, enum xpc_retval reason, s64 put)
629 {
630         struct xpc_notify *notify;
631         u8 notify_type;
632         s64 get = ch->w_remote_GP.get - 1;
633
634
635         while (++get < put && atomic_read(&ch->n_to_notify) > 0) {
636
637                 notify = &ch->notify_queue[get % ch->local_nentries];
638
639                 /*
640                  * See if the notify entry indicates it was associated with
641                  * a message who's sender wants to be notified. It is possible
642                  * that it is, but someone else is doing or has done the
643                  * notification.
644                  */
645                 notify_type = notify->type;
646                 if (notify_type == 0 ||
647                                 cmpxchg(&notify->type, notify_type, 0) !=
648                                                                 notify_type) {
649                         continue;
650                 }
651
652                 DBUG_ON(notify_type != XPC_N_CALL);
653
654                 atomic_dec(&ch->n_to_notify);
655
656                 if (notify->func != NULL) {
657                         dev_dbg(xpc_chan, "notify->func() called, notify=0x%p, "
658                                 "msg_number=%ld, partid=%d, channel=%d\n",
659                                 (void *) notify, get, ch->partid, ch->number);
660
661                         notify->func(reason, ch->partid, ch->number,
662                                                                 notify->key);
663
664                         dev_dbg(xpc_chan, "notify->func() returned, "
665                                 "notify=0x%p, msg_number=%ld, partid=%d, "
666                                 "channel=%d\n", (void *) notify, get,
667                                 ch->partid, ch->number);
668                 }
669         }
670 }
671
672
673 /*
674  * Free up message queues and other stuff that were allocated for the specified
675  * channel.
676  *
677  * Note: ch->reason and ch->reason_line are left set for debugging purposes,
678  * they're cleared when XPC_C_DISCONNECTED is cleared.
679  */
680 static void
681 xpc_free_msgqueues(struct xpc_channel *ch)
682 {
683         DBUG_ON(!spin_is_locked(&ch->lock));
684         DBUG_ON(atomic_read(&ch->n_to_notify) != 0);
685
686         ch->remote_msgqueue_pa = 0;
687         ch->func = NULL;
688         ch->key = NULL;
689         ch->msg_size = 0;
690         ch->local_nentries = 0;
691         ch->remote_nentries = 0;
692         ch->kthreads_assigned_limit = 0;
693         ch->kthreads_idle_limit = 0;
694
695         ch->local_GP->get = 0;
696         ch->local_GP->put = 0;
697         ch->remote_GP.get = 0;
698         ch->remote_GP.put = 0;
699         ch->w_local_GP.get = 0;
700         ch->w_local_GP.put = 0;
701         ch->w_remote_GP.get = 0;
702         ch->w_remote_GP.put = 0;
703         ch->next_msg_to_pull = 0;
704
705         if (ch->flags & XPC_C_SETUP) {
706                 ch->flags &= ~XPC_C_SETUP;
707
708                 dev_dbg(xpc_chan, "ch->flags=0x%x, partid=%d, channel=%d\n",
709                         ch->flags, ch->partid, ch->number);
710
711                 kfree(ch->local_msgqueue_base);
712                 ch->local_msgqueue = NULL;
713                 kfree(ch->remote_msgqueue_base);
714                 ch->remote_msgqueue = NULL;
715                 kfree(ch->notify_queue);
716                 ch->notify_queue = NULL;
717         }
718 }
719
720
721 /*
722  * spin_lock_irqsave() is expected to be held on entry.
723  */
724 static void
725 xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
726 {
727         struct xpc_partition *part = &xpc_partitions[ch->partid];
728         u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
729
730
731         DBUG_ON(!spin_is_locked(&ch->lock));
732
733         if (!(ch->flags & XPC_C_DISCONNECTING)) {
734                 return;
735         }
736
737         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
738
739         /* make sure all activity has settled down first */
740
741         if (atomic_read(&ch->references) > 0) {
742                 return;
743         }
744         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
745
746         if (part->act_state == XPC_P_DEACTIVATING) {
747                 /* can't proceed until the other side disengages from us */
748                 if (xpc_partition_engaged(1UL << ch->partid)) {
749                         return;
750                 }
751
752         } else {
753
754                 /* as long as the other side is up do the full protocol */
755
756                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
757                         return;
758                 }
759
760                 if (!(ch->flags & XPC_C_CLOSEREPLY)) {
761                         ch->flags |= XPC_C_CLOSEREPLY;
762                         xpc_IPI_send_closereply(ch, irq_flags);
763                 }
764
765                 if (!(ch->flags & XPC_C_RCLOSEREPLY)) {
766                         return;
767                 }
768         }
769
770         /* wake those waiting for notify completion */
771         if (atomic_read(&ch->n_to_notify) > 0) {
772                 /* >>> we do callout while holding ch->lock */
773                 xpc_notify_senders(ch, ch->reason, ch->w_local_GP.put);
774         }
775
776         /* both sides are disconnected now */
777
778         if (ch->flags & XPC_C_CONNECTCALLOUT) {
779                 spin_unlock_irqrestore(&ch->lock, *irq_flags);
780                 xpc_disconnect_callout(ch, xpcDisconnected);
781                 spin_lock_irqsave(&ch->lock, *irq_flags);
782         }
783
784         /* it's now safe to free the channel's message queues */
785         xpc_free_msgqueues(ch);
786
787         /* mark disconnected, clear all other flags except XPC_C_WDISCONNECT */
788         ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
789
790         atomic_dec(&part->nchannels_active);
791
792         if (channel_was_connected) {
793                 dev_info(xpc_chan, "channel %d to partition %d disconnected, "
794                         "reason=%d\n", ch->number, ch->partid, ch->reason);
795         }
796
797         if (ch->flags & XPC_C_WDISCONNECT) {
798                 /* we won't lose the CPU since we're holding ch->lock */
799                 complete(&ch->wdisconnect_wait);
800         } else if (ch->delayed_IPI_flags) {
801                 if (part->act_state != XPC_P_DEACTIVATING) {
802                         /* time to take action on any delayed IPI flags */
803                         spin_lock(&part->IPI_lock);
804                         XPC_SET_IPI_FLAGS(part->local_IPI_amo, ch->number,
805                                                         ch->delayed_IPI_flags);
806                         spin_unlock(&part->IPI_lock);
807                 }
808                 ch->delayed_IPI_flags = 0;
809         }
810 }
811
812
813 /*
814  * Process a change in the channel's remote connection state.
815  */
816 static void
817 xpc_process_openclose_IPI(struct xpc_partition *part, int ch_number,
818                                 u8 IPI_flags)
819 {
820         unsigned long irq_flags;
821         struct xpc_openclose_args *args =
822                                 &part->remote_openclose_args[ch_number];
823         struct xpc_channel *ch = &part->channels[ch_number];
824         enum xpc_retval reason;
825
826
827
828         spin_lock_irqsave(&ch->lock, irq_flags);
829
830 again:
831
832         if ((ch->flags & XPC_C_DISCONNECTED) &&
833                                         (ch->flags & XPC_C_WDISCONNECT)) {
834                 /*
835                  * Delay processing IPI flags until thread waiting disconnect
836                  * has had a chance to see that the channel is disconnected.
837                  */
838                 ch->delayed_IPI_flags |= IPI_flags;
839                 spin_unlock_irqrestore(&ch->lock, irq_flags);
840                 return;
841         }
842
843
844         if (IPI_flags & XPC_IPI_CLOSEREQUEST) {
845
846                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREQUEST (reason=%d) received "
847                         "from partid=%d, channel=%d\n", args->reason,
848                         ch->partid, ch->number);
849
850                 /*
851                  * If RCLOSEREQUEST is set, we're probably waiting for
852                  * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
853                  * with this RCLOSEREQUEST in the IPI_flags.
854                  */
855
856                 if (ch->flags & XPC_C_RCLOSEREQUEST) {
857                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
858                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
859                         DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
860                         DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);
861
862                         DBUG_ON(!(IPI_flags & XPC_IPI_CLOSEREPLY));
863                         IPI_flags &= ~XPC_IPI_CLOSEREPLY;
864                         ch->flags |= XPC_C_RCLOSEREPLY;
865
866                         /* both sides have finished disconnecting */
867                         xpc_process_disconnect(ch, &irq_flags);
868                         DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
869                         goto again;
870                 }
871
872                 if (ch->flags & XPC_C_DISCONNECTED) {
873                         if (!(IPI_flags & XPC_IPI_OPENREQUEST)) {
874                                 if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo,
875                                          ch_number) & XPC_IPI_OPENREQUEST)) {
876
877                                         DBUG_ON(ch->delayed_IPI_flags != 0);
878                                         spin_lock(&part->IPI_lock);
879                                         XPC_SET_IPI_FLAGS(part->local_IPI_amo,
880                                                         ch_number,
881                                                         XPC_IPI_CLOSEREQUEST);
882                                         spin_unlock(&part->IPI_lock);
883                                 }
884                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
885                                 return;
886                         }
887
888                         XPC_SET_REASON(ch, 0, 0);
889                         ch->flags &= ~XPC_C_DISCONNECTED;
890
891                         atomic_inc(&part->nchannels_active);
892                         ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
893                 }
894
895                 IPI_flags &= ~(XPC_IPI_OPENREQUEST | XPC_IPI_OPENREPLY);
896
897                 /*
898                  * The meaningful CLOSEREQUEST connection state fields are:
899                  *      reason = reason connection is to be closed
900                  */
901
902                 ch->flags |= XPC_C_RCLOSEREQUEST;
903
904                 if (!(ch->flags & XPC_C_DISCONNECTING)) {
905                         reason = args->reason;
906                         if (reason <= xpcSuccess || reason > xpcUnknownReason) {
907                                 reason = xpcUnknownReason;
908                         } else if (reason == xpcUnregistering) {
909                                 reason = xpcOtherUnregistering;
910                         }
911
912                         XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
913
914                         DBUG_ON(IPI_flags & XPC_IPI_CLOSEREPLY);
915                         spin_unlock_irqrestore(&ch->lock, irq_flags);
916                         return;
917                 }
918
919                 xpc_process_disconnect(ch, &irq_flags);
920         }
921
922
923         if (IPI_flags & XPC_IPI_CLOSEREPLY) {
924
925                 dev_dbg(xpc_chan, "XPC_IPI_CLOSEREPLY received from partid=%d,"
926                         " channel=%d\n", ch->partid, ch->number);
927
928                 if (ch->flags & XPC_C_DISCONNECTED) {
929                         DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
930                         spin_unlock_irqrestore(&ch->lock, irq_flags);
931                         return;
932                 }
933
934                 DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
935
936                 if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
937                         if ((XPC_GET_IPI_FLAGS(part->local_IPI_amo, ch_number)
938                                                 & XPC_IPI_CLOSEREQUEST)) {
939
940                                 DBUG_ON(ch->delayed_IPI_flags != 0);
941                                 spin_lock(&part->IPI_lock);
942                                 XPC_SET_IPI_FLAGS(part->local_IPI_amo,
943                                                 ch_number, XPC_IPI_CLOSEREPLY);
944                                 spin_unlock(&part->IPI_lock);
945                         }
946                         spin_unlock_irqrestore(&ch->lock, irq_flags);
947                         return;
948                 }
949
950                 ch->flags |= XPC_C_RCLOSEREPLY;
951
952                 if (ch->flags & XPC_C_CLOSEREPLY) {
953                         /* both sides have finished disconnecting */
954                         xpc_process_disconnect(ch, &irq_flags);
955                 }
956         }
957
958
959         if (IPI_flags & XPC_IPI_OPENREQUEST) {
960
961                 dev_dbg(xpc_chan, "XPC_IPI_OPENREQUEST (msg_size=%d, "
962                         "local_nentries=%d) received from partid=%d, "
963                         "channel=%d\n", args->msg_size, args->local_nentries,
964                         ch->partid, ch->number);
965
966                 if (part->act_state == XPC_P_DEACTIVATING ||
967                                         (ch->flags & XPC_C_ROPENREQUEST)) {
968                         spin_unlock_irqrestore(&ch->lock, irq_flags);
969                         return;
970                 }
971
972                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
973                         ch->delayed_IPI_flags |= XPC_IPI_OPENREQUEST;
974                         spin_unlock_irqrestore(&ch->lock, irq_flags);
975                         return;
976                 }
977                 DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
978                                                         XPC_C_OPENREQUEST)));
979                 DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
980                                         XPC_C_OPENREPLY | XPC_C_CONNECTED));
981
982                 /*
983                  * The meaningful OPENREQUEST connection state fields are:
984                  *      msg_size = size of channel's messages in bytes
985                  *      local_nentries = remote partition's local_nentries
986                  */
987                 if (args->msg_size == 0 || args->local_nentries == 0) {
988                         /* assume OPENREQUEST was delayed by mistake */
989                         spin_unlock_irqrestore(&ch->lock, irq_flags);
990                         return;
991                 }
992
993                 ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
994                 ch->remote_nentries = args->local_nentries;
995
996
997                 if (ch->flags & XPC_C_OPENREQUEST) {
998                         if (args->msg_size != ch->msg_size) {
999                                 XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
1000                                                                 &irq_flags);
1001                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1002                                 return;
1003                         }
1004                 } else {
1005                         ch->msg_size = args->msg_size;
1006
1007                         XPC_SET_REASON(ch, 0, 0);
1008                         ch->flags &= ~XPC_C_DISCONNECTED;
1009
1010                         atomic_inc(&part->nchannels_active);
1011                 }
1012
1013                 xpc_process_connect(ch, &irq_flags);
1014         }
1015
1016
1017         if (IPI_flags & XPC_IPI_OPENREPLY) {
1018
1019                 dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY (local_msgqueue_pa=0x%lx, "
1020                         "local_nentries=%d, remote_nentries=%d) received from "
1021                         "partid=%d, channel=%d\n", args->local_msgqueue_pa,
1022                         args->local_nentries, args->remote_nentries,
1023                         ch->partid, ch->number);
1024
1025                 if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
1026                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1027                         return;
1028                 }
1029                 if (!(ch->flags & XPC_C_OPENREQUEST)) {
1030                         XPC_DISCONNECT_CHANNEL(ch, xpcOpenCloseError,
1031                                                                 &irq_flags);
1032                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1033                         return;
1034                 }
1035
1036                 DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
1037                 DBUG_ON(ch->flags & XPC_C_CONNECTED);
1038
1039                 /*
1040                  * The meaningful OPENREPLY connection state fields are:
1041                  *      local_msgqueue_pa = physical address of remote
1042                  *                          partition's local_msgqueue
1043                  *      local_nentries = remote partition's local_nentries
1044                  *      remote_nentries = remote partition's remote_nentries
1045                  */
1046                 DBUG_ON(args->local_msgqueue_pa == 0);
1047                 DBUG_ON(args->local_nentries == 0);
1048                 DBUG_ON(args->remote_nentries == 0);
1049
1050                 ch->flags |= XPC_C_ROPENREPLY;
1051                 ch->remote_msgqueue_pa = args->local_msgqueue_pa;
1052
1053                 if (args->local_nentries < ch->remote_nentries) {
1054                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1055                                 "remote_nentries=%d, old remote_nentries=%d, "
1056                                 "partid=%d, channel=%d\n",
1057                                 args->local_nentries, ch->remote_nentries,
1058                                 ch->partid, ch->number);
1059
1060                         ch->remote_nentries = args->local_nentries;
1061                 }
1062                 if (args->remote_nentries < ch->local_nentries) {
1063                         dev_dbg(xpc_chan, "XPC_IPI_OPENREPLY: new "
1064                                 "local_nentries=%d, old local_nentries=%d, "
1065                                 "partid=%d, channel=%d\n",
1066                                 args->remote_nentries, ch->local_nentries,
1067                                 ch->partid, ch->number);
1068
1069                         ch->local_nentries = args->remote_nentries;
1070                 }
1071
1072                 xpc_process_connect(ch, &irq_flags);
1073         }
1074
1075         spin_unlock_irqrestore(&ch->lock, irq_flags);
1076 }
1077
1078
1079 /*
1080  * Attempt to establish a channel connection to a remote partition.
1081  */
1082 static enum xpc_retval
1083 xpc_connect_channel(struct xpc_channel *ch)
1084 {
1085         unsigned long irq_flags;
1086         struct xpc_registration *registration = &xpc_registrations[ch->number];
1087
1088
1089         if (mutex_trylock(&registration->mutex) == 0) {
1090                 return xpcRetry;
1091         }
1092
1093         if (!XPC_CHANNEL_REGISTERED(ch->number)) {
1094                 mutex_unlock(&registration->mutex);
1095                 return xpcUnregistered;
1096         }
1097
1098         spin_lock_irqsave(&ch->lock, irq_flags);
1099
1100         DBUG_ON(ch->flags & XPC_C_CONNECTED);
1101         DBUG_ON(ch->flags & XPC_C_OPENREQUEST);
1102
1103         if (ch->flags & XPC_C_DISCONNECTING) {
1104                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1105                 mutex_unlock(&registration->mutex);
1106                 return ch->reason;
1107         }
1108
1109
1110         /* add info from the channel connect registration to the channel */
1111
1112         ch->kthreads_assigned_limit = registration->assigned_limit;
1113         ch->kthreads_idle_limit = registration->idle_limit;
1114         DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
1115         DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
1116         DBUG_ON(atomic_read(&ch->kthreads_active) != 0);
1117
1118         ch->func = registration->func;
1119         DBUG_ON(registration->func == NULL);
1120         ch->key = registration->key;
1121
1122         ch->local_nentries = registration->nentries;
1123
1124         if (ch->flags & XPC_C_ROPENREQUEST) {
1125                 if (registration->msg_size != ch->msg_size) {
1126                         /* the local and remote sides aren't the same */
1127
1128                         /*
1129                          * Because XPC_DISCONNECT_CHANNEL() can block we're
1130                          * forced to up the registration sema before we unlock
1131                          * the channel lock. But that's okay here because we're
1132                          * done with the part that required the registration
1133                          * sema. XPC_DISCONNECT_CHANNEL() requires that the
1134                          * channel lock be locked and will unlock and relock
1135                          * the channel lock as needed.
1136                          */
1137                         mutex_unlock(&registration->mutex);
1138                         XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
1139                                                                 &irq_flags);
1140                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1141                         return xpcUnequalMsgSizes;
1142                 }
1143         } else {
1144                 ch->msg_size = registration->msg_size;
1145
1146                 XPC_SET_REASON(ch, 0, 0);
1147                 ch->flags &= ~XPC_C_DISCONNECTED;
1148
1149                 atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
1150         }
1151
1152         mutex_unlock(&registration->mutex);
1153
1154
1155         /* initiate the connection */
1156
1157         ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
1158         xpc_IPI_send_openrequest(ch, &irq_flags);
1159
1160         xpc_process_connect(ch, &irq_flags);
1161
1162         spin_unlock_irqrestore(&ch->lock, irq_flags);
1163
1164         return xpcSuccess;
1165 }
1166
1167
1168 /*
1169  * Clear some of the msg flags in the local message queue.
1170  */
1171 static inline void
1172 xpc_clear_local_msgqueue_flags(struct xpc_channel *ch)
1173 {
1174         struct xpc_msg *msg;
1175         s64 get;
1176
1177
1178         get = ch->w_remote_GP.get;
1179         do {
1180                 msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1181                                 (get % ch->local_nentries) * ch->msg_size);
1182                 msg->flags = 0;
1183         } while (++get < (volatile s64) ch->remote_GP.get);
1184 }
1185
1186
1187 /*
1188  * Clear some of the msg flags in the remote message queue.
1189  */
1190 static inline void
1191 xpc_clear_remote_msgqueue_flags(struct xpc_channel *ch)
1192 {
1193         struct xpc_msg *msg;
1194         s64 put;
1195
1196
1197         put = ch->w_remote_GP.put;
1198         do {
1199                 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
1200                                 (put % ch->remote_nentries) * ch->msg_size);
1201                 msg->flags = 0;
1202         } while (++put < (volatile s64) ch->remote_GP.put);
1203 }
1204
1205
1206 static void
1207 xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
1208 {
1209         struct xpc_channel *ch = &part->channels[ch_number];
1210         int nmsgs_sent;
1211
1212
1213         ch->remote_GP = part->remote_GPs[ch_number];
1214
1215
1216         /* See what, if anything, has changed for each connected channel */
1217
1218         xpc_msgqueue_ref(ch);
1219
1220         if (ch->w_remote_GP.get == ch->remote_GP.get &&
1221                                 ch->w_remote_GP.put == ch->remote_GP.put) {
1222                 /* nothing changed since GPs were last pulled */
1223                 xpc_msgqueue_deref(ch);
1224                 return;
1225         }
1226
1227         if (!(ch->flags & XPC_C_CONNECTED)){
1228                 xpc_msgqueue_deref(ch);
1229                 return;
1230         }
1231
1232
1233         /*
1234          * First check to see if messages recently sent by us have been
1235          * received by the other side. (The remote GET value will have
1236          * changed since we last looked at it.)
1237          */
1238
1239         if (ch->w_remote_GP.get != ch->remote_GP.get) {
1240
1241                 /*
1242                  * We need to notify any senders that want to be notified
1243                  * that their sent messages have been received by their
1244                  * intended recipients. We need to do this before updating
1245                  * w_remote_GP.get so that we don't allocate the same message
1246                  * queue entries prematurely (see xpc_allocate_msg()).
1247                  */
1248                 if (atomic_read(&ch->n_to_notify) > 0) {
1249                         /*
1250                          * Notify senders that messages sent have been
1251                          * received and delivered by the other side.
1252                          */
1253                         xpc_notify_senders(ch, xpcMsgDelivered,
1254                                                         ch->remote_GP.get);
1255                 }
1256
1257                 /*
1258                  * Clear msg->flags in previously sent messages, so that
1259                  * they're ready for xpc_allocate_msg().
1260                  */
1261                 xpc_clear_local_msgqueue_flags(ch);
1262
1263                 ch->w_remote_GP.get = ch->remote_GP.get;
1264
1265                 dev_dbg(xpc_chan, "w_remote_GP.get changed to %ld, partid=%d, "
1266                         "channel=%d\n", ch->w_remote_GP.get, ch->partid,
1267                         ch->number);
1268
1269                 /*
1270                  * If anyone was waiting for message queue entries to become
1271                  * available, wake them up.
1272                  */
1273                 if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
1274                         wake_up(&ch->msg_allocate_wq);
1275                 }
1276         }
1277
1278
1279         /*
1280          * Now check for newly sent messages by the other side. (The remote
1281          * PUT value will have changed since we last looked at it.)
1282          */
1283
1284         if (ch->w_remote_GP.put != ch->remote_GP.put) {
1285                 /*
1286                  * Clear msg->flags in previously received messages, so that
1287                  * they're ready for xpc_get_deliverable_msg().
1288                  */
1289                 xpc_clear_remote_msgqueue_flags(ch);
1290
1291                 ch->w_remote_GP.put = ch->remote_GP.put;
1292
1293                 dev_dbg(xpc_chan, "w_remote_GP.put changed to %ld, partid=%d, "
1294                         "channel=%d\n", ch->w_remote_GP.put, ch->partid,
1295                         ch->number);
1296
1297                 nmsgs_sent = ch->w_remote_GP.put - ch->w_local_GP.get;
1298                 if (nmsgs_sent > 0) {
1299                         dev_dbg(xpc_chan, "msgs waiting to be copied and "
1300                                 "delivered=%d, partid=%d, channel=%d\n",
1301                                 nmsgs_sent, ch->partid, ch->number);
1302
1303                         if (ch->flags & XPC_C_CONNECTCALLOUT) {
1304                                 xpc_activate_kthreads(ch, nmsgs_sent);
1305                         }
1306                 }
1307         }
1308
1309         xpc_msgqueue_deref(ch);
1310 }
1311
1312
1313 void
1314 xpc_process_channel_activity(struct xpc_partition *part)
1315 {
1316         unsigned long irq_flags;
1317         u64 IPI_amo, IPI_flags;
1318         struct xpc_channel *ch;
1319         int ch_number;
1320         u32 ch_flags;
1321
1322
1323         IPI_amo = xpc_get_IPI_flags(part);
1324
1325         /*
1326          * Initiate channel connections for registered channels.
1327          *
1328          * For each connected channel that has pending messages activate idle
1329          * kthreads and/or create new kthreads as needed.
1330          */
1331
1332         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1333                 ch = &part->channels[ch_number];
1334
1335
1336                 /*
1337                  * Process any open or close related IPI flags, and then deal
1338                  * with connecting or disconnecting the channel as required.
1339                  */
1340
1341                 IPI_flags = XPC_GET_IPI_FLAGS(IPI_amo, ch_number);
1342
1343                 if (XPC_ANY_OPENCLOSE_IPI_FLAGS_SET(IPI_flags)) {
1344                         xpc_process_openclose_IPI(part, ch_number, IPI_flags);
1345                 }
1346
1347                 ch_flags = ch->flags;   /* need an atomic snapshot of flags */
1348
1349                 if (ch_flags & XPC_C_DISCONNECTING) {
1350                         spin_lock_irqsave(&ch->lock, irq_flags);
1351                         xpc_process_disconnect(ch, &irq_flags);
1352                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1353                         continue;
1354                 }
1355
1356                 if (part->act_state == XPC_P_DEACTIVATING) {
1357                         continue;
1358                 }
1359
1360                 if (!(ch_flags & XPC_C_CONNECTED)) {
1361                         if (!(ch_flags & XPC_C_OPENREQUEST)) {
1362                                 DBUG_ON(ch_flags & XPC_C_SETUP);
1363                                 (void) xpc_connect_channel(ch);
1364                         } else {
1365                                 spin_lock_irqsave(&ch->lock, irq_flags);
1366                                 xpc_process_connect(ch, &irq_flags);
1367                                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1368                         }
1369                         continue;
1370                 }
1371
1372
1373                 /*
1374                  * Process any message related IPI flags, this may involve the
1375                  * activation of kthreads to deliver any pending messages sent
1376                  * from the other partition.
1377                  */
1378
1379                 if (XPC_ANY_MSG_IPI_FLAGS_SET(IPI_flags)) {
1380                         xpc_process_msg_IPI(part, ch_number);
1381                 }
1382         }
1383 }
1384
1385
1386 /*
1387  * XPC's heartbeat code calls this function to inform XPC that a partition is
1388  * going down.  XPC responds by tearing down the XPartition Communication
1389  * infrastructure used for the just downed partition.
1390  *
1391  * XPC's heartbeat code will never call this function and xpc_partition_up()
1392  * at the same time. Nor will it ever make multiple calls to either function
1393  * at the same time.
1394  */
1395 void
1396 xpc_partition_going_down(struct xpc_partition *part, enum xpc_retval reason)
1397 {
1398         unsigned long irq_flags;
1399         int ch_number;
1400         struct xpc_channel *ch;
1401
1402
1403         dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
1404                 XPC_PARTID(part), reason);
1405
1406         if (!xpc_part_ref(part)) {
1407                 /* infrastructure for this partition isn't currently set up */
1408                 return;
1409         }
1410
1411
1412         /* disconnect channels associated with the partition going down */
1413
1414         for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
1415                 ch = &part->channels[ch_number];
1416
1417                 xpc_msgqueue_ref(ch);
1418                 spin_lock_irqsave(&ch->lock, irq_flags);
1419
1420                 XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
1421
1422                 spin_unlock_irqrestore(&ch->lock, irq_flags);
1423                 xpc_msgqueue_deref(ch);
1424         }
1425
1426         xpc_wakeup_channel_mgr(part);
1427
1428         xpc_part_deref(part);
1429 }
1430
1431
1432 /*
1433  * Teardown the infrastructure necessary to support XPartition Communication
1434  * between the specified remote partition and the local one.
1435  */
1436 void
1437 xpc_teardown_infrastructure(struct xpc_partition *part)
1438 {
1439         partid_t partid = XPC_PARTID(part);
1440
1441
1442         /*
1443          * We start off by making this partition inaccessible to local
1444          * processes by marking it as no longer setup. Then we make it
1445          * inaccessible to remote processes by clearing the XPC per partition
1446          * specific variable's magic # (which indicates that these variables
1447          * are no longer valid) and by ignoring all XPC notify IPIs sent to
1448          * this partition.
1449          */
1450
1451         DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
1452         DBUG_ON(atomic_read(&part->nchannels_active) != 0);
1453         DBUG_ON(part->setup_state != XPC_P_SETUP);
1454         part->setup_state = XPC_P_WTEARDOWN;
1455
1456         xpc_vars_part[partid].magic = 0;
1457
1458
1459         free_irq(SGI_XPC_NOTIFY, (void *) (u64) partid);
1460
1461
1462         /*
1463          * Before proceding with the teardown we have to wait until all
1464          * existing references cease.
1465          */
1466         wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));
1467
1468
1469         /* now we can begin tearing down the infrastructure */
1470
1471         part->setup_state = XPC_P_TORNDOWN;
1472
1473         /* in case we've still got outstanding timers registered... */
1474         del_timer_sync(&part->dropped_IPI_timer);
1475
1476         kfree(part->remote_openclose_args_base);
1477         part->remote_openclose_args = NULL;
1478         kfree(part->local_openclose_args_base);
1479         part->local_openclose_args = NULL;
1480         kfree(part->remote_GPs_base);
1481         part->remote_GPs = NULL;
1482         kfree(part->local_GPs_base);
1483         part->local_GPs = NULL;
1484         kfree(part->channels);
1485         part->channels = NULL;
1486         part->local_IPI_amo_va = NULL;
1487 }
1488
1489
1490 /*
1491  * Called by XP at the time of channel connection registration to cause
1492  * XPC to establish connections to all currently active partitions.
1493  */
1494 void
1495 xpc_initiate_connect(int ch_number)
1496 {
1497         partid_t partid;
1498         struct xpc_partition *part;
1499         struct xpc_channel *ch;
1500
1501
1502         DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1503
1504         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1505                 part = &xpc_partitions[partid];
1506
1507                 if (xpc_part_ref(part)) {
1508                         ch = &part->channels[ch_number];
1509
1510                         /*
1511                          * Initiate the establishment of a connection on the
1512                          * newly registered channel to the remote partition.
1513                          */
1514                         xpc_wakeup_channel_mgr(part);
1515                         xpc_part_deref(part);
1516                 }
1517         }
1518 }
1519
1520
1521 void
1522 xpc_connected_callout(struct xpc_channel *ch)
1523 {
1524         /* let the registerer know that a connection has been established */
1525
1526         if (ch->func != NULL) {
1527                 dev_dbg(xpc_chan, "ch->func() called, reason=xpcConnected, "
1528                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1529
1530                 ch->func(xpcConnected, ch->partid, ch->number,
1531                                 (void *) (u64) ch->local_nentries, ch->key);
1532
1533                 dev_dbg(xpc_chan, "ch->func() returned, reason=xpcConnected, "
1534                         "partid=%d, channel=%d\n", ch->partid, ch->number);
1535         }
1536 }
1537
1538
1539 /*
1540  * Called by XP at the time of channel connection unregistration to cause
1541  * XPC to teardown all current connections for the specified channel.
1542  *
1543  * Before returning xpc_initiate_disconnect() will wait until all connections
1544  * on the specified channel have been closed/torndown. So the caller can be
1545  * assured that they will not be receiving any more callouts from XPC to the
1546  * function they registered via xpc_connect().
1547  *
1548  * Arguments:
1549  *
1550  *      ch_number - channel # to unregister.
1551  */
1552 void
1553 xpc_initiate_disconnect(int ch_number)
1554 {
1555         unsigned long irq_flags;
1556         partid_t partid;
1557         struct xpc_partition *part;
1558         struct xpc_channel *ch;
1559
1560
1561         DBUG_ON(ch_number < 0 || ch_number >= XPC_NCHANNELS);
1562
1563         /* initiate the channel disconnect for every active partition */
1564         for (partid = 1; partid < XP_MAX_PARTITIONS; partid++) {
1565                 part = &xpc_partitions[partid];
1566
1567                 if (xpc_part_ref(part)) {
1568                         ch = &part->channels[ch_number];
1569                         xpc_msgqueue_ref(ch);
1570
1571                         spin_lock_irqsave(&ch->lock, irq_flags);
1572
1573                         if (!(ch->flags & XPC_C_DISCONNECTED)) {
1574                                 ch->flags |= XPC_C_WDISCONNECT;
1575
1576                                 XPC_DISCONNECT_CHANNEL(ch, xpcUnregistering,
1577                                                                 &irq_flags);
1578                         }
1579
1580                         spin_unlock_irqrestore(&ch->lock, irq_flags);
1581
1582                         xpc_msgqueue_deref(ch);
1583                         xpc_part_deref(part);
1584                 }
1585         }
1586
1587         xpc_disconnect_wait(ch_number);
1588 }
1589
1590
1591 /*
1592  * To disconnect a channel, and reflect it back to all who may be waiting.
1593  *
1594  * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
1595  * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
1596  * xpc_disconnect_wait().
1597  *
1598  * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
1599  */
1600 void
1601 xpc_disconnect_channel(const int line, struct xpc_channel *ch,
1602                         enum xpc_retval reason, unsigned long *irq_flags)
1603 {
1604         u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
1605
1606
1607         DBUG_ON(!spin_is_locked(&ch->lock));
1608
1609         if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
1610                 return;
1611         }
1612         DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));
1613
1614         dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
1615                 reason, line, ch->partid, ch->number);
1616
1617         XPC_SET_REASON(ch, reason, line);
1618
1619         ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
1620         /* some of these may not have been set */
1621         ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
1622                         XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
1623                         XPC_C_CONNECTING | XPC_C_CONNECTED);
1624
1625         xpc_IPI_send_closerequest(ch, irq_flags);
1626
1627         if (channel_was_connected) {
1628                 ch->flags |= XPC_C_WASCONNECTED;
1629         }
1630
1631         spin_unlock_irqrestore(&ch->lock, *irq_flags);
1632
1633         /* wake all idle kthreads so they can exit */
1634         if (atomic_read(&ch->kthreads_idle) > 0) {
1635                 wake_up_all(&ch->idle_wq);
1636         }
1637
1638         /* wake those waiting to allocate an entry from the local msg queue */
1639         if (atomic_read(&ch->n_on_msg_allocate_wq) > 0) {
1640                 wake_up(&ch->msg_allocate_wq);
1641         }
1642
1643         spin_lock_irqsave(&ch->lock, *irq_flags);
1644 }
1645
1646
1647 void
1648 xpc_disconnect_callout(struct xpc_channel *ch, enum xpc_retval reason)
1649 {
1650         /*
1651          * Let the channel's registerer know that the channel is being
1652          * disconnected. We don't want to do this if the registerer was never
1653          * informed of a connection being made.
1654          */
1655
1656         if (ch->func != NULL) {
1657                 dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
1658                         "channel=%d\n", reason, ch->partid, ch->number);
1659
1660                 ch->func(reason, ch->partid, ch->number, NULL, ch->key);
1661
1662                 dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
1663                         "channel=%d\n", reason, ch->partid, ch->number);
1664         }
1665 }
1666
1667
1668 /*
1669  * Wait for a message entry to become available for the specified channel,
1670  * but don't wait any longer than 1 jiffy.
1671  */
1672 static enum xpc_retval
1673 xpc_allocate_msg_wait(struct xpc_channel *ch)
1674 {
1675         enum xpc_retval ret;
1676
1677
1678         if (ch->flags & XPC_C_DISCONNECTING) {
1679                 DBUG_ON(ch->reason == xpcInterrupted);  // >>> Is this true?
1680                 return ch->reason;
1681         }
1682
1683         atomic_inc(&ch->n_on_msg_allocate_wq);
1684         ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
1685         atomic_dec(&ch->n_on_msg_allocate_wq);
1686
1687         if (ch->flags & XPC_C_DISCONNECTING) {
1688                 ret = ch->reason;
1689                 DBUG_ON(ch->reason == xpcInterrupted);  // >>> Is this true?
1690         } else if (ret == 0) {
1691                 ret = xpcTimeout;
1692         } else {
1693                 ret = xpcInterrupted;
1694         }
1695
1696         return ret;
1697 }
1698
1699
1700 /*
1701  * Allocate an entry for a message from the message queue associated with the
1702  * specified channel.
1703  */
1704 static enum xpc_retval
1705 xpc_allocate_msg(struct xpc_channel *ch, u32 flags,
1706                         struct xpc_msg **address_of_msg)
1707 {
1708         struct xpc_msg *msg;
1709         enum xpc_retval ret;
1710         s64 put;
1711
1712
1713         /* this reference will be dropped in xpc_send_msg() */
1714         xpc_msgqueue_ref(ch);
1715
1716         if (ch->flags & XPC_C_DISCONNECTING) {
1717                 xpc_msgqueue_deref(ch);
1718                 return ch->reason;
1719         }
1720         if (!(ch->flags & XPC_C_CONNECTED)) {
1721                 xpc_msgqueue_deref(ch);
1722                 return xpcNotConnected;
1723         }
1724
1725
1726         /*
1727          * Get the next available message entry from the local message queue.
1728          * If none are available, we'll make sure that we grab the latest
1729          * GP values.
1730          */
1731         ret = xpcTimeout;
1732
1733         while (1) {
1734
1735                 put = (volatile s64) ch->w_local_GP.put;
1736                 if (put - (volatile s64) ch->w_remote_GP.get <
1737                                                         ch->local_nentries) {
1738
1739                         /* There are available message entries. We need to try
1740                          * to secure one for ourselves. We'll do this by trying
1741                          * to increment w_local_GP.put as long as someone else
1742                          * doesn't beat us to it. If they do, we'll have to
1743                          * try again.
1744                          */
1745                         if (cmpxchg(&ch->w_local_GP.put, put, put + 1) ==
1746                                                                         put) {
1747                                 /* we got the entry referenced by put */
1748                                 break;
1749                         }
1750                         continue;       /* try again */
1751                 }
1752
1753
1754                 /*
1755                  * There aren't any available msg entries at this time.
1756                  *
1757                  * In waiting for a message entry to become available,
1758                  * we set a timeout in case the other side is not
1759                  * sending completion IPIs. This lets us fake an IPI
1760                  * that will cause the IPI handler to fetch the latest
1761                  * GP values as if an IPI was sent by the other side.
1762                  */
1763                 if (ret == xpcTimeout) {
1764                         xpc_IPI_send_local_msgrequest(ch);
1765                 }
1766
1767                 if (flags & XPC_NOWAIT) {
1768                         xpc_msgqueue_deref(ch);
1769                         return xpcNoWait;
1770                 }
1771
1772                 ret = xpc_allocate_msg_wait(ch);
1773                 if (ret != xpcInterrupted && ret != xpcTimeout) {
1774                         xpc_msgqueue_deref(ch);
1775                         return ret;
1776                 }
1777         }
1778
1779
1780         /* get the message's address and initialize it */
1781         msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1782                                 (put % ch->local_nentries) * ch->msg_size);
1783
1784
1785         DBUG_ON(msg->flags != 0);
1786         msg->number = put;
1787
1788         dev_dbg(xpc_chan, "w_local_GP.put changed to %ld; msg=0x%p, "
1789                 "msg_number=%ld, partid=%d, channel=%d\n", put + 1,
1790                 (void *) msg, msg->number, ch->partid, ch->number);
1791
1792         *address_of_msg = msg;
1793
1794         return xpcSuccess;
1795 }
1796
1797
1798 /*
1799  * Allocate an entry for a message from the message queue associated with the
1800  * specified channel. NOTE that this routine can sleep waiting for a message
1801  * entry to become available. To not sleep, pass in the XPC_NOWAIT flag.
1802  *
1803  * Arguments:
1804  *
1805  *      partid - ID of partition to which the channel is connected.
1806  *      ch_number - channel #.
1807  *      flags - see xpc.h for valid flags.
1808  *      payload - address of the allocated payload area pointer (filled in on
1809  *                return) in which the user-defined message is constructed.
1810  */
1811 enum xpc_retval
1812 xpc_initiate_allocate(partid_t partid, int ch_number, u32 flags, void **payload)
1813 {
1814         struct xpc_partition *part = &xpc_partitions[partid];
1815         enum xpc_retval ret = xpcUnknownReason;
1816         struct xpc_msg *msg;
1817
1818
1819         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
1820         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
1821
1822         *payload = NULL;
1823
1824         if (xpc_part_ref(part)) {
1825                 ret = xpc_allocate_msg(&part->channels[ch_number], flags, &msg);
1826                 xpc_part_deref(part);
1827
1828                 if (msg != NULL) {
1829                         *payload = &msg->payload;
1830                 }
1831         }
1832
1833         return ret;
1834 }
1835
1836
1837 /*
1838  * Now we actually send the messages that are ready to be sent by advancing
1839  * the local message queue's Put value and then send an IPI to the recipient
1840  * partition.
1841  */
1842 static void
1843 xpc_send_msgs(struct xpc_channel *ch, s64 initial_put)
1844 {
1845         struct xpc_msg *msg;
1846         s64 put = initial_put + 1;
1847         int send_IPI = 0;
1848
1849
1850         while (1) {
1851
1852                 while (1) {
1853                         if (put == (volatile s64) ch->w_local_GP.put) {
1854                                 break;
1855                         }
1856
1857                         msg = (struct xpc_msg *) ((u64) ch->local_msgqueue +
1858                                (put % ch->local_nentries) * ch->msg_size);
1859
1860                         if (!(msg->flags & XPC_M_READY)) {
1861                                 break;
1862                         }
1863
1864                         put++;
1865                 }
1866
1867                 if (put == initial_put) {
1868                         /* nothing's changed */
1869                         break;
1870                 }
1871
1872                 if (cmpxchg_rel(&ch->local_GP->put, initial_put, put) !=
1873                                                                 initial_put) {
1874                         /* someone else beat us to it */
1875                         DBUG_ON((volatile s64) ch->local_GP->put < initial_put);
1876                         break;
1877                 }
1878
1879                 /* we just set the new value of local_GP->put */
1880
1881                 dev_dbg(xpc_chan, "local_GP->put changed to %ld, partid=%d, "
1882                         "channel=%d\n", put, ch->partid, ch->number);
1883
1884                 send_IPI = 1;
1885
1886                 /*
1887                  * We need to ensure that the message referenced by
1888                  * local_GP->put is not XPC_M_READY or that local_GP->put
1889                  * equals w_local_GP.put, so we'll go have a look.
1890                  */
1891                 initial_put = put;
1892         }
1893
1894         if (send_IPI) {
1895                 xpc_IPI_send_msgrequest(ch);
1896         }
1897 }
1898
1899
1900 /*
1901  * Common code that does the actual sending of the message by advancing the
1902  * local message queue's Put value and sends an IPI to the partition the
1903  * message is being sent to.
1904  */
1905 static enum xpc_retval
1906 xpc_send_msg(struct xpc_channel *ch, struct xpc_msg *msg, u8 notify_type,
1907                         xpc_notify_func func, void *key)
1908 {
1909         enum xpc_retval ret = xpcSuccess;
1910         struct xpc_notify *notify = notify;
1911         s64 put, msg_number = msg->number;
1912
1913
1914         DBUG_ON(notify_type == XPC_N_CALL && func == NULL);
1915         DBUG_ON((((u64) msg - (u64) ch->local_msgqueue) / ch->msg_size) !=
1916                                         msg_number % ch->local_nentries);
1917         DBUG_ON(msg->flags & XPC_M_READY);
1918
1919         if (ch->flags & XPC_C_DISCONNECTING) {
1920                 /* drop the reference grabbed in xpc_allocate_msg() */
1921                 xpc_msgqueue_deref(ch);
1922                 return ch->reason;
1923         }
1924
1925         if (notify_type != 0) {
1926                 /*
1927                  * Tell the remote side to send an ACK interrupt when the
1928                  * message has been delivered.
1929                  */
1930                 msg->flags |= XPC_M_INTERRUPT;
1931
1932                 atomic_inc(&ch->n_to_notify);
1933
1934                 notify = &ch->notify_queue[msg_number % ch->local_nentries];
1935                 notify->func = func;
1936                 notify->key = key;
1937                 notify->type = notify_type;
1938
1939                 // >>> is a mb() needed here?
1940
1941                 if (ch->flags & XPC_C_DISCONNECTING) {
1942                         /*
1943                          * An error occurred between our last error check and
1944                          * this one. We will try to clear the type field from
1945                          * the notify entry. If we succeed then
1946                          * xpc_disconnect_channel() didn't already process
1947                          * the notify entry.
1948                          */
1949                         if (cmpxchg(&notify->type, notify_type, 0) ==
1950                                                                 notify_type) {
1951                                 atomic_dec(&ch->n_to_notify);
1952                                 ret = ch->reason;
1953                         }
1954
1955                         /* drop the reference grabbed in xpc_allocate_msg() */
1956                         xpc_msgqueue_deref(ch);
1957                         return ret;
1958                 }
1959         }
1960
1961         msg->flags |= XPC_M_READY;
1962
1963         /*
1964          * The preceding store of msg->flags must occur before the following
1965          * load of ch->local_GP->put.
1966          */
1967         mb();
1968
1969         /* see if the message is next in line to be sent, if so send it */
1970
1971         put = ch->local_GP->put;
1972         if (put == msg_number) {
1973                 xpc_send_msgs(ch, put);
1974         }
1975
1976         /* drop the reference grabbed in xpc_allocate_msg() */
1977         xpc_msgqueue_deref(ch);
1978         return ret;
1979 }
1980
1981
1982 /*
1983  * Send a message previously allocated using xpc_initiate_allocate() on the
1984  * specified channel connected to the specified partition.
1985  *
1986  * This routine will not wait for the message to be received, nor will
1987  * notification be given when it does happen. Once this routine has returned
1988  * the message entry allocated via xpc_initiate_allocate() is no longer
1989  * accessable to the caller.
1990  *
1991  * This routine, although called by users, does not call xpc_part_ref() to
1992  * ensure that the partition infrastructure is in place. It relies on the
1993  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
1994  *
1995  * Arguments:
1996  *
1997  *      partid - ID of partition to which the channel is connected.
1998  *      ch_number - channel # to send message on.
1999  *      payload - pointer to the payload area allocated via
2000  *                      xpc_initiate_allocate().
2001  */
2002 enum xpc_retval
2003 xpc_initiate_send(partid_t partid, int ch_number, void *payload)
2004 {
2005         struct xpc_partition *part = &xpc_partitions[partid];
2006         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2007         enum xpc_retval ret;
2008
2009
2010         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg,
2011                 partid, ch_number);
2012
2013         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2014         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2015         DBUG_ON(msg == NULL);
2016
2017         ret = xpc_send_msg(&part->channels[ch_number], msg, 0, NULL, NULL);
2018
2019         return ret;
2020 }
2021
2022
2023 /*
2024  * Send a message previously allocated using xpc_initiate_allocate on the
2025  * specified channel connected to the specified partition.
2026  *
2027  * This routine will not wait for the message to be sent. Once this routine
2028  * has returned the message entry allocated via xpc_initiate_allocate() is no
2029  * longer accessable to the caller.
2030  *
2031  * Once the remote end of the channel has received the message, the function
2032  * passed as an argument to xpc_initiate_send_notify() will be called. This
2033  * allows the sender to free up or re-use any buffers referenced by the
2034  * message, but does NOT mean the message has been processed at the remote
2035  * end by a receiver.
2036  *
2037  * If this routine returns an error, the caller's function will NOT be called.
2038  *
2039  * This routine, although called by users, does not call xpc_part_ref() to
2040  * ensure that the partition infrastructure is in place. It relies on the
2041  * fact that we called xpc_msgqueue_ref() in xpc_allocate_msg().
2042  *
2043  * Arguments:
2044  *
2045  *      partid - ID of partition to which the channel is connected.
2046  *      ch_number - channel # to send message on.
2047  *      payload - pointer to the payload area allocated via
2048  *                      xpc_initiate_allocate().
2049  *      func - function to call with asynchronous notification of message
2050  *                receipt. THIS FUNCTION MUST BE NON-BLOCKING.
2051  *      key - user-defined key to be passed to the function when it's called.
2052  */
2053 enum xpc_retval
2054 xpc_initiate_send_notify(partid_t partid, int ch_number, void *payload,
2055                                 xpc_notify_func func, void *key)
2056 {
2057         struct xpc_partition *part = &xpc_partitions[partid];
2058         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2059         enum xpc_retval ret;
2060
2061
2062         dev_dbg(xpc_chan, "msg=0x%p, partid=%d, channel=%d\n", (void *) msg,
2063                 partid, ch_number);
2064
2065         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2066         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2067         DBUG_ON(msg == NULL);
2068         DBUG_ON(func == NULL);
2069
2070         ret = xpc_send_msg(&part->channels[ch_number], msg, XPC_N_CALL,
2071                                                                 func, key);
2072         return ret;
2073 }
2074
2075
2076 static struct xpc_msg *
2077 xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
2078 {
2079         struct xpc_partition *part = &xpc_partitions[ch->partid];
2080         struct xpc_msg *remote_msg, *msg;
2081         u32 msg_index, nmsgs;
2082         u64 msg_offset;
2083         enum xpc_retval ret;
2084
2085
2086         if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
2087                 /* we were interrupted by a signal */
2088                 return NULL;
2089         }
2090
2091         while (get >= ch->next_msg_to_pull) {
2092
2093                 /* pull as many messages as are ready and able to be pulled */
2094
2095                 msg_index = ch->next_msg_to_pull % ch->remote_nentries;
2096
2097                 DBUG_ON(ch->next_msg_to_pull >=
2098                                         (volatile s64) ch->w_remote_GP.put);
2099                 nmsgs =  (volatile s64) ch->w_remote_GP.put -
2100                                                 ch->next_msg_to_pull;
2101                 if (msg_index + nmsgs > ch->remote_nentries) {
2102                         /* ignore the ones that wrap the msg queue for now */
2103                         nmsgs = ch->remote_nentries - msg_index;
2104                 }
2105
2106                 msg_offset = msg_index * ch->msg_size;
2107                 msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
2108                                                                 msg_offset);
2109                 remote_msg = (struct xpc_msg *) (ch->remote_msgqueue_pa +
2110                                                                 msg_offset);
2111
2112                 if ((ret = xpc_pull_remote_cachelines(part, msg, remote_msg,
2113                                 nmsgs * ch->msg_size)) != xpcSuccess) {
2114
2115                         dev_dbg(xpc_chan, "failed to pull %d msgs starting with"
2116                                 " msg %ld from partition %d, channel=%d, "
2117                                 "ret=%d\n", nmsgs, ch->next_msg_to_pull,
2118                                 ch->partid, ch->number, ret);
2119
2120                         XPC_DEACTIVATE_PARTITION(part, ret);
2121
2122                         mutex_unlock(&ch->msg_to_pull_mutex);
2123                         return NULL;
2124                 }
2125
2126                 mb();   /* >>> this may not be needed, we're not sure */
2127
2128                 ch->next_msg_to_pull += nmsgs;
2129         }
2130
2131         mutex_unlock(&ch->msg_to_pull_mutex);
2132
2133         /* return the message we were looking for */
2134         msg_offset = (get % ch->remote_nentries) * ch->msg_size;
2135         msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue + msg_offset);
2136
2137         return msg;
2138 }
2139
2140
2141 /*
2142  * Get a message to be delivered.
2143  */
2144 static struct xpc_msg *
2145 xpc_get_deliverable_msg(struct xpc_channel *ch)
2146 {
2147         struct xpc_msg *msg = NULL;
2148         s64 get;
2149
2150
2151         do {
2152                 if ((volatile u32) ch->flags & XPC_C_DISCONNECTING) {
2153                         break;
2154                 }
2155
2156                 get = (volatile s64) ch->w_local_GP.get;
2157                 if (get == (volatile s64) ch->w_remote_GP.put) {
2158                         break;
2159                 }
2160
2161                 /* There are messages waiting to be pulled and delivered.
2162                  * We need to try to secure one for ourselves. We'll do this
2163                  * by trying to increment w_local_GP.get and hope that no one
2164                  * else beats us to it. If they do, we'll we'll simply have
2165                  * to try again for the next one.
2166                  */
2167
2168                 if (cmpxchg(&ch->w_local_GP.get, get, get + 1) == get) {
2169                         /* we got the entry referenced by get */
2170
2171                         dev_dbg(xpc_chan, "w_local_GP.get changed to %ld, "
2172                                 "partid=%d, channel=%d\n", get + 1,
2173                                 ch->partid, ch->number);
2174
2175                         /* pull the message from the remote partition */
2176
2177                         msg = xpc_pull_remote_msg(ch, get);
2178
2179                         DBUG_ON(msg != NULL && msg->number != get);
2180                         DBUG_ON(msg != NULL && (msg->flags & XPC_M_DONE));
2181                         DBUG_ON(msg != NULL && !(msg->flags & XPC_M_READY));
2182
2183                         break;
2184                 }
2185
2186         } while (1);
2187
2188         return msg;
2189 }
2190
2191
2192 /*
2193  * Deliver a message to its intended recipient.
2194  */
2195 void
2196 xpc_deliver_msg(struct xpc_channel *ch)
2197 {
2198         struct xpc_msg *msg;
2199
2200
2201         if ((msg = xpc_get_deliverable_msg(ch)) != NULL) {
2202
2203                 /*
2204                  * This ref is taken to protect the payload itself from being
2205                  * freed before the user is finished with it, which the user
2206                  * indicates by calling xpc_initiate_received().
2207                  */
2208                 xpc_msgqueue_ref(ch);
2209
2210                 atomic_inc(&ch->kthreads_active);
2211
2212                 if (ch->func != NULL) {
2213                         dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
2214                                 "msg_number=%ld, partid=%d, channel=%d\n",
2215                                 (void *) msg, msg->number, ch->partid,
2216                                 ch->number);
2217
2218                         /* deliver the message to its intended recipient */
2219                         ch->func(xpcMsgReceived, ch->partid, ch->number,
2220                                         &msg->payload, ch->key);
2221
2222                         dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
2223                                 "msg_number=%ld, partid=%d, channel=%d\n",
2224                                 (void *) msg, msg->number, ch->partid,
2225                                 ch->number);
2226                 }
2227
2228                 atomic_dec(&ch->kthreads_active);
2229         }
2230 }
2231
2232
2233 /*
2234  * Now we actually acknowledge the messages that have been delivered and ack'd
2235  * by advancing the cached remote message queue's Get value and if requested
2236  * send an IPI to the message sender's partition.
2237  */
2238 static void
2239 xpc_acknowledge_msgs(struct xpc_channel *ch, s64 initial_get, u8 msg_flags)
2240 {
2241         struct xpc_msg *msg;
2242         s64 get = initial_get + 1;
2243         int send_IPI = 0;
2244
2245
2246         while (1) {
2247
2248                 while (1) {
2249                         if (get == (volatile s64) ch->w_local_GP.get) {
2250                                 break;
2251                         }
2252
2253                         msg = (struct xpc_msg *) ((u64) ch->remote_msgqueue +
2254                                (get % ch->remote_nentries) * ch->msg_size);
2255
2256                         if (!(msg->flags & XPC_M_DONE)) {
2257                                 break;
2258                         }
2259
2260                         msg_flags |= msg->flags;
2261                         get++;
2262                 }
2263
2264                 if (get == initial_get) {
2265                         /* nothing's changed */
2266                         break;
2267                 }
2268
2269                 if (cmpxchg_rel(&ch->local_GP->get, initial_get, get) !=
2270                                                                 initial_get) {
2271                         /* someone else beat us to it */
2272                         DBUG_ON((volatile s64) ch->local_GP->get <=
2273                                                                 initial_get);
2274                         break;
2275                 }
2276
2277                 /* we just set the new value of local_GP->get */
2278
2279                 dev_dbg(xpc_chan, "local_GP->get changed to %ld, partid=%d, "
2280                         "channel=%d\n", get, ch->partid, ch->number);
2281
2282                 send_IPI = (msg_flags & XPC_M_INTERRUPT);
2283
2284                 /*
2285                  * We need to ensure that the message referenced by
2286                  * local_GP->get is not XPC_M_DONE or that local_GP->get
2287                  * equals w_local_GP.get, so we'll go have a look.
2288                  */
2289                 initial_get = get;
2290         }
2291
2292         if (send_IPI) {
2293                 xpc_IPI_send_msgrequest(ch);
2294         }
2295 }
2296
2297
2298 /*
2299  * Acknowledge receipt of a delivered message.
2300  *
2301  * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
2302  * that sent the message.
2303  *
2304  * This function, although called by users, does not call xpc_part_ref() to
2305  * ensure that the partition infrastructure is in place. It relies on the
2306  * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
2307  *
2308  * Arguments:
2309  *
2310  *      partid - ID of partition to which the channel is connected.
2311  *      ch_number - channel # message received on.
2312  *      payload - pointer to the payload area allocated via
2313  *                      xpc_initiate_allocate().
2314  */
2315 void
2316 xpc_initiate_received(partid_t partid, int ch_number, void *payload)
2317 {
2318         struct xpc_partition *part = &xpc_partitions[partid];
2319         struct xpc_channel *ch;
2320         struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);
2321         s64 get, msg_number = msg->number;
2322
2323
2324         DBUG_ON(partid <= 0 || partid >= XP_MAX_PARTITIONS);
2325         DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
2326
2327         ch = &part->channels[ch_number];
2328
2329         dev_dbg(xpc_chan, "msg=0x%p, msg_number=%ld, partid=%d, channel=%d\n",
2330                 (void *) msg, msg_number, ch->partid, ch->number);
2331
2332         DBUG_ON((((u64) msg - (u64) ch->remote_msgqueue) / ch->msg_size) !=
2333                                         msg_number % ch->remote_nentries);
2334         DBUG_ON(msg->flags & XPC_M_DONE);
2335
2336         msg->flags |= XPC_M_DONE;
2337
2338         /*
2339          * The preceding store of msg->flags must occur before the following
2340          * load of ch->local_GP->get.
2341          */
2342         mb();
2343
2344         /*
2345          * See if this message is next in line to be acknowledged as having
2346          * been delivered.
2347          */
2348         get = ch->local_GP->get;
2349         if (get == msg_number) {
2350                 xpc_acknowledge_msgs(ch, get, msg->flags);
2351         }
2352
2353         /* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
2354         xpc_msgqueue_deref(ch);
2355 }
2356