dm kcopyd: preallocate sub jobs to avoid deadlock
[pandora-kernel.git] / drivers / md / dm-kcopyd.c
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11
12 #include <linux/types.h>
13 #include <asm/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/device-mapper.h>
26 #include <linux/dm-kcopyd.h>
27
28 #include "dm.h"
29
30 #define SUB_JOB_SIZE    128
31 #define SPLIT_COUNT     8
32 #define MIN_JOBS        8
33
34 /*-----------------------------------------------------------------
35  * Each kcopyd client has its own little pool of preallocated
36  * pages for kcopyd io.
37  *---------------------------------------------------------------*/
38 struct dm_kcopyd_client {
39         spinlock_t lock;
40         struct page_list *pages;
41         unsigned int nr_pages;
42         unsigned int nr_free_pages;
43
44         struct dm_io_client *io_client;
45
46         wait_queue_head_t destroyq;
47         atomic_t nr_jobs;
48
49         mempool_t *job_pool;
50
51         struct workqueue_struct *kcopyd_wq;
52         struct work_struct kcopyd_work;
53
54 /*
55  * We maintain three lists of jobs:
56  *
57  * i)   jobs waiting for pages
58  * ii)  jobs that have pages, and are waiting for the io to be issued.
59  * iii) jobs that have completed.
60  *
61  * All three of these are protected by job_lock.
62  */
63         spinlock_t job_lock;
64         struct list_head complete_jobs;
65         struct list_head io_jobs;
66         struct list_head pages_jobs;
67 };
68
69 static void wake(struct dm_kcopyd_client *kc)
70 {
71         queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
72 }
73
74 static struct page_list *alloc_pl(void)
75 {
76         struct page_list *pl;
77
78         pl = kmalloc(sizeof(*pl), GFP_KERNEL);
79         if (!pl)
80                 return NULL;
81
82         pl->page = alloc_page(GFP_KERNEL);
83         if (!pl->page) {
84                 kfree(pl);
85                 return NULL;
86         }
87
88         return pl;
89 }
90
91 static void free_pl(struct page_list *pl)
92 {
93         __free_page(pl->page);
94         kfree(pl);
95 }
96
97 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
98                             unsigned int nr, struct page_list **pages)
99 {
100         struct page_list *pl;
101
102         spin_lock(&kc->lock);
103         if (kc->nr_free_pages < nr) {
104                 spin_unlock(&kc->lock);
105                 return -ENOMEM;
106         }
107
108         kc->nr_free_pages -= nr;
109         for (*pages = pl = kc->pages; --nr; pl = pl->next)
110                 ;
111
112         kc->pages = pl->next;
113         pl->next = NULL;
114
115         spin_unlock(&kc->lock);
116
117         return 0;
118 }
119
120 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
121 {
122         struct page_list *cursor;
123
124         spin_lock(&kc->lock);
125         for (cursor = pl; cursor->next; cursor = cursor->next)
126                 kc->nr_free_pages++;
127
128         kc->nr_free_pages++;
129         cursor->next = kc->pages;
130         kc->pages = pl;
131         spin_unlock(&kc->lock);
132 }
133
134 /*
135  * These three functions resize the page pool.
136  */
137 static void drop_pages(struct page_list *pl)
138 {
139         struct page_list *next;
140
141         while (pl) {
142                 next = pl->next;
143                 free_pl(pl);
144                 pl = next;
145         }
146 }
147
148 static int client_alloc_pages(struct dm_kcopyd_client *kc, unsigned int nr)
149 {
150         unsigned int i;
151         struct page_list *pl = NULL, *next;
152
153         for (i = 0; i < nr; i++) {
154                 next = alloc_pl();
155                 if (!next) {
156                         if (pl)
157                                 drop_pages(pl);
158                         return -ENOMEM;
159                 }
160                 next->next = pl;
161                 pl = next;
162         }
163
164         kcopyd_put_pages(kc, pl);
165         kc->nr_pages += nr;
166         return 0;
167 }
168
169 static void client_free_pages(struct dm_kcopyd_client *kc)
170 {
171         BUG_ON(kc->nr_free_pages != kc->nr_pages);
172         drop_pages(kc->pages);
173         kc->pages = NULL;
174         kc->nr_free_pages = kc->nr_pages = 0;
175 }
176
177 /*-----------------------------------------------------------------
178  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
179  * for this reason we use a mempool to prevent the client from
180  * ever having to do io (which could cause a deadlock).
181  *---------------------------------------------------------------*/
182 struct kcopyd_job {
183         struct dm_kcopyd_client *kc;
184         struct list_head list;
185         unsigned long flags;
186
187         /*
188          * Error state of the job.
189          */
190         int read_err;
191         unsigned long write_err;
192
193         /*
194          * Either READ or WRITE
195          */
196         int rw;
197         struct dm_io_region source;
198
199         /*
200          * The destinations for the transfer.
201          */
202         unsigned int num_dests;
203         struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
204
205         sector_t offset;
206         unsigned int nr_pages;
207         struct page_list *pages;
208
209         /*
210          * Set this to ensure you are notified when the job has
211          * completed.  'context' is for callback to use.
212          */
213         dm_kcopyd_notify_fn fn;
214         void *context;
215
216         /*
217          * These fields are only used if the job has been split
218          * into more manageable parts.
219          */
220         struct mutex lock;
221         atomic_t sub_jobs;
222         sector_t progress;
223
224         struct kcopyd_job *master_job;
225 };
226
227 static struct kmem_cache *_job_cache;
228
229 int __init dm_kcopyd_init(void)
230 {
231         _job_cache = kmem_cache_create("kcopyd_job",
232                                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
233                                 __alignof__(struct kcopyd_job), 0, NULL);
234         if (!_job_cache)
235                 return -ENOMEM;
236
237         return 0;
238 }
239
240 void dm_kcopyd_exit(void)
241 {
242         kmem_cache_destroy(_job_cache);
243         _job_cache = NULL;
244 }
245
246 /*
247  * Functions to push and pop a job onto the head of a given job
248  * list.
249  */
250 static struct kcopyd_job *pop(struct list_head *jobs,
251                               struct dm_kcopyd_client *kc)
252 {
253         struct kcopyd_job *job = NULL;
254         unsigned long flags;
255
256         spin_lock_irqsave(&kc->job_lock, flags);
257
258         if (!list_empty(jobs)) {
259                 job = list_entry(jobs->next, struct kcopyd_job, list);
260                 list_del(&job->list);
261         }
262         spin_unlock_irqrestore(&kc->job_lock, flags);
263
264         return job;
265 }
266
267 static void push(struct list_head *jobs, struct kcopyd_job *job)
268 {
269         unsigned long flags;
270         struct dm_kcopyd_client *kc = job->kc;
271
272         spin_lock_irqsave(&kc->job_lock, flags);
273         list_add_tail(&job->list, jobs);
274         spin_unlock_irqrestore(&kc->job_lock, flags);
275 }
276
277
278 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
279 {
280         unsigned long flags;
281         struct dm_kcopyd_client *kc = job->kc;
282
283         spin_lock_irqsave(&kc->job_lock, flags);
284         list_add(&job->list, jobs);
285         spin_unlock_irqrestore(&kc->job_lock, flags);
286 }
287
288 /*
289  * These three functions process 1 item from the corresponding
290  * job list.
291  *
292  * They return:
293  * < 0: error
294  *   0: success
295  * > 0: can't process yet.
296  */
297 static int run_complete_job(struct kcopyd_job *job)
298 {
299         void *context = job->context;
300         int read_err = job->read_err;
301         unsigned long write_err = job->write_err;
302         dm_kcopyd_notify_fn fn = job->fn;
303         struct dm_kcopyd_client *kc = job->kc;
304
305         if (job->pages)
306                 kcopyd_put_pages(kc, job->pages);
307         /*
308          * If this is the master job, the sub jobs have already
309          * completed so we can free everything.
310          */
311         if (job->master_job == job)
312                 mempool_free(job, kc->job_pool);
313         fn(read_err, write_err, context);
314
315         if (atomic_dec_and_test(&kc->nr_jobs))
316                 wake_up(&kc->destroyq);
317
318         return 0;
319 }
320
321 static void complete_io(unsigned long error, void *context)
322 {
323         struct kcopyd_job *job = (struct kcopyd_job *) context;
324         struct dm_kcopyd_client *kc = job->kc;
325
326         if (error) {
327                 if (job->rw == WRITE)
328                         job->write_err |= error;
329                 else
330                         job->read_err = 1;
331
332                 if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
333                         push(&kc->complete_jobs, job);
334                         wake(kc);
335                         return;
336                 }
337         }
338
339         if (job->rw == WRITE)
340                 push(&kc->complete_jobs, job);
341
342         else {
343                 job->rw = WRITE;
344                 push(&kc->io_jobs, job);
345         }
346
347         wake(kc);
348 }
349
350 /*
351  * Request io on as many buffer heads as we can currently get for
352  * a particular job.
353  */
354 static int run_io_job(struct kcopyd_job *job)
355 {
356         int r;
357         struct dm_io_request io_req = {
358                 .bi_rw = job->rw,
359                 .mem.type = DM_IO_PAGE_LIST,
360                 .mem.ptr.pl = job->pages,
361                 .mem.offset = job->offset,
362                 .notify.fn = complete_io,
363                 .notify.context = job,
364                 .client = job->kc->io_client,
365         };
366
367         if (job->rw == READ)
368                 r = dm_io(&io_req, 1, &job->source, NULL);
369         else
370                 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
371
372         return r;
373 }
374
375 static int run_pages_job(struct kcopyd_job *job)
376 {
377         int r;
378
379         job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
380                                   PAGE_SIZE >> 9);
381         r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
382         if (!r) {
383                 /* this job is ready for io */
384                 push(&job->kc->io_jobs, job);
385                 return 0;
386         }
387
388         if (r == -ENOMEM)
389                 /* can't complete now */
390                 return 1;
391
392         return r;
393 }
394
395 /*
396  * Run through a list for as long as possible.  Returns the count
397  * of successful jobs.
398  */
399 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
400                         int (*fn) (struct kcopyd_job *))
401 {
402         struct kcopyd_job *job;
403         int r, count = 0;
404
405         while ((job = pop(jobs, kc))) {
406
407                 r = fn(job);
408
409                 if (r < 0) {
410                         /* error this rogue job */
411                         if (job->rw == WRITE)
412                                 job->write_err = (unsigned long) -1L;
413                         else
414                                 job->read_err = 1;
415                         push(&kc->complete_jobs, job);
416                         break;
417                 }
418
419                 if (r > 0) {
420                         /*
421                          * We couldn't service this job ATM, so
422                          * push this job back onto the list.
423                          */
424                         push_head(jobs, job);
425                         break;
426                 }
427
428                 count++;
429         }
430
431         return count;
432 }
433
434 /*
435  * kcopyd does this every time it's woken up.
436  */
437 static void do_work(struct work_struct *work)
438 {
439         struct dm_kcopyd_client *kc = container_of(work,
440                                         struct dm_kcopyd_client, kcopyd_work);
441         struct blk_plug plug;
442
443         /*
444          * The order that these are called is *very* important.
445          * complete jobs can free some pages for pages jobs.
446          * Pages jobs when successful will jump onto the io jobs
447          * list.  io jobs call wake when they complete and it all
448          * starts again.
449          */
450         blk_start_plug(&plug);
451         process_jobs(&kc->complete_jobs, kc, run_complete_job);
452         process_jobs(&kc->pages_jobs, kc, run_pages_job);
453         process_jobs(&kc->io_jobs, kc, run_io_job);
454         blk_finish_plug(&plug);
455 }
456
457 /*
458  * If we are copying a small region we just dispatch a single job
459  * to do the copy, otherwise the io has to be split up into many
460  * jobs.
461  */
462 static void dispatch_job(struct kcopyd_job *job)
463 {
464         struct dm_kcopyd_client *kc = job->kc;
465         atomic_inc(&kc->nr_jobs);
466         if (unlikely(!job->source.count))
467                 push(&kc->complete_jobs, job);
468         else
469                 push(&kc->pages_jobs, job);
470         wake(kc);
471 }
472
473 static void segment_complete(int read_err, unsigned long write_err,
474                              void *context)
475 {
476         /* FIXME: tidy this function */
477         sector_t progress = 0;
478         sector_t count = 0;
479         struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
480         struct kcopyd_job *job = sub_job->master_job;
481         struct dm_kcopyd_client *kc = job->kc;
482
483         mutex_lock(&job->lock);
484
485         /* update the error */
486         if (read_err)
487                 job->read_err = 1;
488
489         if (write_err)
490                 job->write_err |= write_err;
491
492         /*
493          * Only dispatch more work if there hasn't been an error.
494          */
495         if ((!job->read_err && !job->write_err) ||
496             test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
497                 /* get the next chunk of work */
498                 progress = job->progress;
499                 count = job->source.count - progress;
500                 if (count) {
501                         if (count > SUB_JOB_SIZE)
502                                 count = SUB_JOB_SIZE;
503
504                         job->progress += count;
505                 }
506         }
507         mutex_unlock(&job->lock);
508
509         if (count) {
510                 int i;
511
512                 *sub_job = *job;
513                 sub_job->source.sector += progress;
514                 sub_job->source.count = count;
515
516                 for (i = 0; i < job->num_dests; i++) {
517                         sub_job->dests[i].sector += progress;
518                         sub_job->dests[i].count = count;
519                 }
520
521                 sub_job->fn = segment_complete;
522                 sub_job->context = sub_job;
523                 dispatch_job(sub_job);
524
525         } else if (atomic_dec_and_test(&job->sub_jobs)) {
526
527                 /*
528                  * Queue the completion callback to the kcopyd thread.
529                  *
530                  * Some callers assume that all the completions are called
531                  * from a single thread and don't race with each other.
532                  *
533                  * We must not call the callback directly here because this
534                  * code may not be executing in the thread.
535                  */
536                 push(&kc->complete_jobs, job);
537                 wake(kc);
538         }
539 }
540
541 /*
542  * Create some sub jobs to share the work between them.
543  */
544 static void split_job(struct kcopyd_job *master_job)
545 {
546         int i;
547
548         atomic_inc(&master_job->kc->nr_jobs);
549
550         atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
551         for (i = 0; i < SPLIT_COUNT; i++) {
552                 master_job[i + 1].master_job = master_job;
553                 segment_complete(0, 0u, &master_job[i + 1]);
554         }
555 }
556
557 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
558                    unsigned int num_dests, struct dm_io_region *dests,
559                    unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
560 {
561         struct kcopyd_job *job;
562
563         /*
564          * Allocate an array of jobs consisting of one master job
565          * followed by SPLIT_COUNT sub jobs.
566          */
567         job = mempool_alloc(kc->job_pool, GFP_NOIO);
568
569         /*
570          * set up for the read.
571          */
572         job->kc = kc;
573         job->flags = flags;
574         job->read_err = 0;
575         job->write_err = 0;
576         job->rw = READ;
577
578         job->source = *from;
579
580         job->num_dests = num_dests;
581         memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
582
583         job->offset = 0;
584         job->nr_pages = 0;
585         job->pages = NULL;
586
587         job->fn = fn;
588         job->context = context;
589         job->master_job = job;
590
591         if (job->source.count <= SUB_JOB_SIZE)
592                 dispatch_job(job);
593         else {
594                 mutex_init(&job->lock);
595                 job->progress = 0;
596                 split_job(job);
597         }
598
599         return 0;
600 }
601 EXPORT_SYMBOL(dm_kcopyd_copy);
602
603 /*
604  * Cancels a kcopyd job, eg. someone might be deactivating a
605  * mirror.
606  */
607 #if 0
608 int kcopyd_cancel(struct kcopyd_job *job, int block)
609 {
610         /* FIXME: finish */
611         return -1;
612 }
613 #endif  /*  0  */
614
615 /*-----------------------------------------------------------------
616  * Client setup
617  *---------------------------------------------------------------*/
618 int dm_kcopyd_client_create(unsigned int nr_pages,
619                             struct dm_kcopyd_client **result)
620 {
621         int r = -ENOMEM;
622         struct dm_kcopyd_client *kc;
623
624         kc = kmalloc(sizeof(*kc), GFP_KERNEL);
625         if (!kc)
626                 return -ENOMEM;
627
628         spin_lock_init(&kc->lock);
629         spin_lock_init(&kc->job_lock);
630         INIT_LIST_HEAD(&kc->complete_jobs);
631         INIT_LIST_HEAD(&kc->io_jobs);
632         INIT_LIST_HEAD(&kc->pages_jobs);
633
634         kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
635         if (!kc->job_pool)
636                 goto bad_slab;
637
638         INIT_WORK(&kc->kcopyd_work, do_work);
639         kc->kcopyd_wq = alloc_workqueue("kcopyd",
640                                         WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0);
641         if (!kc->kcopyd_wq)
642                 goto bad_workqueue;
643
644         kc->pages = NULL;
645         kc->nr_pages = kc->nr_free_pages = 0;
646         r = client_alloc_pages(kc, nr_pages);
647         if (r)
648                 goto bad_client_pages;
649
650         kc->io_client = dm_io_client_create(nr_pages);
651         if (IS_ERR(kc->io_client)) {
652                 r = PTR_ERR(kc->io_client);
653                 goto bad_io_client;
654         }
655
656         init_waitqueue_head(&kc->destroyq);
657         atomic_set(&kc->nr_jobs, 0);
658
659         *result = kc;
660         return 0;
661
662 bad_io_client:
663         client_free_pages(kc);
664 bad_client_pages:
665         destroy_workqueue(kc->kcopyd_wq);
666 bad_workqueue:
667         mempool_destroy(kc->job_pool);
668 bad_slab:
669         kfree(kc);
670
671         return r;
672 }
673 EXPORT_SYMBOL(dm_kcopyd_client_create);
674
675 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
676 {
677         /* Wait for completion of all jobs submitted by this client. */
678         wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
679
680         BUG_ON(!list_empty(&kc->complete_jobs));
681         BUG_ON(!list_empty(&kc->io_jobs));
682         BUG_ON(!list_empty(&kc->pages_jobs));
683         destroy_workqueue(kc->kcopyd_wq);
684         dm_io_client_destroy(kc->io_client);
685         client_free_pages(kc);
686         mempool_destroy(kc->job_pool);
687         kfree(kc);
688 }
689 EXPORT_SYMBOL(dm_kcopyd_client_destroy);