#include <linux/kallsyms.h>
#include <linux/debug_locks.h>
#include <linux/lockdep.h>
-#define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>
+#include <linux/idr.h>
+
+enum {
+ /* global_cwq flags */
+ GCWQ_FREEZING = 1 << 3, /* freeze in progress */
+
+ /* worker flags */
+ WORKER_STARTED = 1 << 0, /* started */
+ WORKER_DIE = 1 << 1, /* die die die */
+ WORKER_IDLE = 1 << 2, /* is idle */
+ WORKER_ROGUE = 1 << 4, /* not bound to any cpu */
+
+ /* gcwq->trustee_state */
+ TRUSTEE_START = 0, /* start */
+ TRUSTEE_IN_CHARGE = 1, /* trustee in charge of gcwq */
+ TRUSTEE_BUTCHER = 2, /* butcher workers */
+ TRUSTEE_RELEASE = 3, /* release workers */
+ TRUSTEE_DONE = 4, /* trustee is done */
+
+ BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
+ BUSY_WORKER_HASH_SIZE = 1 << BUSY_WORKER_HASH_ORDER,
+ BUSY_WORKER_HASH_MASK = BUSY_WORKER_HASH_SIZE - 1,
+
+ TRUSTEE_COOLDOWN = HZ / 10, /* for trustee draining */
+};
/*
- * The per-CPU workqueue (if single thread, we always use the first
- * possible cpu).
+ * Structure fields follow one of the following exclusion rules.
+ *
+ * I: Set during initialization and read-only afterwards.
+ *
+ * L: gcwq->lock protected. Access with gcwq->lock held.
+ *
+ * F: wq->flush_mutex protected.
+ *
+ * W: workqueue_lock protected.
*/
-struct cpu_workqueue_struct {
- spinlock_t lock;
+struct global_cwq;
+struct cpu_workqueue_struct;
+struct worker {
+ /* on idle list while idle, on busy hash table while busy */
+ union {
+ struct list_head entry; /* L: while idle */
+ struct hlist_node hentry; /* L: while busy */
+ };
+
+ struct work_struct *current_work; /* L: work being processed */
+ struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
+ struct list_head scheduled; /* L: scheduled works */
+ struct task_struct *task; /* I: worker task */
+ struct global_cwq *gcwq; /* I: the associated gcwq */
+ struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
+ unsigned int flags; /* L: flags */
+ int id; /* I: worker id */
+};
+
+/*
+ * Global per-cpu workqueue.
+ */
+struct global_cwq {
+ spinlock_t lock; /* the gcwq lock */
+ unsigned int cpu; /* I: the associated cpu */
+ unsigned int flags; /* L: GCWQ_* flags */
+
+ int nr_workers; /* L: total number of workers */
+ int nr_idle; /* L: currently idle ones */
+
+ /* workers are chained either in the idle_list or busy_hash */
+ struct list_head idle_list; /* L: list of idle workers */
+ struct hlist_head busy_hash[BUSY_WORKER_HASH_SIZE];
+ /* L: hash of busy workers */
+
+ struct ida worker_ida; /* L: for worker IDs */
+
+ struct task_struct *trustee; /* L: for gcwq shutdown */
+ unsigned int trustee_state; /* L: trustee state */
+ wait_queue_head_t trustee_wait; /* trustee wait */
+} ____cacheline_aligned_in_smp;
+
+/*
+ * The per-CPU workqueue. The lower WORK_STRUCT_FLAG_BITS of
+ * work_struct->data are used for flags and thus cwqs need to be
+ * aligned at two's power of the number of flag bits.
+ */
+struct cpu_workqueue_struct {
+ struct global_cwq *gcwq; /* I: the associated gcwq */
struct list_head worklist;
- wait_queue_head_t more_work;
- struct work_struct *current_work;
+ struct worker *worker;
+ struct workqueue_struct *wq; /* I: the owning workqueue */
+ int work_color; /* L: current color */
+ int flush_color; /* L: flushing color */
+ int nr_in_flight[WORK_NR_COLORS];
+ /* L: nr of in_flight works */
+ int nr_active; /* L: nr of active works */
+ int max_active; /* L: max active works */
+ struct list_head delayed_works; /* L: delayed works */
+};
- struct workqueue_struct *wq;
- struct task_struct *thread;
-} ____cacheline_aligned;
+/*
+ * Structure used to wait for workqueue flush.
+ */
+struct wq_flusher {
+ struct list_head list; /* F: list of flushers */
+ int flush_color; /* F: flush color waiting for */
+ struct completion done; /* flush completion */
+};
/*
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
*/
struct workqueue_struct {
- struct cpu_workqueue_struct *cpu_wq;
- struct list_head list;
- const char *name;
- int singlethread;
- int freezeable; /* Freeze threads during suspend */
- int rt;
+ unsigned int flags; /* I: WQ_* flags */
+ struct cpu_workqueue_struct *cpu_wq; /* I: cwq's */
+ struct list_head list; /* W: list of all workqueues */
+
+ struct mutex flush_mutex; /* protects wq flushing */
+ int work_color; /* F: current work color */
+ int flush_color; /* F: current flush color */
+ atomic_t nr_cwqs_to_flush; /* flush in progress */
+ struct wq_flusher *first_flusher; /* F: first flusher */
+ struct list_head flusher_queue; /* F: flush waiters */
+ struct list_head flusher_overflow; /* F: flush overflow list */
+
+ unsigned long single_cpu; /* cpu for single cpu wq */
+
+ int saved_max_active; /* I: saved cwq max_active */
+ const char *name; /* I: workqueue name */
#ifdef CONFIG_LOCKDEP
- struct lockdep_map lockdep_map;
+ struct lockdep_map lockdep_map;
#endif
};
+#define for_each_busy_worker(worker, i, pos, gcwq) \
+ for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++) \
+ hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
+
#ifdef CONFIG_DEBUG_OBJECTS_WORK
static struct debug_obj_descr work_debug_descr;
* statically initialized. We just make sure that it
* is tracked in the object tracker.
*/
- if (test_bit(WORK_STRUCT_STATIC, work_data_bits(work))) {
+ if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
debug_object_init(work, &work_debug_descr);
debug_object_activate(work, &work_debug_descr);
return 0;
/* Serializes the accesses to the list of workqueues. */
static DEFINE_SPINLOCK(workqueue_lock);
static LIST_HEAD(workqueues);
+static bool workqueue_freezing; /* W: have wqs started freezing? */
+
+static DEFINE_PER_CPU(struct global_cwq, global_cwq);
+
+static int worker_thread(void *__worker);
+
+static struct global_cwq *get_gcwq(unsigned int cpu)
+{
+ return &per_cpu(global_cwq, cpu);
+}
+
+static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
+ struct workqueue_struct *wq)
+{
+ return per_cpu_ptr(wq->cpu_wq, cpu);
+}
+
+static unsigned int work_color_to_flags(int color)
+{
+ return color << WORK_STRUCT_COLOR_SHIFT;
+}
+
+static int get_work_color(struct work_struct *work)
+{
+ return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
+ ((1 << WORK_STRUCT_COLOR_BITS) - 1);
+}
+
+static int work_next_color(int color)
+{
+ return (color + 1) % WORK_NR_COLORS;
+}
-static int singlethread_cpu __read_mostly;
-static const struct cpumask *cpu_singlethread_map __read_mostly;
/*
- * _cpu_down() first removes CPU from cpu_online_map, then CPU_DEAD
- * flushes cwq->worklist. This means that flush_workqueue/wait_on_work
- * which comes in between can't use for_each_online_cpu(). We could
- * use cpu_possible_map, the cpumask below is more a documentation
- * than optimization.
+ * Work data points to the cwq while a work is on queue. Once
+ * execution starts, it points to the cpu the work was last on. This
+ * can be distinguished by comparing the data value against
+ * PAGE_OFFSET.
+ *
+ * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
+ * cwq, cpu or clear work->data. These functions should only be
+ * called while the work is owned - ie. while the PENDING bit is set.
+ *
+ * get_work_[g]cwq() can be used to obtain the gcwq or cwq
+ * corresponding to a work. gcwq is available once the work has been
+ * queued anywhere after initialization. cwq is available only from
+ * queueing until execution starts.
*/
-static cpumask_var_t cpu_populated_map __read_mostly;
+static inline void set_work_data(struct work_struct *work, unsigned long data,
+ unsigned long flags)
+{
+ BUG_ON(!work_pending(work));
+ atomic_long_set(&work->data, data | flags | work_static(work));
+}
-/* If it's single threaded, it isn't in the list of workqueues. */
-static inline int is_wq_single_threaded(struct workqueue_struct *wq)
+static void set_work_cwq(struct work_struct *work,
+ struct cpu_workqueue_struct *cwq,
+ unsigned long extra_flags)
{
- return wq->singlethread;
+ set_work_data(work, (unsigned long)cwq,
+ WORK_STRUCT_PENDING | extra_flags);
}
-static const struct cpumask *wq_cpu_map(struct workqueue_struct *wq)
+static void set_work_cpu(struct work_struct *work, unsigned int cpu)
{
- return is_wq_single_threaded(wq)
- ? cpu_singlethread_map : cpu_populated_map;
+ set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
}
-static
-struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
+static void clear_work_data(struct work_struct *work)
{
- if (unlikely(is_wq_single_threaded(wq)))
- cpu = singlethread_cpu;
- return per_cpu_ptr(wq->cpu_wq, cpu);
+ set_work_data(work, WORK_STRUCT_NO_CPU, 0);
}
-/*
- * Set the workqueue on which a work item is to be run
- * - Must *only* be called if the pending flag is set
+static inline unsigned long get_work_data(struct work_struct *work)
+{
+ return atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK;
+}
+
+static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
+{
+ unsigned long data = get_work_data(work);
+
+ return data >= PAGE_OFFSET ? (void *)data : NULL;
+}
+
+static struct global_cwq *get_work_gcwq(struct work_struct *work)
+{
+ unsigned long data = get_work_data(work);
+ unsigned int cpu;
+
+ if (data >= PAGE_OFFSET)
+ return ((struct cpu_workqueue_struct *)data)->gcwq;
+
+ cpu = data >> WORK_STRUCT_FLAG_BITS;
+ if (cpu == NR_CPUS)
+ return NULL;
+
+ BUG_ON(cpu >= num_possible_cpus());
+ return get_gcwq(cpu);
+}
+
+/**
+ * busy_worker_head - return the busy hash head for a work
+ * @gcwq: gcwq of interest
+ * @work: work to be hashed
+ *
+ * Return hash head of @gcwq for @work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to the hash head.
*/
-static inline void set_wq_data(struct work_struct *work,
- struct cpu_workqueue_struct *cwq)
+static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
+ struct work_struct *work)
{
- unsigned long new;
+ const int base_shift = ilog2(sizeof(struct work_struct));
+ unsigned long v = (unsigned long)work;
- BUG_ON(!work_pending(work));
+ /* simple shift and fold hash, do we need something better? */
+ v >>= base_shift;
+ v += v >> BUSY_WORKER_HASH_ORDER;
+ v &= BUSY_WORKER_HASH_MASK;
- new = (unsigned long) cwq | (1UL << WORK_STRUCT_PENDING);
- new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
- atomic_long_set(&work->data, new);
+ return &gcwq->busy_hash[v];
}
-/*
- * Clear WORK_STRUCT_PENDING and the workqueue on which it was queued.
+/**
+ * __find_worker_executing_work - find worker which is executing a work
+ * @gcwq: gcwq of interest
+ * @bwh: hash head as returned by busy_worker_head()
+ * @work: work to find worker for
+ *
+ * Find a worker which is executing @work on @gcwq. @bwh should be
+ * the hash head obtained by calling busy_worker_head() with the same
+ * work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to worker which is executing @work if found, NULL
+ * otherwise.
*/
-static inline void clear_wq_data(struct work_struct *work)
+static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
+ struct hlist_head *bwh,
+ struct work_struct *work)
{
- unsigned long flags = *work_data_bits(work) &
- (1UL << WORK_STRUCT_STATIC);
- atomic_long_set(&work->data, flags);
+ struct worker *worker;
+ struct hlist_node *tmp;
+
+ hlist_for_each_entry(worker, tmp, bwh, hentry)
+ if (worker->current_work == work)
+ return worker;
+ return NULL;
}
-static inline
-struct cpu_workqueue_struct *get_wq_data(struct work_struct *work)
+/**
+ * find_worker_executing_work - find worker which is executing a work
+ * @gcwq: gcwq of interest
+ * @work: work to find worker for
+ *
+ * Find a worker which is executing @work on @gcwq. This function is
+ * identical to __find_worker_executing_work() except that this
+ * function calculates @bwh itself.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ *
+ * RETURNS:
+ * Pointer to worker which is executing @work if found, NULL
+ * otherwise.
+ */
+static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
+ struct work_struct *work)
{
- return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
+ return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
+ work);
}
+/**
+ * insert_work - insert a work into cwq
+ * @cwq: cwq @work belongs to
+ * @work: work to insert
+ * @head: insertion point
+ * @extra_flags: extra WORK_STRUCT_* flags to set
+ *
+ * Insert @work into @cwq after @head.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
static void insert_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work, struct list_head *head)
+ struct work_struct *work, struct list_head *head,
+ unsigned int extra_flags)
{
- trace_workqueue_insertion(cwq->thread, work);
+ /* we own @work, set data and link */
+ set_work_cwq(work, cwq, extra_flags);
- set_wq_data(work, cwq);
/*
* Ensure that we get the right work->data if we see the
* result of list_add() below, see try_to_grab_pending().
*/
smp_wmb();
+
list_add_tail(&work->entry, head);
- wake_up(&cwq->more_work);
+ wake_up_process(cwq->worker->task);
+}
+
+/**
+ * cwq_unbind_single_cpu - unbind cwq from single cpu workqueue processing
+ * @cwq: cwq to unbind
+ *
+ * Try to unbind @cwq from single cpu workqueue processing. If
+ * @cwq->wq is frozen, unbind is delayed till the workqueue is thawed.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_unbind_single_cpu(struct cpu_workqueue_struct *cwq)
+{
+ struct workqueue_struct *wq = cwq->wq;
+ struct global_cwq *gcwq = cwq->gcwq;
+
+ BUG_ON(wq->single_cpu != gcwq->cpu);
+ /*
+ * Unbind from workqueue if @cwq is not frozen. If frozen,
+ * thaw_workqueues() will either restart processing on this
+ * cpu or unbind if empty. This keeps works queued while
+ * frozen fully ordered and flushable.
+ */
+ if (likely(!(gcwq->flags & GCWQ_FREEZING))) {
+ smp_wmb(); /* paired with cmpxchg() in __queue_work() */
+ wq->single_cpu = NR_CPUS;
+ }
}
-static void __queue_work(struct cpu_workqueue_struct *cwq,
+static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
struct work_struct *work)
{
+ struct global_cwq *gcwq;
+ struct cpu_workqueue_struct *cwq;
+ struct list_head *worklist;
unsigned long flags;
+ bool arbitrate;
debug_work_activate(work);
- spin_lock_irqsave(&cwq->lock, flags);
- insert_work(cwq, work, &cwq->worklist);
- spin_unlock_irqrestore(&cwq->lock, flags);
+
+ /*
+ * Determine gcwq to use. SINGLE_CPU is inherently
+ * NON_REENTRANT, so test it first.
+ */
+ if (!(wq->flags & WQ_SINGLE_CPU)) {
+ struct global_cwq *last_gcwq;
+
+ /*
+ * It's multi cpu. If @wq is non-reentrant and @work
+ * was previously on a different cpu, it might still
+ * be running there, in which case the work needs to
+ * be queued on that cpu to guarantee non-reentrance.
+ */
+ gcwq = get_gcwq(cpu);
+ if (wq->flags & WQ_NON_REENTRANT &&
+ (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
+ struct worker *worker;
+
+ spin_lock_irqsave(&last_gcwq->lock, flags);
+
+ worker = find_worker_executing_work(last_gcwq, work);
+
+ if (worker && worker->current_cwq->wq == wq)
+ gcwq = last_gcwq;
+ else {
+ /* meh... not running there, queue here */
+ spin_unlock_irqrestore(&last_gcwq->lock, flags);
+ spin_lock_irqsave(&gcwq->lock, flags);
+ }
+ } else
+ spin_lock_irqsave(&gcwq->lock, flags);
+ } else {
+ unsigned int req_cpu = cpu;
+
+ /*
+ * It's a bit more complex for single cpu workqueues.
+ * We first need to determine which cpu is going to be
+ * used. If no cpu is currently serving this
+ * workqueue, arbitrate using atomic accesses to
+ * wq->single_cpu; otherwise, use the current one.
+ */
+ retry:
+ cpu = wq->single_cpu;
+ arbitrate = cpu == NR_CPUS;
+ if (arbitrate)
+ cpu = req_cpu;
+
+ gcwq = get_gcwq(cpu);
+ spin_lock_irqsave(&gcwq->lock, flags);
+
+ /*
+ * The following cmpxchg() is a full barrier paired
+ * with smp_wmb() in cwq_unbind_single_cpu() and
+ * guarantees that all changes to wq->st_* fields are
+ * visible on the new cpu after this point.
+ */
+ if (arbitrate)
+ cmpxchg(&wq->single_cpu, NR_CPUS, cpu);
+
+ if (unlikely(wq->single_cpu != cpu)) {
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+ goto retry;
+ }
+ }
+
+ /* gcwq determined, get cwq and queue */
+ cwq = get_cwq(gcwq->cpu, wq);
+
+ BUG_ON(!list_empty(&work->entry));
+
+ cwq->nr_in_flight[cwq->work_color]++;
+
+ if (likely(cwq->nr_active < cwq->max_active)) {
+ cwq->nr_active++;
+ worklist = &cwq->worklist;
+ } else
+ worklist = &cwq->delayed_works;
+
+ insert_work(cwq, work, worklist, work_color_to_flags(cwq->work_color));
+
+ spin_unlock_irqrestore(&gcwq->lock, flags);
}
/**
{
int ret = 0;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
- BUG_ON(!list_empty(&work->entry));
- __queue_work(wq_per_cpu(wq, cpu), work);
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ __queue_work(cpu, wq, work);
ret = 1;
}
return ret;
static void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
- struct cpu_workqueue_struct *cwq = get_wq_data(&dwork->work);
- struct workqueue_struct *wq = cwq->wq;
+ struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
- __queue_work(wq_per_cpu(wq, smp_processor_id()), &dwork->work);
+ __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
}
/**
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
+ struct global_cwq *gcwq = get_work_gcwq(work);
+ unsigned int lcpu = gcwq ? gcwq->cpu : raw_smp_processor_id();
+
BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry));
timer_stats_timer_set_start_info(&dwork->timer);
-
- /* This stores cwq for the moment, for the timer_fn */
- set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
+ /*
+ * This stores cwq for the moment, for the timer_fn.
+ * Note that the work's gcwq is preserved to allow
+ * reentrance detection for delayed works.
+ */
+ set_work_cwq(work, get_cwq(lcpu, wq), 0);
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
-static void run_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * worker_enter_idle - enter idle state
+ * @worker: worker which is entering idle state
+ *
+ * @worker is entering idle state. Update stats and idle timer if
+ * necessary.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_enter_idle(struct worker *worker)
{
- spin_lock_irq(&cwq->lock);
- while (!list_empty(&cwq->worklist)) {
- struct work_struct *work = list_entry(cwq->worklist.next,
- struct work_struct, entry);
- work_func_t f = work->func;
+ struct global_cwq *gcwq = worker->gcwq;
+
+ BUG_ON(worker->flags & WORKER_IDLE);
+ BUG_ON(!list_empty(&worker->entry) &&
+ (worker->hentry.next || worker->hentry.pprev));
+
+ worker->flags |= WORKER_IDLE;
+ gcwq->nr_idle++;
+
+ /* idle_list is LIFO */
+ list_add(&worker->entry, &gcwq->idle_list);
+
+ if (unlikely(worker->flags & WORKER_ROGUE))
+ wake_up_all(&gcwq->trustee_wait);
+}
+
+/**
+ * worker_leave_idle - leave idle state
+ * @worker: worker which is leaving idle state
+ *
+ * @worker is leaving idle state. Update stats.
+ *
+ * LOCKING:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void worker_leave_idle(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+
+ BUG_ON(!(worker->flags & WORKER_IDLE));
+ worker->flags &= ~WORKER_IDLE;
+ gcwq->nr_idle--;
+ list_del_init(&worker->entry);
+}
+
+static struct worker *alloc_worker(void)
+{
+ struct worker *worker;
+
+ worker = kzalloc(sizeof(*worker), GFP_KERNEL);
+ if (worker) {
+ INIT_LIST_HEAD(&worker->entry);
+ INIT_LIST_HEAD(&worker->scheduled);
+ }
+ return worker;
+}
+
+/**
+ * create_worker - create a new workqueue worker
+ * @cwq: cwq the new worker will belong to
+ * @bind: whether to set affinity to @cpu or not
+ *
+ * Create a new worker which is bound to @cwq. The returned worker
+ * can be started by calling start_worker() or destroyed using
+ * destroy_worker().
+ *
+ * CONTEXT:
+ * Might sleep. Does GFP_KERNEL allocations.
+ *
+ * RETURNS:
+ * Pointer to the newly created worker.
+ */
+static struct worker *create_worker(struct cpu_workqueue_struct *cwq, bool bind)
+{
+ struct global_cwq *gcwq = cwq->gcwq;
+ int id = -1;
+ struct worker *worker = NULL;
+
+ spin_lock_irq(&gcwq->lock);
+ while (ida_get_new(&gcwq->worker_ida, &id)) {
+ spin_unlock_irq(&gcwq->lock);
+ if (!ida_pre_get(&gcwq->worker_ida, GFP_KERNEL))
+ goto fail;
+ spin_lock_irq(&gcwq->lock);
+ }
+ spin_unlock_irq(&gcwq->lock);
+
+ worker = alloc_worker();
+ if (!worker)
+ goto fail;
+
+ worker->gcwq = gcwq;
+ worker->cwq = cwq;
+ worker->id = id;
+
+ worker->task = kthread_create(worker_thread, worker, "kworker/%u:%d",
+ gcwq->cpu, id);
+ if (IS_ERR(worker->task))
+ goto fail;
+
+ /*
+ * A rogue worker will become a regular one if CPU comes
+ * online later on. Make sure every worker has
+ * PF_THREAD_BOUND set.
+ */
+ if (bind)
+ kthread_bind(worker->task, gcwq->cpu);
+ else
+ worker->task->flags |= PF_THREAD_BOUND;
+
+ return worker;
+fail:
+ if (id >= 0) {
+ spin_lock_irq(&gcwq->lock);
+ ida_remove(&gcwq->worker_ida, id);
+ spin_unlock_irq(&gcwq->lock);
+ }
+ kfree(worker);
+ return NULL;
+}
+
+/**
+ * start_worker - start a newly created worker
+ * @worker: worker to start
+ *
+ * Make the gcwq aware of @worker and start it.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void start_worker(struct worker *worker)
+{
+ worker->flags |= WORKER_STARTED;
+ worker->gcwq->nr_workers++;
+ worker_enter_idle(worker);
+ wake_up_process(worker->task);
+}
+
+/**
+ * destroy_worker - destroy a workqueue worker
+ * @worker: worker to be destroyed
+ *
+ * Destroy @worker and adjust @gcwq stats accordingly.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
+ */
+static void destroy_worker(struct worker *worker)
+{
+ struct global_cwq *gcwq = worker->gcwq;
+ int id = worker->id;
+
+ /* sanity check frenzy */
+ BUG_ON(worker->current_work);
+ BUG_ON(!list_empty(&worker->scheduled));
+
+ if (worker->flags & WORKER_STARTED)
+ gcwq->nr_workers--;
+ if (worker->flags & WORKER_IDLE)
+ gcwq->nr_idle--;
+
+ list_del_init(&worker->entry);
+ worker->flags |= WORKER_DIE;
+
+ spin_unlock_irq(&gcwq->lock);
+
+ kthread_stop(worker->task);
+ kfree(worker);
+
+ spin_lock_irq(&gcwq->lock);
+ ida_remove(&gcwq->worker_ida, id);
+}
+
+/**
+ * move_linked_works - move linked works to a list
+ * @work: start of series of works to be scheduled
+ * @head: target list to append @work to
+ * @nextp: out paramter for nested worklist walking
+ *
+ * Schedule linked works starting from @work to @head. Work series to
+ * be scheduled starts at @work and includes any consecutive work with
+ * WORK_STRUCT_LINKED set in its predecessor.
+ *
+ * If @nextp is not NULL, it's updated to point to the next work of
+ * the last scheduled work. This allows move_linked_works() to be
+ * nested inside outer list_for_each_entry_safe().
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void move_linked_works(struct work_struct *work, struct list_head *head,
+ struct work_struct **nextp)
+{
+ struct work_struct *n;
+
+ /*
+ * Linked worklist will always end before the end of the list,
+ * use NULL for list head.
+ */
+ list_for_each_entry_safe_from(work, n, NULL, entry) {
+ list_move_tail(&work->entry, head);
+ if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
+ break;
+ }
+
+ /*
+ * If we're already inside safe list traversal and have moved
+ * multiple works to the scheduled queue, the next position
+ * needs to be updated.
+ */
+ if (nextp)
+ *nextp = n;
+}
+
+static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
+{
+ struct work_struct *work = list_first_entry(&cwq->delayed_works,
+ struct work_struct, entry);
+
+ move_linked_works(work, &cwq->worklist, NULL);
+ cwq->nr_active++;
+}
+
+/**
+ * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
+ * @cwq: cwq of interest
+ * @color: color of work which left the queue
+ *
+ * A work either has completed or is removed from pending queue,
+ * decrement nr_in_flight of its cwq and handle workqueue flushing.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
+static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color)
+{
+ /* ignore uncolored works */
+ if (color == WORK_NO_COLOR)
+ return;
+
+ cwq->nr_in_flight[color]--;
+ cwq->nr_active--;
+
+ if (!list_empty(&cwq->delayed_works)) {
+ /* one down, submit a delayed one */
+ if (cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+ } else if (!cwq->nr_active && cwq->wq->flags & WQ_SINGLE_CPU) {
+ /* this was the last work, unbind from single cpu */
+ cwq_unbind_single_cpu(cwq);
+ }
+
+ /* is flush in progress and are we at the flushing tip? */
+ if (likely(cwq->flush_color != color))
+ return;
+
+ /* are there still in-flight works? */
+ if (cwq->nr_in_flight[color])
+ return;
+
+ /* this cwq is done, clear flush_color */
+ cwq->flush_color = -1;
+
+ /*
+ * If this was the last cwq, wake up the first flusher. It
+ * will handle the rest.
+ */
+ if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
+ complete(&cwq->wq->first_flusher->done);
+}
+
+/**
+ * process_one_work - process single work
+ * @worker: self
+ * @work: work to process
+ *
+ * Process @work. This function contains all the logics necessary to
+ * process a single work including synchronization against and
+ * interaction with other workers on the same cpu, queueing and
+ * flushing. As long as context requirement is met, any worker can
+ * call this function to process a work.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which is released and regrabbed.
+ */
+static void process_one_work(struct worker *worker, struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq = worker->cwq;
+ struct global_cwq *gcwq = cwq->gcwq;
+ struct hlist_head *bwh = busy_worker_head(gcwq, work);
+ work_func_t f = work->func;
+ int work_color;
#ifdef CONFIG_LOCKDEP
- /*
- * It is permissible to free the struct work_struct
- * from inside the function that is called from it,
- * this we need to take into account for lockdep too.
- * To avoid bogus "held lock freed" warnings as well
- * as problems when looking into work->lockdep_map,
- * make a copy and use that here.
- */
- struct lockdep_map lockdep_map = work->lockdep_map;
+ /*
+ * It is permissible to free the struct work_struct from
+ * inside the function that is called from it, this we need to
+ * take into account for lockdep too. To avoid bogus "held
+ * lock freed" warnings as well as problems when looking into
+ * work->lockdep_map, make a copy and use that here.
+ */
+ struct lockdep_map lockdep_map = work->lockdep_map;
#endif
- trace_workqueue_execution(cwq->thread, work);
- debug_work_deactivate(work);
- cwq->current_work = work;
- list_del_init(cwq->worklist.next);
- spin_unlock_irq(&cwq->lock);
-
- BUG_ON(get_wq_data(work) != cwq);
- work_clear_pending(work);
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_acquire(&lockdep_map);
- f(work);
- lock_map_release(&lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
- printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
- "%s/0x%08x/%d\n",
- current->comm, preempt_count(),
- task_pid_nr(current));
- printk(KERN_ERR " last function: ");
- print_symbol("%s\n", (unsigned long)f);
- debug_show_held_locks(current);
- dump_stack();
- }
+ /* claim and process */
+ debug_work_deactivate(work);
+ hlist_add_head(&worker->hentry, bwh);
+ worker->current_work = work;
+ worker->current_cwq = cwq;
+ work_color = get_work_color(work);
+
+ BUG_ON(get_work_cwq(work) != cwq);
+ /* record the current cpu number in the work data and dequeue */
+ set_work_cpu(work, gcwq->cpu);
+ list_del_init(&work->entry);
+
+ spin_unlock_irq(&gcwq->lock);
+
+ work_clear_pending(work);
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_acquire(&lockdep_map);
+ f(work);
+ lock_map_release(&lockdep_map);
+ lock_map_release(&cwq->wq->lockdep_map);
+
+ if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
+ printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
+ "%s/0x%08x/%d\n",
+ current->comm, preempt_count(), task_pid_nr(current));
+ printk(KERN_ERR " last function: ");
+ print_symbol("%s\n", (unsigned long)f);
+ debug_show_held_locks(current);
+ dump_stack();
+ }
+
+ spin_lock_irq(&gcwq->lock);
+
+ /* we're done with it, release */
+ hlist_del_init(&worker->hentry);
+ worker->current_work = NULL;
+ worker->current_cwq = NULL;
+ cwq_dec_nr_in_flight(cwq, work_color);
+}
- spin_lock_irq(&cwq->lock);
- cwq->current_work = NULL;
+/**
+ * process_scheduled_works - process scheduled works
+ * @worker: self
+ *
+ * Process all scheduled works. Please note that the scheduled list
+ * may change while processing a work, so this function repeatedly
+ * fetches a work from the top and executes it.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times.
+ */
+static void process_scheduled_works(struct worker *worker)
+{
+ while (!list_empty(&worker->scheduled)) {
+ struct work_struct *work = list_first_entry(&worker->scheduled,
+ struct work_struct, entry);
+ process_one_work(worker, work);
}
- spin_unlock_irq(&cwq->lock);
}
-static int worker_thread(void *__cwq)
+/**
+ * worker_thread - the worker thread function
+ * @__worker: self
+ *
+ * The cwq worker thread function.
+ */
+static int worker_thread(void *__worker)
{
- struct cpu_workqueue_struct *cwq = __cwq;
- DEFINE_WAIT(wait);
+ struct worker *worker = __worker;
+ struct global_cwq *gcwq = worker->gcwq;
+ struct cpu_workqueue_struct *cwq = worker->cwq;
- if (cwq->wq->freezeable)
- set_freezable();
+woke_up:
+ spin_lock_irq(&gcwq->lock);
- for (;;) {
- prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
- if (!freezing(current) &&
- !kthread_should_stop() &&
- list_empty(&cwq->worklist))
- schedule();
- finish_wait(&cwq->more_work, &wait);
+ /* DIE can be set only while we're idle, checking here is enough */
+ if (worker->flags & WORKER_DIE) {
+ spin_unlock_irq(&gcwq->lock);
+ return 0;
+ }
- try_to_freeze();
+ worker_leave_idle(worker);
+recheck:
+ /*
+ * ->scheduled list can only be filled while a worker is
+ * preparing to process a work or actually processing it.
+ * Make sure nobody diddled with it while I was sleeping.
+ */
+ BUG_ON(!list_empty(&worker->scheduled));
- if (kthread_should_stop())
- break;
+ while (!list_empty(&cwq->worklist)) {
+ struct work_struct *work =
+ list_first_entry(&cwq->worklist,
+ struct work_struct, entry);
+
+ /*
+ * The following is a rather inefficient way to close
+ * race window against cpu hotplug operations. Will
+ * be replaced soon.
+ */
+ if (unlikely(!(worker->flags & WORKER_ROGUE) &&
+ !cpumask_equal(&worker->task->cpus_allowed,
+ get_cpu_mask(gcwq->cpu)))) {
+ spin_unlock_irq(&gcwq->lock);
+ set_cpus_allowed_ptr(worker->task,
+ get_cpu_mask(gcwq->cpu));
+ cpu_relax();
+ spin_lock_irq(&gcwq->lock);
+ goto recheck;
+ }
- run_workqueue(cwq);
+ if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
+ /* optimization path, not strictly necessary */
+ process_one_work(worker, work);
+ if (unlikely(!list_empty(&worker->scheduled)))
+ process_scheduled_works(worker);
+ } else {
+ move_linked_works(work, &worker->scheduled, NULL);
+ process_scheduled_works(worker);
+ }
}
- return 0;
+ /*
+ * gcwq->lock is held and there's no work to process, sleep.
+ * Workers are woken up only while holding gcwq->lock, so
+ * setting the current state before releasing gcwq->lock is
+ * enough to prevent losing any event.
+ */
+ worker_enter_idle(worker);
+ __set_current_state(TASK_INTERRUPTIBLE);
+ spin_unlock_irq(&gcwq->lock);
+ schedule();
+ goto woke_up;
}
struct wq_barrier {
complete(&barr->done);
}
+/**
+ * insert_wq_barrier - insert a barrier work
+ * @cwq: cwq to insert barrier into
+ * @barr: wq_barrier to insert
+ * @target: target work to attach @barr to
+ * @worker: worker currently executing @target, NULL if @target is not executing
+ *
+ * @barr is linked to @target such that @barr is completed only after
+ * @target finishes execution. Please note that the ordering
+ * guarantee is observed only with respect to @target and on the local
+ * cpu.
+ *
+ * Currently, a queued barrier can't be canceled. This is because
+ * try_to_grab_pending() can't determine whether the work to be
+ * grabbed is at the head of the queue and thus can't clear LINKED
+ * flag of the previous work while there must be a valid next work
+ * after a work with LINKED flag set.
+ *
+ * Note that when @worker is non-NULL, @target may be modified
+ * underneath us, so we can't reliably determine cwq from @target.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock).
+ */
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
- struct wq_barrier *barr, struct list_head *head)
+ struct wq_barrier *barr,
+ struct work_struct *target, struct worker *worker)
{
+ struct list_head *head;
+ unsigned int linked = 0;
+
/*
- * debugobject calls are safe here even with cwq->lock locked
+ * debugobject calls are safe here even with gcwq->lock locked
* as we know for sure that this will not trigger any of the
* checks and call back into the fixup functions where we
* might deadlock.
*/
INIT_WORK_ON_STACK(&barr->work, wq_barrier_func);
- __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
-
+ __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);
+ /*
+ * If @target is currently being executed, schedule the
+ * barrier to the worker; otherwise, put it after @target.
+ */
+ if (worker)
+ head = worker->scheduled.next;
+ else {
+ unsigned long *bits = work_data_bits(target);
+
+ head = target->entry.next;
+ /* there can already be other linked works, inherit and set */
+ linked = *bits & WORK_STRUCT_LINKED;
+ __set_bit(WORK_STRUCT_LINKED_BIT, bits);
+ }
+
debug_work_activate(&barr->work);
- insert_work(cwq, &barr->work, head);
+ insert_work(cwq, &barr->work, head,
+ work_color_to_flags(WORK_NO_COLOR) | linked);
}
-static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
+/**
+ * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
+ * @wq: workqueue being flushed
+ * @flush_color: new flush color, < 0 for no-op
+ * @work_color: new work color, < 0 for no-op
+ *
+ * Prepare cwqs for workqueue flushing.
+ *
+ * If @flush_color is non-negative, flush_color on all cwqs should be
+ * -1. If no cwq has in-flight commands at the specified color, all
+ * cwq->flush_color's stay at -1 and %false is returned. If any cwq
+ * has in flight commands, its cwq->flush_color is set to
+ * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
+ * wakeup logic is armed and %true is returned.
+ *
+ * The caller should have initialized @wq->first_flusher prior to
+ * calling this function with non-negative @flush_color. If
+ * @flush_color is negative, no flush color update is done and %false
+ * is returned.
+ *
+ * If @work_color is non-negative, all cwqs should have the same
+ * work_color which is previous to @work_color and all will be
+ * advanced to @work_color.
+ *
+ * CONTEXT:
+ * mutex_lock(wq->flush_mutex).
+ *
+ * RETURNS:
+ * %true if @flush_color >= 0 and there's something to flush. %false
+ * otherwise.
+ */
+static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
+ int flush_color, int work_color)
{
- int active = 0;
- struct wq_barrier barr;
+ bool wait = false;
+ unsigned int cpu;
+
+ if (flush_color >= 0) {
+ BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
+ atomic_set(&wq->nr_cwqs_to_flush, 1);
+ }
+
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ struct global_cwq *gcwq = cwq->gcwq;
+
+ spin_lock_irq(&gcwq->lock);
+
+ if (flush_color >= 0) {
+ BUG_ON(cwq->flush_color != -1);
+
+ if (cwq->nr_in_flight[flush_color]) {
+ cwq->flush_color = flush_color;
+ atomic_inc(&wq->nr_cwqs_to_flush);
+ wait = true;
+ }
+ }
+
+ if (work_color >= 0) {
+ BUG_ON(work_color != work_next_color(cwq->work_color));
+ cwq->work_color = work_color;
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
+ complete(&wq->first_flusher->done);
+
+ return wait;
+}
- WARN_ON(cwq->thread == current);
+/**
+ * flush_workqueue - ensure that any scheduled work has run to completion.
+ * @wq: workqueue to flush
+ *
+ * Forces execution of the workqueue and blocks until its completion.
+ * This is typically used in driver shutdown handlers.
+ *
+ * We sleep until all works which were queued on entry have been handled,
+ * but we are not livelocked by new incoming ones.
+ */
+void flush_workqueue(struct workqueue_struct *wq)
+{
+ struct wq_flusher this_flusher = {
+ .list = LIST_HEAD_INIT(this_flusher.list),
+ .flush_color = -1,
+ .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
+ };
+ int next_color;
+
+ lock_map_acquire(&wq->lockdep_map);
+ lock_map_release(&wq->lockdep_map);
+
+ mutex_lock(&wq->flush_mutex);
+
+ /*
+ * Start-to-wait phase
+ */
+ next_color = work_next_color(wq->work_color);
+
+ if (next_color != wq->flush_color) {
+ /*
+ * Color space is not full. The current work_color
+ * becomes our flush_color and work_color is advanced
+ * by one.
+ */
+ BUG_ON(!list_empty(&wq->flusher_overflow));
+ this_flusher.flush_color = wq->work_color;
+ wq->work_color = next_color;
+
+ if (!wq->first_flusher) {
+ /* no flush in progress, become the first flusher */
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ wq->first_flusher = &this_flusher;
+
+ if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
+ wq->work_color)) {
+ /* nothing to flush, done */
+ wq->flush_color = next_color;
+ wq->first_flusher = NULL;
+ goto out_unlock;
+ }
+ } else {
+ /* wait in queue */
+ BUG_ON(wq->flush_color == this_flusher.flush_color);
+ list_add_tail(&this_flusher.list, &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+ } else {
+ /*
+ * Oops, color space is full, wait on overflow queue.
+ * The next flush completion will assign us
+ * flush_color and transfer to flusher_queue.
+ */
+ list_add_tail(&this_flusher.list, &wq->flusher_overflow);
+ }
+
+ mutex_unlock(&wq->flush_mutex);
+
+ wait_for_completion(&this_flusher.done);
+
+ /*
+ * Wake-up-and-cascade phase
+ *
+ * First flushers are responsible for cascading flushes and
+ * handling overflow. Non-first flushers can simply return.
+ */
+ if (wq->first_flusher != &this_flusher)
+ return;
+
+ mutex_lock(&wq->flush_mutex);
+
+ wq->first_flusher = NULL;
+
+ BUG_ON(!list_empty(&this_flusher.list));
+ BUG_ON(wq->flush_color != this_flusher.flush_color);
+
+ while (true) {
+ struct wq_flusher *next, *tmp;
+
+ /* complete all the flushers sharing the current flush color */
+ list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
+ if (next->flush_color != wq->flush_color)
+ break;
+ list_del_init(&next->list);
+ complete(&next->done);
+ }
+
+ BUG_ON(!list_empty(&wq->flusher_overflow) &&
+ wq->flush_color != work_next_color(wq->work_color));
+
+ /* this flush_color is finished, advance by one */
+ wq->flush_color = work_next_color(wq->flush_color);
+
+ /* one color has been freed, handle overflow queue */
+ if (!list_empty(&wq->flusher_overflow)) {
+ /*
+ * Assign the same color to all overflowed
+ * flushers, advance work_color and append to
+ * flusher_queue. This is the start-to-wait
+ * phase for these overflowed flushers.
+ */
+ list_for_each_entry(tmp, &wq->flusher_overflow, list)
+ tmp->flush_color = wq->work_color;
+
+ wq->work_color = work_next_color(wq->work_color);
+
+ list_splice_tail_init(&wq->flusher_overflow,
+ &wq->flusher_queue);
+ flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
+ }
+
+ if (list_empty(&wq->flusher_queue)) {
+ BUG_ON(wq->flush_color != wq->work_color);
+ break;
+ }
- spin_lock_irq(&cwq->lock);
- if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
- insert_wq_barrier(cwq, &barr, &cwq->worklist);
- active = 1;
- }
- spin_unlock_irq(&cwq->lock);
+ /*
+ * Need to flush more colors. Make the next flusher
+ * the new first flusher and arm cwqs.
+ */
+ BUG_ON(wq->flush_color == wq->work_color);
+ BUG_ON(wq->flush_color != next->flush_color);
- if (active) {
- wait_for_completion(&barr.done);
- destroy_work_on_stack(&barr.work);
- }
+ list_del_init(&next->list);
+ wq->first_flusher = next;
- return active;
-}
+ if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
+ break;
-/**
- * flush_workqueue - ensure that any scheduled work has run to completion.
- * @wq: workqueue to flush
- *
- * Forces execution of the workqueue and blocks until its completion.
- * This is typically used in driver shutdown handlers.
- *
- * We sleep until all works which were queued on entry have been handled,
- * but we are not livelocked by new incoming ones.
- *
- * This function used to run the workqueues itself. Now we just wait for the
- * helper threads to do it.
- */
-void flush_workqueue(struct workqueue_struct *wq)
-{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
- int cpu;
+ /*
+ * Meh... this color is already done, clear first
+ * flusher and repeat cascading.
+ */
+ wq->first_flusher = NULL;
+ }
- might_sleep();
- lock_map_acquire(&wq->lockdep_map);
- lock_map_release(&wq->lockdep_map);
- for_each_cpu(cpu, cpu_map)
- flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
+out_unlock:
+ mutex_unlock(&wq->flush_mutex);
}
EXPORT_SYMBOL_GPL(flush_workqueue);
*/
int flush_work(struct work_struct *work)
{
+ struct worker *worker = NULL;
+ struct global_cwq *gcwq;
struct cpu_workqueue_struct *cwq;
- struct list_head *prev;
struct wq_barrier barr;
might_sleep();
- cwq = get_wq_data(work);
- if (!cwq)
+ gcwq = get_work_gcwq(work);
+ if (!gcwq)
return 0;
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- prev = NULL;
- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
* See the comment near try_to_grab_pending()->smp_rmb().
- * If it was re-queued under us we are not going to wait.
+ * If it was re-queued to a different gcwq under us, we
+ * are not going to wait.
*/
smp_rmb();
- if (unlikely(cwq != get_wq_data(work)))
- goto out;
- prev = &work->entry;
+ cwq = get_work_cwq(work);
+ if (unlikely(!cwq || gcwq != cwq->gcwq))
+ goto already_gone;
} else {
- if (cwq->current_work != work)
- goto out;
- prev = &cwq->worklist;
+ worker = find_worker_executing_work(gcwq, work);
+ if (!worker)
+ goto already_gone;
+ cwq = worker->current_cwq;
}
- insert_wq_barrier(cwq, &barr, prev->next);
-out:
- spin_unlock_irq(&cwq->lock);
- if (!prev)
- return 0;
+
+ insert_wq_barrier(cwq, &barr, work, worker);
+ spin_unlock_irq(&gcwq->lock);
+
+ lock_map_acquire(&cwq->wq->lockdep_map);
+ lock_map_release(&cwq->wq->lockdep_map);
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
return 1;
+already_gone:
+ spin_unlock_irq(&gcwq->lock);
+ return 0;
}
EXPORT_SYMBOL_GPL(flush_work);
*/
static int try_to_grab_pending(struct work_struct *work)
{
- struct cpu_workqueue_struct *cwq;
+ struct global_cwq *gcwq;
int ret = -1;
- if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work)))
+ if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
return 0;
/*
* The queueing is in progress, or it is already queued. Try to
* steal it from ->worklist without clearing WORK_STRUCT_PENDING.
*/
-
- cwq = get_wq_data(work);
- if (!cwq)
+ gcwq = get_work_gcwq(work);
+ if (!gcwq)
return ret;
- spin_lock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);
if (!list_empty(&work->entry)) {
/*
- * This work is queued, but perhaps we locked the wrong cwq.
+ * This work is queued, but perhaps we locked the wrong gcwq.
* In that case we must see the new value after rmb(), see
* insert_work()->wmb().
*/
smp_rmb();
- if (cwq == get_wq_data(work)) {
+ if (gcwq == get_work_gcwq(work)) {
debug_work_deactivate(work);
list_del_init(&work->entry);
+ cwq_dec_nr_in_flight(get_work_cwq(work),
+ get_work_color(work));
ret = 1;
}
}
- spin_unlock_irq(&cwq->lock);
+ spin_unlock_irq(&gcwq->lock);
return ret;
}
-static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
- struct work_struct *work)
+static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
{
struct wq_barrier barr;
- int running = 0;
+ struct worker *worker;
- spin_lock_irq(&cwq->lock);
- if (unlikely(cwq->current_work == work)) {
- insert_wq_barrier(cwq, &barr, cwq->worklist.next);
- running = 1;
- }
- spin_unlock_irq(&cwq->lock);
+ spin_lock_irq(&gcwq->lock);
+
+ worker = find_worker_executing_work(gcwq, work);
+ if (unlikely(worker))
+ insert_wq_barrier(worker->current_cwq, &barr, work, worker);
- if (unlikely(running)) {
+ spin_unlock_irq(&gcwq->lock);
+
+ if (unlikely(worker)) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
}
static void wait_on_work(struct work_struct *work)
{
- struct cpu_workqueue_struct *cwq;
- struct workqueue_struct *wq;
- const struct cpumask *cpu_map;
int cpu;
might_sleep();
lock_map_acquire(&work->lockdep_map);
lock_map_release(&work->lockdep_map);
- cwq = get_wq_data(work);
- if (!cwq)
- return;
-
- wq = cwq->wq;
- cpu_map = wq_cpu_map(wq);
-
- for_each_cpu(cpu, cpu_map)
- wait_on_cpu_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+ for_each_possible_cpu(cpu)
+ wait_on_cpu_work(get_gcwq(cpu), work);
}
static int __cancel_work_timer(struct work_struct *work,
wait_on_work(work);
} while (unlikely(ret < 0));
- clear_wq_data(work);
+ clear_work_data(work);
return ret;
}
void flush_delayed_work(struct delayed_work *dwork)
{
if (del_timer_sync(&dwork->timer)) {
- struct cpu_workqueue_struct *cwq;
- cwq = wq_per_cpu(get_wq_data(&dwork->work)->wq, get_cpu());
- __queue_work(cwq, &dwork->work);
+ __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
+ &dwork->work);
put_cpu();
}
flush_work(&dwork->work);
BUG_ON(!keventd_wq);
- cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
- if (current == cwq->thread)
+ cwq = get_cwq(cpu, keventd_wq);
+ if (current == cwq->worker->task)
ret = 1;
return ret;
}
-static struct cpu_workqueue_struct *
-init_cpu_workqueue(struct workqueue_struct *wq, int cpu)
+static struct cpu_workqueue_struct *alloc_cwqs(void)
{
- struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
- cwq->wq = wq;
- spin_lock_init(&cwq->lock);
- INIT_LIST_HEAD(&cwq->worklist);
- init_waitqueue_head(&cwq->more_work);
-
- return cwq;
-}
-
-static int create_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
-{
- struct sched_param param = { .sched_priority = MAX_RT_PRIO-1 };
- struct workqueue_struct *wq = cwq->wq;
- const char *fmt = is_wq_single_threaded(wq) ? "%s" : "%s/%d";
- struct task_struct *p;
-
- p = kthread_create(worker_thread, cwq, fmt, wq->name, cpu);
/*
- * Nobody can add the work_struct to this cwq,
- * if (caller is __create_workqueue)
- * nobody should see this wq
- * else // caller is CPU_UP_PREPARE
- * cpu is not on cpu_online_map
- * so we can abort safely.
+ * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
+ * Make sure that the alignment isn't lower than that of
+ * unsigned long long.
*/
- if (IS_ERR(p))
- return PTR_ERR(p);
- if (cwq->wq->rt)
- sched_setscheduler_nocheck(p, SCHED_FIFO, ¶m);
- cwq->thread = p;
-
- trace_workqueue_creation(cwq->thread, cpu);
+ const size_t size = sizeof(struct cpu_workqueue_struct);
+ const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
+ __alignof__(unsigned long long));
+ struct cpu_workqueue_struct *cwqs;
+#ifndef CONFIG_SMP
+ void *ptr;
- return 0;
+ /*
+ * On UP, percpu allocator doesn't honor alignment parameter
+ * and simply uses arch-dependent default. Allocate enough
+ * room to align cwq and put an extra pointer at the end
+ * pointing back to the originally allocated pointer which
+ * will be used for free.
+ *
+ * FIXME: This really belongs to UP percpu code. Update UP
+ * percpu code to honor alignment and remove this ugliness.
+ */
+ ptr = __alloc_percpu(size + align + sizeof(void *), 1);
+ cwqs = PTR_ALIGN(ptr, align);
+ *(void **)per_cpu_ptr(cwqs + 1, 0) = ptr;
+#else
+ /* On SMP, percpu allocator can do it itself */
+ cwqs = __alloc_percpu(size, align);
+#endif
+ /* just in case, make sure it's actually aligned */
+ BUG_ON(!IS_ALIGNED((unsigned long)cwqs, align));
+ return cwqs;
}
-static void start_workqueue_thread(struct cpu_workqueue_struct *cwq, int cpu)
+static void free_cwqs(struct cpu_workqueue_struct *cwqs)
{
- struct task_struct *p = cwq->thread;
-
- if (p != NULL) {
- if (cpu >= 0)
- kthread_bind(p, cpu);
- wake_up_process(p);
- }
+#ifndef CONFIG_SMP
+ /* on UP, the pointer to free is stored right after the cwq */
+ if (cwqs)
+ free_percpu(*(void **)per_cpu_ptr(cwqs + 1, 0));
+#else
+ free_percpu(cwqs);
+#endif
}
struct workqueue_struct *__create_workqueue_key(const char *name,
- int singlethread,
- int freezeable,
- int rt,
+ unsigned int flags,
+ int max_active,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
- struct cpu_workqueue_struct *cwq;
- int err = 0, cpu;
+ bool failed = false;
+ unsigned int cpu;
+
+ max_active = clamp_val(max_active, 1, INT_MAX);
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
- return NULL;
+ goto err;
- wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
- if (!wq->cpu_wq) {
- kfree(wq);
- return NULL;
- }
+ wq->cpu_wq = alloc_cwqs();
+ if (!wq->cpu_wq)
+ goto err;
+
+ wq->flags = flags;
+ wq->saved_max_active = max_active;
+ mutex_init(&wq->flush_mutex);
+ atomic_set(&wq->nr_cwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->flusher_queue);
+ INIT_LIST_HEAD(&wq->flusher_overflow);
+ wq->single_cpu = NR_CPUS;
wq->name = name;
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
- wq->singlethread = singlethread;
- wq->freezeable = freezeable;
- wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
- if (singlethread) {
- cwq = init_cpu_workqueue(wq, singlethread_cpu);
- err = create_workqueue_thread(cwq, singlethread_cpu);
- start_workqueue_thread(cwq, -1);
- } else {
- cpu_maps_update_begin();
- /*
- * We must place this wq on list even if the code below fails.
- * cpu_down(cpu) can remove cpu from cpu_populated_map before
- * destroy_workqueue() takes the lock, in that case we leak
- * cwq[cpu]->thread.
- */
- spin_lock(&workqueue_lock);
- list_add(&wq->list, &workqueues);
- spin_unlock(&workqueue_lock);
- /*
- * We must initialize cwqs for each possible cpu even if we
- * are going to call destroy_workqueue() finally. Otherwise
- * cpu_up() can hit the uninitialized cwq once we drop the
- * lock.
- */
- for_each_possible_cpu(cpu) {
- cwq = init_cpu_workqueue(wq, cpu);
- if (err || !cpu_online(cpu))
- continue;
- err = create_workqueue_thread(cwq, cpu);
- start_workqueue_thread(cwq, cpu);
- }
- cpu_maps_update_done();
+ cpu_maps_update_begin();
+ /*
+ * We must initialize cwqs for each possible cpu even if we
+ * are going to call destroy_workqueue() finally. Otherwise
+ * cpu_up() can hit the uninitialized cwq once we drop the
+ * lock.
+ */
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
+ cwq->gcwq = gcwq;
+ cwq->wq = wq;
+ cwq->flush_color = -1;
+ cwq->max_active = max_active;
+ INIT_LIST_HEAD(&cwq->worklist);
+ INIT_LIST_HEAD(&cwq->delayed_works);
+
+ if (failed)
+ continue;
+ cwq->worker = create_worker(cwq, cpu_online(cpu));
+ if (cwq->worker)
+ start_worker(cwq->worker);
+ else
+ failed = true;
}
- if (err) {
+ /*
+ * workqueue_lock protects global freeze state and workqueues
+ * list. Grab it, set max_active accordingly and add the new
+ * workqueue to workqueues list.
+ */
+ spin_lock(&workqueue_lock);
+
+ if (workqueue_freezing && wq->flags & WQ_FREEZEABLE)
+ for_each_possible_cpu(cpu)
+ get_cwq(cpu, wq)->max_active = 0;
+
+ list_add(&wq->list, &workqueues);
+
+ spin_unlock(&workqueue_lock);
+
+ cpu_maps_update_done();
+
+ if (failed) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
+err:
+ if (wq) {
+ free_cwqs(wq->cpu_wq);
+ kfree(wq);
+ }
+ return NULL;
}
EXPORT_SYMBOL_GPL(__create_workqueue_key);
-static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
-{
- /*
- * Our caller is either destroy_workqueue() or CPU_POST_DEAD,
- * cpu_add_remove_lock protects cwq->thread.
- */
- if (cwq->thread == NULL)
- return;
-
- lock_map_acquire(&cwq->wq->lockdep_map);
- lock_map_release(&cwq->wq->lockdep_map);
-
- flush_cpu_workqueue(cwq);
- /*
- * If the caller is CPU_POST_DEAD and cwq->worklist was not empty,
- * a concurrent flush_workqueue() can insert a barrier after us.
- * However, in that case run_workqueue() won't return and check
- * kthread_should_stop() until it flushes all work_struct's.
- * When ->worklist becomes empty it is safe to exit because no
- * more work_structs can be queued on this cwq: flush_workqueue
- * checks list_empty(), and a "normal" queue_work() can't use
- * a dead CPU.
- */
- trace_workqueue_destruction(cwq->thread);
- kthread_stop(cwq->thread);
- cwq->thread = NULL;
-}
-
/**
* destroy_workqueue - safely terminate a workqueue
* @wq: target workqueue
*/
void destroy_workqueue(struct workqueue_struct *wq)
{
- const struct cpumask *cpu_map = wq_cpu_map(wq);
- int cpu;
+ unsigned int cpu;
+
+ flush_workqueue(wq);
+ /*
+ * wq list is used to freeze wq, remove from list after
+ * flushing is complete in case freeze races us.
+ */
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
+ cpu_maps_update_done();
+
+ for_each_possible_cpu(cpu) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+ int i;
+
+ if (cwq->worker) {
+ spin_lock_irq(&cwq->gcwq->lock);
+ destroy_worker(cwq->worker);
+ cwq->worker = NULL;
+ spin_unlock_irq(&cwq->gcwq->lock);
+ }
- for_each_cpu(cpu, cpu_map)
- cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
- cpu_maps_update_done();
+ for (i = 0; i < WORK_NR_COLORS; i++)
+ BUG_ON(cwq->nr_in_flight[i]);
+ BUG_ON(cwq->nr_active);
+ BUG_ON(!list_empty(&cwq->delayed_works));
+ }
- free_percpu(wq->cpu_wq);
+ free_cwqs(wq->cpu_wq);
kfree(wq);
}
EXPORT_SYMBOL_GPL(destroy_workqueue);
+/*
+ * CPU hotplug.
+ *
+ * CPU hotplug is implemented by allowing cwqs to be detached from
+ * CPU, running with unbound workers and allowing them to be
+ * reattached later if the cpu comes back online. A separate thread
+ * is created to govern cwqs in such state and is called the trustee.
+ *
+ * Trustee states and their descriptions.
+ *
+ * START Command state used on startup. On CPU_DOWN_PREPARE, a
+ * new trustee is started with this state.
+ *
+ * IN_CHARGE Once started, trustee will enter this state after
+ * making all existing workers rogue. DOWN_PREPARE waits
+ * for trustee to enter this state. After reaching
+ * IN_CHARGE, trustee tries to execute the pending
+ * worklist until it's empty and the state is set to
+ * BUTCHER, or the state is set to RELEASE.
+ *
+ * BUTCHER Command state which is set by the cpu callback after
+ * the cpu has went down. Once this state is set trustee
+ * knows that there will be no new works on the worklist
+ * and once the worklist is empty it can proceed to
+ * killing idle workers.
+ *
+ * RELEASE Command state which is set by the cpu callback if the
+ * cpu down has been canceled or it has come online
+ * again. After recognizing this state, trustee stops
+ * trying to drain or butcher and transits to DONE.
+ *
+ * DONE Trustee will enter this state after BUTCHER or RELEASE
+ * is complete.
+ *
+ * trustee CPU draining
+ * took over down complete
+ * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
+ * | | ^
+ * | CPU is back online v return workers |
+ * ----------------> RELEASE --------------
+ */
+
+/**
+ * trustee_wait_event_timeout - timed event wait for trustee
+ * @cond: condition to wait for
+ * @timeout: timeout in jiffies
+ *
+ * wait_event_timeout() for trustee to use. Handles locking and
+ * checks for RELEASE request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by trustee.
+ *
+ * RETURNS:
+ * Positive indicating left time if @cond is satisfied, 0 if timed
+ * out, -1 if canceled.
+ */
+#define trustee_wait_event_timeout(cond, timeout) ({ \
+ long __ret = (timeout); \
+ while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
+ __ret) { \
+ spin_unlock_irq(&gcwq->lock); \
+ __wait_event_timeout(gcwq->trustee_wait, (cond) || \
+ (gcwq->trustee_state == TRUSTEE_RELEASE), \
+ __ret); \
+ spin_lock_irq(&gcwq->lock); \
+ } \
+ gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret); \
+})
+
+/**
+ * trustee_wait_event - event wait for trustee
+ * @cond: condition to wait for
+ *
+ * wait_event() for trustee to use. Automatically handles locking and
+ * checks for CANCEL request.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by trustee.
+ *
+ * RETURNS:
+ * 0 if @cond is satisfied, -1 if canceled.
+ */
+#define trustee_wait_event(cond) ({ \
+ long __ret1; \
+ __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
+ __ret1 < 0 ? -1 : 0; \
+})
+
+static int __cpuinit trustee_thread(void *__gcwq)
+{
+ struct global_cwq *gcwq = __gcwq;
+ struct worker *worker;
+ struct hlist_node *pos;
+ int i;
+
+ BUG_ON(gcwq->cpu != smp_processor_id());
+
+ spin_lock_irq(&gcwq->lock);
+ /*
+ * Make all workers rogue. Trustee must be bound to the
+ * target cpu and can't be cancelled.
+ */
+ BUG_ON(gcwq->cpu != smp_processor_id());
+
+ list_for_each_entry(worker, &gcwq->idle_list, entry)
+ worker->flags |= WORKER_ROGUE;
+
+ for_each_busy_worker(worker, i, pos, gcwq)
+ worker->flags |= WORKER_ROGUE;
+
+ /*
+ * We're now in charge. Notify and proceed to drain. We need
+ * to keep the gcwq running during the whole CPU down
+ * procedure as other cpu hotunplug callbacks may need to
+ * flush currently running tasks.
+ */
+ gcwq->trustee_state = TRUSTEE_IN_CHARGE;
+ wake_up_all(&gcwq->trustee_wait);
+
+ /*
+ * The original cpu is in the process of dying and may go away
+ * anytime now. When that happens, we and all workers would
+ * be migrated to other cpus. Try draining any left work.
+ * Note that if the gcwq is frozen, there may be frozen works
+ * in freezeable cwqs. Don't declare completion while frozen.
+ */
+ while (gcwq->nr_workers != gcwq->nr_idle ||
+ gcwq->flags & GCWQ_FREEZING ||
+ gcwq->trustee_state == TRUSTEE_IN_CHARGE) {
+ /* give a breather */
+ if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
+ break;
+ }
+
+ /* notify completion */
+ gcwq->trustee = NULL;
+ gcwq->trustee_state = TRUSTEE_DONE;
+ wake_up_all(&gcwq->trustee_wait);
+ spin_unlock_irq(&gcwq->lock);
+ return 0;
+}
+
+/**
+ * wait_trustee_state - wait for trustee to enter the specified state
+ * @gcwq: gcwq the trustee of interest belongs to
+ * @state: target state to wait for
+ *
+ * Wait for the trustee to reach @state. DONE is already matched.
+ *
+ * CONTEXT:
+ * spin_lock_irq(gcwq->lock) which may be released and regrabbed
+ * multiple times. To be used by cpu_callback.
+ */
+static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
+{
+ if (!(gcwq->trustee_state == state ||
+ gcwq->trustee_state == TRUSTEE_DONE)) {
+ spin_unlock_irq(&gcwq->lock);
+ __wait_event(gcwq->trustee_wait,
+ gcwq->trustee_state == state ||
+ gcwq->trustee_state == TRUSTEE_DONE);
+ spin_lock_irq(&gcwq->lock);
+ }
+}
+
static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
unsigned long action,
void *hcpu)
{
unsigned int cpu = (unsigned long)hcpu;
- struct cpu_workqueue_struct *cwq;
- struct workqueue_struct *wq;
- int err = 0;
+ struct global_cwq *gcwq = get_gcwq(cpu);
+ struct task_struct *new_trustee = NULL;
+ struct worker *worker;
+ struct hlist_node *pos;
+ unsigned long flags;
+ int i;
action &= ~CPU_TASKS_FROZEN;
switch (action) {
- case CPU_UP_PREPARE:
- cpumask_set_cpu(cpu, cpu_populated_map);
+ case CPU_DOWN_PREPARE:
+ new_trustee = kthread_create(trustee_thread, gcwq,
+ "workqueue_trustee/%d\n", cpu);
+ if (IS_ERR(new_trustee))
+ return notifier_from_errno(PTR_ERR(new_trustee));
+ kthread_bind(new_trustee, cpu);
}
-undo:
- list_for_each_entry(wq, &workqueues, list) {
- cwq = per_cpu_ptr(wq->cpu_wq, cpu);
-
- switch (action) {
- case CPU_UP_PREPARE:
- err = create_workqueue_thread(cwq, cpu);
- if (!err)
- break;
- printk(KERN_ERR "workqueue [%s] for %i failed\n",
- wq->name, cpu);
- action = CPU_UP_CANCELED;
- err = -ENOMEM;
- goto undo;
-
- case CPU_ONLINE:
- start_workqueue_thread(cwq, cpu);
- break;
- case CPU_UP_CANCELED:
- start_workqueue_thread(cwq, -1);
- case CPU_POST_DEAD:
- cleanup_workqueue_thread(cwq);
- break;
- }
- }
+ /* some are called w/ irq disabled, don't disturb irq status */
+ spin_lock_irqsave(&gcwq->lock, flags);
switch (action) {
- case CPU_UP_CANCELED:
+ case CPU_DOWN_PREPARE:
+ /* initialize trustee and tell it to acquire the gcwq */
+ BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
+ gcwq->trustee = new_trustee;
+ gcwq->trustee_state = TRUSTEE_START;
+ wake_up_process(gcwq->trustee);
+ wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
+ break;
+
case CPU_POST_DEAD:
- cpumask_clear_cpu(cpu, cpu_populated_map);
+ gcwq->trustee_state = TRUSTEE_BUTCHER;
+ break;
+
+ case CPU_DOWN_FAILED:
+ case CPU_ONLINE:
+ if (gcwq->trustee_state != TRUSTEE_DONE) {
+ gcwq->trustee_state = TRUSTEE_RELEASE;
+ wake_up_process(gcwq->trustee);
+ wait_trustee_state(gcwq, TRUSTEE_DONE);
+ }
+
+ /* clear ROGUE from all workers */
+ list_for_each_entry(worker, &gcwq->idle_list, entry)
+ worker->flags &= ~WORKER_ROGUE;
+
+ for_each_busy_worker(worker, i, pos, gcwq)
+ worker->flags &= ~WORKER_ROGUE;
+ break;
}
- return notifier_from_errno(err);
+ spin_unlock_irqrestore(&gcwq->lock, flags);
+
+ return notifier_from_errno(0);
}
#ifdef CONFIG_SMP
EXPORT_SYMBOL_GPL(work_on_cpu);
#endif /* CONFIG_SMP */
+#ifdef CONFIG_FREEZER
+
+/**
+ * freeze_workqueues_begin - begin freezing workqueues
+ *
+ * Start freezing workqueues. After this function returns, all
+ * freezeable workqueues will queue new works to their frozen_works
+ * list instead of the cwq ones.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and gcwq->lock's.
+ */
+void freeze_workqueues_begin(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(workqueue_freezing);
+ workqueue_freezing = true;
+
+ for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ spin_lock_irq(&gcwq->lock);
+
+ BUG_ON(gcwq->flags & GCWQ_FREEZING);
+ gcwq->flags |= GCWQ_FREEZING;
+
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (wq->flags & WQ_FREEZEABLE)
+ cwq->max_active = 0;
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ spin_unlock(&workqueue_lock);
+}
+
+/**
+ * freeze_workqueues_busy - are freezeable workqueues still busy?
+ *
+ * Check whether freezing is complete. This function must be called
+ * between freeze_workqueues_begin() and thaw_workqueues().
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock.
+ *
+ * RETURNS:
+ * %true if some freezeable workqueues are still busy. %false if
+ * freezing is complete.
+ */
+bool freeze_workqueues_busy(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+ bool busy = false;
+
+ spin_lock(&workqueue_lock);
+
+ BUG_ON(!workqueue_freezing);
+
+ for_each_possible_cpu(cpu) {
+ /*
+ * nr_active is monotonically decreasing. It's safe
+ * to peek without lock.
+ */
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ BUG_ON(cwq->nr_active < 0);
+ if (cwq->nr_active) {
+ busy = true;
+ goto out_unlock;
+ }
+ }
+ }
+out_unlock:
+ spin_unlock(&workqueue_lock);
+ return busy;
+}
+
+/**
+ * thaw_workqueues - thaw workqueues
+ *
+ * Thaw workqueues. Normal queueing is restored and all collected
+ * frozen works are transferred to their respective cwq worklists.
+ *
+ * CONTEXT:
+ * Grabs and releases workqueue_lock and gcwq->lock's.
+ */
+void thaw_workqueues(void)
+{
+ struct workqueue_struct *wq;
+ unsigned int cpu;
+
+ spin_lock(&workqueue_lock);
+
+ if (!workqueue_freezing)
+ goto out_unlock;
+
+ for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ spin_lock_irq(&gcwq->lock);
+
+ BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
+ gcwq->flags &= ~GCWQ_FREEZING;
+
+ list_for_each_entry(wq, &workqueues, list) {
+ struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+ if (!(wq->flags & WQ_FREEZEABLE))
+ continue;
+
+ /* restore max_active and repopulate worklist */
+ cwq->max_active = wq->saved_max_active;
+
+ while (!list_empty(&cwq->delayed_works) &&
+ cwq->nr_active < cwq->max_active)
+ cwq_activate_first_delayed(cwq);
+
+ /* perform delayed unbind from single cpu if empty */
+ if (wq->single_cpu == gcwq->cpu &&
+ !cwq->nr_active && list_empty(&cwq->delayed_works))
+ cwq_unbind_single_cpu(cwq);
+
+ wake_up_process(cwq->worker->task);
+ }
+
+ spin_unlock_irq(&gcwq->lock);
+ }
+
+ workqueue_freezing = false;
+out_unlock:
+ spin_unlock(&workqueue_lock);
+}
+#endif /* CONFIG_FREEZER */
+
void __init init_workqueues(void)
{
- alloc_cpumask_var(&cpu_populated_map, GFP_KERNEL);
+ unsigned int cpu;
+ int i;
+
+ /*
+ * The pointer part of work->data is either pointing to the
+ * cwq or contains the cpu number the work ran last on. Make
+ * sure cpu number won't overflow into kernel pointer area so
+ * that they can be distinguished.
+ */
+ BUILD_BUG_ON(NR_CPUS << WORK_STRUCT_FLAG_BITS >= PAGE_OFFSET);
+
+ hotcpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
+
+ /* initialize gcwqs */
+ for_each_possible_cpu(cpu) {
+ struct global_cwq *gcwq = get_gcwq(cpu);
+
+ spin_lock_init(&gcwq->lock);
+ gcwq->cpu = cpu;
+
+ INIT_LIST_HEAD(&gcwq->idle_list);
+ for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
+ INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
+
+ ida_init(&gcwq->worker_ida);
+
+ gcwq->trustee_state = TRUSTEE_DONE;
+ init_waitqueue_head(&gcwq->trustee_wait);
+ }
- cpumask_copy(cpu_populated_map, cpu_online_mask);
- singlethread_cpu = cpumask_first(cpu_possible_mask);
- cpu_singlethread_map = cpumask_of(singlethread_cpu);
- hotcpu_notifier(workqueue_cpu_callback, 0);
keventd_wq = create_workqueue("events");
BUG_ON(!keventd_wq);
}