net/ceph: make ceph_msgr_wq non-reentrant
[pandora-kernel.git] / net / ceph / messenger.c
index 0e8157e..dff633d 100644 (file)
@@ -96,12 +96,10 @@ struct workqueue_struct *ceph_msgr_wq;
 
 int ceph_msgr_init(void)
 {
-       ceph_msgr_wq = create_workqueue("ceph-msgr");
-       if (IS_ERR(ceph_msgr_wq)) {
-               int ret = PTR_ERR(ceph_msgr_wq);
-               pr_err("msgr_init failed to create workqueue: %d\n", ret);
-               ceph_msgr_wq = NULL;
-               return ret;
+       ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+       if (!ceph_msgr_wq) {
+               pr_err("msgr_init failed to create workqueue\n");
+               return -ENOMEM;
        }
        return 0;
 }
@@ -540,8 +538,7 @@ static void prepare_write_message(struct ceph_connection *con)
                /* initialize page iterator */
                con->out_msg_pos.page = 0;
                if (m->pages)
-                       con->out_msg_pos.page_pos =
-                               le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+                       con->out_msg_pos.page_pos = m->page_alignment;
                else
                        con->out_msg_pos.page_pos = 0;
                con->out_msg_pos.data_pos = 0;
@@ -1491,7 +1488,7 @@ static int read_partial_message(struct ceph_connection *con)
        struct ceph_msg *m = con->in_msg;
        int ret;
        int to, left;
-       unsigned front_len, middle_len, data_len, data_off;
+       unsigned front_len, middle_len, data_len;
        int datacrc = con->msgr->nocrc;
        int skip;
        u64 seq;
@@ -1527,19 +1524,17 @@ static int read_partial_message(struct ceph_connection *con)
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
-       data_off = le16_to_cpu(con->in_hdr.data_off);
 
        /* verify seq# */
        seq = le64_to_cpu(con->in_hdr.seq);
        if ((s64)seq - (s64)con->in_seq < 1) {
-               pr_info("skipping %s%lld %s seq %lld, expected %lld\n",
+               pr_info("skipping %s%lld %s seq %lld expected %lld\n",
                        ENTITY_NAME(con->peer_name),
                        ceph_pr_addr(&con->peer_addr.in_addr),
                        seq, con->in_seq + 1);
                con->in_base_pos = -front_len - middle_len - data_len -
                        sizeof(m->footer);
                con->in_tag = CEPH_MSGR_TAG_READY;
-               con->in_seq++;
                return 0;
        } else if ((s64)seq - (s64)con->in_seq > 1) {
                pr_err("read_partial_message bad seq %lld expected %lld\n",
@@ -1576,7 +1571,7 @@ static int read_partial_message(struct ceph_connection *con)
 
                con->in_msg_pos.page = 0;
                if (m->pages)
-                       con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+                       con->in_msg_pos.page_pos = m->page_alignment;
                else
                        con->in_msg_pos.page_pos = 0;
                con->in_msg_pos.data_pos = 0;
@@ -1925,20 +1920,6 @@ bad_tag:
 /*
  * Atomically queue work on a connection.  Bump @con reference to
  * avoid races with connection teardown.
- *
- * There is some trickery going on with QUEUED and BUSY because we
- * only want a _single_ thread operating on each connection at any
- * point in time, but we want to use all available CPUs.
- *
- * The worker thread only proceeds if it can atomically set BUSY.  It
- * clears QUEUED and does it's thing.  When it thinks it's done, it
- * clears BUSY, then rechecks QUEUED.. if it's set again, it loops
- * (tries again to set BUSY).
- *
- * To queue work, we first set QUEUED, _then_ if BUSY isn't set, we
- * try to queue work.  If that fails (work is already queued, or BUSY)
- * we give up (work also already being done or is queued) but leave QUEUED
- * set so that the worker thread will loop if necessary.
  */
 static void queue_con(struct ceph_connection *con)
 {
@@ -1953,11 +1934,7 @@ static void queue_con(struct ceph_connection *con)
                return;
        }
 
-       set_bit(QUEUED, &con->state);
-       if (test_bit(BUSY, &con->state)) {
-               dout("queue_con %p - already BUSY\n", con);
-               con->ops->put(con);
-       } else if (!queue_work(ceph_msgr_wq, &con->work.work)) {
+       if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
                dout("queue_con %p - already queued\n", con);
                con->ops->put(con);
        } else {
@@ -1972,15 +1949,6 @@ static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
-       int backoff = 0;
-
-more:
-       if (test_and_set_bit(BUSY, &con->state) != 0) {
-               dout("con_work %p BUSY already set\n", con);
-               goto out;
-       }
-       dout("con_work %p start, clearing QUEUED\n", con);
-       clear_bit(QUEUED, &con->state);
 
        mutex_lock(&con->mutex);
 
@@ -1999,28 +1967,13 @@ more:
            try_read(con) < 0 ||
            try_write(con) < 0) {
                mutex_unlock(&con->mutex);
-               backoff = 1;
                ceph_fault(con);     /* error/fault path */
                goto done_unlocked;
        }
 
 done:
        mutex_unlock(&con->mutex);
-
 done_unlocked:
-       clear_bit(BUSY, &con->state);
-       dout("con->state=%lu\n", con->state);
-       if (test_bit(QUEUED, &con->state)) {
-               if (!backoff || test_bit(OPENING, &con->state)) {
-                       dout("con_work %p QUEUED reset, looping\n", con);
-                       goto more;
-               }
-               dout("con_work %p QUEUED reset, but just faulted\n", con);
-               clear_bit(QUEUED, &con->state);
-       }
-       dout("con_work %p done\n", con);
-
-out:
        con->ops->put(con);
 }
 
@@ -2301,6 +2254,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
 
        /* data */
        m->nr_pages = 0;
+       m->page_alignment = 0;
        m->pages = NULL;
        m->pagelist = NULL;
        m->bio = NULL;
@@ -2370,6 +2324,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                               type, front_len);
                        return NULL;
                }
+               msg->page_alignment = le16_to_cpu(hdr->data_off);
        }
        memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));