Merge branch 'x86-fixes-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git...
[pandora-kernel.git] / fs / ceph / osd_client.c
index 7db14ba..c7b4ded 100644 (file)
 #include "osd_client.h"
 #include "messenger.h"
 #include "decode.h"
+#include "auth.h"
+
+#define OSD_OP_FRONT_LEN       4096
+#define OSD_OPREPLY_FRONT_LEN  512
 
 const static struct ceph_connection_operations osd_con_ops;
+static int __kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd);
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
@@ -72,29 +78,34 @@ static void calc_layout(struct ceph_osd_client *osdc,
             req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
-
 /*
  * requests
  */
-void ceph_osdc_put_request(struct ceph_osd_request *req)
+void ceph_osdc_release_request(struct kref *kref)
 {
-       dout("osdc put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
-            atomic_read(&req->r_ref)-1);
-       BUG_ON(atomic_read(&req->r_ref) <= 0);
-       if (atomic_dec_and_test(&req->r_ref)) {
-               if (req->r_request)
-                       ceph_msg_put(req->r_request);
-               if (req->r_reply)
-                       ceph_msg_put(req->r_reply);
-               if (req->r_own_pages)
-                       ceph_release_page_vector(req->r_pages,
-                                                req->r_num_pages);
-               ceph_put_snap_context(req->r_snapc);
-               if (req->r_mempool)
-                       mempool_free(req, req->r_osdc->req_mempool);
-               else
-                       kfree(req);
+       struct ceph_osd_request *req = container_of(kref,
+                                                   struct ceph_osd_request,
+                                                   r_kref);
+
+       if (req->r_request)
+               ceph_msg_put(req->r_request);
+       if (req->r_reply)
+               ceph_msg_put(req->r_reply);
+       if (req->r_con_filling_msg) {
+               dout("release_request revoking pages %p from con %p\n",
+                    req->r_pages, req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg,
+                                     req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
        }
+       if (req->r_own_pages)
+               ceph_release_page_vector(req->r_pages,
+                                        req->r_num_pages);
+       ceph_put_snap_context(req->r_snapc);
+       if (req->r_mempool)
+               mempool_free(req, req->r_osdc->req_mempool);
+       else
+               kfree(req);
 }
 
 /*
@@ -125,11 +136,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        struct ceph_osd_request_head *head;
        struct ceph_osd_op *op;
        void *p;
-       int do_trunc = truncate_seq && (off + *plen > truncate_size);
-       int num_op = 1 + do_sync + do_trunc;
+       int num_op = 1 + do_sync;
        size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
-       int err, i;
-       u64 prevofs;
+       int i;
 
        if (use_mempool) {
                req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
@@ -140,15 +149,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
-       err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
-       if (err) {
-               ceph_osdc_put_request(req);
-               return ERR_PTR(-ENOMEM);
-       }
-
        req->r_osdc = osdc;
        req->r_mempool = use_mempool;
-       atomic_set(&req->r_ref, 1);
+       kref_init(&req->r_kref);
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
        INIT_LIST_HEAD(&req->r_unsafe_item);
@@ -156,7 +159,19 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
 
-       /* create message; allow space for oid */
+       /* create reply message */
+       if (use_mempool)
+               msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
+       else
+               msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
+                                  OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
+       if (IS_ERR(msg)) {
+               ceph_osdc_put_request(req);
+               return ERR_PTR(PTR_ERR(msg));
+       }
+       req->r_reply = msg;
+
+       /* create request message; allow space for oid */
        msg_size += 40;
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
@@ -165,7 +180,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        else
                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
        if (IS_ERR(msg)) {
-               ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
                ceph_osdc_put_request(req);
                return ERR_PTR(PTR_ERR(msg));
        }
@@ -194,22 +208,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                req->r_request->hdr.data_len = cpu_to_le32(*plen);
                op->payload_len = cpu_to_le32(*plen);
        }
+       op->extent.truncate_size = cpu_to_le64(truncate_size);
+       op->extent.truncate_seq = cpu_to_le32(truncate_seq);
 
        /* fill in oid */
        head->object_len = cpu_to_le32(req->r_oid_len);
        memcpy(p, req->r_oid, req->r_oid_len);
        p += req->r_oid_len;
 
-       /* additional ops */
-       if (do_trunc) {
-               op++;
-               op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ?
-                            CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
-               op->trunc.truncate_seq = cpu_to_le32(truncate_seq);
-               prevofs = le64_to_cpu((op-1)->extent.offset);
-               op->trunc.truncate_size = cpu_to_le64(truncate_size -
-                                                     (off-prevofs));
-       }
        if (do_sync) {
                op++;
                op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
@@ -224,6 +230,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
+       msg_size = p - msg->front.iov_base;
+       msg->front.iov_len = msg_size;
+       msg->hdr.front_len = cpu_to_le32(msg_size);
        return req;
 }
 
@@ -305,7 +314,6 @@ static void osd_reset(struct ceph_connection *con)
                return;
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
-       osd->o_incarnation++;
        down_read(&osdc->map_sem);
        kick_requests(osdc, osd);
        up_read(&osdc->map_sem);
@@ -325,12 +333,15 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        INIT_LIST_HEAD(&osd->o_requests);
+       INIT_LIST_HEAD(&osd->o_osd_lru);
        osd->o_incarnation = 1;
 
        ceph_con_init(osdc->client->msgr, &osd->o_con);
        osd->o_con.private = osd;
        osd->o_con.ops = &osd_con_ops;
        osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+
+       INIT_LIST_HEAD(&osd->o_keepalive_item);
        return osd;
 }
 
@@ -350,34 +361,74 @@ static void put_osd(struct ceph_osd *osd)
 {
        dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
             atomic_read(&osd->o_ref) - 1);
-       if (atomic_dec_and_test(&osd->o_ref)) {
-               ceph_con_shutdown(&osd->o_con);
+       if (atomic_dec_and_test(&osd->o_ref))
                kfree(osd);
-       }
 }
 
 /*
  * remove an osd from our map
  */
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
-       dout("remove_osd %p\n", osd);
+       dout("__remove_osd %p\n", osd);
        BUG_ON(!list_empty(&osd->o_requests));
        rb_erase(&osd->o_node, &osdc->osds);
