[IA64-SGI] enforce proper ordering of callouts by XPC
[pandora-kernel.git] / arch / ia64 / sn / kernel / xpc_channel.c
index 0c0a689..cdf6856 100644 (file)
@@ -22,6 +22,8 @@
 #include <linux/cache.h>
 #include <linux/interrupt.h>
 #include <linux/slab.h>
+#include <linux/mutex.h>
+#include <linux/completion.h>
 #include <asm/sn/bte.h>
 #include <asm/sn/sn_sal.h>
 #include <asm/sn/xpc.h>
@@ -56,8 +58,8 @@ xpc_initialize_channels(struct xpc_partition *part, partid_t partid)
                atomic_set(&ch->n_to_notify, 0);
 
                spin_lock_init(&ch->lock);
-               sema_init(&ch->msg_to_pull_sema, 1);    /* mutex */
-               sema_init(&ch->wdisconnect_sema, 0);    /* event wait */
+               mutex_init(&ch->msg_to_pull_mutex);
+               init_completion(&ch->wdisconnect_wait);
 
                atomic_set(&ch->n_on_msg_allocate_wq, 0);
                init_waitqueue_head(&ch->msg_allocate_wq);
@@ -445,7 +447,7 @@ xpc_allocate_local_msgqueue(struct xpc_channel *ch)
 
                nbytes = nentries * ch->msg_size;
                ch->local_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
-                                               (GFP_KERNEL | GFP_DMA),
+                                               GFP_KERNEL,
                                                &ch->local_msgqueue_base);
                if (ch->local_msgqueue == NULL) {
                        continue;
@@ -453,7 +455,7 @@ xpc_allocate_local_msgqueue(struct xpc_channel *ch)
                memset(ch->local_msgqueue, 0, nbytes);
 
                nbytes = nentries * sizeof(struct xpc_notify);
-               ch->notify_queue = kmalloc(nbytes, (GFP_KERNEL | GFP_DMA));
+               ch->notify_queue = kmalloc(nbytes, GFP_KERNEL);
                if (ch->notify_queue == NULL) {
                        kfree(ch->local_msgqueue_base);
                        ch->local_msgqueue = NULL;
@@ -500,7 +502,7 @@ xpc_allocate_remote_msgqueue(struct xpc_channel *ch)
 
                nbytes = nentries * ch->msg_size;
                ch->remote_msgqueue = xpc_kmalloc_cacheline_aligned(nbytes,
-                                               (GFP_KERNEL | GFP_DMA),
+                                               GFP_KERNEL,
                                                &ch->remote_msgqueue_base);
                if (ch->remote_msgqueue == NULL) {
                        continue;
@@ -534,7 +536,6 @@ static enum xpc_retval
 xpc_allocate_msgqueues(struct xpc_channel *ch)
 {
        unsigned long irq_flags;
-       int i;
        enum xpc_retval ret;
 
 
@@ -552,11 +553,6 @@ xpc_allocate_msgqueues(struct xpc_channel *ch)
                return ret;
        }
 
-       for (i = 0; i < ch->local_nentries; i++) {
-               /* use a semaphore as an event wait queue */
-               sema_init(&ch->notify_queue[i].sema, 0);
-       }
-
        spin_lock_irqsave(&ch->lock, irq_flags);
        ch->flags |= XPC_C_SETUP;
        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -742,7 +738,9 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
 
        /* make sure all activity has settled down first */
 
-       if (atomic_read(&ch->references) > 0) {
+       if (atomic_read(&ch->references) > 0 ||
+                       ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
+                       !(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE))) {
                return;
        }
        DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
@@ -779,7 +777,7 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
 
        /* both sides are disconnected now */
 
-       if (ch->flags & XPC_C_CONNECTCALLOUT) {
+       if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
                spin_unlock_irqrestore(&ch->lock, *irq_flags);
                xpc_disconnect_callout(ch, xpcDisconnected);
                spin_lock_irqsave(&ch->lock, *irq_flags);
@@ -799,10 +797,8 @@ xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
        }
 
        if (ch->flags & XPC_C_WDISCONNECT) {
-               spin_unlock_irqrestore(&ch->lock, *irq_flags);
-               up(&ch->wdisconnect_sema);
-               spin_lock_irqsave(&ch->lock, *irq_flags);
-
+               /* we won't lose the CPU since we're holding ch->lock */
+               complete(&ch->wdisconnect_wait);
        } else if (ch->delayed_IPI_flags) {
                if (part->act_state != XPC_P_DEACTIVATING) {
                        /* time to take action on any delayed IPI flags */
@@ -1092,12 +1088,12 @@ xpc_connect_channel(struct xpc_channel *ch)
        struct xpc_registration *registration = &xpc_registrations[ch->number];
 
 
-       if (down_trylock(&registration->sema) != 0) {
+       if (mutex_trylock(&registration->mutex) == 0) {
                return xpcRetry;
        }
 
        if (!XPC_CHANNEL_REGISTERED(ch->number)) {
-               up(&registration->sema);
+               mutex_unlock(&registration->mutex);
                return xpcUnregistered;
        }
 
@@ -1108,7 +1104,7 @@ xpc_connect_channel(struct xpc_channel *ch)
 
        if (ch->flags & XPC_C_DISCONNECTING) {
                spin_unlock_irqrestore(&ch->lock, irq_flags);
-               up(&registration->sema);
+               mutex_unlock(&registration->mutex);
                return ch->reason;
        }
 
@@ -1140,7 +1136,7 @@ xpc_connect_channel(struct xpc_channel *ch)
                         * channel lock be locked and will unlock and relock
                         * the channel lock as needed.
                         */
-                       up(&registration->sema);
+                       mutex_unlock(&registration->mutex);
                        XPC_DISCONNECT_CHANNEL(ch, xpcUnequalMsgSizes,
                                                                &irq_flags);
                        spin_unlock_irqrestore(&ch->lock, irq_flags);
@@ -1155,7 +1151,7 @@ xpc_connect_channel(struct xpc_channel *ch)
                atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
        }
 
-       up(&registration->sema);
+       mutex_unlock(&registration->mutex);
 
 
        /* initiate the connection */
@@ -1306,7 +1302,7 @@ xpc_process_msg_IPI(struct xpc_partition *part, int ch_number)
                                "delivered=%d, partid=%d, channel=%d\n",
                                nmsgs_sent, ch->partid, ch->number);
 
-                       if (ch->flags & XPC_C_CONNECTCALLOUT) {
+                       if (ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) {
                                xpc_activate_kthreads(ch, nmsgs_sent);
                        }
                }
@@ -2089,7 +2085,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
        enum xpc_retval ret;
 
 
-       if (down_interruptible(&ch->msg_to_pull_sema) != 0) {
+       if (mutex_lock_interruptible(&ch->msg_to_pull_mutex) != 0) {
                /* we were interrupted by a signal */
                return NULL;
        }
@@ -2125,7 +2121,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
 
                        XPC_DEACTIVATE_PARTITION(part, ret);
 
-                       up(&ch->msg_to_pull_sema);
+                       mutex_unlock(&ch->msg_to_pull_mutex);
                        return NULL;
                }
 
@@ -2134,7 +2130,7 @@ xpc_pull_remote_msg(struct xpc_channel *ch, s64 get)
                ch->next_msg_to_pull += nmsgs;
        }
 
-       up(&ch->msg_to_pull_sema);
+       mutex_unlock(&ch->msg_to_pull_mutex);
 
        /* return the message we were looking for */
        msg_offset = (get % ch->remote_nentries) * ch->msg_size;