dm snapshot: queue writes to chunks being merged
authorMikulas Patocka <mpatocka@redhat.com>
Thu, 10 Dec 2009 23:52:33 +0000 (23:52 +0000)
committerAlasdair G Kergon <agk@redhat.com>
Thu, 10 Dec 2009 23:52:33 +0000 (23:52 +0000)
While a set of chunks is being merged, any overlapping writes need to be
queued.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Signed-off-by: Mike Snitzer <snitzer@redhat.com>
Signed-off-by: Alasdair G Kergon <agk@redhat.com>
drivers/md/dm-snap.c

index dc2412e..91a47c5 100644 (file)
@@ -109,6 +109,16 @@ struct dm_snapshot {
 
        /* Wait for events based on state_bits */
        unsigned long state_bits;
+
+       /* Range of chunks currently being merged. */
+       chunk_t first_merging_chunk;
+       int num_merging_chunks;
+
+       /*
+        * Incoming bios that overlap with chunks being merged must wait
+        * for them to be committed.
+        */
+       struct bio_list bios_queued_during_merge;
 };
 
 /*
@@ -747,6 +757,14 @@ static void merge_shutdown(struct dm_snapshot *s)
        wake_up_bit(&s->state_bits, RUNNING_MERGE);
 }
 
+static struct bio *__release_queued_bios_after_merge(struct dm_snapshot *s)
+{
+       s->first_merging_chunk = 0;
+       s->num_merging_chunks = 0;
+
+       return bio_list_get(&s->bios_queued_during_merge);
+}
+
 /*
  * Remove one chunk from the index of completed exceptions.
  */