+       list_del_init(&osd->o_osd_lru);
        ceph_con_close(&osd->o_con);
        put_osd(osd);
 }
 
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+                             struct ceph_osd *osd)
+{
+       dout("__move_osd_to_lru %p\n", osd);
+       BUG_ON(!list_empty(&osd->o_osd_lru));
+       list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+       osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+       dout("__remove_osd_from_lru %p\n", osd);
+       if (!list_empty(&osd->o_osd_lru))
+               list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+       struct ceph_osd *osd, *nosd;
+
+       dout("__remove_old_osds %p\n", osdc);
+       mutex_lock(&osdc->request_mutex);
+       list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+               if (!remove_all && time_before(jiffies, osd->lru_ttl))
+                       break;
+               __remove_osd(osdc, osd);
+       }
+       mutex_unlock(&osdc->request_mutex);
+}
+
 /*
  * reset osd connect
  */
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
+       struct ceph_osd_request *req;
        int ret = 0;
 
-       dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+       dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
        if (list_empty(&osd->o_requests)) {
-               remove_osd(osdc, osd);
+               __remove_osd(osdc, osd);
+       } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
+                         &osd->o_con.peer_addr,
+                         sizeof(osd->o_con.peer_addr)) == 0 &&
+                  !ceph_con_opened(&osd->o_con)) {
+               dout(" osd addr hasn't changed and connection never opened,"
+                    " letting msgr retry");
+               /* touch each r_stamp for handle_timeout()'s benfit */
+               list_for_each_entry(req, &osd->o_requests, r_osd_item)
+                       req->r_stamp = jiffies;
+               ret = -EAGAIN;
        } else {
                ceph_con_close(&osd->o_con);
                ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -424,6 +475,16 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
        return NULL;
 }
 
