Merge git://git.kernel.org/pub/scm/linux/kernel/git/mason/btrfs-unstable
[pandora-kernel.git] / fs / btrfs / async-thread.c
1 /*
2  * Copyright (C) 2007 Oracle.  All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public
6  * License v2 as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11  * General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public
14  * License along with this program; if not, write to the
15  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16  * Boston, MA 021110-1307, USA.
17  */
18
19 #include <linux/kthread.h>
20 #include <linux/list.h>
21 #include <linux/spinlock.h>
22 #include <linux/freezer.h>
23 #include "async-thread.h"
24
25 #define WORK_QUEUED_BIT 0
26 #define WORK_DONE_BIT 1
27 #define WORK_ORDER_DONE_BIT 2
28 #define WORK_HIGH_PRIO_BIT 3
29
30 /*
31  * container for the kthread task pointer and the list of pending work
32  * One of these is allocated per thread.
33  */
34 struct btrfs_worker_thread {
35         /* pool we belong to */
36         struct btrfs_workers *workers;
37
38         /* list of struct btrfs_work that are waiting for service */
39         struct list_head pending;
40         struct list_head prio_pending;
41
42         /* list of worker threads from struct btrfs_workers */
43         struct list_head worker_list;
44
45         /* kthread */
46         struct task_struct *task;
47
48         /* number of things on the pending list */
49         atomic_t num_pending;
50
51         /* reference counter for this struct */
52         atomic_t refs;
53
54         unsigned long sequence;
55
56         /* protects the pending list. */
57         spinlock_t lock;
58
59         /* set to non-zero when this thread is already awake and kicking */
60         int working;
61
62         /* are we currently idle */
63         int idle;
64 };
65
66 /*
67  * btrfs_start_workers uses kthread_run, which can block waiting for memory
68  * for a very long time.  It will actually throttle on page writeback,
69  * and so it may not make progress until after our btrfs worker threads
70  * process all of the pending work structs in their queue
71  *
72  * This means we can't use btrfs_start_workers from inside a btrfs worker
73  * thread that is used as part of cleaning dirty memory, which pretty much
74  * involves all of the worker threads.
75  *
76  * Instead we have a helper queue who never has more than one thread
77  * where we scheduler thread start operations.  This worker_start struct
78  * is used to contain the work and hold a pointer to the queue that needs
79  * another worker.
80  */
81 struct worker_start {
82         struct btrfs_work work;
83         struct btrfs_workers *queue;
84 };
85
86 static void start_new_worker_func(struct btrfs_work *work)
87 {
88         struct worker_start *start;
89         start = container_of(work, struct worker_start, work);
90         btrfs_start_workers(start->queue, 1);
91         kfree(start);
92 }
93
94 static int start_new_worker(struct btrfs_workers *queue)
95 {
96         struct worker_start *start;
97         int ret;
98
99         start = kzalloc(sizeof(*start), GFP_NOFS);
100         if (!start)
101                 return -ENOMEM;
102
103         start->work.func = start_new_worker_func;
104         start->queue = queue;
105         ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
106         if (ret)
107                 kfree(start);
108         return ret;
109 }
110
111 /*
112  * helper function to move a thread onto the idle list after it
113  * has finished some requests.
114  */
115 static void check_idle_worker(struct btrfs_worker_thread *worker)
116 {
117         if (!worker->idle && atomic_read(&worker->num_pending) <
118             worker->workers->idle_thresh / 2) {
119                 unsigned long flags;
120                 spin_lock_irqsave(&worker->workers->lock, flags);
121                 worker->idle = 1;
122
123                 /* the list may be empty if the worker is just starting */
124                 if (!list_empty(&worker->worker_list)) {
125                         list_move(&worker->worker_list,
126                                  &worker->workers->idle_list);
127                 }
128                 spin_unlock_irqrestore(&worker->workers->lock, flags);
129         }
130 }
131
132 /*
133  * helper function to move a thread off the idle list after new
134  * pending work is added.
135  */
136 static void check_busy_worker(struct btrfs_worker_thread *worker)
137 {
138         if (worker->idle && atomic_read(&worker->num_pending) >=
139             worker->workers->idle_thresh) {
140                 unsigned long flags;
141                 spin_lock_irqsave(&worker->workers->lock, flags);
142                 worker->idle = 0;
143
144                 if (!list_empty(&worker->worker_list)) {
145                         list_move_tail(&worker->worker_list,
146                                       &worker->workers->worker_list);
147                 }
148                 spin_unlock_irqrestore(&worker->workers->lock, flags);
149         }
150 }
151
152 static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
153 {
154         struct btrfs_workers *workers = worker->workers;
155         unsigned long flags;
156
157         rmb();
158         if (!workers->atomic_start_pending)
159                 return;
160
161         spin_lock_irqsave(&workers->lock, flags);
162         if (!workers->atomic_start_pending)
163                 goto out;
164
165         workers->atomic_start_pending = 0;
166         if (workers->num_workers + workers->num_workers_starting >=
167             workers->max_workers)
168                 goto out;
169
170         workers->num_workers_starting += 1;
171         spin_unlock_irqrestore(&workers->lock, flags);
172         start_new_worker(workers);
173         return;
174
175 out:
176         spin_unlock_irqrestore(&workers->lock, flags);
177 }
178
179 static noinline int run_ordered_completions(struct btrfs_workers *workers,
180                                             struct btrfs_work *work)
181 {
182         if (!workers->ordered)
183                 return 0;
184
185         set_bit(WORK_DONE_BIT, &work->flags);
186
187         spin_lock(&workers->order_lock);
188
189         while (1) {
190                 if (!list_empty(&workers->prio_order_list)) {
191                         work = list_entry(workers->prio_order_list.next,
192                                           struct btrfs_work, order_list);
193                 } else if (!list_empty(&workers->order_list)) {
194                         work = list_entry(workers->order_list.next,
195                                           struct btrfs_work, order_list);
196                 } else {
197                         break;
198                 }
199                 if (!test_bit(WORK_DONE_BIT, &work->flags))
200                         break;
201
202                 /* we are going to call the ordered done function, but
203                  * we leave the work item on the list as a barrier so
204                  * that later work items that are done don't have their
205                  * functions called before this one returns
206                  */
207                 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
208                         break;
209
210                 spin_unlock(&workers->order_lock);
211
212                 work->ordered_func(work);
213
214                 /* now take the lock again and call the freeing code */
215                 spin_lock(&workers->order_lock);
216                 list_del(&work->order_list);
217                 work->ordered_free(work);
218         }
219
220         spin_unlock(&workers->order_lock);
221         return 0;
222 }
223
224 static void put_worker(struct btrfs_worker_thread *worker)
225 {
226         if (atomic_dec_and_test(&worker->refs))
227                 kfree(worker);
228 }
229
230 static int try_worker_shutdown(struct btrfs_worker_thread *worker)
231 {
232         int freeit = 0;
233
234         spin_lock_irq(&worker->lock);
235         spin_lock(&worker->workers->lock);
236         if (worker->workers->num_workers > 1 &&
237             worker->idle &&
238             !worker->working &&
239             !list_empty(&worker->worker_list) &&
240             list_empty(&worker->prio_pending) &&
241             list_empty(&worker->pending) &&
242             atomic_read(&worker->num_pending) == 0) {
243                 freeit = 1;
244                 list_del_init(&worker->worker_list);
245                 worker->workers->num_workers--;
246         }
247         spin_unlock(&worker->workers->lock);
248         spin_unlock_irq(&worker->lock);
249
250         if (freeit)
251                 put_worker(worker);
252         return freeit;
253 }
254
255 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
256                                         struct list_head *prio_head,
257                                         struct list_head *head)
258 {
259         struct btrfs_work *work = NULL;
260         struct list_head *cur = NULL;
261
262         if(!list_empty(prio_head))
263                 cur = prio_head->next;
264
265         smp_mb();
266         if (!list_empty(&worker->prio_pending))
267                 goto refill;
268
269         if (!list_empty(head))
270                 cur = head->next;
271
272         if (cur)
273                 goto out;
274
275 refill:
276         spin_lock_irq(&worker->lock);
277         list_splice_tail_init(&worker->prio_pending, prio_head);
278         list_splice_tail_init(&worker->pending, head);
279
280         if (!list_empty(prio_head))
281                 cur = prio_head->next;
282         else if (!list_empty(head))
283                 cur = head->next;
284         spin_unlock_irq(&worker->lock);
285
286         if (!cur)
287                 goto out_fail;
288
289 out:
290         work = list_entry(cur, struct btrfs_work, list);
291
292 out_fail:
293         return work;
294 }
295
296 /*
297  * main loop for servicing work items
298  */
299 static int worker_loop(void *arg)
300 {
301         struct btrfs_worker_thread *worker = arg;
302         struct list_head head;
303         struct list_head prio_head;
304         struct btrfs_work *work;
305
306         INIT_LIST_HEAD(&head);
307         INIT_LIST_HEAD(&prio_head);
308
309         do {
310 again:
311                 while (1) {
312
313
314                         work = get_next_work(worker, &prio_head, &head);
315                         if (!work)
316                                 break;
317
318                         list_del(&work->list);
319                         clear_bit(WORK_QUEUED_BIT, &work->flags);
320
321                         work->worker = worker;
322
323                         work->func(work);
324
325                         atomic_dec(&worker->num_pending);
326                         /*
327                          * unless this is an ordered work queue,
328                          * 'work' was probably freed by func above.
329                          */
330                         run_ordered_completions(worker->workers, work);
331
332                         check_pending_worker_creates(worker);
333
334                 }
335
336                 spin_lock_irq(&worker->lock);
337                 check_idle_worker(worker);
338
339                 if (freezing(current)) {
340                         worker->working = 0;
341                         spin_unlock_irq(&worker->lock);
342                         refrigerator();
343                 } else {
344                         spin_unlock_irq(&worker->lock);
345                         if (!kthread_should_stop()) {
346                                 cpu_relax();
347                                 /*
348                                  * we've dropped the lock, did someone else
349                                  * jump_in?
350                                  */
351                                 smp_mb();
352                                 if (!list_empty(&worker->pending) ||
353                                     !list_empty(&worker->prio_pending))
354                                         continue;
355
356                                 /*
357                                  * this short schedule allows more work to
358                                  * come in without the queue functions
359                                  * needing to go through wake_up_process()
360                                  *
361                                  * worker->working is still 1, so nobody
362                                  * is going to try and wake us up
363                                  */
364                                 schedule_timeout(1);
365                                 smp_mb();
366                                 if (!list_empty(&worker->pending) ||
367                                     !list_empty(&worker->prio_pending))
368                                         continue;
369
370                                 if (kthread_should_stop())
371                                         break;
372
373                                 /* still no more work?, sleep for real */
374                                 spin_lock_irq(&worker->lock);
375                                 set_current_state(TASK_INTERRUPTIBLE);
376                                 if (!list_empty(&worker->pending) ||
377                                     !list_empty(&worker->prio_pending)) {
378                                         spin_unlock_irq(&worker->lock);
379                                         goto again;
380                                 }
381
382                                 /*
383                                  * this makes sure we get a wakeup when someone
384                                  * adds something new to the queue
385                                  */
386                                 worker->working = 0;
387                                 spin_unlock_irq(&worker->lock);
388
389                                 if (!kthread_should_stop()) {
390                                         schedule_timeout(HZ * 120);
391                                         if (!worker->working &&
392                                             try_worker_shutdown(worker)) {
393                                                 return 0;
394                                         }
395                                 }
396                         }
397                         __set_current_state(TASK_RUNNING);
398                 }
399         } while (!kthread_should_stop());
400         return 0;
401 }
402
403 /*
404  * this will wait for all the worker threads to shutdown
405  */
406 int btrfs_stop_workers(struct btrfs_workers *workers)
407 {
408         struct list_head *cur;
409         struct btrfs_worker_thread *worker;
410         int can_stop;
411
412         spin_lock_irq(&workers->lock);
413         list_splice_init(&workers->idle_list, &workers->worker_list);
414         while (!list_empty(&workers->worker_list)) {
415                 cur = workers->worker_list.next;
416                 worker = list_entry(cur, struct btrfs_worker_thread,
417                                     worker_list);
418
419                 atomic_inc(&worker->refs);
420                 workers->num_workers -= 1;
421                 if (!list_empty(&worker->worker_list)) {
422                         list_del_init(&worker->worker_list);
423                         put_worker(worker);
424                         can_stop = 1;
425                 } else
426                         can_stop = 0;
427                 spin_unlock_irq(&workers->lock);
428                 if (can_stop)
429                         kthread_stop(worker->task);
430                 spin_lock_irq(&workers->lock);
431                 put_worker(worker);
432         }
433         spin_unlock_irq(&workers->lock);
434         return 0;
435 }
436
437 /*
438  * simple init on struct btrfs_workers
439  */
440 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
441                         struct btrfs_workers *async_helper)
442 {
443         workers->num_workers = 0;
444         workers->num_workers_starting = 0;
445         INIT_LIST_HEAD(&workers->worker_list);
446         INIT_LIST_HEAD(&workers->idle_list);
447         INIT_LIST_HEAD(&workers->order_list);
448         INIT_LIST_HEAD(&workers->prio_order_list);
449         spin_lock_init(&workers->lock);
450         spin_lock_init(&workers->order_lock);
451         workers->max_workers = max;
452         workers->idle_thresh = 32;
453         workers->name = name;
454         workers->ordered = 0;
455         workers->atomic_start_pending = 0;
456         workers->atomic_worker_start = async_helper;
457 }
458
459 /*
460  * starts new worker threads.  This does not enforce the max worker
461  * count in case you need to temporarily go past it.
462  */
463 static int __btrfs_start_workers(struct btrfs_workers *workers,
464                                  int num_workers)
465 {
466         struct btrfs_worker_thread *worker;
467         int ret = 0;
468         int i;
469
470         for (i = 0; i < num_workers; i++) {
471                 worker = kzalloc(sizeof(*worker), GFP_NOFS);
472                 if (!worker) {
473                         ret = -ENOMEM;
474                         goto fail;
475                 }
476
477                 INIT_LIST_HEAD(&worker->pending);
478                 INIT_LIST_HEAD(&worker->prio_pending);
479                 INIT_LIST_HEAD(&worker->worker_list);
480                 spin_lock_init(&worker->lock);
481
482                 atomic_set(&worker->num_pending, 0);
483                 atomic_set(&worker->refs, 1);
484                 worker->workers = workers;
485                 worker->task = kthread_run(worker_loop, worker,
486                                            "btrfs-%s-%d", workers->name,
487                                            workers->num_workers + i);
488                 if (IS_ERR(worker->task)) {
489                         ret = PTR_ERR(worker->task);
490                         kfree(worker);
491                         goto fail;
492                 }
493                 spin_lock_irq(&workers->lock);
494                 list_add_tail(&worker->worker_list, &workers->idle_list);
495                 worker->idle = 1;
496                 workers->num_workers++;
497                 workers->num_workers_starting--;
498                 WARN_ON(workers->num_workers_starting < 0);
499                 spin_unlock_irq(&workers->lock);
500         }
501         return 0;
502 fail:
503         btrfs_stop_workers(workers);
504         return ret;
505 }
506
507 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
508 {
509         spin_lock_irq(&workers->lock);
510         workers->num_workers_starting += num_workers;
511         spin_unlock_irq(&workers->lock);
512         return __btrfs_start_workers(workers, num_workers);
513 }
514
515 /*
516  * run through the list and find a worker thread that doesn't have a lot
517  * to do right now.  This can return null if we aren't yet at the thread
518  * count limit and all of the threads are busy.
519  */
520 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
521 {
522         struct btrfs_worker_thread *worker;
523         struct list_head *next;
524         int enforce_min;
525
526         enforce_min = (workers->num_workers + workers->num_workers_starting) <
527                 workers->max_workers;
528
529         /*
530          * if we find an idle thread, don't move it to the end of the
531          * idle list.  This improves the chance that the next submission
532          * will reuse the same thread, and maybe catch it while it is still
533          * working
534          */
535         if (!list_empty(&workers->idle_list)) {
536                 next = workers->idle_list.next;
537                 worker = list_entry(next, struct btrfs_worker_thread,
538                                     worker_list);
539                 return worker;
540         }
541         if (enforce_min || list_empty(&workers->worker_list))
542                 return NULL;
543
544         /*
545          * if we pick a busy task, move the task to the end of the list.
546          * hopefully this will keep things somewhat evenly balanced.
547          * Do the move in batches based on the sequence number.  This groups
548          * requests submitted at roughly the same time onto the same worker.
549          */
550         next = workers->worker_list.next;
551         worker = list_entry(next, struct btrfs_worker_thread, worker_list);
552         worker->sequence++;
553
554         if (worker->sequence % workers->idle_thresh == 0)
555                 list_move_tail(next, &workers->worker_list);
556         return worker;
557 }
558
559 /*
560  * selects a worker thread to take the next job.  This will either find
561  * an idle worker, start a new worker up to the max count, or just return
562  * one of the existing busy workers.
563  */
564 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
565 {
566         struct btrfs_worker_thread *worker;
567         unsigned long flags;
568         struct list_head *fallback;
569
570 again:
571         spin_lock_irqsave(&workers->lock, flags);
572         worker = next_worker(workers);
573
574         if (!worker) {
575                 if (workers->num_workers + workers->num_workers_starting >=
576                     workers->max_workers) {
577                         goto fallback;
578                 } else if (workers->atomic_worker_start) {
579                         workers->atomic_start_pending = 1;
580                         goto fallback;
581                 } else {
582                         workers->num_workers_starting++;
583                         spin_unlock_irqrestore(&workers->lock, flags);
584                         /* we're below the limit, start another worker */
585                         __btrfs_start_workers(workers, 1);
586                         goto again;
587                 }
588         }
589         goto found;
590
591 fallback:
592         fallback = NULL;
593         /*
594          * we have failed to find any workers, just
595          * return the first one we can find.
596          */
597         if (!list_empty(&workers->worker_list))
598                 fallback = workers->worker_list.next;
599         if (!list_empty(&workers->idle_list))
600                 fallback = workers->idle_list.next;
601         BUG_ON(!fallback);
602         worker = list_entry(fallback,
603                   struct btrfs_worker_thread, worker_list);
604 found:
605         /*
606          * this makes sure the worker doesn't exit before it is placed
607          * onto a busy/idle list
608          */
609         atomic_inc(&worker->num_pending);
610         spin_unlock_irqrestore(&workers->lock, flags);
611         return worker;
612 }
613
614 /*
615  * btrfs_requeue_work just puts the work item back on the tail of the list
616  * it was taken from.  It is intended for use with long running work functions
617  * that make some progress and want to give the cpu up for others.
618  */
619 int btrfs_requeue_work(struct btrfs_work *work)
620 {
621         struct btrfs_worker_thread *worker = work->worker;
622         unsigned long flags;
623         int wake = 0;
624
625         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
626                 goto out;
627
628         spin_lock_irqsave(&worker->lock, flags);
629         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
630                 list_add_tail(&work->list, &worker->prio_pending);
631         else
632                 list_add_tail(&work->list, &worker->pending);
633         atomic_inc(&worker->num_pending);
634
635         /* by definition we're busy, take ourselves off the idle
636          * list
637          */
638         if (worker->idle) {
639                 spin_lock(&worker->workers->lock);
640                 worker->idle = 0;
641                 list_move_tail(&worker->worker_list,
642                               &worker->workers->worker_list);
643                 spin_unlock(&worker->workers->lock);
644         }
645         if (!worker->working) {
646                 wake = 1;
647                 worker->working = 1;
648         }
649
650         if (wake)
651                 wake_up_process(worker->task);
652         spin_unlock_irqrestore(&worker->lock, flags);
653 out:
654
655         return 0;
656 }
657
658 void btrfs_set_work_high_prio(struct btrfs_work *work)
659 {
660         set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
661 }
662
663 /*
664  * places a struct btrfs_work into the pending queue of one of the kthreads
665  */
666 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
667 {
668         struct btrfs_worker_thread *worker;
669         unsigned long flags;
670         int wake = 0;
671
672         /* don't requeue something already on a list */
673         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
674                 goto out;
675
676         worker = find_worker(workers);
677         if (workers->ordered) {
678                 /*
679                  * you're not allowed to do ordered queues from an
680                  * interrupt handler
681                  */
682                 spin_lock(&workers->order_lock);
683                 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
684                         list_add_tail(&work->order_list,
685                                       &workers->prio_order_list);
686                 } else {
687                         list_add_tail(&work->order_list, &workers->order_list);
688                 }
689                 spin_unlock(&workers->order_lock);
690         } else {
691                 INIT_LIST_HEAD(&work->order_list);
692         }
693
694         spin_lock_irqsave(&worker->lock, flags);
695
696         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
697                 list_add_tail(&work->list, &worker->prio_pending);
698         else
699                 list_add_tail(&work->list, &worker->pending);
700         check_busy_worker(worker);
701
702         /*
703          * avoid calling into wake_up_process if this thread has already
704          * been kicked
705          */
706         if (!worker->working)
707                 wake = 1;
708         worker->working = 1;
709
710         if (wake)
711                 wake_up_process(worker->task);
712         spin_unlock_irqrestore(&worker->lock, flags);
713
714 out:
715         return 0;
716 }