@@ -755,8 +773,6 @@ static int __remove_single_exception_chunk(struct dm_snapshot *s,
 {
        struct dm_exception *e;
 
-       /* FIXME: interlock writes to this chunk */
-
        e = dm_lookup_exception(&s->complete, old_chunk);
        if (!e) {
                DMERR("Corruption detected: exception for block %llu is "
@@ -801,14 +817,32 @@ static int __remove_single_exception_chunk(struct dm_snapshot *s,
        return 0;
 }
 
-static int remove_single_exception_chunk(struct dm_snapshot *s,
-                                        chunk_t old_chunk)
+static void flush_bios(struct bio *bio);
+
+static int remove_single_exception_chunk(struct dm_snapshot *s)
 {
-       int r = 0;
+       struct bio *b = NULL;
+       int r;
+       chunk_t old_chunk = s->first_merging_chunk + s->num_merging_chunks - 1;
 
        down_write(&s->lock);
-       r = __remove_single_exception_chunk(s, old_chunk);
+
+       /*
+        * Process chunks (and associated exceptions) in reverse order
+        * so that dm_consecutive_chunk_count_dec() accounting works.
+        */
+       do {
+               r = __remove_single_exception_chunk(s, old_chunk);
+               if (r)
+                       goto out;
+       } while (old_chunk-- > s->first_merging_chunk);
+
+       b = __release_queued_bios_after_merge(s);
+
+out:
        up_write(&s->lock);
+       if (b)
+               flush_bios(b);
 
        return r;
 }
@@ -844,9 +878,6 @@ static void snapshot_merge_next_chunks(struct dm_snapshot *s)
 
        /* TODO: use larger I/O size once we verify that kcopyd handles it */
 
-       if (remove_single_exception_chunk(s, old_chunk) < 0)
-               goto shut;
-
        dest.bdev = s->origin->bdev;
        dest.sector = chunk_to_sector(s->store, old_chunk);
        dest.count = min((sector_t)s->store->chunk_size,
@@ -856,6 +887,13 @@ static void snapshot_merge_next_chunks(struct dm_snapshot *s)
        src.sector = chunk_to_sector(s->store, new_chunk);
        src.count = dest.count;
 
+       down_write(&s->lock);
+       s->first_merging_chunk = old_chunk;
+       s->num_merging_chunks = 1;
+       up_write(&s->lock);
+
+       /* !!! FIXME: wait until writes to this chunk drain */
+
        dm_kcopyd_copy(s->kcopyd_client, &src, 1, &dest, 0, merge_callback, s);
        return;
 
@@ -863,9 +901,12 @@ shut:
        merge_shutdown(s);
 }
 
+static void error_bios(struct bio *bio);
+
 static void merge_callback(int read_err, unsigned long write_err, void *context)
 {
        struct dm_snapshot *s = context;
+       struct bio *b = NULL;
 
        if (read_err || write_err) {
                if (read_err)
@@ -875,16 +916,25 @@ static void merge_callback(int read_err, unsigned long write_err, void *context)
                goto shut;
        }
 
-       if (s->store->type->commit_merge(s->store, 1) < 0) {
+       if (s->store->type->commit_merge(s->store,
+                                        s->num_merging_chunks) < 0) {
                DMERR("Write error in exception store: shutting down merge");
                goto shut;
        }
 
+       if (remove_single_exception_chunk(s) < 0)
+               goto shut;
+
        snapshot_merge_next_chunks(s);
 
        return;
 
 shut:
+       down_write(&s->lock);
+       b = __release_queued_bios_after_merge(s);
+       up_write(&s->lock);
+       error_bios(b);
+
        merge_shutdown(s);
 }
 
@@ -983,6 +1033,9 @@ static int snapshot_ctr(struct dm_target *ti, unsigned int argc, char **argv)
        INIT_LIST_HEAD(&s->list);
        spin_lock_init(&s->pe_lock);
        s->state_bits = 0;
+       s->first_merging_chunk = 0;
+       s->num_merging_chunks = 0;
+       bio_list_init(&s->bios_queued_during_merge);
 
        /* Allocate hash table for COW data */
        if (init_hash_tables(s)) {
@@ -1539,6 +1592,8 @@ static int snapshot_map(struct dm_target *ti, struct bio *bio,
  * For each chunk, if there is an existing exception, it is used to
  * redirect I/O to the cow device.  Otherwise I/O is sent to the origin,
  * which in turn might generate exceptions in other snapshots.
+ * If merging is currently taking place on the chunk in question, the
+ * I/O is deferred by adding it to s->bios_queued_during_merge.
  */
 static int snapshot_merge_map(struct dm_target *ti, struct bio *bio,
                              union map_info *map_context)
@@ -1559,7 +1614,7 @@ static int snapshot_merge_map(struct dm_target *ti, struct bio *bio,
 
        chunk = sector_to_chunk(s->store, bio->bi_sector);
 
-       down_read(&s->lock);
+       down_write(&s->lock);
 
        /* Full snapshots are not usable */
        if (!s->valid) {
@@ -1570,6 +1625,16 @@ static int snapshot_merge_map(struct dm_target *ti, struct bio *bio,
        /* If the block is already remapped - use that */
        e = dm_lookup_exception(&s->complete, chunk);
        if (e) {
+               /* Queue writes overlapping with chunks being merged */
+               if (bio_rw(bio) == WRITE &&
+                   chunk >= s->first_merging_chunk &&
+                   chunk < (s->first_merging_chunk +
+                            s->num_merging_chunks)) {
+                       bio->bi_bdev = s->origin->bdev;
+                       bio_list_add(&s->bios_queued_during_merge, bio);
+                       r = DM_MAPIO_SUBMITTED;
+                       goto out_unlock;
+               }
                remap_exception(s, e, bio, chunk);
                goto out_unlock;
        }
@@ -1577,12 +1642,12 @@ static int snapshot_merge_map(struct dm_target *ti, struct bio *bio,
        bio->bi_bdev = s->origin->bdev;
 
        if (bio_rw(bio) == WRITE) {
-               up_read(&s->lock);
+               up_write(&s->lock);
                return do_origin(s->origin, bio);
        }
 
 out_unlock:
-       up_read(&s->lock);
+       up_write(&s->lock);
 
        return r;
 }