+static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
+{
+       schedule_delayed_work(&osdc->timeout_work,
+                       osdc->client->mount_args->osd_keepalive_timeout * HZ);
+}
+
+static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
+{
+       cancel_delayed_work(&osdc->timeout_work);
+}
 
 /*
  * Register request, assign tid.  If this is the first request, set up
@@ -432,26 +493,19 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 static void register_request(struct ceph_osd_client *osdc,
                             struct ceph_osd_request *req)
 {
-       struct ceph_osd_request_head *head = req->r_request->front.iov_base;
-
        mutex_lock(&osdc->request_mutex);
        req->r_tid = ++osdc->last_tid;
-       head->tid = cpu_to_le64(req->r_tid);
+       req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
+       INIT_LIST_HEAD(&req->r_req_lru_item);
 
        dout("register_request %p tid %lld\n", req, req->r_tid);
        __insert_request(osdc, req);
        ceph_osdc_get_request(req);
        osdc->num_requests++;
 
-       req->r_timeout_stamp =
-               jiffies + osdc->client->mount_args->osd_timeout*HZ;
-
        if (osdc->num_requests == 1) {
-               osdc->timeout_tid = req->r_tid;
-               dout("  timeout on tid %llu at %lu\n", req->r_tid,
-                    req->r_timeout_stamp);
-               schedule_delayed_work(&osdc->timeout_work,
-                     round_jiffies_relative(req->r_timeout_stamp - jiffies));
+               dout(" first request, scheduling timeout\n");
+               __schedule_osd_timeout(osdc);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -472,27 +526,16 @@ static void __unregister_request(struct ceph_osd_client *osdc,
 
                list_del_init(&req->r_osd_item);
                if (list_empty(&req->r_osd->o_requests))
-                       remove_osd(osdc, req->r_osd);
+                       __move_osd_to_lru(osdc, req->r_osd);
                req->r_osd = NULL;
        }
 
        ceph_osdc_put_request(req);
 
-       if (req->r_tid == osdc->timeout_tid) {
-               if (osdc->num_requests == 0) {
-                       dout("no requests, canceling timeout\n");
-                       osdc->timeout_tid = 0;
-                       cancel_delayed_work(&osdc->timeout_work);
-               } else {
-                       req = rb_entry(rb_first(&osdc->requests),
-                                      struct ceph_osd_request, r_node);
-                       osdc->timeout_tid = req->r_tid;
-                       dout("rescheduled timeout on tid %llu at %lu\n",
-                            req->r_tid, req->r_timeout_stamp);
-                       schedule_delayed_work(&osdc->timeout_work,
-                             round_jiffies_relative(req->r_timeout_stamp -
-                                                    jiffies));
-               }
+       list_del_init(&req->r_req_lru_item);
+       if (osdc->num_requests == 0) {
+               dout(" no requests, canceling timeout\n");
+               __cancel_osd_timeout(osdc);
        }
 }
 
@@ -505,6 +548,7 @@ static void __cancel_request(struct ceph_osd_request *req)
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
                req->r_sent = 0;
        }
+       list_del_init(&req->r_req_lru_item);
 }
 
 /*
@@ -523,7 +567,6 @@ static int __map_osds(struct ceph_osd_client *osdc,
        struct ceph_pg pgid;
        int o = -1;
        int err;
-       struct ceph_osd *newosd = NULL;
 
        dout("map_osds %p tid %lld\n", req, req->r_tid);
        err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
@@ -531,6 +574,8 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (err)
                return err;
        pgid = reqhead->layout.ol_pgid;
+       req->r_pgid = pgid;
+
        o = ceph_calc_pg_primary(osdc->osdmap, pgid);
 
        if ((req->r_osd && req->r_osd->o_osd == o &&
@@ -545,25 +590,15 @@ static int __map_osds(struct ceph_osd_client *osdc,
        if (req->r_osd) {
                __cancel_request(req);
                list_del_init(&req->r_osd_item);
-               if (list_empty(&req->r_osd->o_requests)) {
-                       /* try to re-use r_osd if possible */
-                       newosd = get_osd(req->r_osd);
-                       remove_osd(osdc, newosd);
-               }
                req->r_osd = NULL;
        }
 
        req->r_osd = __lookup_osd(osdc, o);
        if (!req->r_osd && o >= 0) {
-               if (newosd) {
-                       req->r_osd = newosd;
-                       newosd = NULL;
-               } else {
-                       err = -ENOMEM;
-                       req->r_osd = create_osd(osdc);
-                       if (!req->r_osd)
-                               goto out;
-               }
+               err = -ENOMEM;
+               req->r_osd = create_osd(osdc);
+               if (!req->r_osd)
+                       goto out;
 
                dout("map_osds osd %p is osd%d\n", req->r_osd, o);
                req->r_osd->o_osd = o;
@@ -573,13 +608,13 @@ static int __map_osds(struct ceph_osd_client *osdc,
                ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
        }
 
-       if (req->r_osd)
+       if (req->r_osd) {
+               __remove_osd_from_lru(req->r_osd);
                list_add(&req->r_osd_item, &req->r_osd->o_requests);
+       }
        err = 1;   /* osd changed */
 
 out:
-       if (newosd)
-               put_osd(newosd);
        return err;
 }
 
