workqueue: introduce worker
[pandora-kernel.git] / kernel / workqueue.c
index 77dabbf..600db10 100644 (file)
 #include <linux/kallsyms.h>
 #include <linux/debug_locks.h>
 #include <linux/lockdep.h>
-#define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>
+#include <linux/idr.h>
+
+/*
+ * Structure fields follow one of the following exclusion rules.
+ *
+ * I: Set during initialization and read-only afterwards.
+ *
+ * L: cwq->lock protected.  Access with cwq->lock held.
+ *
+ * F: wq->flush_mutex protected.
+ *
+ * W: workqueue_lock protected.
+ */
+
+struct cpu_workqueue_struct;
+
+struct worker {
+       struct work_struct      *current_work;  /* L: work being processed */
+       struct task_struct      *task;          /* I: worker task */
+       struct cpu_workqueue_struct *cwq;       /* I: the associated cwq */
+       int                     id;             /* I: worker id */
+};
 
 /*
  * The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).
+ * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the number of flag bits.
  */
 struct cpu_workqueue_struct {
 
@@ -46,25 +68,45 @@ struct cpu_workqueue_struct {
 
        struct list_head worklist;
        wait_queue_head_t more_work;
-       struct work_struct *current_work;
+       unsigned int            cpu;
+       struct worker           *worker;
+
+       struct workqueue_struct *wq;            /* I: the owning workqueue */
+       int                     work_color;     /* L: current color */
+       int                     flush_color;    /* L: flushing color */
+       int                     nr_in_flight[WORK_NR_COLORS];
+                                               /* L: nr of in_flight works */
+};
 
-       struct workqueue_struct *wq;
-       struct task_struct *thread;
-} ____cacheline_aligned;
+/*
+ * Structure used to wait for workqueue flush.
+ */
+struct wq_flusher {
+       struct list_head        list;           /* F: list of flushers */
+       int                     flush_color;    /* F: flush color waiting for */
+       struct completion       done;           /* flush completion */
+};
 
 /*
  * The externally visible workqueue abstraction is an array of
  * per-CPU workqueues:
  */
 struct workqueue_struct {
-       struct cpu_workqueue_struct *cpu_wq;
-       struct list_head list;
-       const char *name;
-       int singlethread;
-       int freezeable;         /* Freeze threads during suspend */
-       int rt;
+       unsigned int            flags;          /* I: WQ_* flags */
+       struct cpu_workqueue_struct *cpu_wq;    /* I: cwq's */
+       struct list_head        list;           /* W: list of all workqueues */
+
+       struct mutex            flush_mutex;    /* protects wq flushing */
+       int                     work_color;     /* F: current work color */
+       int                     flush_color;    /* F: current flush color */
+       atomic_t                nr_cwqs_to_flush; /* flush in progress */
+       struct wq_flusher       *first_flusher; /* F: first flusher */
+       struct list_head        flusher_queue;  /* F: flush waiters */
+       struct list_head        flusher_overflow; /* F: flush overflow list */
+
+       const char              *name;          /* I: workqueue name */
 #ifdef CONFIG_LOCKDEP
-       struct lockdep_map lockdep_map;
+       struct lockdep_map      lockdep_map;
 #endif
 };
 
@@ -107,7 +149,7 @@ static int work_fixup_activate(void *addr, enum debug_obj_state state)
                 * statically initialized. We just make sure that it
                 * is tracked in the object tracker.
                 */
-               if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
+               if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
                        debug_object_init(work, &work_debug_descr);
                        debug_object_activate(work, &work_debug_descr);
                        return 0;
@@ -181,36 +223,40 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
 /* Serializes the accesses to the list of workqueues. */
 static DEFINE_SPINLOCK(workqueue_lock);
 static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);
 
 static int singlethread_cpu __read_mostly;
-static const struct cpumask *cpu_singlethread_map __read_mostly;
-/*
- * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
- * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
- * which comes in between can't use for_each_online_cpu(). We could
- * use cpu_possible_map, the cpumask below is more a documentation
- * than optimization.
- */
-static cpumask_var_t cpu_populated_map __read_mostly;
 
