Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/linux-dm
[pandora-kernel.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/device-mapper.h>
26 #include <linux/dm-kcopyd.h>
27
28 #include "dm.h"
29
30 #define SUB_JOB_SIZE    128
31 #define SPLIT_COUNT     8
32 #define MIN_JOBS        8
33 #define RESERVE_PAGES   (DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
34
35 /*-----------------------------------------------------------------
36  * Each kcopyd client has its own little pool of preallocated
37  * pages for kcopyd io.
38  *---------------------------------------------------------------*/
39 struct dm_kcopyd_client {
40         struct page_list *pages;
41         unsigned nr_reserved_pages;
42         unsigned nr_free_pages;
43
44         struct dm_io_client *io_client;
45
46         wait_queue_head_t destroyq;
47         atomic_t nr_jobs;
48
49         mempool_t *job_pool;
50
51         struct workqueue_struct *kcopyd_wq;
52         struct work_struct kcopyd_work;
53
54 /*
55  * We maintain three lists of jobs:
56  *
57  * i)   jobs waiting for pages
58  * ii)  jobs that have pages, and are waiting for the io to be issued.
59  * iii) jobs that have completed.
60  *
61  * All three of these are protected by job_lock.
62  */
63         spinlock_t job_lock;
64         struct list_head complete_jobs;
65         struct list_head io_jobs;
66         struct list_head pages_jobs;
67 };
68
69 static struct page_list zero_page_list;
70
71 static void wake(struct dm_kcopyd_client *kc)
72 {
73         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
74 }
75
76 /*
77  * Obtain one page for the use of kcopyd.
78  */
79 static struct page_list *alloc_pl(gfp_t gfp)
80 {
81         struct page_list *pl;
82
83         pl = kmalloc(sizeof(*pl), gfp);
84         if (!pl)
85                 return NULL;
86
87         pl->page = alloc_page(gfp);
88         if (!pl->page) {
89                 kfree(pl);
90                 return NULL;
91         }
92
93         return pl;
94 }
95
96 static void free_pl(struct page_list *pl)
97 {
98         __free_page(pl->page);
99         kfree(pl);
100 }
101
102 /*
103  * Add the provided pages to a client's free page list, releasing
104  * back to the system any beyond the reserved_pages limit.
105  */
106 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
107 {
108         struct page_list *next;
109
110         do {
111                 next = pl->next;
112
113                 if (kc->nr_free_pages >= kc->nr_reserved_pages)
114                         free_pl(pl);
115                 else {
116                         pl->next = kc->pages;
117                         kc->pages = pl;
118                         kc->nr_free_pages++;
119                 }
120
121                 pl = next;
122         } while (pl);
123 }
124
125 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
126                             unsigned int nr, struct page_list **pages)
127 {
128         struct page_list *pl;
129
130         *pages = NULL;
131
132         do {
133                 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY);
134                 if (unlikely(!pl)) {
135                         /* Use reserved pages */
136                         pl = kc->pages;
137                         if (unlikely(!pl))
138                                 goto out_of_memory;
139                         kc->pages = pl->next;
140                         kc->nr_free_pages--;
141                 }
142                 pl->next = *pages;
143                 *pages = pl;
144         } while (--nr);
145
146         return 0;
147
148 out_of_memory:
149         if (*pages)
150                 kcopyd_put_pages(kc, *pages);
151         return -ENOMEM;
152 }
153
154 /*
155  * These three functions resize the page pool.
156  */
157 static void drop_pages(struct page_list *pl)
158 {
159         struct page_list *next;
160
161         while (pl) {
162                 next = pl->next;
163                 free_pl(pl);
164                 pl = next;
165         }
166 }
167
168 /*
169  * Allocate and reserve nr_pages for the use of a specific client.
170  */
171 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
172 {
173         unsigned i;
174         struct page_list *pl = NULL, *next;
175
176         for (i = 0; i < nr_pages; i++) {
177                 next = alloc_pl(GFP_KERNEL);
178                 if (!next) {
179                         if (pl)
180                                 drop_pages(pl);
181                         return -ENOMEM;
182                 }
183                 next->next = pl;
184                 pl = next;
185         }
186
187         kc->nr_reserved_pages += nr_pages;
188         kcopyd_put_pages(kc, pl);
189
190         return 0;
191 }
192
193 static void client_free_pages(struct dm_kcopyd_client *kc)
194 {
195         BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
196         drop_pages(kc->pages);
197         kc->pages = NULL;
198         kc->nr_free_pages = kc->nr_reserved_pages = 0;
199 }
200
201 /*-----------------------------------------------------------------
202  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
203  * for this reason we use a mempool to prevent the client from
204  * ever having to do io (which could cause a deadlock).
205  *---------------------------------------------------------------*/
206 struct kcopyd_job {
207         struct dm_kcopyd_client *kc;
208         struct list_head list;
209         unsigned long flags;
210
211         /*
212          * Error state of the job.
213          */
214         int read_err;
215         unsigned long write_err;
216
217         /*
218          * Either READ or WRITE
219          */
220         int rw;
221         struct dm_io_region source;
222
223         /*
224          * The destinations for the transfer.
225          */
226         unsigned int num_dests;
227         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
228
229         struct page_list *pages;
230
231         /*
232          * Set this to ensure you are notified when the job has
233          * completed.  'context' is for callback to use.
234          */
235         dm_kcopyd_notify_fn fn;
236         void *context;
237
238         /*
239          * These fields are only used if the job has been split
240          * into more manageable parts.
241          */
242         struct mutex lock;
243         atomic_t sub_jobs;
244         sector_t progress;
245
246         struct kcopyd_job *master_job;
247 };
248
249 static struct kmem_cache *_job_cache;
250
251 int __init dm_kcopyd_init(void)
252 {
253         _job_cache = kmem_cache_create("kcopyd_job",
254                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
255                                 __alignof__(struct kcopyd_job), 0, NULL);
256         if (!_job_cache)
257                 return -ENOMEM;
258
259         zero_page_list.next = &zero_page_list;
260         zero_page_list.page = ZERO_PAGE(0);
261
262         return 0;
263 }
264
265 void dm_kcopyd_exit(void)
266 {
267         kmem_cache_destroy(_job_cache);
268         _job_cache = NULL;
269 }
270
271 /*
272  * Functions to push and pop a job onto the head of a given job
273  * list.
274  */
275 static struct kcopyd_job *pop(struct list_head *jobs,
276                               struct dm_kcopyd_client *kc)
277 {
278         struct kcopyd_job *job = NULL;
279         unsigned long flags;
280
281         spin_lock_irqsave(&kc->job_lock, flags);
282
283         if (!list_empty(jobs)) {
284                 job = list_entry(jobs->next, struct kcopyd_job, list);
285                 list_del(&job->list);
286         }
287         spin_unlock_irqrestore(&kc->job_lock, flags);
288
289         return job;
290 }
291
292 static void push(struct list_head *jobs, struct kcopyd_job *job)
293 {
294         unsigned long flags;
295         struct dm_kcopyd_client *kc = job->kc;
296
297         spin_lock_irqsave(&kc->job_lock, flags);
298         list_add_tail(&job->list, jobs);
299         spin_unlock_irqrestore(&kc->job_lock, flags);
300 }
301
302
303 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
304 {
305         unsigned long flags;
306         struct dm_kcopyd_client *kc = job->kc;
307
308         spin_lock_irqsave(&kc->job_lock, flags);
309         list_add(&job->list, jobs);
310         spin_unlock_irqrestore(&kc->job_lock, flags);
311 }
312
313 /*
314  * These three functions process 1 item from the corresponding
315  * job list.
316  *
317  * They return:
318  * < 0: error
319  *   0: success
320  * > 0: can't process yet.
321  */
322 static int run_complete_job(struct kcopyd_job *job)
323 {
324         void *context = job->context;
325         int read_err = job->read_err;
326         unsigned long write_err = job->write_err;
327         dm_kcopyd_notify_fn fn = job->fn;
328         struct dm_kcopyd_client *kc = job->kc;
329
330         if (job->pages && job->pages != &zero_page_list)
331                 kcopyd_put_pages(kc, job->pages);
332         /*
333          * If this is the master job, the sub jobs have already
334          * completed so we can free everything.
335          */
336         if (job->master_job == job)
337                 mempool_free(job, kc->job_pool);
338         fn(read_err, write_err, context);
339
340         if (atomic_dec_and_test(&kc->nr_jobs))
341                 wake_up(&kc->destroyq);
342
343         return 0;
344 }
345
346 static void complete_io(unsigned long error, void *context)
347 {
348         struct kcopyd_job *job = (struct kcopyd_job *) context;
349         struct dm_kcopyd_client *kc = job->kc;
350
351         if (error) {
352                 if (job->rw == WRITE)
353                         job->write_err |= error;
354                 else
355                         job->read_err = 1;
356
357                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
358                         push(&kc->complete_jobs, job);
359                         wake(kc);
360                         return;
361                 }
362         }
363
364         if (job->rw == WRITE)
365                 push(&kc->complete_jobs, job);
366
367         else {
368                 job->rw = WRITE;
369                 push(&kc->io_jobs, job);
370         }
371
372         wake(kc);
373 }
374
375 /*
376  * Request io on as many buffer heads as we can currently get for
377  * a particular job.
378  */
379 static int run_io_job(struct kcopyd_job *job)
380 {
381         int r;
382         struct dm_io_request io_req = {
383                 .bi_rw = job->rw,
384                 .mem.type = DM_IO_PAGE_LIST,
385                 .mem.ptr.pl = job->pages,
386                 .mem.offset = 0,
387                 .notify.fn = complete_io,
388                 .notify.context = job,
389                 .client = job->kc->io_client,
390         };
391
392         if (job->rw == READ)
393                 r = dm_io(&io_req, 1, &job->source, NULL);
394         else
395                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
396
397         return r;
398 }
399
400 static int run_pages_job(struct kcopyd_job *job)
401 {
402         int r;
403         unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
404
405         r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
406         if (!r) {
407                 /* this job is ready for io */
408                 push(&job->kc->io_jobs, job);
409                 return 0;
410         }
411
412         if (r == -ENOMEM)
413                 /* can't complete now */
414                 return 1;
415
416         return r;
417 }
418
419 /*
420  * Run through a list for as long as possible.  Returns the count
421  * of successful jobs.
422  */
423 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
424                         int (*fn) (struct kcopyd_job *))
425 {
426         struct kcopyd_job *job;
427         int r, count = 0;
428
429         while ((job = pop(jobs, kc))) {
430
431                 r = fn(job);
432
433                 if (r < 0) {
434                         /* error this rogue job */
435                         if (job->rw == WRITE)
436                                 job->write_err = (unsigned long) -1L;
437                         else
438                                 job->read_err = 1;
439                         push(&kc->complete_jobs, job);
440                         break;
441                 }
442
443                 if (r > 0) {
444                         /*
445                          * We couldn't service this job ATM, so
446                          * push this job back onto the list.
447                          */
448                         push_head(jobs, job);
449                         break;
450                 }
451
452                 count++;
453         }
454
455         return count;
456 }
457
458 /*
459  * kcopyd does this every time it's woken up.
460  */
461 static void do_work(struct work_struct *work)
462 {
463         struct dm_kcopyd_client *kc = container_of(work,
464                                         struct dm_kcopyd_client, kcopyd_work);
465         struct blk_plug plug;
466
467         /*
468          * The order that these are called is *very* important.
469          * complete jobs can free some pages for pages jobs.
470          * Pages jobs when successful will jump onto the io jobs
471          * list.  io jobs call wake when they complete and it all
472          * starts again.
473          */
474         blk_start_plug(&plug);
475         process_jobs(&kc->complete_jobs, kc, run_complete_job);
476         process_jobs(&kc->pages_jobs, kc, run_pages_job);
477         process_jobs(&kc->io_jobs, kc, run_io_job);
478         blk_finish_plug(&plug);
479 }
480
481 /*
482  * If we are copying a small region we just dispatch a single job
483  * to do the copy, otherwise the io has to be split up into many
484  * jobs.
485  */
486 static void dispatch_job(struct kcopyd_job *job)
487 {
488         struct dm_kcopyd_client *kc = job->kc;
489         atomic_inc(&kc->nr_jobs);
490         if (unlikely(!job->source.count))
491                 push(&kc->complete_jobs, job);
492         else if (job->pages == &zero_page_list)
493                 push(&kc->io_jobs, job);
494         else
495                 push(&kc->pages_jobs, job);
496         wake(kc);
497 }
498
499 static void segment_complete(int read_err, unsigned long write_err,
500                              void *context)
501 {
502         /* FIXME: tidy this function */
503         sector_t progress = 0;
504         sector_t count = 0;
505         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
506         struct kcopyd_job *job = sub_job->master_job;
507         struct dm_kcopyd_client *kc = job->kc;
508
509         mutex_lock(&job->lock);
510
511         /* update the error */
512         if (read_err)
513                 job->read_err = 1;
514
515         if (write_err)
516                 job->write_err |= write_err;
517
518         /*
519          * Only dispatch more work if there hasn't been an error.
520          */
521         if ((!job->read_err && !job->write_err) ||
522             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
523                 /* get the next chunk of work */
524                 progress = job->progress;
525                 count = job->source.count - progress;
526                 if (count) {
527                         if (count > SUB_JOB_SIZE)
528                                 count = SUB_JOB_SIZE;
529
530                         job->progress += count;
531                 }
532         }
533         mutex_unlock(&job->lock);
534
535         if (count) {
536                 int i;
537
538                 *sub_job = *job;
539                 sub_job->source.sector += progress;
540                 sub_job->source.count = count;
541
542                 for (i = 0; i < job->num_dests; i++) {
543                         sub_job->dests[i].sector += progress;
544                         sub_job->dests[i].count = count;
545                 }
546
547                 sub_job->fn = segment_complete;
548                 sub_job->context = sub_job;
549                 dispatch_job(sub_job);
550
551         } else if (atomic_dec_and_test(&job->sub_jobs)) {
552
553                 /*
554                  * Queue the completion callback to the kcopyd thread.
555                  *
556                  * Some callers assume that all the completions are called
557                  * from a single thread and don't race with each other.
558                  *
559                  * We must not call the callback directly here because this
560                  * code may not be executing in the thread.
561                  */
562                 push(&kc->complete_jobs, job);
563                 wake(kc);
564         }
565 }
566
567 /*
568  * Create some sub jobs to share the work between them.
569  */
570 static void split_job(struct kcopyd_job *master_job)
571 {
572         int i;
573
574         atomic_inc(&master_job->kc->nr_jobs);
575
576         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
577         for (i = 0; i < SPLIT_COUNT; i++) {
578                 master_job[i + 1].master_job = master_job;
579                 segment_complete(0, 0u, &master_job[i + 1]);
580         }
581 }
582
583 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
584                    unsigned int num_dests, struct dm_io_region *dests,
585                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
586 {
587         struct kcopyd_job *job;
588
589         /*
590          * Allocate an array of jobs consisting of one master job
591          * followed by SPLIT_COUNT sub jobs.
592          */
593         job = mempool_alloc(kc->job_pool, GFP_NOIO);
594
595         /*
596          * set up for the read.
597          */
598         job->kc = kc;
599         job->flags = flags;
600         job->read_err = 0;
601         job->write_err = 0;
602
603         job->num_dests = num_dests;
604         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
605
606         if (from) {
607                 job->source = *from;
608                 job->pages = NULL;
609                 job->rw = READ;
610         } else {
611                 memset(&job->source, 0, sizeof job->source);
612                 job->source.count = job->dests[0].count;
613                 job->pages = &zero_page_list;
614                 job->rw = WRITE;
615         }
616
617         job->fn = fn;
618         job->context = context;
619         job->master_job = job;
620
621         if (job->source.count <= SUB_JOB_SIZE)
622                 dispatch_job(job);
623         else {
624                 mutex_init(&job->lock);
625                 job->progress = 0;
626                 split_job(job);
627         }
628
629         return 0;
630 }
631 EXPORT_SYMBOL(dm_kcopyd_copy);
632
633 int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
634                    unsigned num_dests, struct dm_io_region *dests,
635                    unsigned flags, dm_kcopyd_notify_fn fn, void *context)
636 {
637         return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
638 }
639 EXPORT_SYMBOL(dm_kcopyd_zero);
640
641 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
642                                  dm_kcopyd_notify_fn fn, void *context)
643 {
644         struct kcopyd_job *job;
645
646         job = mempool_alloc(kc->job_pool, GFP_NOIO);
647
648         memset(job, 0, sizeof(struct kcopyd_job));
649         job->kc = kc;
650         job->fn = fn;
651         job->context = context;
652         job->master_job = job;
653
654         atomic_inc(&kc->nr_jobs);
655
656         return job;
657 }
658 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
659
660 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
661 {
662         struct kcopyd_job *job = j;
663         struct dm_kcopyd_client *kc = job->kc;
664
665         job->read_err = read_err;
666         job->write_err = write_err;
667
668         push(&kc->complete_jobs, job);
669         wake(kc);
670 }
671 EXPORT_SYMBOL(dm_kcopyd_do_callback);
672
673 /*
674  * Cancels a kcopyd job, eg. someone might be deactivating a
675  * mirror.
676  */
677 #if 0
678 int kcopyd_cancel(struct kcopyd_job *job, int block)
679 {
680         /* FIXME: finish */
681         return -1;
682 }
683 #endif  /*  0  */
684
685 /*-----------------------------------------------------------------
686  * Client setup
687  *---------------------------------------------------------------*/
688 struct dm_kcopyd_client *dm_kcopyd_client_create(void)
689 {
690         int r = -ENOMEM;
691         struct dm_kcopyd_client *kc;
692
693         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
694         if (!kc)
695                 return ERR_PTR(-ENOMEM);
696
697         spin_lock_init(&kc->job_lock);
698         INIT_LIST_HEAD(&kc->complete_jobs);
699         INIT_LIST_HEAD(&kc->io_jobs);
700         INIT_LIST_HEAD(&kc->pages_jobs);
701
702         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
703         if (!kc->job_pool)
704                 goto bad_slab;
705
706         INIT_WORK(&kc->kcopyd_work, do_work);
707         kc->kcopyd_wq = alloc_workqueue("kcopyd",
708                                         WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
709         if (!kc->kcopyd_wq)
710                 goto bad_workqueue;
711
712         kc->pages = NULL;
713         kc->nr_reserved_pages = kc->nr_free_pages = 0;
714         r = client_reserve_pages(kc, RESERVE_PAGES);
715         if (r)
716                 goto bad_client_pages;
717
718         kc->io_client = dm_io_client_create();
719         if (IS_ERR(kc->io_client)) {
720                 r = PTR_ERR(kc->io_client);
721                 goto bad_io_client;
722         }
723
724         init_waitqueue_head(&kc->destroyq);
725         atomic_set(&kc->nr_jobs, 0);
726
727         return kc;
728
729 bad_io_client:
730         client_free_pages(kc);
731 bad_client_pages:
732         destroy_workqueue(kc->kcopyd_wq);
733 bad_workqueue:
734         mempool_destroy(kc->job_pool);
735 bad_slab:
736         kfree(kc);
737
738         return ERR_PTR(r);
739 }
740 EXPORT_SYMBOL(dm_kcopyd_client_create);
741
742 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
743 {
744         /* Wait for completion of all jobs submitted by this client. */
745         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
746
747         BUG_ON(!list_empty(&kc->complete_jobs));
748         BUG_ON(!list_empty(&kc->io_jobs));
749         BUG_ON(!list_empty(&kc->pages_jobs));
750         destroy_workqueue(kc->kcopyd_wq);
751         dm_io_client_destroy(kc->io_client);
752         client_free_pages(kc);
753         mempool_destroy(kc->job_pool);
754         kfree(kc);
755 }
756 EXPORT_SYMBOL(dm_kcopyd_client_destroy);