@@ -609,7 +644,8 @@ static int __send_request(struct ceph_osd_client *osdc,
        reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
        reqhead->reassert_version = req->r_reassert_version;
 
-       req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
+       req->r_stamp = jiffies;
+       list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -630,11 +666,14 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req;
+       struct ceph_osd_request *req, *last_req = NULL;
        struct ceph_osd *osd;
        unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
-       unsigned long next_timeout = timeout + jiffies;
+       unsigned long keepalive =
+               osdc->client->mount_args->osd_keepalive_timeout * HZ;
+       unsigned long last_stamp = 0;
        struct rb_node *p;
+       struct list_head slow_osds;
 
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -657,44 +696,93 @@ static void handle_timeout(struct work_struct *work)
                        continue;
                }
        }
-       for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
-               osd = rb_entry(p, struct ceph_osd, o_node);
-               if (list_empty(&osd->o_requests))
-                       continue;
-               req = list_first_entry(&osd->o_requests,
-                                      struct ceph_osd_request, r_osd_item);
-               if (time_before(jiffies, req->r_timeout_stamp))
-                       continue;
 
-               dout(" tid %llu (at least) timed out on osd%d\n",
+       /*
+        * reset osds that appear to be _really_ unresponsive.  this
+        * is a failsafe measure.. we really shouldn't be getting to
+        * this point if the system is working properly.  the monitors
+        * should mark the osd as failed and we should find out about
+        * it from an updated osd map.
+        */
+       while (!list_empty(&osdc->req_lru)) {
+               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
+                                r_req_lru_item);
+
+               if (time_before(jiffies, req->r_stamp + timeout))
+                       break;
+
+               BUG_ON(req == last_req && req->r_stamp == last_stamp);
+               last_req = req;
+               last_stamp = req->r_stamp;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
+                          req->r_tid, osd->o_osd);
+               __kick_requests(osdc, osd);
+       }
+
+       /*
+        * ping osds that are a bit slow.  this ensures that if there
+        * is a break in the TCP connection we will notice, and reopen
+        * a connection with that osd (from the fault callback).
+        */
+       INIT_LIST_HEAD(&slow_osds);
+       list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
+               if (time_before(jiffies, req->r_stamp + keepalive))
+                       break;
+
+               osd = req->r_osd;
+               BUG_ON(!osd);
+               dout(" tid %llu is slow, will send keepalive on osd%d\n",
                     req->r_tid, osd->o_osd);
-               req->r_timeout_stamp = next_timeout;
+               list_move_tail(&osd->o_keepalive_item, &slow_osds);
+       }
+       while (!list_empty(&slow_osds)) {
+               osd = list_entry(slow_osds.next, struct ceph_osd,
+                                o_keepalive_item);
+               list_del_init(&osd->o_keepalive_item);
                ceph_con_keepalive(&osd->o_con);
        }
 