-/* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
+static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
+                                           struct workqueue_struct *wq)
 {
-       return wq->singlethread;
+       return per_cpu_ptr(wq->cpu_wq, cpu);
 }
 
-static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
+static struct cpu_workqueue_struct *target_cwq(unsigned int cpu,
+                                              struct workqueue_struct *wq)
 {
-       return is_wq_single_threaded(wq)
-               ? cpu_singlethread_map : cpu_populated_map;
+       if (unlikely(wq->flags & WQ_SINGLE_THREAD))
+               cpu = singlethread_cpu;
+       return get_cwq(cpu, wq);
 }
 
-static
-struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
+static unsigned int work_color_to_flags(int color)
 {
-       if (unlikely(is_wq_single_threaded(wq)))
-               cpu = singlethread_cpu;
-       return per_cpu_ptr(wq->cpu_wq, cpu);
+       return color << WORK_STRUCT_COLOR_SHIFT;
+}
+
+static int get_work_color(struct work_struct *work)
+{
+       return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
+               ((1 << WORK_STRUCT_COLOR_BITS) - 1);
+}
+
+static int work_next_color(int color)
+{
+       return (color + 1) % WORK_NR_COLORS;
 }
 
 /*
@@ -218,15 +264,13 @@ struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
  * - Must *only* be called if the pending flag is set
  */
 static inline void set_wq_data(struct work_struct *work,
-                               struct cpu_workqueue_struct *cwq)
+                              struct cpu_workqueue_struct *cwq,
+                              unsigned long extra_flags)
 {
-       unsigned long new;
-
        BUG_ON(!work_pending(work));
 
-       new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
-       new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
-       atomic_long_set(&work->data, new);
+       atomic_long_set(&work->data, (unsigned long)cwq | work_static(work) |
+                       WORK_STRUCT_PENDING | extra_flags);
 }
 
 /*
@@ -234,40 +278,56 @@ static inline void set_wq_data(struct work_struct *work,
  */
 static inline void clear_wq_data(struct work_struct *work)
 {
-       unsigned long flags = *work_data_bits(work) &
-                               (1UL << WORK_STRUCT_STATIC);
-       atomic_long_set(&work->data, flags);
+       atomic_long_set(&work->data, work_static(work));
 }
 
-static inline
-struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
+static inline struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
 {
-       return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
+       return (void *)(atomic_long_read(&work->data) &
+                       WORK_STRUCT_WQ_DATA_MASK);
 }
 
+/**
+ * insert_work - insert a work into cwq
+ * @cwq: cwq @work belongs to
+ * @work: work to insert
+ * @head: insertion point
+ * @extra_flags: extra WORK_STRUCT_* flags to set
+ *
+ * Insert @work into @cwq after @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
 static void insert_work(struct cpu_workqueue_struct *cwq,
-                       struct work_struct *work, struct list_head *head)
+                       struct work_struct *work, struct list_head *head,
+                       unsigned int extra_flags)
 {
-       trace_workqueue_insertion(cwq->thread, work);
+       /* we own @work, set data and link */
+       set_wq_data(work, cwq, extra_flags);
 
-       set_wq_data(work, cwq);
        /*
         * Ensure that we get the right work->data if we see the
         * result of list_add() below, see try_to_grab_pending().
         */
        smp_wmb();
+
        list_add_tail(&work->entry, head);
        wake_up(&cwq->more_work);
 }
 
-static void __queue_work(struct cpu_workqueue_struct *cwq,
+static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
                         struct work_struct *work)
 {
+       struct cpu_workqueue_struct *cwq = target_cwq(cpu, wq);
        unsigned long flags;
 
        debug_work_activate(work);
        spin_lock_irqsave(&cwq->lock, flags);
-       insert_work(cwq, work, &cwq->worklist);
+       BUG_ON(!list_empty(&work->entry));
+       cwq->nr_in_flight[cwq->work_color]++;
+       insert_work(cwq, work, &cwq->worklist,
+                   work_color_to_flags(cwq->work_color));
        spin_unlock_irqrestore(&cwq->lock, flags);
 }
 
@@ -308,9 +368,8 @@ queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
 {
        int ret = 0;
 
-       if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
-               BUG_ON(!list_empty(&work->entry));
-               __queue_work(wq_per_cpu(wq, cpu), work);
+       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+               __queue_work(cpu, wq, work);
                ret = 1;
        }
        return ret;
@@ -321,9 +380,8 @@ static void delayed_work_timer_fn(unsigned long __data)
 {
        struct delayed_work *dwork = (struct delayed_work *)__data;
        struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
-       struct workqueue_struct *wq = cwq->wq;
 
-       __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
+       __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
 }
 
 /**
@@ -360,14 +418,14 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
        struct timer_list *timer = &dwork->timer;
        struct work_struct *work = &dwork->work;
 
-       if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
                BUG_ON(timer_pending(timer));
                BUG_ON(!list_empty(&work->entry));
 
                timer_stats_timer_set_start_info(&dwork->timer);
 
                /* This stores cwq for the moment, for the timer_fn */
-               set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
+               set_wq_data(work, target_cwq(raw_smp_processor_id(), wq), 0);
                timer->expires = jiffies + delay;
                timer->data = (unsigned long)dwork;
                timer->function = delayed_work_timer_fn;
