ore: Support for short read/writes
[pandora-kernel.git] / fs / exofs / ore.c
index 25305af..8354fe0 100644 (file)
@@ -47,53 +47,101 @@ MODULE_AUTHOR("Boaz Harrosh <bharrosh@panasas.com>");
 MODULE_DESCRIPTION("Objects Raid Engine ore.ko");
 MODULE_LICENSE("GPL");
 
+static void ore_calc_stripe_info(struct ore_layout *layout, u64 file_offset,
+                                struct ore_striping_info *si);
+
 static u8 *_ios_cred(struct ore_io_state *ios, unsigned index)
 {
-       return ios->comps->comps[index & ios->comps->single_comp].cred;
+       return ios->oc->comps[index & ios->oc->single_comp].cred;
 }
 
 static struct osd_obj_id *_ios_obj(struct ore_io_state *ios, unsigned index)
 {
-       return &ios->comps->comps[index & ios->comps->single_comp].obj;
+       return &ios->oc->comps[index & ios->oc->single_comp].obj;
 }
 
 static struct osd_dev *_ios_od(struct ore_io_state *ios, unsigned index)
 {
-       return ios->comps->ods[index];
+       return ore_comp_dev(ios->oc, index);
 }
 
-int  ore_get_rw_state(struct ore_layout *layout, struct ore_components *comps,
-                     bool is_reading, u64 offset, u64 length,
-                     struct ore_io_state **pios)
+static int  _get_io_state(struct ore_layout *layout,
+                         struct ore_components *oc, unsigned numdevs,
+                         struct ore_io_state **pios)
 {
        struct ore_io_state *ios;
 
        /*TODO: Maybe use kmem_cach per sbi of size
         * exofs_io_state_size(layout->s_numdevs)
         */
-       ios = kzalloc(ore_io_state_size(comps->numdevs), GFP_KERNEL);
+       ios = kzalloc(ore_io_state_size(numdevs), GFP_KERNEL);
        if (unlikely(!ios)) {
                ORE_DBGMSG("Failed kzalloc bytes=%d\n",
-                            ore_io_state_size(comps->numdevs));
+                          ore_io_state_size(numdevs));
                *pios = NULL;
                return -ENOMEM;
        }
 
        ios->layout = layout;
-       ios->comps = comps;
-       ios->offset = offset;
-       ios->length = length;
+       ios->oc = oc;
+       *pios = ios;
+       return 0;
+}
+
+/* Allocate an io_state for only a single group of devices
+ *
+ * If a user needs to call ore_read/write() this version must be used becase it
+ * allocates extra stuff for striping and raid.
+ * The ore might decide to only IO less then @length bytes do to alignmets
+ * and constrains as follows:
+ * - The IO cannot cross group boundary.
+ * - In raid5/6 The end of the IO must align at end of a stripe eg.
+ *   (@offset + @length) % strip_size == 0. Or the complete range is within a
+ *   single stripe.
+ * - Memory condition only permitted a shorter IO. (A user can use @length=~0
+ *   And check the returned ios->length for max_io_size.)
+ *
+ * The caller must check returned ios->length (and/or ios->nr_pages) and
+ * re-issue these pages that fall outside of ios->length
+ */
+int  ore_get_rw_state(struct ore_layout *layout, struct ore_components *oc,
+                     bool is_reading, u64 offset, u64 length,
+                     struct ore_io_state **pios)
+{
+       struct ore_io_state *ios;
+       unsigned numdevs = layout->group_width * layout->mirrors_p1;
+       int ret;
+
+       ret = _get_io_state(layout, oc, numdevs, pios);
+       if (unlikely(ret))
+               return ret;
+
+       ios = *pios;
        ios->reading = is_reading;
+       ios->offset = offset;
+
+       if (length) {
+               ore_calc_stripe_info(layout, offset, &ios->si);
+               ios->length = (length <= ios->si.group_length) ? length :
+                                                       ios->si.group_length;
+               ios->nr_pages = (ios->length + PAGE_SIZE - 1) / PAGE_SIZE;
+       }
 
-       *pios = ios;
        return 0;
 }
 EXPORT_SYMBOL(ore_get_rw_state);
 