-       if (osdc->timeout_tid)
-               schedule_delayed_work(&osdc->timeout_work,
-                                     round_jiffies_relative(timeout));
-
+       __schedule_osd_timeout(osdc);
        mutex_unlock(&osdc->request_mutex);
 
        up_read(&osdc->map_sem);
 }
 
+static void handle_osds_timeout(struct work_struct *work)
+{
+       struct ceph_osd_client *osdc =
+               container_of(work, struct ceph_osd_client,
+                            osds_timeout_work.work);
+       unsigned long delay =
+               osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+       dout("osds timeout\n");
+       down_read(&osdc->map_sem);
+       remove_old_osds(osdc, 0);
+       up_read(&osdc->map_sem);
+
+       schedule_delayed_work(&osdc->osds_timeout_work,
+                             round_jiffies_relative(delay));
+}
+
 /*
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
+                        struct ceph_connection *con)
 {
        struct ceph_osd_reply_head *rhead = msg->front.iov_base;
        struct ceph_osd_request *req;
        u64 tid;
        int numops, object_len, flags;
 
+       tid = le64_to_cpu(msg->hdr.tid);
        if (msg->front.iov_len < sizeof(*rhead))
                goto bad;
-       tid = le64_to_cpu(rhead->tid);
        numops = le32_to_cpu(rhead->num_ops);
        object_len = le32_to_cpu(rhead->object_len);
        if (msg->front.iov_len != sizeof(*rhead) + object_len +
@@ -713,14 +801,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        ceph_osdc_get_request(req);
        flags = le32_to_cpu(rhead->flags);
 
-       if (req->r_reply) {
-               /*
-                * once we see the message has been received, we don't
-                * need a ref (which is only needed for revoking
-                * pages)
-                */
-               ceph_msg_put(req->r_reply);
-               req->r_reply = NULL;
+       /*
+        * if this connection filled our message, drop our reference now, to
+        * avoid a (safe but slower) revoke later.
+        */
+       if (req->r_con_filling_msg == con && req->r_reply == msg) {
+               dout(" dropping con_filling_msg ref %p\n", con);
+               req->r_con_filling_msg = NULL;
+               ceph_con_put(con);
        }
 
        if (!req->r_got_reply) {
@@ -739,6 +827,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                req->r_got_reply = 1;
        } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
                dout("handle_reply tid %llu dup ack\n", tid);
+               mutex_unlock(&osdc->request_mutex);
                goto done;
        }
 
@@ -770,21 +859,11 @@ bad:
        pr_err("corrupt osd_op_reply got %d %d expected %d\n",
               (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
               (int)sizeof(*rhead));
+       ceph_msg_dump(msg);
 }
 
 
-/*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static void kick_requests(struct ceph_osd_client *osdc,
+static int __kick_requests(struct ceph_osd_client *osdc,
                          struct ceph_osd *kickosd)
 {
        struct ceph_osd_request *req;
@@ -793,18 +872,22 @@ static void kick_requests(struct ceph_osd_client *osdc,
        int err;
 
        dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-       mutex_lock(&osdc->request_mutex);
-       if (!kickosd) {
+       if (kickosd) {
+               err = __reset_osd(osdc, kickosd);
+               if (err == -EAGAIN)
+                       return 1;
+       } else {
                for (p = rb_first(&osdc->osds); p; p = n) {
                        struct ceph_osd *osd =
                                rb_entry(p, struct ceph_osd, o_node);
 
                        n = rb_next(p);
                        if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-                           !ceph_entity_addr_equal(&osd->o_con.peer_addr,
-                                           ceph_osd_addr(osdc->osdmap,
-                                                         osd->o_osd)))
-                               reset_osd(osdc, osd);
+                           memcmp(&osd->o_con.peer_addr,
+                                  ceph_osd_addr(osdc->osdmap,
+                                                osd->o_osd),
+                                  sizeof(struct ceph_entity_addr)) != 0)
+                               __reset_osd(osdc, osd);
                }
        }
 
@@ -843,7 +926,7 @@ static void kick_requests(struct ceph_osd_client *osdc,
 
 kick:
                dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-                    req->r_osd->o_osd);
+                    req->r_osd ? req->r_osd->o_osd : -1);
                req->r_flags |= CEPH_OSD_FLAG_RETRY;
                err = __send_request(osdc, req);
                if (err) {
@@ -851,14 +934,36 @@ kick:
                        req->r_resend = true;
                }
        }
+
+       return needmap;
+}
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * Close connections to down osds.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read and request_mutex.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+                         struct ceph_osd *kickosd)
+{
+       int needmap;
+
+       mutex_lock(&osdc->request_mutex);
+       needmap = __kick_requests(osdc, kickosd);
        mutex_unlock(&osdc->request_mutex);
 
        if (needmap) {
                dout("%d requests for down osds, need new map\n", needmap);
                ceph_monc_request_next_osdmap(&osdc->client->monc);
        }
-}
 
+}
 /*
  * Process updated osd map.
  *
@@ -882,10 +987,8 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
        /* verify fsid */
        ceph_decode_need(&p, end, sizeof(fsid), bad);
        ceph_decode_copy(&p, &fsid, sizeof(fsid));