@@ -382,61 +440,231 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 }
 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static struct worker *alloc_worker(void)
 {
-       spin_lock_irq(&cwq->lock);
-       while (!list_empty(&cwq->worklist)) {
-               struct work_struct *work = list_entry(cwq->worklist.next,
-                                               struct work_struct, entry);
-               work_func_t f = work->func;
+       struct worker *worker;
+
+       worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+       return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq.  The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep.  Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+       int id = -1;
+       struct worker *worker = NULL;
+
+       spin_lock(&workqueue_lock);
+       while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+               spin_unlock(&workqueue_lock);
+               if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+                       goto fail;
+               spin_lock(&workqueue_lock);
+       }
+       spin_unlock(&workqueue_lock);
+
+       worker = alloc_worker();
+       if (!worker)
+               goto fail;
+
+       worker->cwq = cwq;
+       worker->id = id;
+
+       worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+                                     cwq->cpu, id);
+       if (IS_ERR(worker->task))
+               goto fail;
+
+       if (bind)
+               kthread_bind(worker->task, cwq->cpu);
+
+       return worker;
+fail:
+       if (id >= 0) {
+               spin_lock(&workqueue_lock);
+               ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+               spin_unlock(&workqueue_lock);
+       }
+       kfree(worker);
+       return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+       wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+       int cpu = worker->cwq->cpu;
+       int id = worker->id;
+
+       /* sanity check frenzy */
+       BUG_ON(worker->current_work);
+
+       kthread_stop(worker->task);
+       kfree(worker);
+
+       spin_lock(&workqueue_lock);
+       ida_remove(&per_cpu(worker_ida, cpu), id);
+       spin_unlock(&workqueue_lock);
+}
+
+/**
+ * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
+ * @cwq: cwq of interest
+ * @color: color of work which left the queue
+ *
+ * A work either has completed or is removed from pending queue,
+ * decrement nr_in_flight of its cwq and handle workqueue flushing.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
+{
+       /* ignore uncolored works */
+       if (color == WORK_NO_COLOR)
+               return;
+
+       cwq->nr_in_flight[color]--;
+
+       /* is flush in progress and are we at the flushing tip? */
+       if (likely(cwq->flush_color != color))
+               return;
+
+       /* are there still in-flight works? */
+       if (cwq->nr_in_flight[color])
+               return;
+
+       /* this cwq is done, clear flush_color */
+       cwq->flush_color = -1;
+
+       /*
+        * If this was the last cwq, wake up the first flusher.  It
+        * will handle the rest.
+        */
+       if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+               complete(&cwq->wq->first_flusher->done);
+}
+
+/**
+ * process_one_work - process single work
+ * @worker: self
+ * @work: work to process
+ *
+ * Process @work.  This function contains all the logics necessary to
+ * process a single work including synchronization against and
+ * interaction with other workers on the same cpu, queueing and
+ * flushing.  As long as context requirement is met, any worker can
+ * call this function to process a work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock) which is released and regrabbed.
+ */
+static void process_one_work(struct worker *worker, struct work_struct *work)
+{
+       struct cpu_workqueue_struct *cwq = worker->cwq;
+       work_func_t f = work->func;
+       int work_color;
 #ifdef CONFIG_LOCKDEP
-               /*
-                * It is permissible to free the struct work_struct
-                * from inside the function that is called from it,
-                * this we need to take into account for lockdep too.
-                * To avoid bogus "held lock freed" warnings as well
-                * as problems when looking into work->lockdep_map,
-                * make a copy and use that here.
-                */
-               struct lockdep_map lockdep_map = work->lockdep_map;
+       /*
+        * It is permissible to free the struct work_struct from
+        * inside the function that is called from it, this we need to
+        * take into account for lockdep too.  To avoid bogus "held
+        * lock freed" warnings as well as problems when looking into
+        * work->lockdep_map, make a copy and use that here.
+        */
+       struct lockdep_map lockdep_map = work->lockdep_map;
 #endif
-               trace_workqueue_execution(cwq->thread, work);
-               debug_work_deactivate(work);
-               cwq->current_work = work;
-               list_del_init(cwq->worklist.next);
-               spin_unlock_irq(&cwq->lock);
+       /* claim and process */
+       debug_work_deactivate(work);
+       worker->current_work = work;
+       work_color = get_work_color(work);
+       list_del_init(&work->entry);
 
-               BUG_ON(get_wq_data(work) != cwq);
-               work_clear_pending(work);
-               lock_map_acquire(&cwq->wq->lockdep_map);
-               lock_map_acquire(&lockdep_map);
-               f(work);
-               lock_map_release(&lockdep_map);
-               lock_map_release(&cwq->wq->lockdep_map);
-
-               if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
-                       printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
-                                       "%s/0x%08x/%d\n",
-                                       current->comm, preempt_count(),
-                                       task_pid_nr(current));
-                       printk(KERN_ERR "    last function: ");
-                       print_symbol("%s\n", (unsigned long)f);
-                       debug_show_held_locks(current);
-                       dump_stack();
-               }
+       spin_unlock_irq(&cwq->lock);
 
