workqueue: introduce worker
authorTejun Heo <tj@kernel.org>
Tue, 29 Jun 2010 08:07:11 +0000 (10:07 +0200)
committerTejun Heo <tj@kernel.org>
Tue, 29 Jun 2010 08:07:11 +0000 (10:07 +0200)
Separate out worker thread related information to struct worker from
struct cpu_workqueue_struct and implement helper functions to deal
with the new struct worker.  The only change which is visible outside
is that now workqueue worker are all named "kworker/CPUID:WORKERID"
where WORKERID is allocated from per-cpu ida.

This is in preparation of concurrency managed workqueue where shared
multiple workers would be available per cpu.

Signed-off-by: Tejun Heo <tj@kernel.org>
kernel/workqueue.c

index 56e47c5..600db10 100644 (file)
@@ -33,6 +33,7 @@
 #include <linux/kallsyms.h>
 #include <linux/debug_locks.h>
 #include <linux/lockdep.h>
+#include <linux/idr.h>
 
 /*
  * Structure fields follow one of the following exclusion rules.
  * W: workqueue_lock protected.
  */
 
+struct cpu_workqueue_struct;
+
+struct worker {
+       struct work_struct      *current_work;  /* L: work being processed */
+       struct task_struct      *task;          /* I: worker task */
+       struct cpu_workqueue_struct *cwq;       /* I: the associated cwq */
+       int                     id;             /* I: worker id */
+};
+
 /*
  * The per-CPU workqueue (if single thread, we always use the first
  * possible cpu).  The lower WORK_STRUCT_FLAG_BITS of
@@ -58,15 +68,14 @@ struct cpu_workqueue_struct {
 
        struct list_head worklist;
        wait_queue_head_t more_work;
-       struct work_struct *current_work;
        unsigned int            cpu;
+       struct worker           *worker;
 
        struct workqueue_struct *wq;            /* I: the owning workqueue */
        int                     work_color;     /* L: current color */
        int                     flush_color;    /* L: flushing color */
        int                     nr_in_flight[WORK_NR_COLORS];
                                                /* L: nr of in_flight works */
-       struct task_struct      *thread;
 };
 
 /*
@@ -214,6 +223,9 @@ static inline void debug_work_deactivate(struct work_struct *work) { }
 /* Serializes the accesses to the list of workqueues. */
 static DEFINE_SPINLOCK(workqueue_lock);
 static LIST_HEAD(workqueues);
+static DEFINE_PER_CPU(struct ida, worker_ida);
+
+static int worker_thread(void *__worker);
 
 static int singlethread_cpu __read_mostly;
 
@@ -428,6 +440,105 @@ int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 }
 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
 