-       if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
-               pr_err("got osdmap with wrong fsid, ignoring\n");
+       if (ceph_check_fsid(osdc->client, &fsid) < 0)
                return;
-       }
 
        down_write(&osdc->map_sem);
 
@@ -908,6 +1011,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                err = PTR_ERR(newmap);
                                goto bad;
                        }
+                       BUG_ON(!newmap);
                        if (newmap != osdc->osdmap) {
                                ceph_osdmap_destroy(osdc->osdmap);
                                osdc->osdmap = newmap;
@@ -944,6 +1048,7 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
                                err = PTR_ERR(newmap);
                                goto bad;
                        }
+                       BUG_ON(!newmap);
                        oldmap = osdc->osdmap;
                        osdc->osdmap = newmap;
                        if (oldmap)
@@ -963,6 +1068,7 @@ done:
 
 bad:
        pr_err("osdc handle_map corrupt msg\n");
+       ceph_msg_dump(msg);
        up_write(&osdc->map_sem);
        return;
 }
@@ -974,44 +1080,35 @@ bad:
  * find those pages.
  *  0 = success, -1 failure.
  */
-static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
-                        int want)
+static int __prepare_pages(struct ceph_connection *con,
+                        struct ceph_msg_header *hdr,
+                        struct ceph_osd_request *req,
+                        u64 tid,
+                        struct ceph_msg *m)
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc;
-       struct ceph_osd_reply_head *rhead = m->front.iov_base;
-       struct ceph_osd_request *req;
-       u64 tid;
        int ret = -1;
-       int type = le16_to_cpu(m->hdr.type);
+       int data_len = le32_to_cpu(hdr->data_len);
+       unsigned data_off = le16_to_cpu(hdr->data_off);
+
+       int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
 
        if (!osd)
                return -1;
-       osdc = osd->o_osdc;
 
-       dout("prepare_pages on msg %p want %d\n", m, want);
-       if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
-               return -1;  /* hmm! */
+       osdc = osd->o_osdc;
 
-       tid = le64_to_cpu(rhead->tid);
-       mutex_lock(&osdc->request_mutex);
-       req = __lookup_request(osdc, tid);
-       if (!req) {
-               dout("prepare_pages unknown tid %llu\n", tid);
-               goto out;
-       }
-       dout("prepare_pages tid %llu has %d pages, want %d\n",
+       dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
             tid, req->r_num_pages, want);
-       if (likely(req->r_num_pages >= want && !req->r_prepared_pages)) {
-               m->pages = req->r_pages;
-               m->nr_pages = req->r_num_pages;
-               req->r_reply = m;  /* only for duration of read over socket */
-               ceph_msg_get(m);
-               req->r_prepared_pages = 1;
-               ret = 0; /* success */
-       }
+       if (unlikely(req->r_num_pages < want))
+               goto out;
+       m->pages = req->r_pages;
+       m->nr_pages = req->r_num_pages;
+       ret = 0; /* success */
 out:
-       mutex_unlock(&osdc->request_mutex);
+       BUG_ON(ret < 0 || m->nr_pages < want);
+
        return ret;
 }
 
@@ -1066,8 +1163,9 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
        if (rc < 0) {
                mutex_lock(&osdc->request_mutex);
                __cancel_request(req);
+               __unregister_request(osdc, req);
                mutex_unlock(&osdc->request_mutex);
-               dout("wait_request tid %llu timed out\n", req->r_tid);
+               dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
                return rc;
        }
 
@@ -1122,35 +1220,50 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        init_completion(&osdc->map_waiters);
        osdc->last_requested_map = 0;
        mutex_init(&osdc->request_mutex);
-       osdc->timeout_tid = 0;
        osdc->last_tid = 0;
        osdc->osds = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->osd_lru);
        osdc->requests = RB_ROOT;
+       INIT_LIST_HEAD(&osdc->req_lru);
        osdc->num_requests = 0;
        INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+       INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+
+       schedule_delayed_work(&osdc->osds_timeout_work,
+          round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
 
+       err = -ENOMEM;
        osdc->req_mempool = mempool_create_kmalloc_pool(10,
                                        sizeof(struct ceph_osd_request));
        if (!osdc->req_mempool)
-               return -ENOMEM;
+               goto out;
 
-       err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
+       err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
        if (err < 0)
-               return -ENOMEM;
-       err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
+               goto out_mempool;
+       err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+                               OSD_OPREPLY_FRONT_LEN, 10, true);
        if (err < 0)
-               return -ENOMEM;
-
+               goto out_msgpool;
        return 0;
+
+out_msgpool:
+       ceph_msgpool_destroy(&osdc->msgpool_op);
+out_mempool:
+       mempool_destroy(osdc->req_mempool);
+out:
+       return err;
 }
 
 void ceph_osdc_stop(struct ceph_osd_client *osdc)
 {
        cancel_delayed_work_sync(&osdc->timeout_work);
+       cancel_delayed_work_sync(&osdc->osds_timeout_work);
        if (osdc->osdmap) {
                ceph_osdmap_destroy(osdc->osdmap);
                osdc->osdmap = NULL;
        }
+       remove_old_osds(osdc, 1);
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
        ceph_msgpool_destroy(&osdc->msgpool_op_reply);
@@ -1244,18 +1357,19 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 {
        struct ceph_osd *osd = con->private;
-       struct ceph_osd_client *osdc = osd->o_osdc;
+       struct ceph_osd_client *osdc;
        int type = le16_to_cpu(msg->hdr.type);
 
        if (!osd)
                return;
+       osdc = osd->o_osdc;
 
        switch (type) {
        case CEPH_MSG_OSD_MAP:
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg);
+               handle_reply(osdc, msg, con);
                break;
 
        default:
@@ -1265,19 +1379,88 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
        ceph_msg_put(msg);
 }
 
-static struct ceph_msg *alloc_msg(struct ceph_connection *con,
-                                 struct ceph_msg_header *hdr)
+/*
+ * lookup and return message for incoming reply
+ */
+static struct ceph_msg *get_reply(struct ceph_connection *con,
+                                 struct ceph_msg_header *hdr,
+                                 int *skip)
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
+       struct ceph_msg *m;
+       struct ceph_osd_request *req;
+       int front = le32_to_cpu(hdr->front_len);
+       int data_len = le32_to_cpu(hdr->data_len);
+       u64 tid;
+       int err;
+
+       tid = le64_to_cpu(hdr->tid);
+       mutex_lock(&osdc->request_mutex);
+       req = __lookup_request(osdc, tid);
+       if (!req) {
+               *skip = 1;
+               m = NULL;
+               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
+                       osd->o_osd);
+               goto out;
+       }
+
+       if (req->r_con_filling_msg) {
+               dout("get_reply revoking msg %p from old con %p\n",
+                    req->r_reply, req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
+       }
+
+       if (front > req->r_reply->front.iov_len) {
+               pr_warning("get_reply front %d > preallocated %d\n",
+                          front, (int)req->r_reply->front.iov_len);
+               m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
+               if (IS_ERR(m))
+                       goto out;
+               ceph_msg_put(req->r_reply);
+               req->r_reply = m;
+       }
+       m = ceph_msg_get(req->r_reply);
+
+       if (data_len > 0) {
+               err = __prepare_pages(con, hdr, req, tid, m);
+               if (err < 0) {
+                       *skip = 1;
+                       ceph_msg_put(m);
+                       m = ERR_PTR(err);
+               }
+       }
+       *skip = 0;
+       req->r_con_filling_msg = ceph_con_get(con);
+       dout("get_reply tid %lld %p\n", tid, m);
+
+out:
+       mutex_unlock(&osdc->request_mutex);
+       return m;
+
+}
+
+static struct ceph_msg *alloc_msg(struct ceph_connection *con,
+                                 struct ceph_msg_header *hdr,
+                                 int *skip)
+{
+       struct ceph_osd *osd = con->private;
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
 
        switch (type) {
+       case CEPH_MSG_OSD_MAP:
+               return ceph_msg_new(type, front, 0, 0, NULL);
        case CEPH_MSG_OSD_OPREPLY:
-               return ceph_msgpool_get(&osdc->msgpool_op_reply, front);
+               return get_reply(con, hdr, skip);
+       default:
+               pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
+                       osd->o_osd);
+               *skip = 1;
+               return NULL;
        }