-               spin_lock_irq(&cwq->lock);
-               cwq->current_work = NULL;
+       BUG_ON(get_wq_data(work) != cwq);
+       work_clear_pending(work);
+       lock_map_acquire(&cwq->wq->lockdep_map);
+       lock_map_acquire(&lockdep_map);
+       f(work);
+       lock_map_release(&lockdep_map);
+       lock_map_release(&cwq->wq->lockdep_map);
+
+       if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
+               printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
+                      "%s/0x%08x/%d\n",
+                      current->comm, preempt_count(), task_pid_nr(current));
+               printk(KERN_ERR "    last function: ");
+               print_symbol("%s\n", (unsigned long)f);
+               debug_show_held_locks(current);
+               dump_stack();
+       }
+
+       spin_lock_irq(&cwq->lock);
+
+       /* we're done with it, release */
+       worker->current_work = NULL;
+       cwq_dec_nr_in_flight(cwq, work_color);
+}
+
+static void run_workqueue(struct worker *worker)
+{
+       struct cpu_workqueue_struct *cwq = worker->cwq;
+
+       spin_lock_irq(&cwq->lock);
+       while (!list_empty(&cwq->worklist)) {
+               struct work_struct *work = list_entry(cwq->worklist.next,
+                                               struct work_struct, entry);
+               process_one_work(worker, work);
        }
        spin_unlock_irq(&cwq->lock);
 }
 
-static int worker_thread(void *__cwq)
+/**
+ * worker_thread - the worker thread function
+ * @__worker: self
+ *
+ * The cwq worker thread function.
+ */
+static int worker_thread(void *__worker)
 {
-       struct cpu_workqueue_struct *cwq = __cwq;
+       struct worker *worker = __worker;
+       struct cpu_workqueue_struct *cwq = worker->cwq;
        DEFINE_WAIT(wait);
 
-       if (cwq->wq->freezeable)
+       if (cwq->wq->flags & WQ_FREEZEABLE)
                set_freezable();
 
        for (;;) {
@@ -452,7 +680,11 @@ static int worker_thread(void *__cwq)
                if (kthread_should_stop())
                        break;
 
-               run_workqueue(cwq);
+               if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
+                                           get_cpu_mask(cwq->cpu))))
+                       set_cpus_allowed_ptr(worker->task,
+                                            get_cpu_mask(cwq->cpu));
+               run_workqueue(worker);
        }
 
        return 0;
@@ -469,6 +701,17 @@ static void wq_barrier_func(struct work_struct *work)
        complete(&barr->done);
 }
 
+/**
+ * insert_wq_barrier - insert a barrier work
+ * @cwq: cwq to insert barrier into
+ * @barr: wq_barrier to insert
+ * @head: insertion point
+ *
+ * Insert barrier @barr into @cwq before @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
                        struct wq_barrier *barr, struct list_head *head)
 {
@@ -479,34 +722,82 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
         * might deadlock.
         */
        INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
-       __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
-
+       __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
        init_completion(&barr->done);
 
        debug_work_activate(&barr->work);
-       insert_work(cwq, &barr->work, head);
+       insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
 }
 
