RDS: Do not mask address when pinning pages
[pandora-kernel.git] / net / rds / rdma.c
index 75fd13b..1929cb8 100644 (file)
@@ -35,7 +35,7 @@
 #include <linux/rbtree.h>
 #include <linux/dma-mapping.h> /* for DMA_*_DEVICE */
 
-#include "rdma.h"
+#include "rds.h"
 
 /*
  * XXX
@@ -130,14 +130,22 @@ void rds_rdma_drop_keys(struct rds_sock *rs)
 {
        struct rds_mr *mr;
        struct rb_node *node;
+       unsigned long flags;
 
        /* Release any MRs associated with this socket */
+       spin_lock_irqsave(&rs->rs_rdma_lock, flags);
        while ((node = rb_first(&rs->rs_rdma_keys))) {
                mr = container_of(node, struct rds_mr, r_rb_node);
                if (mr->r_trans == rs->rs_transport)
                        mr->r_invalidate = 0;
+               rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
+               RB_CLEAR_NODE(&mr->r_rb_node);
+               spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
+               rds_destroy_mr(mr);
                rds_mr_put(mr);
+               spin_lock_irqsave(&rs->rs_rdma_lock, flags);
        }
+       spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 
        if (rs->rs_transport && rs->rs_transport->flush_mrs)
                rs->rs_transport->flush_mrs();
@@ -181,7 +189,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
                goto out;
        }
 