-       return ceph_alloc_msg(con, hdr);
 }
 
 /*
@@ -1297,12 +1480,71 @@ static void put_osd_con(struct ceph_connection *con)
        put_osd(osd);
 }
 
+/*
+ * authentication
+ */
+static int get_authorizer(struct ceph_connection *con,
+                         void **buf, int *len, int *proto,
+                         void **reply_buf, int *reply_len, int force_new)
+{
+       struct ceph_osd *o = con->private;
+       struct ceph_osd_client *osdc = o->o_osdc;
+       struct ceph_auth_client *ac = osdc->client->monc.auth;
+       int ret = 0;
+
+       if (force_new && o->o_authorizer) {
+               ac->ops->destroy_authorizer(ac, o->o_authorizer);
+               o->o_authorizer = NULL;
+       }
+       if (o->o_authorizer == NULL) {
+               ret = ac->ops->create_authorizer(
+                       ac, CEPH_ENTITY_TYPE_OSD,
+                       &o->o_authorizer,
+                       &o->o_authorizer_buf,
+                       &o->o_authorizer_buf_len,
+                       &o->o_authorizer_reply_buf,
+                       &o->o_authorizer_reply_buf_len);
+               if (ret)
+               return ret;
+       }
+
+       *proto = ac->protocol;
+       *buf = o->o_authorizer_buf;
+       *len = o->o_authorizer_buf_len;
+       *reply_buf = o->o_authorizer_reply_buf;
+       *reply_len = o->o_authorizer_reply_buf_len;
+       return 0;
+}
+
+
+static int verify_authorizer_reply(struct ceph_connection *con, int len)
+{
+       struct ceph_osd *o = con->private;
+       struct ceph_osd_client *osdc = o->o_osdc;
+       struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+       return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
+}
+
+static int invalidate_authorizer(struct ceph_connection *con)
+{
+       struct ceph_osd *o = con->private;
+       struct ceph_osd_client *osdc = o->o_osdc;
+       struct ceph_auth_client *ac = osdc->client->monc.auth;
+
+       if (ac->ops->invalidate_authorizer)
+               ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
+
+       return ceph_monc_validate_auth(&osdc->client->monc);
+}
+
 const static struct ceph_connection_operations osd_con_ops = {
        .get = get_osd_con,
        .put = put_osd_con,
        .dispatch = dispatch,
+       .get_authorizer = get_authorizer,
+       .verify_authorizer_reply = verify_authorizer_reply,
+       .invalidate_authorizer = invalidate_authorizer,
        .alloc_msg = alloc_msg,
        .fault = osd_reset,
-       .alloc_middle = ceph_alloc_middle,
-       .prepare_pages = prepare_pages,
 };