-static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
+ * @wq: workqueue being flushed
+ * @flush_color: new flush color, < 0 for no-op
+ * @work_color: new work color, < 0 for no-op
+ *
+ * Prepare cwqs for workqueue flushing.
+ *
+ * If @flush_color is non-negative, flush_color on all cwqs should be
+ * -1.  If no cwq has in-flight commands at the specified color, all
+ * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
+ * has in flight commands, its cwq->flush_color is set to
+ * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
+ * wakeup logic is armed and %true is returned.
+ *
+ * The caller should have initialized @wq->first_flusher prior to
+ * calling this function with non-negative @flush_color.  If
+ * @flush_color is negative, no flush color update is done and %false
+ * is returned.
+ *
+ * If @work_color is non-negative, all cwqs should have the same
+ * work_color which is previous to @work_color and all will be
+ * advanced to @work_color.
+ *
+ * CONTEXT:
+ * mutex_lock(wq->flush_mutex).
+ *
+ * RETURNS:
+ * %true if @flush_color >= 0 and there's something to flush.  %false
+ * otherwise.
+ */
+static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
+                                     int flush_color, int work_color)
 {
-       int active = 0;
-       struct wq_barrier barr;
-
-       WARN_ON(cwq->thread == current);
+       bool wait = false;
+       unsigned int cpu;
 
-       spin_lock_irq(&cwq->lock);
-       if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
-               insert_wq_barrier(cwq, &barr, &cwq->worklist);
-               active = 1;
+       if (flush_color >= 0) {
+               BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
+               atomic_set(&wq->nr_cwqs_to_flush, 1);
        }
-       spin_unlock_irq(&cwq->lock);
 
-       if (active) {
-               wait_for_completion(&barr.done);
-               destroy_work_on_stack(&barr.work);
+       for_each_possible_cpu(cpu) {
+               struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+               spin_lock_irq(&cwq->lock);
+
+               if (flush_color >= 0) {
+                       BUG_ON(cwq->flush_color != -1);
+
+                       if (cwq->nr_in_flight[flush_color]) {
+                               cwq->flush_color = flush_color;
+                               atomic_inc(&wq->nr_cwqs_to_flush);
+                               wait = true;
+                       }
+               }
+
+               if (work_color >= 0) {
+                       BUG_ON(work_color != work_next_color(cwq->work_color));
+                       cwq->work_color = work_color;
+               }
+
+               spin_unlock_irq(&cwq->lock);
        }
 
-       return active;
+       if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
+               complete(&wq->first_flusher->done);
+
+       return wait;
 }
 
 /**
@@ -518,20 +809,146 @@ static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
  *
  * We sleep until all works which were queued on entry have been handled,
  * but we are not livelocked by new incoming ones.
- *
- * This function used to run the workqueues itself.  Now we just wait for the
- * helper threads to do it.
  */
 void flush_workqueue(struct workqueue_struct *wq)
 {
-       const struct cpumask *cpu_map = wq_cpu_map(wq);
-       int cpu;
+       struct wq_flusher this_flusher = {
+               .list = LIST_HEAD_INIT(this_flusher.list),
+               .flush_color = -1,
+               .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
+       };
+       int next_color;
 
-       might_sleep();
        lock_map_acquire(&wq->lockdep_map);
        lock_map_release(&wq->lockdep_map);
-       for_each_cpu(cpu, cpu_map)
-               flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+
+       mutex_lock(&wq->flush_mutex);
+
+       /*
+        * Start-to-wait phase
+        */
+       next_color = work_next_color(wq->work_color);
+
+       if (next_color != wq->flush_color) {
+               /*
+                * Color space is not full.  The current work_color
+                * becomes our flush_color and work_color is advanced
+                * by one.
+                */
+               BUG_ON(!list_empty(&wq->flusher_overflow));
+               this_flusher.flush_color = wq->work_color;
+               wq->work_color = next_color;
+
+               if (!wq->first_flusher) {
+                       /* no flush in progress, become the first flusher */
+                       BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+                       wq->first_flusher = &this_flusher;
+
+                       if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
+                                                      wq->work_color)) {
+                               /* nothing to flush, done */
+                               wq->flush_color = next_color;
+                               wq->first_flusher = NULL;
+                               goto out_unlock;
+                       }
+               } else {
+                       /* wait in queue */
+                       BUG_ON(wq->flush_color == this_flusher.flush_color);
+                       list_add_tail(&this_flusher.list, &wq->flusher_queue);
+                       flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+               }
+       } else {
+               /*
+                * Oops, color space is full, wait on overflow queue.
+                * The next flush completion will assign us
+                * flush_color and transfer to flusher_queue.
+                */
+               list_add_tail(&this_flusher.list, &wq->flusher_overflow);
+       }
+
+       mutex_unlock(&wq->flush_mutex);
+
+       wait_for_completion(&this_flusher.done);
+
+       /*
+        * Wake-up-and-cascade phase
+        *
+        * First flushers are responsible for cascading flushes and
+        * handling overflow.  Non-first flushers can simply return.
+        */
+       if (wq->first_flusher != &this_flusher)
+               return;
+
+       mutex_lock(&wq->flush_mutex);
+
+       wq->first_flusher = NULL;
+
+       BUG_ON(!list_empty(&this_flusher.list));
+       BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+       while (true) {
+               struct wq_flusher *next, *tmp;
+
+               /* complete all the flushers sharing the current flush color */
+               list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
+                       if (next->flush_color != wq->flush_color)
+                               break;
+                       list_del_init(&next->list);
+                       complete(&next->done);
+               }
+
+               BUG_ON(!list_empty(&wq->flusher_overflow) &&
+                      wq->flush_color != work_next_color(wq->work_color));
+
+               /* this flush_color is finished, advance by one */
+               wq->flush_color = work_next_color(wq->flush_color);
+
+               /* one color has been freed, handle overflow queue */
+               if (!list_empty(&wq->flusher_overflow)) {
+                       /*
+                        * Assign the same color to all overflowed
+                        * flushers, advance work_color and append to
+                        * flusher_queue.  This is the start-to-wait
+                        * phase for these overflowed flushers.
+                        */
+                       list_for_each_entry(tmp, &wq->flusher_overflow, list)
+                               tmp->flush_color = wq->work_color;
+
+                       wq->work_color = work_next_color(wq->work_color);
+
+                       list_splice_tail_init(&wq->flusher_overflow,
+                                             &wq->flusher_queue);
+                       flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+               }
+
+               if (list_empty(&wq->flusher_queue)) {
+                       BUG_ON(wq->flush_color != wq->work_color);
+                       break;
+               }
+
+               /*
+                * Need to flush more colors.  Make the next flusher
+                * the new first flusher and arm cwqs.
+                */
+               BUG_ON(wq->flush_color == wq->work_color);
+               BUG_ON(wq->flush_color != next->flush_color);
+
+               list_del_init(&next->list);
+               wq->first_flusher = next;
+
+               if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
+                       break;
+
+               /*
+                * Meh... this color is already done, clear first
+                * flusher and repeat cascading.
+                */
+               wq->first_flusher = NULL;
+       }
+
+out_unlock:
+       mutex_unlock(&wq->flush_mutex);
 }
 EXPORT_SYMBOL_GPL(flush_workqueue);
 
