perf: Fix race in swevent hash
[pandora-kernel.git] / net / ceph / osd_client.c
index 16836a7..2df98a6 100644 (file)
@@ -217,6 +217,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
        req->r_flags = flags;
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -226,7 +227,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
        else
                msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
-                                  OSD_OPREPLY_FRONT_LEN, gfp_flags);
+                                  OSD_OPREPLY_FRONT_LEN, gfp_flags, true);
        if (!msg) {
                ceph_osdc_put_request(req);
                return NULL;
@@ -243,13 +244,13 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
                ceph_pagelist_init(req->r_trail);
        }
        /* create request message; allow space for oid */
-       msg_size += 40;
+       msg_size += MAX_OBJ_NAME_SIZE;
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
        if (use_mempool)
                msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
-               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
+               msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true);
        if (!msg) {
                ceph_osdc_put_request(req);
                return NULL;
@@ -677,12 +678,22 @@ static void put_osd(struct ceph_osd *osd)
  */
 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       dout("__remove_osd %p\n", osd);
+       dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
        BUG_ON(!list_empty(&osd->o_requests));
-       rb_erase(&osd->o_node, &osdc->osds);
        list_del_init(&osd->o_osd_lru);
-       ceph_con_close(&osd->o_con);
-       put_osd(osd);
+       rb_erase(&osd->o_node, &osdc->osds);
+       RB_CLEAR_NODE(&osd->o_node);
+}
+
+static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+{
+       dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
+
+       if (!RB_EMPTY_NODE(&osd->o_node)) {
+               ceph_con_close(&osd->o_con);
+               __remove_osd(osdc, osd);
+               put_osd(osd);
+       }
 }
 
 static void remove_all_osds(struct ceph_osd_client *osdc)
@@ -692,7 +703,7 @@ static void remove_all_osds(struct ceph_osd_client *osdc)
        while (!RB_EMPTY_ROOT(&osdc->osds)) {
                struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
                                                struct ceph_osd, o_node);
-               __remove_osd(osdc, osd);
+               remove_osd(osdc, osd);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -722,7 +733,7 @@ static void remove_old_osds(struct ceph_osd_client *osdc)
        list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
                if (time_before(jiffies, osd->lru_ttl))
                        break;
-               __remove_osd(osdc, osd);
+               remove_osd(osdc, osd);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -738,7 +749,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
-               __remove_osd(osdc, osd);
+               remove_osd(osdc, osd);
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
                          sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -816,13 +827,10 @@ static void __register_request(struct ceph_osd_client *osdc,
 {
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
-       INIT_LIST_HEAD(&req->r_req_lru_item);
-
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
-
        if (osdc->num_requests == 1) {
                dout(" first request, scheduling timeout\n");
                __schedule_osd_timeout(osdc);
@@ -945,7 +953,7 @@ EXPORT_SYMBOL(ceph_osdc_set_request_linger);
  * Caller should hold map_sem for read and request_mutex.
  */
 static int __map_request(struct ceph_osd_client *osdc,
-                        struct ceph_osd_request *req)
+                        struct ceph_osd_request *req, int force_resend)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
@@ -969,7 +977,8 @@ static int __map_request(struct ceph_osd_client *osdc,
                num = err;
        }
 
-       if ((req->r_osd && req->r_osd->o_osd == o &&
+       if ((!force_resend &&
+            req->r_osd && req->r_osd->o_osd == o &&
             req->r_sent >= req->r_osd->o_incarnation &&
             req->r_num_pg_osds == num &&
             memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
@@ -1272,6 +1281,7 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
        struct rb_node *p, *n;
 
+       dout("%s %p\n", __func__, osdc);
        for (p = rb_first(&osdc->osds); p; p = n) {
                struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
 
@@ -1291,18 +1301,18 @@ static void reset_changed_osds(struct ceph_osd_client *osdc)
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static void kick_requests(struct ceph_osd_client *osdc)
+static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
 {
        struct ceph_osd_request *req, *nreq;
        struct rb_node *p;
        int needmap = 0;
        int err;
 
-       dout("kick_requests\n");
+       dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
        mutex_lock(&osdc->request_mutex);
        for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
                req = rb_entry(p, struct ceph_osd_request, r_node);
-               err = __map_request(osdc, req);
+               err = __map_request(osdc, req, force_resend);
                if (err < 0)
                        continue;  /* error */
                if (req->r_osd == NULL) {
@@ -1320,7 +1330,7 @@ static void kick_requests(struct ceph_osd_client *osdc)
                                 r_linger_item) {
                dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
 
-               err = __map_request(osdc, req);
+               err = __map_request(osdc, req, force_resend);
                if (err == 0)
                        continue;  /* no change and no osd was specified */
                if (err < 0)
@@ -1397,7 +1407,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
                        }
-                       kick_requests(osdc);
+                       kick_requests(osdc, 0);
                        reset_changed_osds(osdc);
                } else {
                        dout("ignoring incremental map %u len %d\n",
@@ -1425,6 +1435,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                             "older than our %u\n", epoch, maplen,
                             osdc->osdmap->epoch);
                } else {
+                       int skipped_map = 0;
+
                        dout("taking full map %u len %d\n", epoch, maplen);
                        newmap = osdmap_decode(&p, p+maplen);
                        if (IS_ERR(newmap)) {
@@ -1434,9 +1446,12 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                        BUG_ON(!newmap);
                        oldmap = osdc->osdmap;
                        osdc->osdmap = newmap;
-                       if (oldmap)
+                       if (oldmap) {
+                               if (oldmap->epoch + 1 < newmap->epoch)
+                                       skipped_map = 1;
                                ceph_osdmap_destroy(oldmap);
-                       kick_requests(osdc);
+                       }
+                       kick_requests(osdc, skipped_map);
                }
                p += maplen;
                nr_maps--;
@@ -1709,12 +1724,14 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
         * the request still han't been touched yet.
         */
        if (req->r_sent == 0) {
-               rc = __map_request(osdc, req);
+               rc = __map_request(osdc, req, 0);
                if (rc < 0) {
                        if (nofail) {
                                dout("osdc_start_request failed map, "
                                     " will retry %lld\n", req->r_tid);
                                rc = 0;
+                       } else {
+                               __unregister_request(osdc, req);
                        }
                        goto out_unlock;
                }
@@ -2034,7 +2051,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (front > req->r_reply->front.iov_len) {
                pr_warning("get_reply front %d > preallocated %d\n",
                           front, (int)req->r_reply->front.iov_len);
-               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
                if (!m)
                        goto out;
                ceph_msg_put(req->r_reply);
@@ -2082,7 +2099,7 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        switch (type) {
        case CEPH_MSG_OSD_MAP:
        case CEPH_MSG_WATCH_NOTIFY:
-               return ceph_msg_new(type, front, GFP_NOFS);
+               return ceph_msg_new(type, front, GFP_NOFS, false);
        case CEPH_MSG_OSD_OPREPLY:
                return get_reply(con, hdr, skip);
        default: