Merge branch 'x86-fixes-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[pandora-kernel.git] / fs / ceph / osd_client.c
index ffe1f40..c7b4ded 100644 (file)
@@ -17,6 +17,8 @@
 #define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd);
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
@@ -228,6 +230,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+       msg_size = p - msg->front.iov_base;
+       msg->front.iov_len = msg_size;
+       msg->hdr.front_len = cpu_to_le32(msg_size);
        return req;
 }
 
@@ -336,6 +341,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
 
+       INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
 }
 
@@ -407,11 +413,22 @@ static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
+       struct ceph_osd_request *req;
        int ret = 0;
 
        dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests)) {
                __remove_osd(osdc, osd);
+       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
+                         &osd->o_con.peer_addr,
+                         sizeof(osd->o_con.peer_addr)) == 0 &&
+                  !ceph_con_opened(&osd->o_con)) {
+               dout(" osd addr hasn't changed and connection never opened,"
+                    " letting msgr retry");
+               /* touch each r_stamp for handle_timeout()'s benfit */
+               list_for_each_entry(req, &osd->o_requests, r_osd_item)
+                       req->r_stamp = jiffies;
+               ret = -EAGAIN;
        } else {
                ceph_con_close(&osd->o_con);
                ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -458,6 +475,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
        return NULL;
 }
 
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+       schedule_delayed_work(&osdc->timeout_work,
+                       osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+       cancel_delayed_work(&osdc->timeout_work);
+}
 
 /*
  * Register request, assign tid.  If this is the first request, set up
@@ -469,21 +496,16 @@ static void register_request(struct ceph_osd_client *osdc,
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
        req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
-       req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
        if (osdc->num_requests == 1) {
-               osdc->timeout_tid = req->r_tid;
-               dout("  timeout on tid %llu at %lu\n", req->r_tid,
-                    req->r_timeout_stamp);
-               schedule_delayed_work(&osdc->timeout_work,
-                     round_jiffies_relative(req->r_timeout_stamp - jiffies));
+               dout(" first request, scheduling timeout\n");
+               __schedule_osd_timeout(osdc);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -510,21 +532,10 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
        ceph_osdc_put_request(req);
 
-       if (req->r_tid == osdc->timeout_tid) {
-               if (osdc->num_requests == 0) {
-                       dout("no requests, canceling timeout\n");
-                       osdc->timeout_tid = 0;
-                       cancel_delayed_work(&osdc->timeout_work);
-               } else {
-                       req = rb_entry(rb_first(&osdc->requests),
-                                      struct ceph_osd_request, r_node);
-                       osdc->timeout_tid = req->r_tid;
-                       dout("rescheduled timeout on tid %llu at %lu\n",
-                            req->r_tid, req->r_timeout_stamp);
-                       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(req->r_timeout_stamp -
-                                                    jiffies));
-               }
+       list_del_init(&req->r_req_lru_item);
+       if (osdc->num_requests == 0) {
+               dout(" no requests, canceling timeout\n");
+               __cancel_osd_timeout(osdc);
        }
 }
 
@@ -537,6 +548,7 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
+       list_del_init(&req->r_req_lru_item);
 }
 
 /*
@@ -632,7 +644,8 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+       req->r_stamp = jiffies;
+       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -653,11 +666,14 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *last_req = NULL;
        struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
+       unsigned long keepalive =
+               osdc->client->mount_args->osd_keepalive_timeout * HZ;
+       unsigned long last_stamp = 0;
        struct rb_node *p;
+       struct list_head slow_osds;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -680,25 +696,56 @@ static void handle_timeout(struct work_struct *work)
                        continue;
                }
        }
-       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
-               osd = rb_entry(p, struct ceph_osd, o_node);
-               if (list_empty(&osd->o_requests))
-                       continue;
-               req = list_first_entry(&osd->o_requests,
-                                      struct ceph_osd_request, r_osd_item);
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
 
-               dout(" tid %llu (at least) timed out on osd%d\n",
+       /*
+        * reset osds that appear to be _really_ unresponsive.  this
+        * is a failsafe measure.. we really shouldn't be getting to
+        * this point if the system is working properly.  the monitors
+        * should mark the osd as failed and we should find out about
+        * it from an updated osd map.
+        */
+       while (!list_empty(&osdc->req_lru)) {
+               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+                                r_req_lru_item);
+
+               if (time_before(jiffies, req->r_stamp + timeout))
+                       break;
+
+               BUG_ON(req == last_req && req->r_stamp == last_stamp);
+               last_req = req;
+               last_stamp = req->r_stamp;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+                          req->r_tid, osd->o_osd);
+               __kick_requests(osdc, osd);
+       }
+
+       /*
+        * ping osds that are a bit slow.  this ensures that if there
+        * is a break in the TCP connection we will notice, and reopen
+        * a connection with that osd (from the fault callback).
+        */
+       INIT_LIST_HEAD(&slow_osds);
+       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+               if (time_before(jiffies, req->r_stamp + keepalive))
+                       break;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               dout(" tid %llu is slow, will send keepalive on osd%d\n",
                     req->r_tid, osd->o_osd);
-               req->r_timeout_stamp = next_timeout;
+               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+       }
+       while (!list_empty(&slow_osds)) {
+               osd = list_entry(slow_osds.next, struct ceph_osd,
+                                o_keepalive_item);
+               list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
+       __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
@@ -816,18 +863,7 @@ bad:
 }
 
 
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd)
 {
        struct ceph_osd_request *req;
@@ -836,9 +872,10 @@ static void kick_requests(struct ceph_osd_client *osdc,
        int err;
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       mutex_lock(&osdc->request_mutex);
        if (kickosd) {
-               __reset_osd(osdc, kickosd);
+               err = __reset_osd(osdc, kickosd);
+               if (err == -EAGAIN)
+                       return 1;
        } else {
                for (p = rb_first(&osdc->osds); p; p = n) {
                        struct ceph_osd *osd =
@@ -889,7 +926,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
 kick:
                dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-                    req->r_osd->o_osd);
+                    req->r_osd ? req->r_osd->o_osd : -1);
                req->r_flags |= CEPH_OSD_FLAG_RETRY;
                err = __send_request(osdc, req);
                if (err) {
@@ -897,14 +934,36 @@ kick:
                        req->r_resend = true;
                }
        }
+
+       return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd)
+{
+       int needmap;
+
+       mutex_lock(&osdc->request_mutex);
+       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-}
 
+}
 /*
  * Process updated osd map.
  *
@@ -1161,11 +1220,11 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
-       osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
        INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->req_lru);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
        INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);