@@ -559,7 +976,6 @@ int flush_work(struct work_struct *work)
        lock_map_acquire(&cwq->wq->lockdep_map);
        lock_map_release(&cwq->wq->lockdep_map);
 
-       prev = NULL;
        spin_lock_irq(&cwq->lock);
        if (!list_empty(&work->entry)) {
                /*
@@ -568,22 +984,22 @@ int flush_work(struct work_struct *work)
                 */
                smp_rmb();
                if (unlikely(cwq != get_wq_data(work)))
-                       goto out;
+                       goto already_gone;
                prev = &work->entry;
        } else {
-               if (cwq->current_work != work)
-                       goto out;
+               if (!cwq->worker || cwq->worker->current_work != work)
+                       goto already_gone;
                prev = &cwq->worklist;
        }
        insert_wq_barrier(cwq, &barr, prev->next);
-out:
-       spin_unlock_irq(&cwq->lock);
-       if (!prev)
-               return 0;
 
+       spin_unlock_irq(&cwq->lock);
        wait_for_completion(&barr.done);
        destroy_work_on_stack(&barr.work);
        return 1;
+already_gone:
+       spin_unlock_irq(&cwq->lock);
+       return 0;
 }
 EXPORT_SYMBOL_GPL(flush_work);
 
@@ -596,7 +1012,7 @@ static int try_to_grab_pending(struct work_struct *work)
        struct cpu_workqueue_struct *cwq;
        int ret = -1;
 
-       if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
+       if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
                return 0;
 
        /*
@@ -619,6 +1035,7 @@ static int try_to_grab_pending(struct work_struct *work)
                if (cwq == get_wq_data(work)) {
                        debug_work_deactivate(work);
                        list_del_init(&work->entry);
+                       cwq_dec_nr_in_flight(cwq, get_work_color(work));
                        ret = 1;
                }
        }
@@ -634,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
        int running = 0;
 
        spin_lock_irq(&cwq->lock);
-       if (unlikely(cwq->current_work == work)) {
+       if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
                insert_wq_barrier(cwq, &barr, cwq->worklist.next);
                running = 1;
        }
@@ -650,7 +1067,6 @@ static void wait_on_work(struct work_struct *work)
 {
        struct cpu_workqueue_struct *cwq;
        struct workqueue_struct *wq;
-       const struct cpumask *cpu_map;
        int cpu;
 
        might_sleep();
@@ -663,10 +1079,9 @@ static void wait_on_work(struct work_struct *work)
                return;
 
        wq = cwq->wq;
-       cpu_map = wq_cpu_map(wq);
 
-       for_each_cpu(cpu, cpu_map)
-               wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+       for_each_possible_cpu(cpu)
+               wait_on_cpu_work(get_cwq(cpu, wq), work);
 }
 
 static int __cancel_work_timer(struct work_struct *work,
@@ -783,9 +1198,8 @@ EXPORT_SYMBOL(schedule_delayed_work);
 void flush_delayed_work(struct delayed_work *dwork)
 {
        if (del_timer_sync(&dwork->timer)) {
-               struct cpu_workqueue_struct *cwq;
-               cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu());
-               __queue_work(cwq, &dwork->work);
+               __queue_work(get_cpu(), get_wq_data(&dwork->work)->wq,
+                            &dwork->work);
                put_cpu();
        }
        flush_work(&dwork->work);
@@ -924,160 +1338,136 @@ int current_is_keventd(void)
 
        BUG_ON(!keventd_wq);
 
-       cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
-       if (current == cwq->thread)
+       cwq = get_cwq(cpu, keventd_wq);
+       if (current == cwq->worker->task)
                ret = 1;
 
        return ret;
 
 }
 
-static struct cpu_workqueue_struct *
-init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
-{
-       struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
-       cwq->wq = wq;
-       spin_lock_init(&cwq->lock);
-       INIT_LIST_HEAD(&cwq->worklist);
-       init_waitqueue_head(&cwq->more_work);
-
-       return cwq;
-}
-
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
+static struct cpu_workqueue_struct *alloc_cwqs(void)
 {
-       struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
-       struct workqueue_struct *wq = cwq->wq;
-       const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
-       struct task_struct *p;
-
-       p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
        /*
-        * Nobody can add the work_struct to this cwq,
-        *      if (caller is __create_workqueue)
-        *              nobody should see this wq
-        *      else // caller is CPU_UP_PREPARE
-        *              cpu is not on cpu_online_map
-        * so we can abort safely.
+        * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
+        * Make sure that the alignment isn't lower than that of
+        * unsigned long long.
         */
-       if (IS_ERR(p))
-               return PTR_ERR(p);
-       if (cwq->wq->rt)
-               sched_setscheduler_nocheck(p, SCHED_FIFO, &param);
-       cwq->thread = p;
+       const size_t size = sizeof(struct cpu_workqueue_struct);
+       const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
+                                  __alignof__(unsigned long long));
+       struct cpu_workqueue_struct *cwqs;
+#ifndef CONFIG_SMP
+       void *ptr;
 