-int  ore_get_io_state(struct ore_layout *layout, struct ore_components *comps,
-                     struct ore_io_state **ios)
+/* Allocate an io_state for all the devices in the comps array
+ *
+ * This version of io_state allocation is used mostly by create/remove
+ * and trunc where we currently need all the devices. The only wastful
+ * bit is the read/write_attributes with no IO. Those sites should
+ * be converted to use ore_get_rw_state() with length=0
+ */
+int  ore_get_io_state(struct ore_layout *layout, struct ore_components *oc,
+                     struct ore_io_state **pios)
 {
-       return ore_get_rw_state(layout, comps, true, 0, 0, ios);
+       return _get_io_state(layout, oc, oc->numdevs, pios);
 }
 EXPORT_SYMBOL(ore_get_io_state);
 
@@ -287,16 +335,8 @@ EXPORT_SYMBOL(ore_check_io);
  *
  *     O = L % stripe_unit + N * stripe_unit + M * group_depth * stripe_unit
  */
-struct _striping_info {
-       u64 obj_offset;
-       u64 group_length;
-       u64 M; /* for truncate */
-       unsigned dev;
-       unsigned unit_off;
-};
-
-static void _calc_stripe_info(struct ore_layout *layout, u64 file_offset,
-                             struct _striping_info *si)
+static void ore_calc_stripe_info(struct ore_layout *layout, u64 file_offset,
+                                struct ore_striping_info *si)
 {
        u32     stripe_unit = layout->stripe_unit;
        u32     group_width = layout->group_width;
@@ -337,8 +377,8 @@ static int _add_stripe_unit(struct ore_io_state *ios,  unsigned *cur_pg,
        unsigned pg = *cur_pg;
        struct request_queue *q =
                        osd_request_queue(_ios_od(ios, per_dev->dev));
-
-       per_dev->length += cur_len;
+       unsigned len = cur_len;
+       int ret;
 
        if (per_dev->bio == NULL) {
                unsigned pages_in_stripe = ios->layout->group_width *
@@ -350,7 +390,8 @@ static int _add_stripe_unit(struct ore_io_state *ios,  unsigned *cur_pg,
                if (unlikely(!per_dev->bio)) {
                        ORE_DBGMSG("Failed to allocate BIO size=%u\n",
                                     bio_size);
-                       return -ENOMEM;
+                       ret = -ENOMEM;
+                       goto out;
                }
        }
 
@@ -363,31 +404,48 @@ static int _add_stripe_unit(struct ore_io_state *ios,  unsigned *cur_pg,
 
                added_len = bio_add_pc_page(q, per_dev->bio, ios->pages[pg],
                                            pglen, pgbase);
-               if (unlikely(pglen != added_len))
-                       return -ENOMEM;
+               if (unlikely(pglen != added_len)) {
+                       ret = -ENOMEM;
+                       goto out;
+               }
                pgbase = 0;
                ++pg;
        }
        BUG_ON(cur_len);
 
+       per_dev->length += len;
        *cur_pg = pg;
-       return 0;
+       ret = 0;
+out:   /* we fail the complete unit on an error eg don't advance
+        * per_dev->length and cur_pg. This means that we might have a bigger
+        * bio than the CDB requested length (per_dev->length). That's fine
+        * only the oposite is fatal.
+        */
+       return ret;
 }
 
-static int _prepare_one_group(struct ore_io_state *ios, u64 length,
-                             struct _striping_info *si)
+static int _prepare_for_striping(struct ore_io_state *ios)
 {
+       struct ore_striping_info *si = &ios->si;
        unsigned stripe_unit = ios->layout->stripe_unit;
        unsigned mirrors_p1 = ios->layout->mirrors_p1;
        unsigned devs_in_group = ios->layout->group_width * mirrors_p1;
        unsigned dev = si->dev;
        unsigned first_dev = dev - (dev % devs_in_group);
-       unsigned max_comp = ios->numdevs ? ios->numdevs - mirrors_p1 : 0;
        unsigned cur_pg = ios->pages_consumed;
+       u64 length = ios->length;
        int ret = 0;
 
+       if (!ios->pages) {
+               ios->numdevs = ios->layout->mirrors_p1;
+               return 0;
+       }
+
+       BUG_ON(length > si->group_length);
+
        while (length) {
-               struct ore_per_dev_state *per_dev = &ios->per_dev[dev];
+               unsigned comp = dev - first_dev;
+               struct ore_per_dev_state *per_dev = &ios->per_dev[comp];
                unsigned cur_len, page_off = 0;
 
                if (!per_dev->length) {
@@ -405,9 +463,6 @@ static int _prepare_one_group(struct ore_io_state *ios, u64 length,
                                per_dev->offset = si->obj_offset - si->unit_off;
                                cur_len = stripe_unit;
                        }
-
-                       if (max_comp < dev)
-                               max_comp = dev;
                } else {
                        cur_len = stripe_unit;
                }
@@ -425,58 +480,22 @@ static int _prepare_one_group(struct ore_io_state *ios, u64 length,
                length -= cur_len;
        }
 out:
-       ios->numdevs = max_comp + mirrors_p1;
+       ios->numdevs = devs_in_group;
        ios->pages_consumed = cur_pg;
-       return ret;
-}
-
-static int _prepare_for_striping(struct ore_io_state *ios)
-{
-       u64 length = ios->length;
-       u64 offset = ios->offset;
-       struct _striping_info si;
-       int ret = 0;
-
-       if (!ios->pages) {
-               if (ios->kern_buff) {
-                       struct ore_per_dev_state *per_dev = &ios->per_dev[0];
-
-                       _calc_stripe_info(ios->layout, ios->offset, &si);
-                       per_dev->offset = si.obj_offset;
-                       per_dev->dev = si.dev;
-
-                       /* no cross device without page array */
-                       BUG_ON((ios->layout->group_width > 1) &&
-                              (si.unit_off + ios->length >
-                               ios->layout->stripe_unit));
-               }
-               ios->numdevs = ios->layout->mirrors_p1;
-               return 0;
-       }
-
-       while (length) {
-               _calc_stripe_info(ios->layout, offset, &si);
-
-               if (length < si.group_length)
-                       si.group_length = length;
-
-               ret = _prepare_one_group(ios, si.group_length, &si);
-               if (unlikely(ret))
-                       goto out;
-
-               offset += si.group_length;
-               length -= si.group_length;
+       if (unlikely(ret)) {
+               if (length == ios->length)
+                       return ret;
+               else
+                       ios->length -= length;
        }
-
-out:
-       return ret;
+       return 0;
 }
 
 int ore_create(struct ore_io_state *ios)
 {
        int i, ret;
 
-       for (i = 0; i < ios->comps->numdevs; i++) {
+       for (i = 0; i < ios->oc->numdevs; i++) {
                struct osd_request *or;
 
                or = osd_start_request(_ios_od(ios, i), GFP_KERNEL);
@@ -501,7 +520,7 @@ int ore_remove(struct ore_io_state *ios)
 {
        int i, ret;
 
-       for (i = 0; i < ios->comps->numdevs; i++) {
+       for (i = 0; i < ios->oc->numdevs; i++) {
                struct osd_request *or;
 
                or = osd_start_request(_ios_od(ios, i), GFP_KERNEL);
@@ -543,7 +562,6 @@ static int _write_mirror(struct ore_io_state *ios, int cur_comp)
                        goto out;
                }
                per_dev->or = or;
-               per_dev->offset = master_dev->offset;
 
                if (ios->pages) {
                        struct bio *bio;
@@ -562,6 +580,7 @@ static int _write_mirror(struct ore_io_state *ios, int cur_comp)
                                __bio_clone(bio, master_dev->bio);
                                bio->bi_bdev = NULL;
                                bio->bi_next = NULL;
+                               per_dev->offset = master_dev->offset;
                                per_dev->length = master_dev->length;
                                per_dev->bio =  bio;
                                per_dev->dev = dev;
@@ -579,7 +598,15 @@ static int _write_mirror(struct ore_io_state *ios, int cur_comp)
                                     _LLU(per_dev->offset),
                                     _LLU(per_dev->length), dev);
                } else if (ios->kern_buff) {
-                       ret = osd_req_write_kern(or, _ios_obj(ios, dev),
+                       per_dev->offset = ios->si.obj_offset;
+                       per_dev->dev = ios->si.dev + dev;
+
+                       /* no cross device without page array */
+                       BUG_ON((ios->layout->group_width > 1) &&
+                              (ios->si.unit_off + ios->length >
+                               ios->layout->stripe_unit));
+
+                       ret = osd_req_write_kern(or, _ios_obj(ios, per_dev->dev),
                                                 per_dev->offset,
                                                 ios->kern_buff, ios->length);
                        if (unlikely(ret))
@@ -588,7 +615,7 @@ static int _write_mirror(struct ore_io_state *ios, int cur_comp)
                                      "length=0x%llx dev=%d\n",
                                     _LLU(_ios_obj(ios, dev)->id),
                                     _LLU(per_dev->offset),
-                                    _LLU(ios->length), dev);
+                                    _LLU(ios->length), per_dev->dev);
                } else {
                        osd_req_set_attributes(or, _ios_obj(ios, dev));
                        ORE_DBGMSG2("obj(0x%llx) set_attributes=%d dev=%d\n",
@@ -654,16 +681,9 @@ static int _read_mirror(struct ore_io_state *ios, unsigned cur_comp)
                             " dev=%d\n", _LLU(obj->id),
                             _LLU(per_dev->offset), _LLU(per_dev->length),
                             first_dev);
-       } else if (ios->kern_buff) {
-               int ret = osd_req_read_kern(or, obj, per_dev->offset,
-                                           ios->kern_buff, ios->length);
-               ORE_DBGMSG2("read_kern(0x%llx) offset=0x%llx "
-                             "length=0x%llx dev=%d ret=>%d\n",
-                             _LLU(obj->id), _LLU(per_dev->offset),
-                             _LLU(ios->length), first_dev, ret);
-               if (unlikely(ret))
-                       return ret;
        } else {
+               BUG_ON(ios->kern_buff);
+
                osd_req_get_attributes(or, obj);
                ORE_DBGMSG2("obj(0x%llx) get_attributes=%d dev=%d\n",
                              _LLU(obj->id),
@@ -744,31 +764,29 @@ static int _truncate_mirrors(struct ore_io_state *ios, unsigned cur_comp,
 }
 
 struct _trunc_info {
-       struct _striping_info si;
+       struct ore_striping_info si;
        u64 prev_group_obj_off;
        u64 next_group_obj_off;
 
        unsigned first_group_dev;
        unsigned nex_group_dev;
-       unsigned max_devs;
 };
 
-void _calc_trunk_info(struct ore_layout *layout, u64 file_offset,
-                      struct _trunc_info *ti)
+static void _calc_trunk_info(struct ore_layout *layout, u64 file_offset,
+                            struct _trunc_info *ti)
 {
        unsigned stripe_unit = layout->stripe_unit;
 
-       _calc_stripe_info(layout, file_offset, &ti->si);
+       ore_calc_stripe_info(layout, file_offset, &ti->si);
 
        ti->prev_group_obj_off = ti->si.M * stripe_unit;
        ti->next_group_obj_off = ti->si.M ? (ti->si.M - 1) * stripe_unit : 0;
 
        ti->first_group_dev = ti->si.dev - (ti->si.dev % layout->group_width);
        ti->nex_group_dev = ti->first_group_dev + layout->group_width;
-       ti->max_devs = layout->group_width * layout->group_count;
 }
 
-int ore_truncate(struct ore_layout *layout, struct ore_components *comps,
+int ore_truncate(struct ore_layout *layout, struct ore_components *oc,
                   u64 size)
 {
        struct ore_io_state *ios;
@@ -779,22 +797,22 @@ int ore_truncate(struct ore_layout *layout, struct ore_components *comps,
        struct _trunc_info ti;
        int i, ret;
 
-       ret = ore_get_io_state(layout, comps, &ios);
+       ret = ore_get_io_state(layout, oc, &ios);
        if (unlikely(ret))
                return ret;
 
        _calc_trunk_info(ios->layout, size, &ti);
 
-       size_attrs = kcalloc(ti.max_devs, sizeof(*size_attrs),
+       size_attrs = kcalloc(ios->oc->numdevs, sizeof(*size_attrs),
                             GFP_KERNEL);
        if (unlikely(!size_attrs)) {
                ret = -ENOMEM;
                goto out;
        }
 
-       ios->numdevs = ios->comps->numdevs;
+       ios->numdevs = ios->oc->numdevs;
 
-       for (i = 0; i < ti.max_devs; ++i) {
+       for (i = 0; i < ios->numdevs; ++i) {
                struct exofs_trunc_attr *size_attr = &size_attrs[i];
                u64 obj_size;
 
@@ -815,7 +833,7 @@ int ore_truncate(struct ore_layout *layout, struct ore_components *comps,
                size_attr->attr.val_ptr = &size_attr->newsize;
 
                ORE_DBGMSG("trunc(0x%llx) obj_offset=0x%llx dev=%d\n",
-                            _LLU(comps->comps->obj.id), _LLU(obj_size), i);
+                            _LLU(oc->comps->obj.id), _LLU(obj_size), i);
                ret = _truncate_mirrors(ios, i * ios->layout->mirrors_p1,
                                        &size_attr->attr);
                if (unlikely(ret))