+static struct worker *alloc_worker(void)
+{
+       struct worker *worker;
+
+       worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+       return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq.  The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep.  Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+       int id = -1;
+       struct worker *worker = NULL;
+
+       spin_lock(&workqueue_lock);
+       while (ida_get_new(&per_cpu(worker_ida, cwq->cpu), &id)) {
+               spin_unlock(&workqueue_lock);
+               if (!ida_pre_get(&per_cpu(worker_ida, cwq->cpu), GFP_KERNEL))
+                       goto fail;
+               spin_lock(&workqueue_lock);
+       }
+       spin_unlock(&workqueue_lock);
+
+       worker = alloc_worker();
+       if (!worker)
+               goto fail;
+
+       worker->cwq = cwq;
+       worker->id = id;
+
+       worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+                                     cwq->cpu, id);
+       if (IS_ERR(worker->task))
+               goto fail;
+
+       if (bind)
+               kthread_bind(worker->task, cwq->cpu);
+
+       return worker;
+fail:
+       if (id >= 0) {
+               spin_lock(&workqueue_lock);
+               ida_remove(&per_cpu(worker_ida, cwq->cpu), id);
+               spin_unlock(&workqueue_lock);
+       }
+       kfree(worker);
+       return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Start @worker.
+ *
+ * CONTEXT:
+ * spin_lock_irq(cwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+       wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker.
+ */
+static void destroy_worker(struct worker *worker)
+{
+       int cpu = worker->cwq->cpu;
+       int id = worker->id;
+
+       /* sanity check frenzy */
+       BUG_ON(worker->current_work);
+
+       kthread_stop(worker->task);
+       kfree(worker);
+
+       spin_lock(&workqueue_lock);
+       ida_remove(&per_cpu(worker_ida, cpu), id);
+       spin_unlock(&workqueue_lock);
+}
+
 /**
  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
  * @cwq: cwq of interest
@@ -468,7 +579,7 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
 
 /**
  * process_one_work - process single work
- * @cwq: cwq to process work for
+ * @worker: self
  * @work: work to process
  *
  * Process @work.  This function contains all the logics necessary to
@@ -480,9 +591,9 @@ static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
  * CONTEXT:
  * spin_lock_irq(cwq->lock) which is released and regrabbed.
  */
-static void process_one_work(struct cpu_workqueue_struct *cwq,
-                            struct work_struct *work)
+static void process_one_work(struct worker *worker, struct work_struct *work)
 {
+       struct cpu_workqueue_struct *cwq = worker->cwq;
        work_func_t f = work->func;
        int work_color;
 #ifdef CONFIG_LOCKDEP
@@ -497,7 +608,7 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
 #endif
        /* claim and process */
        debug_work_deactivate(work);
-       cwq->current_work = work;
+       worker->current_work = work;
        work_color = get_work_color(work);
        list_del_init(&work->entry);
 
@@ -524,30 +635,33 @@ static void process_one_work(struct cpu_workqueue_struct *cwq,
        spin_lock_irq(&cwq->lock);
 
        /* we're done with it, release */
-       cwq->current_work = NULL;
+       worker->current_work = NULL;
        cwq_dec_nr_in_flight(cwq, work_color);
 }
 
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+static void run_workqueue(struct worker *worker)
 {
+       struct cpu_workqueue_struct *cwq = worker->cwq;
+
        spin_lock_irq(&cwq->lock);
        while (!list_empty(&cwq->worklist)) {
                struct work_struct *work = list_entry(cwq->worklist.next,
                                                struct work_struct, entry);
-               process_one_work(cwq, work);
+               process_one_work(worker, work);
        }
        spin_unlock_irq(&cwq->lock);
 }
 
 /**
  * worker_thread - the worker thread function
- * @__cwq: cwq to serve
+ * @__worker: self
  *
  * The cwq worker thread function.
  */
-static int worker_thread(void *__cwq)
+static int worker_thread(void *__worker)
 {
-       struct cpu_workqueue_struct *cwq = __cwq;
+       struct worker *worker = __worker;
+       struct cpu_workqueue_struct *cwq = worker->cwq;
        DEFINE_WAIT(wait);
 
        if (cwq->wq->flags & WQ_FREEZEABLE)
@@ -566,11 +680,11 @@ static int worker_thread(void *__cwq)
                if (kthread_should_stop())
                        break;
 
-               if (unlikely(!cpumask_equal(&cwq->thread->cpus_allowed,
+               if (unlikely(!cpumask_equal(&worker->task->cpus_allowed,
                                            get_cpu_mask(cwq->cpu))))
-                       set_cpus_allowed_ptr(cwq->thread,
+                       set_cpus_allowed_ptr(worker->task,
                                             get_cpu_mask(cwq->cpu));
-               run_workqueue(cwq);
+               run_workqueue(worker);
        }
 
        return 0;
@@ -873,7 +987,7 @@ int flush_work(struct work_struct *work)
                        goto already_gone;
                prev = &work->entry;
        } else {
-               if (cwq->current_work != work)
+               if (!cwq->worker || cwq->worker->current_work != work)
                        goto already_gone;
                prev = &cwq->worklist;
        }
@@ -937,7 +1051,7 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
        int running = 0;
 
        spin_lock_irq(&cwq->lock);
-       if (unlikely(cwq->current_work == work)) {
+       if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
                insert_wq_barrier(cwq, &barr, cwq->worklist.next);
                running = 1;
        }
@@ -1225,7 +1339,7 @@ int current_is_keventd(void)
        BUG_ON(!keventd_wq);
 
        cwq = get_cwq(cpu, keventd_wq);
-       if (current == cwq->thread)
+       if (current == cwq->worker->task)
                ret = 1;
 
        return ret;
@@ -1279,38 +1393,6 @@ static void free_cwqs(struct cpu_workqueue_struct *cwqs)
 #endif
 }
 
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-       struct workqueue_struct *wq = cwq->wq;
-       struct task_struct *p;
-
-       p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
-       /*
-        * Nobody can add the work_struct to this cwq,
-        *      if (caller is __create_workqueue)
-        *              nobody should see this wq
-        *      else // caller is CPU_UP_PREPARE
-        *              cpu is not on cpu_online_map
-        * so we can abort safely.
-        */
-       if (IS_ERR(p))
-               return PTR_ERR(p);
-       cwq->thread = p;
-
-       return 0;
-}
-
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
-       struct task_struct *p = cwq->thread;
-
-       if (p != NULL) {
-               if (cpu >= 0)
-                       kthread_bind(p, cpu);
-               wake_up_process(p);
-       }
-}
-
 struct workqueue_struct *__create_workqueue_key(const char *name,
                                                unsigned int flags,
                                                struct lock_class_key *key,
@@ -1318,7 +1400,8 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 {
        bool singlethread = flags & WQ_SINGLE_THREAD;
        struct workqueue_struct *wq;
-       int err = 0, cpu;
+       bool failed = false;
+       unsigned int cpu;
 
        wq = kzalloc(sizeof(*wq), GFP_KERNEL);
        if (!wq)
@@ -1348,20 +1431,21 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
                struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
 
                BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
-               cwq->wq = wq;
                cwq->cpu = cpu;
+               cwq->wq = wq;
                cwq->flush_color = -1;
                spin_lock_init(&cwq->lock);
                INIT_LIST_HEAD(&cwq->worklist);
                init_waitqueue_head(&cwq->more_work);
 
-               if (err)
+               if (failed)
                        continue;
-               err = create_workqueue_thread(cwq, cpu);
-               if (cpu_online(cpu) && !singlethread)
-                       start_workqueue_thread(cwq, cpu);
+               cwq->worker = create_worker(cwq,
+                                           cpu_online(cpu) && !singlethread);
+               if (cwq->worker)
+                       start_worker(cwq->worker);
                else
-                       start_workqueue_thread(cwq, -1);
+                       failed = true;
        }
 
        spin_lock(&workqueue_lock);
@@ -1370,7 +1454,7 @@ struct workqueue_struct *__create_workqueue_key(const char *name,
 
        cpu_maps_update_done();
 
-       if (err) {
+       if (failed) {
                destroy_workqueue(wq);
                wq = NULL;
        }
@@ -1406,9 +1490,9 @@ void destroy_workqueue(struct workqueue_struct *wq)
                struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
                int i;
 
-               if (cwq->thread) {
-                       kthread_stop(cwq->thread);
-                       cwq->thread = NULL;
+               if (cwq->worker) {
+                       destroy_worker(cwq->worker);
+                       cwq->worker = NULL;
                }
 
                for (i = 0; i < WORK_NR_COLORS; i++)
@@ -1495,6 +1579,11 @@ EXPORT_SYMBOL_GPL(work_on_cpu);
 
 void __init init_workqueues(void)
 {
+       unsigned int cpu;
+
+       for_each_possible_cpu(cpu)
+               ida_init(&per_cpu(worker_ida, cpu));
+
        singlethread_cpu = cpumask_first(cpu_possible_mask);
        hotcpu_notifier(workqueue_cpu_callback, 0);
        keventd_wq = create_workqueue("events");