-       trace_workqueue_creation(cwq->thread, cpu);
-
-       return 0;
+       /*
+        * On UP, percpu allocator doesn't honor alignment parameter
+        * and simply uses arch-dependent default.  Allocate enough
+        * room to align cwq and put an extra pointer at the end
+        * pointing back to the originally allocated pointer which
+        * will be used for free.
+        *
+        * FIXME: This really belongs to UP percpu code.  Update UP
+        * percpu code to honor alignment and remove this ugliness.
+        */
+       ptr = __alloc_percpu(size + align + sizeof(void *), 1);
+       cwqs = PTR_ALIGN(ptr, align);
+       *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
+#else
+       /* On SMP, percpu allocator can do it itself */
+       cwqs = __alloc_percpu(size, align);
+#endif
+       /* just in case, make sure it's actually aligned */
+       BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
+       return cwqs;
 }
 
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
+static void free_cwqs(struct cpu_workqueue_struct *cwqs)
 {
-       struct task_struct *p = cwq->thread;
-
-       if (p != NULL) {
-               if (cpu >= 0)
-                       kthread_bind(p, cpu);
-               wake_up_process(p);
-       }
+#ifndef CONFIG_SMP
+       /* on UP, the pointer to free is stored right after the cwq */
+       if (cwqs)
+               free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
+#else
+       free_percpu(cwqs);
+#endif
 }
 
 struct workqueue_struct *__create_workqueue_key(const char *name,
-                                               int singlethread,
-                                               int freezeable,
-                                               int rt,
+                                               unsigned int flags,
                                                struct lock_class_key *key,
                                                const char *lock_name)
 {
+       bool singlethread = flags & WQ_SINGLE_THREAD;
        struct workqueue_struct *wq;
-       struct cpu_workqueue_struct *cwq;
-       int err = 0, cpu;
+       bool failed = false;
+       unsigned int cpu;
 
        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
-               return NULL;
+               goto err;
 
-       wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
-       if (!wq->cpu_wq) {
-               kfree(wq);
-               return NULL;
-       }
+       wq->cpu_wq = alloc_cwqs();
+       if (!wq->cpu_wq)
+               goto err;
 
+       wq->flags = flags;
+       mutex_init(&wq->flush_mutex);
+       atomic_set(&wq->nr_cwqs_to_flush, 0);
+       INIT_LIST_HEAD(&wq->flusher_queue);
+       INIT_LIST_HEAD(&wq->flusher_overflow);
        wq->name = name;
        lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
-       wq->singlethread = singlethread;
-       wq->freezeable = freezeable;
-       wq->rt = rt;
        INIT_LIST_HEAD(&wq->list);
 
-       if (singlethread) {
-               cwq = init_cpu_workqueue(wq, singlethread_cpu);
-               err = create_workqueue_thread(cwq, singlethread_cpu);
-               start_workqueue_thread(cwq, -1);
-       } else {
-               cpu_maps_update_begin();
-               /*
-                * We must place this wq on list even if the code below fails.
-                * cpu_down(cpu) can remove cpu from cpu_populated_map before
-                * destroy_workqueue() takes the lock, in that case we leak
-                * cwq[cpu]->thread.
-                */
-               spin_lock(&workqueue_lock);
-               list_add(&wq->list, &workqueues);
-               spin_unlock(&workqueue_lock);
-               /*
-                * We must initialize cwqs for each possible cpu even if we
-                * are going to call destroy_workqueue() finally. Otherwise
-                * cpu_up() can hit the uninitialized cwq once we drop the
-                * lock.
-                */
-               for_each_possible_cpu(cpu) {
-                       cwq = init_cpu_workqueue(wq, cpu);
-                       if (err || !cpu_online(cpu))
-                               continue;
-                       err = create_workqueue_thread(cwq, cpu);
-                       start_workqueue_thread(cwq, cpu);
-               }
-               cpu_maps_update_done();
+       cpu_maps_update_begin();
+       /*
+        * We must initialize cwqs for each possible cpu even if we
+        * are going to call destroy_workqueue() finally. Otherwise
+        * cpu_up() can hit the uninitialized cwq once we drop the
+        * lock.
+        */
+       for_each_possible_cpu(cpu) {
+               struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+               BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
+               cwq->cpu = cpu;
+               cwq->wq = wq;
+               cwq->flush_color = -1;
+               spin_lock_init(&cwq->lock);
+               INIT_LIST_HEAD(&cwq->worklist);
+               init_waitqueue_head(&cwq->more_work);
+
+               if (failed)
+                       continue;
+               cwq->worker = create_worker(cwq,
+                                           cpu_online(cpu) && !singlethread);
+               if (cwq->worker)
+                       start_worker(cwq->worker);
+               else
+                       failed = true;
        }
 
-       if (err) {
+       spin_lock(&workqueue_lock);
+       list_add(&wq->list, &workqueues);
+       spin_unlock(&workqueue_lock);
+
+       cpu_maps_update_done();
+
+       if (failed) {
                destroy_workqueue(wq);
                wq = NULL;
        }
        return wq;
+err:
+       if (wq) {
+               free_cwqs(wq->cpu_wq);
+               kfree(wq);
+       }
+       return NULL;
 }
 EXPORT_SYMBOL_GPL(__create_workqueue_key);
 
-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-{
-       /*
-        * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
-        * cpu_add_remove_lock protects cwq->thread.
-        */
-       if (cwq->thread == NULL)
-               return;
-
-       lock_map_acquire(&cwq->wq->lockdep_map);
-       lock_map_release(&cwq->wq->lockdep_map);
-
-       flush_cpu_workqueue(cwq);
-       /*
-        * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
-        * a concurrent flush_workqueue() can insert a barrier after us.
-        * However, in that case run_workqueue() won't return and check
-        * kthread_should_stop() until it flushes all work_struct's.
-        * When ->worklist becomes empty it is safe to exit because no
-        * more work_structs can be queued on this cwq: flush_workqueue
-        * checks list_empty(), and a "normal" queue_work() can't use
-        * a dead CPU.
-        */
-       trace_workqueue_destruction(cwq->thread);
-       kthread_stop(cwq->thread);
-       cwq->thread = NULL;
-}
-
 /**
  * destroy_workqueue - safely terminate a workqueue
  * @wq: target workqueue
@@ -1086,19 +1476,30 @@ static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
  */
 void destroy_workqueue(struct workqueue_struct *wq)
 {
-       const struct cpumask *cpu_map = wq_cpu_map(wq);
        int cpu;
 
        cpu_maps_update_begin();
        spin_lock(&workqueue_lock);
        list_del(&wq->list);
        spin_unlock(&workqueue_lock);
+       cpu_maps_update_done();
 
-       for_each_cpu(cpu, cpu_map)
-               cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
-       cpu_maps_update_done();
+       flush_workqueue(wq);
 
-       free_percpu(wq->cpu_wq);
+       for_each_possible_cpu(cpu) {
+               struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+               int i;
+
+               if (cwq->worker) {
+                       destroy_worker(cwq->worker);
+                       cwq->worker = NULL;
+               }
+
+               for (i = 0; i < WORK_NR_COLORS; i++)
+                       BUG_ON(cwq->nr_in_flight[i]);
+       }
+
+       free_cwqs(wq->cpu_wq);
        kfree(wq);
 }
 EXPORT_SYMBOL_GPL(destroy_workqueue);
@@ -1110,47 +1511,23 @@ static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
        unsigned int cpu = (unsigned long)hcpu;
        struct cpu_workqueue_struct *cwq;
        struct workqueue_struct *wq;
-       int ret = NOTIFY_OK;
 
        action &= ~CPU_TASKS_FROZEN;
 
-       switch (action) {
-       case CPU_UP_PREPARE:
-               cpumask_set_cpu(cpu, cpu_populated_map);
-       }
-undo:
        list_for_each_entry(wq, &workqueues, list) {
-               cwq = per_cpu_ptr(wq->cpu_wq, cpu);
+               if (wq->flags & WQ_SINGLE_THREAD)
+                       continue;
 
-               switch (action) {
-               case CPU_UP_PREPARE:
-                       if (!create_workqueue_thread(cwq, cpu))
-                               break;
-                       printk(KERN_ERR "workqueue [%s] for %i failed\n",
-                               wq->name, cpu);
-                       action = CPU_UP_CANCELED;
-                       ret = NOTIFY_BAD;
-                       goto undo;
-
-               case CPU_ONLINE:
-                       start_workqueue_thread(cwq, cpu);
-                       break;
+               cwq = get_cwq(cpu, wq);
 
-               case CPU_UP_CANCELED:
-                       start_workqueue_thread(cwq, -1);
+               switch (action) {
                case CPU_POST_DEAD:
-                       cleanup_workqueue_thread(cwq);
+                       flush_workqueue(wq);
                        break;
                }
        }
 
-       switch (action) {
-       case CPU_UP_CANCELED:
-       case CPU_POST_DEAD:
-               cpumask_clear_cpu(cpu, cpu_populated_map);
-       }
-
-       return ret;
+       return notifier_from_errno(0);
 }
 
 #ifdef CONFIG_SMP
@@ -1202,11 +1579,12 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
 
 void __init init_workqueues(void)
 {
-       alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
+       unsigned int cpu;
+
+       for_each_possible_cpu(cpu)
+               ida_init(&per_cpu(worker_ida, cpu));
 
-       cpumask_copy(cpu_populated_map, cpu_online_mask);
        singlethread_cpu = cpumask_first(cpu_possible_mask);
-       cpu_singlethread_map = cpumask_of(singlethread_cpu);
        hotcpu_notifier(workqueue_cpu_callback, 0);
        keventd_wq = create_workqueue("events");
        BUG_ON(!keventd_wq);