Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net-2.6
[pandora-kernel.git] / fs / ceph / osd_client.c
index dbe63db..3514f71 100644 (file)
@@ -413,11 +413,22 @@ static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
+       struct ceph_osd_request *req;
        int ret = 0;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests)) {
                __remove_osd(osdc, osd);
+       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
+                         &osd->o_con.peer_addr,
+                         sizeof(osd->o_con.peer_addr)) == 0 &&
+                  !ceph_con_opened(&osd->o_con)) {
+               dout(" osd addr hasn't changed and connection never opened,"
+                    " letting msgr retry");
+               /* touch each r_stamp for handle_timeout()'s benfit */
+               list_for_each_entry(req, &osd->o_requests, r_osd_item)
+                       req->r_stamp = jiffies;
+               ret = -EAGAIN;
        } else {
                ceph_con_close(&osd->o_con);
                ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -554,7 +565,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        struct ceph_pg pgid;
-       int o = -1;
+       int acting[CEPH_PG_MAX_SIZE];
+       int o = -1, num = 0;
        int err;
 
        dout("map_osds %p tid %lld\n", req, req->r_tid);
@@ -565,10 +577,16 @@ static int __map_osds(struct ceph_osd_client *osdc,
        pgid = reqhead->layout.ol_pgid;
        req->r_pgid = pgid;
 
-       o = ceph_calc_pg_primary(osdc->osdmap, pgid);
+       err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
+       if (err > 0) {
+               o = acting[0];
+               num = err;
+       }
 
        if ((req->r_osd && req->r_osd->o_osd == o &&
-            req->r_sent >= req->r_osd->o_incarnation) ||
+            req->r_sent >= req->r_osd->o_incarnation &&
+            req->r_num_pg_osds == num &&
+            memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
            (req->r_osd == NULL && o == -1))
                return 0;  /* no change */
 
@@ -576,6 +594,10 @@ static int __map_osds(struct ceph_osd_client *osdc,
             req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
             req->r_osd ? req->r_osd->o_osd : -1);
 
+       /* record full pg acting set */
+       memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
+       req->r_num_pg_osds = num;
+
        if (req->r_osd) {
                __cancel_request(req);
                list_del_init(&req->r_osd_item);
@@ -601,7 +623,7 @@ static int __map_osds(struct ceph_osd_client *osdc,
                __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
        }
-       err = 1;   /* osd changed */
+       err = 1;   /* osd or pg changed */
 
 out:
        return err;
@@ -633,7 +655,7 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_sent_stamp = jiffies;
+       req->r_stamp = jiffies;
        list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
@@ -660,7 +682,7 @@ static void handle_timeout(struct work_struct *work)
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
        unsigned long keepalive =
                osdc->client->mount_args->osd_keepalive_timeout * HZ;
-       unsigned long last_sent = 0;
+       unsigned long last_stamp = 0;
        struct rb_node *p;
        struct list_head slow_osds;
 
@@ -697,12 +719,12 @@ static void handle_timeout(struct work_struct *work)
                req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
                                 r_req_lru_item);
 
-               if (time_before(jiffies, req->r_sent_stamp + timeout))
+               if (time_before(jiffies, req->r_stamp + timeout))
                        break;
 
-               BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+               BUG_ON(req == last_req && req->r_stamp == last_stamp);
                last_req = req;
-               last_sent = req->r_sent_stamp;
+               last_stamp = req->r_stamp;
 
                osd = req->r_osd;
                BUG_ON(!osd);
@@ -718,7 +740,7 @@ static void handle_timeout(struct work_struct *work)
         */
        INIT_LIST_HEAD(&slow_osds);
        list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
-               if (time_before(jiffies, req->r_sent_stamp + keepalive))
+               if (time_before(jiffies, req->r_stamp + keepalive))
                        break;
 
                osd = req->r_osd;
@@ -768,16 +790,18 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        struct ceph_osd_request *req;
        u64 tid;
        int numops, object_len, flags;
+       s32 result;
 
        tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*rhead))
                goto bad;
        numops = le32_to_cpu(rhead->num_ops);
        object_len = le32_to_cpu(rhead->object_len);
+       result = le32_to_cpu(rhead->result);
        if (msg->front.iov_len != sizeof(*rhead) + object_len +
            numops * sizeof(struct ceph_osd_op))
                goto bad;
-       dout("handle_reply %p tid %llu\n", msg, tid);
+       dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
 
        /* lookup */
        mutex_lock(&osdc->request_mutex);
@@ -823,7 +847,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        dout("handle_reply tid %llu flags %d\n", tid, flags);
 
        /* either this is a read, or we got the safe response */
-       if ((flags & CEPH_OSD_FLAG_ONDISK) ||
+       if (result < 0 ||
+           (flags & CEPH_OSD_FLAG_ONDISK) ||
            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
                __unregister_request(osdc, req);
 
@@ -862,7 +887,9 @@ static int __kick_requests(struct ceph_osd_client *osdc,
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
        if (kickosd) {
-               __reset_osd(osdc, kickosd);
+               err = __reset_osd(osdc, kickosd);
+               if (err == -EAGAIN)
+                       return 1;
        } else {
                for (p = rb_first(&osdc->osds); p; p = n) {
                        struct ceph_osd *osd =
@@ -913,7 +940,7 @@ static int __kick_requests(struct ceph_osd_client *osdc,
 
 kick:
                dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-                    req->r_osd->o_osd);
+                    req->r_osd ? req->r_osd->o_osd : -1);
                req->r_flags |= CEPH_OSD_FLAG_RETRY;
                err = __send_request(osdc, req);
                if (err) {