-       if (rs->rs_transport->get_mr == NULL) {
+       if (!rs->rs_transport->get_mr) {
                ret = -EOPNOTSUPP;
                goto out;
        }
@@ -197,13 +205,13 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 
        /* XXX clamp nr_pages to limit the size of this alloc? */
        pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
-       if (pages == NULL) {
+       if (!pages) {
                ret = -ENOMEM;
                goto out;
        }
 
        mr = kzalloc(sizeof(struct rds_mr), GFP_KERNEL);
-       if (mr == NULL) {
+       if (!mr) {
                ret = -ENOMEM;
                goto out;
        }
@@ -230,13 +238,13 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
         * r/o or r/w. We need to assume r/w, or we'll do a lot of RDMA to
         * the zero page.
         */
-       ret = rds_pin_pages(args->vec.addr & PAGE_MASK, nr_pages, pages, 1);
+       ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
        if (ret < 0)
                goto out;
 
        nents = ret;
        sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
-       if (sg == NULL) {
+       if (!sg) {
                ret = -ENOMEM;
                goto out;
        }
@@ -406,27 +414,30 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
 
        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
-       if (mr && (mr->r_use_once || force)) {
+       if (!mr) {
+               printk(KERN_ERR "rds: trying to unuse MR with unknown r_key %u!\n", r_key);
+               spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
+               return;
+       }
+
+       if (mr->r_use_once || force) {
                rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
                RB_CLEAR_NODE(&mr->r_rb_node);
                zot_me = 1;
-       } else if (mr)
-               atomic_inc(&mr->r_refcount);
+       }
        spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
 
        /* May have to issue a dma_sync on this memory region.
         * Note we could avoid this if the operation was a RDMA READ,
         * but at this point we can't tell. */
-       if (mr != NULL) {
-               if (mr->r_trans->sync_mr)
-                       mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
-
-               /* If the MR was marked as invalidate, this will
-                * trigger an async flush. */
-               if (zot_me)
-                       rds_destroy_mr(mr);
-               rds_mr_put(mr);
-       }
+       if (mr->r_trans->sync_mr)
+               mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
+
+       /* If the MR was marked as invalidate, this will
+        * trigger an async flush. */
+       if (zot_me)
+               rds_destroy_mr(mr);
+       rds_mr_put(mr);
 }
 
 void rds_rdma_free_op(struct rds_rdma_op *ro)
@@ -440,33 +451,67 @@ void rds_rdma_free_op(struct rds_rdma_op *ro)
                 * is the case for a RDMA_READ which copies from remote
                 * to local memory */
                if (!ro->r_write) {
-                       BUG_ON(in_interrupt());
+                       BUG_ON(irqs_disabled());
                        set_page_dirty(page);
                }
                put_page(page);
        }
 
        kfree(ro->r_notifier);
-       kfree(ro);
+       ro->r_notifier = NULL;
+       ro->r_active = 0;
+}
+
+/*
+ * Count the number of pages needed to describe an incoming iovec.
+ */
+static int rds_rdma_pages(struct rds_rdma_args *args)
+{
+       struct rds_iovec vec;
+       struct rds_iovec __user *local_vec;
+       unsigned int tot_pages = 0;
+       unsigned int nr_pages;
+       unsigned int i;
+
+       local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
+
+       /* figure out the number of pages in the vector */
+       for (i = 0; i < args->nr_local; i++) {
+               if (copy_from_user(&vec, &local_vec[i],
+                                  sizeof(struct rds_iovec)))
+                       return -EFAULT;
+
+               nr_pages = rds_pages_in_vec(&vec);
+               if (nr_pages == 0)
+                       return -EINVAL;
+
+               tot_pages += nr_pages;
+       }
+
+       return tot_pages;
+}
+
+int rds_rdma_extra_size(struct rds_rdma_args *args)
+{
+       return rds_rdma_pages(args) * sizeof(struct scatterlist);
 }
 
 /*
  * args is a pointer to an in-kernel copy in the sendmsg cmsg.
  */
-static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
-                                           struct rds_rdma_args *args)
+static int rds_rdma_prepare(struct rds_message *rm,
+                           struct rds_sock *rs,
+                           struct rds_rdma_args *args)
 {
        struct rds_iovec vec;
-       struct rds_rdma_op *op = NULL;
+       struct rds_rdma_op *op = &rm->rdma.m_rdma_op;
        unsigned int nr_pages;
-       unsigned int max_pages;
        unsigned int nr_bytes;
        struct page **pages = NULL;
        struct rds_iovec __user *local_vec;
-       struct scatterlist *sg;
        unsigned int nr;
        unsigned int i, j;
-       int ret;
+       int ret = 0;
 
 
        if (rs->rs_bound_addr == 0) {
@@ -479,37 +524,12 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
                goto out;
        }
 
-       nr_pages = 0;
-       max_pages = 0;
-
-       local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
-
-       /* figure out the number of pages in the vector */
-       for (i = 0; i < args->nr_local; i++) {
-               if (copy_from_user(&vec, &local_vec[i],
-                                  sizeof(struct rds_iovec))) {
-                       ret = -EFAULT;
-                       goto out;
-               }
-
-               nr = rds_pages_in_vec(&vec);
-               if (nr == 0) {
-                       ret = -EINVAL;
-                       goto out;
-               }
-
-               max_pages = max(nr, max_pages);
-               nr_pages += nr;
-       }
-
-       pages = kcalloc(max_pages, sizeof(struct page *), GFP_KERNEL);
-       if (pages == NULL) {
-               ret = -ENOMEM;
+       nr_pages = rds_rdma_pages(args);
+       if (nr_pages < 0)
                goto out;
-       }
 
-       op = kzalloc(offsetof(struct rds_rdma_op, r_sg[nr_pages]), GFP_KERNEL);
-       if (op == NULL) {
+       pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
+       if (!pages) {
                ret = -ENOMEM;
                goto out;
        }
@@ -517,9 +537,10 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
        op->r_write = !!(args->flags & RDS_RDMA_READWRITE);
        op->r_fence = !!(args->flags & RDS_RDMA_FENCE);
        op->r_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+       op->r_active = 1;
        op->r_recverr = rs->rs_recverr;
        WARN_ON(!nr_pages);
-       sg_init_table(op->r_sg, nr_pages);
+       op->r_sg = rds_message_alloc_sgs(rm, nr_pages);
 
        if (op->r_notify || op->r_recverr) {
                /* We allocate an uninitialized notifier here, because
@@ -553,6 +574,8 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
               (unsigned long long)args->remote_vec.addr,
               op->r_key);
 
+       local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
+
        for (i = 0; i < args->nr_local; i++) {
                if (copy_from_user(&vec, &local_vec[i],
                                   sizeof(struct rds_iovec))) {
@@ -569,15 +592,10 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
                rs->rs_user_addr = vec.addr;
                rs->rs_user_bytes = vec.bytes;
 
-               /* did the user change the vec under us? */
-               if (nr > max_pages || op->r_nents + nr > nr_pages) {
-                       ret = -EINVAL;
-                       goto out;
-               }
                /* If it's a WRITE operation, we want to pin the pages for reading.
                 * If it's a READ operation, we need to pin the pages for writing.
                 */
-               ret = rds_pin_pages(vec.addr & PAGE_MASK, nr, pages, !op->r_write);
+               ret = rds_pin_pages(vec.addr, nr, pages, !op->r_write);
                if (ret < 0)
                        goto out;
 
@@ -588,6 +606,7 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
 
                for (j = 0; j < nr; j++) {
                        unsigned int offset = vec.addr & ~PAGE_MASK;
+                       struct scatterlist *sg;
 
                        sg = &op->r_sg[op->r_nents + j];
                        sg_set_page(sg, pages[j],
@@ -617,12 +636,10 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
        ret = 0;
 out:
        kfree(pages);
-       if (ret) {
-               if (op)
-                       rds_rdma_free_op(op);
-               op = ERR_PTR(ret);
-       }
-       return op;
+       if (ret)
+               rds_rdma_free_op(op);
+
+       return ret;
 }
 
 /*
@@ -632,17 +649,17 @@ out:
 int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
                          struct cmsghdr *cmsg)
 {
-       struct rds_rdma_op *op;
+       int ret;
 
        if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args)) ||
-           rm->m_rdma_op != NULL)
+           rm->rdma.m_rdma_op.r_active)
                return -EINVAL;
 
-       op = rds_rdma_prepare(rs, CMSG_DATA(cmsg));
-       if (IS_ERR(op))
-               return PTR_ERR(op);
+       ret = rds_rdma_prepare(rm, rs, CMSG_DATA(cmsg));
+       if (ret)
+               return ret;
+
        rds_stats_inc(s_send_rdma);
-       rm->m_rdma_op = op;
        return 0;
 }
 
@@ -671,9 +688,10 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
         */
        r_key = rds_rdma_cookie_key(rm->m_rdma_cookie);
 
+
        spin_lock_irqsave(&rs->rs_rdma_lock, flags);
        mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
-       if (mr == NULL)
+       if (!mr)
                err = -EINVAL;  /* invalid r_key */
        else
                atomic_inc(&mr->r_refcount);
@@ -681,7 +699,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 
        if (mr) {
                mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
-               rm->m_rdma_mr = mr;
+               rm->rdma.m_rdma_mr = mr;
        }
        return err;
 }
@@ -699,5 +717,77 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
            rm->m_rdma_cookie != 0)
                return -EINVAL;
 
-       return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
+       return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.m_rdma_mr);
+}
+
+/*
+ * Fill in rds_message for an atomic request.
+ */
+int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
+                   struct cmsghdr *cmsg)
+{
+       struct page *page = NULL;
+       struct rds_atomic_args *args;
+       int ret = 0;
+
+       if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
+        || rm->atomic.op_active)
+               return -EINVAL;
+
+       args = CMSG_DATA(cmsg);
+
+       if (cmsg->cmsg_type == RDS_CMSG_ATOMIC_CSWP) {
+               rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
+               rm->atomic.op_swap_add = args->cswp.swap;
+               rm->atomic.op_compare = args->cswp.compare;
+       } else {
+               rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
+               rm->atomic.op_swap_add = args->fadd.add;
+       }
+
+       rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+       rm->atomic.op_recverr = rs->rs_recverr;
+       rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
+
+       /* verify 8 byte-aligned */
+       if (args->local_addr & 0x7) {
+               ret = -EFAULT;
+               goto err;
+       }
+
+       ret = rds_pin_pages(args->local_addr, 1, &page, 1);
+       if (ret != 1)
+               goto err;
+       ret = 0;
+
+       sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
+
+       if (rm->atomic.op_notify || rm->atomic.op_recverr) {
+               /* We allocate an uninitialized notifier here, because
+                * we don't want to do that in the completion handler. We
+                * would have to use GFP_ATOMIC there, and don't want to deal
+                * with failed allocations.
+                */
+               rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
+               if (!rm->atomic.op_notifier) {
+                       ret = -ENOMEM;
+                       goto err;
+               }
+
+               rm->atomic.op_notifier->n_user_token = args->user_token;
+               rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
+       }
+
+       rm->atomic.op_rkey = rds_rdma_cookie_key(args->cookie);
+       rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
+
+       rm->atomic.op_active = 1;
+
+       return ret;
+err:
+       if (page)
+               put_page(page);
+       kfree(rm->atomic.op_notifier);
+
+       return ret;
 }