* %WORKER_UNBOUND set and concurrency management disabled, and may
* be executing on any CPU. The pool behaves as an unbound one.
*
- * Note that DISASSOCIATED can be flipped only while holding
- * assoc_mutex to avoid changing binding state while
+ * Note that DISASSOCIATED should be flipped only while holding
+ * manager_mutex to avoid changing binding state while
* create_worker() is in progress.
*/
POOL_MANAGE_WORKERS = 1 << 0, /* need to manage workers */
WORKER_PREP = 1 << 3, /* preparing to run works */
WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
WORKER_UNBOUND = 1 << 7, /* worker is unbound */
+ WORKER_REBOUND = 1 << 8, /* worker was rebound */
- WORKER_NOT_RUNNING = WORKER_PREP | WORKER_UNBOUND |
- WORKER_CPU_INTENSIVE,
+ WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
+ WORKER_UNBOUND | WORKER_REBOUND,
NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
*
* F: wq->flush_mutex protected.
*
- * W: workqueue_lock protected.
+ * MG: pool->manager_mutex and pool->lock protected. Writes require both
+ * locks. Reads can happen under either lock.
*
- * R: workqueue_lock protected for writes. Sched-RCU protected for reads.
+ * PL: wq_pool_mutex protected.
*
- * FR: wq->flush_mutex and workqueue_lock protected for writes. Sched-RCU
+ * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads.
+ *
+ * PW: pwq_lock protected.
+ *
+ * FR: wq->flush_mutex and pwq_lock protected for writes. Sched-RCU
* protected for reads.
+ *
+ * MD: wq_mayday_lock protected.
*/
/* struct worker is defined in workqueue_internal.h */
struct timer_list idle_timer; /* L: worker idle timeout */
struct timer_list mayday_timer; /* L: SOS timer for workers */
- /* workers are chained either in busy_hash or idle_list */
+ /* a workers is either on busy_hash or idle_list, or the manager */
DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
/* L: hash of busy workers */
+ /* see manage_workers() for details on the two manager mutexes */
struct mutex manager_arb; /* manager arbitration */
- struct mutex assoc_mutex; /* protect POOL_DISASSOCIATED */
- struct ida worker_ida; /* L: for worker IDs */
+ struct mutex manager_mutex; /* manager exclusion */
+ struct idr worker_idr; /* MG: worker IDs and iteration */
struct workqueue_attrs *attrs; /* I: worker attributes */
- struct hlist_node hash_node; /* R: unbound_pool_hash node */
- int refcnt; /* refcnt for unbound pools */
+ struct hlist_node hash_node; /* PL: unbound_pool_hash node */
+ int refcnt; /* PL: refcnt for unbound pools */
/*
* The current concurrency level. As it's likely to be accessed
int max_active; /* L: max active works */
struct list_head delayed_works; /* L: delayed works */
struct list_head pwqs_node; /* FR: node on wq->pwqs */
- struct list_head mayday_node; /* W: node on wq->maydays */
+ struct list_head mayday_node; /* MD: node on wq->maydays */
/*
* Release of unbound pwq is punted to system_wq. See put_pwq()
* and pwq_unbound_release_workfn() for details. pool_workqueue
* itself is also sched-RCU protected so that the first pwq can be
- * determined without grabbing workqueue_lock.
+ * determined without grabbing pwq_lock.
*/
struct work_struct unbound_release_work;
struct rcu_head rcu;
struct wq_device;
/*
- * The externally visible workqueue abstraction is an array of
- * per-CPU workqueues:
+ * The externally visible workqueue. It relays the issued work items to
+ * the appropriate worker_pool through its pool_workqueues.
*/
struct workqueue_struct {
- unsigned int flags; /* W: WQ_* flags */
+ unsigned int flags; /* PL: WQ_* flags */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwq's */
struct list_head pwqs; /* FR: all pwqs of this wq */
- struct list_head list; /* W: list of all workqueues */
+ struct list_head list; /* PL: list of all workqueues */
struct mutex flush_mutex; /* protects wq flushing */
int work_color; /* F: current work color */
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
- struct list_head maydays; /* W: pwqs requesting rescue */
+ struct list_head maydays; /* MD: pwqs requesting rescue */
struct worker *rescuer; /* I: rescue worker */
- int nr_drainers; /* W: drain in progress */
- int saved_max_active; /* W: saved pwq max_active */
+ int nr_drainers; /* PL: drain in progress */
+ int saved_max_active; /* PW: saved pwq max_active */
#ifdef CONFIG_SYSFS
struct wq_device *wq_dev; /* I: for sysfs interface */
static struct kmem_cache *pwq_cache;
-/* hash of all unbound pools keyed by pool->attrs */
+static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
+static DEFINE_SPINLOCK(pwq_lock); /* protects pool_workqueues */
+static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
+
+static LIST_HEAD(workqueues); /* PL: list of all workqueues */
+static bool workqueue_freezing; /* PL: have wqs started freezing? */
+
+/* the per-cpu worker pools */
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
+ cpu_worker_pools);
+
+static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
+
+/* PL: hash of all unbound pools keyed by pool->attrs */
static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
+/* I: attributes used when instantiating standard unbound pools on demand */
static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
struct workqueue_struct *system_wq __read_mostly;
struct workqueue_struct *system_freezable_wq __read_mostly;
EXPORT_SYMBOL_GPL(system_freezable_wq);
+static int worker_thread(void *__worker);
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from);
+
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
-#define assert_rcu_or_wq_lock() \
+#define assert_rcu_or_pool_mutex() \
+ rcu_lockdep_assert(rcu_read_lock_sched_held() || \
+ lockdep_is_held(&wq_pool_mutex), \
+ "sched RCU or wq_pool_mutex should be held")
+
+#define assert_rcu_or_pwq_lock() \
rcu_lockdep_assert(rcu_read_lock_sched_held() || \
- lockdep_is_held(&workqueue_lock), \
- "sched RCU or workqueue lock should be held")
+ lockdep_is_held(&pwq_lock), \
+ "sched RCU or pwq_lock should be held")
+
+#ifdef CONFIG_LOCKDEP
+#define assert_manager_or_pool_lock(pool) \
+ WARN_ONCE(debug_locks && \
+ !lockdep_is_held(&(pool)->manager_mutex) && \
+ !lockdep_is_held(&(pool)->lock), \
+ "pool->manager_mutex or ->lock should be held")
+#else
+#define assert_manager_or_pool_lock(pool) do { } while (0)
+#endif
#define for_each_cpu_worker_pool(pool, cpu) \
for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
(pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
(pool)++)
-#define for_each_busy_worker(worker, i, pool) \
- hash_for_each(pool->busy_hash, i, worker, hentry)
-
/**
* for_each_pool - iterate through all worker_pools in the system
* @pool: iteration cursor
- * @id: integer used for iteration
+ * @pi: integer used for iteration
*
- * This must be called either with workqueue_lock held or sched RCU read
+ * This must be called either with wq_pool_mutex held or sched RCU read
* locked. If the pool needs to be used beyond the locking in effect, the
* caller is responsible for guaranteeing that the pool stays online.
*
* The if/else clause exists only for the lockdep assertion and can be
* ignored.
*/
-#define for_each_pool(pool, id) \
- idr_for_each_entry(&worker_pool_idr, pool, id) \
- if (({ assert_rcu_or_wq_lock(); false; })) { } \
+#define for_each_pool(pool, pi) \
+ idr_for_each_entry(&worker_pool_idr, pool, pi) \
+ if (({ assert_rcu_or_pool_mutex(); false; })) { } \
+ else
+
+/**
+ * for_each_pool_worker - iterate through all workers of a worker_pool
+ * @worker: iteration cursor
+ * @wi: integer used for iteration
+ * @pool: worker_pool to iterate workers of
+ *
+ * This must be called with either @pool->manager_mutex or ->lock held.
+ *
+ * The if/else clause exists only for the lockdep assertion and can be
+ * ignored.
+ */
+#define for_each_pool_worker(worker, wi, pool) \
+ idr_for_each_entry(&(pool)->worker_idr, (worker), (wi)) \
+ if (({ assert_manager_or_pool_lock((pool)); false; })) { } \
else
/**
* @pwq: iteration cursor
* @wq: the target workqueue
*
- * This must be called either with workqueue_lock held or sched RCU read
- * locked. If the pwq needs to be used beyond the locking in effect, the
- * caller is responsible for guaranteeing that the pwq stays online.
+ * This must be called either with pwq_lock held or sched RCU read locked.
+ * If the pwq needs to be used beyond the locking in effect, the caller is
+ * responsible for guaranteeing that the pwq stays online.
*
* The if/else clause exists only for the lockdep assertion and can be
* ignored.
*/
#define for_each_pwq(pwq, wq) \
list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node) \
- if (({ assert_rcu_or_wq_lock(); false; })) { } \
+ if (({ assert_rcu_or_pwq_lock(); false; })) { } \
else
#ifdef CONFIG_DEBUG_OBJECTS_WORK
static inline void debug_work_deactivate(struct work_struct *work) { }
#endif
-/* Serializes the accesses to the list of workqueues. */
-static DEFINE_SPINLOCK(workqueue_lock);
-static LIST_HEAD(workqueues);
-static bool workqueue_freezing; /* W: have wqs started freezing? */
-
-/*
- * The CPU and unbound standard worker pools. The unbound ones have
- * POOL_DISASSOCIATED set, and their workers have WORKER_UNBOUND set.
- */
-static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS],
- cpu_worker_pools);
-
-/*
- * idr of all pools. Modifications are protected by workqueue_lock. Read
- * accesses are protected by sched-RCU protected.
- */
-static DEFINE_IDR(worker_pool_idr);
-
-static int worker_thread(void *__worker);
-static void copy_workqueue_attrs(struct workqueue_attrs *to,
- const struct workqueue_attrs *from);
-
/* allocate ID and assign it to @pool */
static int worker_pool_assign_id(struct worker_pool *pool)
{
int ret;
+ lockdep_assert_held(&wq_pool_mutex);
+
do {
if (!idr_pre_get(&worker_pool_idr, GFP_KERNEL))
return -ENOMEM;
-
- spin_lock_irq(&workqueue_lock);
ret = idr_get_new(&worker_pool_idr, pool, &pool->id);
- spin_unlock_irq(&workqueue_lock);
} while (ret == -EAGAIN);
return ret;
* first_pwq - return the first pool_workqueue of the specified workqueue
* @wq: the target workqueue
*
- * This must be called either with workqueue_lock held or sched RCU read
- * locked. If the pwq needs to be used beyond the locking in effect, the
- * caller is responsible for guaranteeing that the pwq stays online.
+ * This must be called either with pwq_lock held or sched RCU read locked.
+ * If the pwq needs to be used beyond the locking in effect, the caller is
+ * responsible for guaranteeing that the pwq stays online.
*/
static struct pool_workqueue *first_pwq(struct workqueue_struct *wq)
{
- assert_rcu_or_wq_lock();
+ assert_rcu_or_pwq_lock();
return list_first_or_null_rcu(&wq->pwqs, struct pool_workqueue,
pwqs_node);
}
*
* Return the worker_pool @work was last associated with. %NULL if none.
*
- * Pools are created and destroyed under workqueue_lock, and allows read
+ * Pools are created and destroyed under wq_pool_mutex, and allows read
* access under sched-RCU read lock. As such, this function should be
- * called under workqueue_lock or with preemption disabled.
+ * called under wq_pool_mutex or with preemption disabled.
*
* All fields of the returned pool are accessible as long as the above
* mentioned locking is in effect. If the returned pool needs to be used
unsigned long data = atomic_long_read(&work->data);
int pool_id;
- assert_rcu_or_wq_lock();
+ assert_rcu_or_pool_mutex();
if (data & WORK_STRUCT_PWQ)
return ((struct pool_workqueue *)
* recycled work item as currently executing and make it wait until the
* current execution finishes, introducing an unwanted dependency.
*
- * This function checks the work item address, work function and workqueue
- * to avoid false positives. Note that this isn't complete as one may
- * construct a work function which can introduce dependency onto itself
- * through a recycled work item. Well, if somebody wants to shoot oneself
- * in the foot that badly, there's only so much we can do, and if such
- * deadlock actually occurs, it should be easy to locate the culprit work
- * function.
+ * This function checks the work item address and work function to avoid
+ * false positives. Note that this isn't complete as one may construct a
+ * work function which can introduce dependency onto itself through a
+ * recycled work item. Well, if somebody wants to shoot oneself in the
+ * foot that badly, there's only so much we can do, and if such deadlock
+ * actually occurs, it should be easy to locate the culprit work function.
*
* CONTEXT:
* spin_lock_irq(pool->lock).
get_pwq(pwq);
/*
- * Ensure either worker_sched_deactivated() sees the above
- * list_add_tail() or we see zero nr_running to avoid workers
- * lying around lazily while there are works to be processed.
+ * Ensure either wq_worker_sleeping() sees the above
+ * list_add_tail() or we see zero nr_running to avoid workers lying
+ * around lazily while there are works to be processed.
*/
smp_mb();
}
EXPORT_SYMBOL_GPL(queue_work_on);
-/**
- * queue_work - queue work on a workqueue
- * @wq: workqueue to use
- * @work: work to queue
- *
- * Returns %false if @work was already on a queue, %true otherwise.
- *
- * We queue the work to the CPU on which it was submitted, but if the CPU dies
- * it can be processed by another CPU.
- */
-bool queue_work(struct workqueue_struct *wq, struct work_struct *work)
-{
- return queue_work_on(WORK_CPU_UNBOUND, wq, work);
-}
-EXPORT_SYMBOL_GPL(queue_work);
-
void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);
-/**
- * queue_delayed_work - queue work on a workqueue after delay
- * @wq: workqueue to use
- * @dwork: delayable work to queue
- * @delay: number of jiffies to wait before queueing
- *
- * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
- */
-bool queue_delayed_work(struct workqueue_struct *wq,
- struct delayed_work *dwork, unsigned long delay)
-{
- return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
-}
-EXPORT_SYMBOL_GPL(queue_delayed_work);
-
/**
* mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
* @cpu: CPU number to execute work on
}
EXPORT_SYMBOL_GPL(mod_delayed_work_on);
-/**
- * mod_delayed_work - modify delay of or queue a delayed work
- * @wq: workqueue to use
- * @dwork: work to queue
- * @delay: number of jiffies to wait before queueing
- *
- * mod_delayed_work_on() on local CPU.
- */
-bool mod_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork,
- unsigned long delay)
-{
- return mod_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
-}
-EXPORT_SYMBOL_GPL(mod_delayed_work);
-
/**
* worker_enter_idle - enter idle state
* @worker: worker which is entering idle state
}
}
-/*
- * Rebind an idle @worker to its CPU. worker_thread() will test
- * list_empty(@worker->entry) before leaving idle and call this function.
- */
-static void idle_worker_rebind(struct worker *worker)
-{
- /* CPU may go down again inbetween, clear UNBOUND only on success */
- if (worker_maybe_bind_and_lock(worker->pool))
- worker_clr_flags(worker, WORKER_UNBOUND);
-
- /* rebind complete, become available again */
- list_add(&worker->entry, &worker->pool->idle_list);
- spin_unlock_irq(&worker->pool->lock);
-}
-
-/*
- * Function for @worker->rebind.work used to rebind unbound busy workers to
- * the associated cpu which is coming back online. This is scheduled by
- * cpu up but can race with other cpu hotplug operations and may be
- * executed twice without intervening cpu down.
- */
-static void busy_worker_rebind_fn(struct work_struct *work)
-{
- struct worker *worker = container_of(work, struct worker, rebind_work);
-
- if (worker_maybe_bind_and_lock(worker->pool))
- worker_clr_flags(worker, WORKER_UNBOUND);
-
- spin_unlock_irq(&worker->pool->lock);
-}
-
-/**
- * rebind_workers - rebind all workers of a pool to the associated CPU
- * @pool: pool of interest
- *
- * @pool->cpu is coming online. Rebind all workers to the CPU. Rebinding
- * is different for idle and busy ones.
- *
- * Idle ones will be removed from the idle_list and woken up. They will
- * add themselves back after completing rebind. This ensures that the
- * idle_list doesn't contain any unbound workers when re-bound busy workers
- * try to perform local wake-ups for concurrency management.
- *
- * Busy workers can rebind after they finish their current work items.
- * Queueing the rebind work item at the head of the scheduled list is
- * enough. Note that nr_running will be properly bumped as busy workers
- * rebind.
- *
- * On return, all non-manager workers are scheduled for rebind - see
- * manage_workers() for the manager special case. Any idle worker
- * including the manager will not appear on @idle_list until rebind is
- * complete, making local wake-ups safe.
- */
-static void rebind_workers(struct worker_pool *pool)
-{
- struct worker *worker, *n;
- int i;
-
- lockdep_assert_held(&pool->assoc_mutex);
- lockdep_assert_held(&pool->lock);
-
- /* dequeue and kick idle ones */
- list_for_each_entry_safe(worker, n, &pool->idle_list, entry) {
- /*
- * idle workers should be off @pool->idle_list until rebind
- * is complete to avoid receiving premature local wake-ups.
- */
- list_del_init(&worker->entry);
-
- /*
- * worker_thread() will see the above dequeuing and call
- * idle_worker_rebind().
- */
- wake_up_process(worker->task);
- }
-
- /* rebind busy workers */
- for_each_busy_worker(worker, i, pool) {
- struct work_struct *rebind_work = &worker->rebind_work;
- struct workqueue_struct *wq;
-
- if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
- work_data_bits(rebind_work)))
- continue;
-
- debug_work_activate(rebind_work);
-
- /*
- * wq doesn't really matter but let's keep @worker->pool
- * and @pwq->pool consistent for sanity.
- */
- if (worker->pool->attrs->nice < 0)
- wq = system_highpri_wq;
- else
- wq = system_wq;
-
- insert_work(per_cpu_ptr(wq->cpu_pwqs, pool->cpu), rebind_work,
- worker->scheduled.next,
- work_color_to_flags(WORK_NO_COLOR));
- }
-}
-
static struct worker *alloc_worker(void)
{
struct worker *worker;
if (worker) {
INIT_LIST_HEAD(&worker->entry);
INIT_LIST_HEAD(&worker->scheduled);
- INIT_WORK(&worker->rebind_work, busy_worker_rebind_fn);
/* on creation a worker is in !idle && prep state */
worker->flags = WORKER_PREP;
}
struct worker *worker = NULL;
int id = -1;
+ lockdep_assert_held(&pool->manager_mutex);
+
+ /*
+ * ID is needed to determine kthread name. Allocate ID first
+ * without installing the pointer.
+ */
+ idr_preload(GFP_KERNEL);
spin_lock_irq(&pool->lock);
- while (ida_get_new(&pool->worker_ida, &id)) {
- spin_unlock_irq(&pool->lock);
- if (!ida_pre_get(&pool->worker_ida, GFP_KERNEL))
- goto fail;
- spin_lock_irq(&pool->lock);
- }
+
+ id = idr_alloc(&pool->worker_idr, NULL, 0, 0, GFP_NOWAIT);
+
spin_unlock_irq(&pool->lock);
+ idr_preload_end();
+ if (id < 0)
+ goto fail;
worker = alloc_worker();
if (!worker)
if (IS_ERR(worker->task))
goto fail;
+ /*
+ * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
+ * online CPUs. It'll be re-applied when any of the CPUs come up.
+ */
set_user_nice(worker->task, pool->attrs->nice);
set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
- /*
- * %PF_THREAD_BOUND is used to prevent userland from meddling with
- * cpumask of workqueue workers. This is an abuse. We need
- * %PF_NO_SETAFFINITY.
- */
- worker->task->flags |= PF_THREAD_BOUND;
+ /* prevent userland from meddling with cpumask of workqueue workers */
+ worker->task->flags |= PF_NO_SETAFFINITY;
/*
* The caller is responsible for ensuring %POOL_DISASSOCIATED
if (pool->flags & POOL_DISASSOCIATED)
worker->flags |= WORKER_UNBOUND;
+ /* successful, commit the pointer to idr */
+ spin_lock_irq(&pool->lock);
+ idr_replace(&pool->worker_idr, worker, worker->id);
+ spin_unlock_irq(&pool->lock);
+
return worker;
+
fail:
if (id >= 0) {
spin_lock_irq(&pool->lock);
- ida_remove(&pool->worker_ida, id);
+ idr_remove(&pool->worker_idr, id);
spin_unlock_irq(&pool->lock);
}
kfree(worker);
wake_up_process(worker->task);
}
+/**
+ * create_and_start_worker - create and start a worker for a pool
+ * @pool: the target pool
+ *
+ * Grab the managership of @pool and create and start a new worker for it.
+ */
+static int create_and_start_worker(struct worker_pool *pool)
+{
+ struct worker *worker;
+
+ mutex_lock(&pool->manager_mutex);
+
+ worker = create_worker(pool);
+ if (worker) {
+ spin_lock_irq(&pool->lock);
+ start_worker(worker);
+ spin_unlock_irq(&pool->lock);
+ }
+
+ mutex_unlock(&pool->manager_mutex);
+
+ return worker ? 0 : -ENOMEM;
+}
+
/**
* destroy_worker - destroy a workqueue worker
* @worker: worker to be destroyed
static void destroy_worker(struct worker *worker)
{
struct worker_pool *pool = worker->pool;
- int id = worker->id;
+
+ lockdep_assert_held(&pool->manager_mutex);
+ lockdep_assert_held(&pool->lock);
/* sanity check frenzy */
if (WARN_ON(worker->current_work) ||
list_del_init(&worker->entry);
worker->flags |= WORKER_DIE;
+ idr_remove(&pool->worker_idr, worker->id);
+
spin_unlock_irq(&pool->lock);
kthread_stop(worker->task);
kfree(worker);
spin_lock_irq(&pool->lock);
- ida_remove(&pool->worker_ida, id);
}
static void idle_worker_timeout(unsigned long __pool)
struct pool_workqueue *pwq = get_work_pwq(work);
struct workqueue_struct *wq = pwq->wq;
- lockdep_assert_held(&workqueue_lock);
+ lockdep_assert_held(&wq_mayday_lock);
if (!wq->rescuer)
return;
struct worker_pool *pool = (void *)__pool;
struct work_struct *work;
- spin_lock_irq(&workqueue_lock); /* for wq->maydays */
+ spin_lock_irq(&wq_mayday_lock); /* for wq->maydays */
spin_lock(&pool->lock);
if (need_to_create_worker(pool)) {
}
spin_unlock(&pool->lock);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&wq_mayday_lock);
mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
}
* sent to all rescuers with works scheduled on @pool to resolve
* possible allocation deadlock.
*
- * On return, need_to_create_worker() is guaranteed to be false and
- * may_start_working() true.
+ * On return, need_to_create_worker() is guaranteed to be %false and
+ * may_start_working() %true.
*
* LOCKING:
* spin_lock_irq(pool->lock) which may be released and regrabbed
* manager.
*
* RETURNS:
- * false if no action was taken and pool->lock stayed locked, true
+ * %false if no action was taken and pool->lock stayed locked, %true
* otherwise.
*/
static bool maybe_create_worker(struct worker_pool *pool)
* multiple times. Called only from manager.
*
* RETURNS:
- * false if no action was taken and pool->lock stayed locked, true
+ * %false if no action was taken and pool->lock stayed locked, %true
* otherwise.
*/
static bool maybe_destroy_workers(struct worker_pool *pool)
struct worker_pool *pool = worker->pool;
bool ret = false;
+ /*
+ * Managership is governed by two mutexes - manager_arb and
+ * manager_mutex. manager_arb handles arbitration of manager role.
+ * Anyone who successfully grabs manager_arb wins the arbitration
+ * and becomes the manager. mutex_trylock() on pool->manager_arb
+ * failure while holding pool->lock reliably indicates that someone
+ * else is managing the pool and the worker which failed trylock
+ * can proceed to executing work items. This means that anyone
+ * grabbing manager_arb is responsible for actually performing
+ * manager duties. If manager_arb is grabbed and released without
+ * actual management, the pool may stall indefinitely.
+ *
+ * manager_mutex is used for exclusion of actual management
+ * operations. The holder of manager_mutex can be sure that none
+ * of management operations, including creation and destruction of
+ * workers, won't take place until the mutex is released. Because
+ * manager_mutex doesn't interfere with manager role arbitration,
+ * it is guaranteed that the pool's management, while may be
+ * delayed, won't be disturbed by someone else grabbing
+ * manager_mutex.
+ */
if (!mutex_trylock(&pool->manager_arb))
return ret;
/*
- * To simplify both worker management and CPU hotplug, hold off
- * management while hotplug is in progress. CPU hotplug path can't
- * grab @pool->manager_arb to achieve this because that can lead to
- * idle worker depletion (all become busy thinking someone else is
- * managing) which in turn can result in deadlock under extreme
- * circumstances. Use @pool->assoc_mutex to synchronize manager
- * against CPU hotplug.
- *
- * assoc_mutex would always be free unless CPU hotplug is in
- * progress. trylock first without dropping @pool->lock.
+ * With manager arbitration won, manager_mutex would be free in
+ * most cases. trylock first without dropping @pool->lock.
*/
- if (unlikely(!mutex_trylock(&pool->assoc_mutex))) {
+ if (unlikely(!mutex_trylock(&pool->manager_mutex))) {
spin_unlock_irq(&pool->lock);
- mutex_lock(&pool->assoc_mutex);
- /*
- * CPU hotplug could have happened while we were waiting
- * for assoc_mutex. Hotplug itself can't handle us
- * because manager isn't either on idle or busy list, and
- * @pool's state and ours could have deviated.
- *
- * As hotplug is now excluded via assoc_mutex, we can
- * simply try to bind. It will succeed or fail depending
- * on @pool's current state. Try it and adjust
- * %WORKER_UNBOUND accordingly.
- */
- if (worker_maybe_bind_and_lock(pool))
- worker->flags &= ~WORKER_UNBOUND;
- else
- worker->flags |= WORKER_UNBOUND;
-
+ mutex_lock(&pool->manager_mutex);
ret = true;
}
ret |= maybe_destroy_workers(pool);
ret |= maybe_create_worker(pool);
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
mutex_unlock(&pool->manager_arb);
return ret;
}
* worker_thread - the worker thread function
* @__worker: self
*
- * The worker thread function. There are NR_CPU_WORKER_POOLS dynamic pools
- * of these per each cpu. These workers process all works regardless of
- * their specific target workqueue. The only exception is works which
- * belong to workqueues with a rescuer which will be explained in
- * rescuer_thread().
+ * The worker thread function. All workers belong to a worker_pool -
+ * either a per-cpu one or dynamic unbound one. These workers process all
+ * work items regardless of their specific target workqueue. The only
+ * exception is work items which belong to workqueues with a rescuer which
+ * will be explained in rescuer_thread().
*/
static int worker_thread(void *__worker)
{
woke_up:
spin_lock_irq(&pool->lock);
- /* we are off idle list if destruction or rebind is requested */
- if (unlikely(list_empty(&worker->entry))) {
+ /* am I supposed to die? */
+ if (unlikely(worker->flags & WORKER_DIE)) {
spin_unlock_irq(&pool->lock);
-
- /* if DIE is set, destruction is requested */
- if (worker->flags & WORKER_DIE) {
- worker->task->flags &= ~PF_WQ_WORKER;
- return 0;
- }
-
- /* otherwise, rebind */
- idle_worker_rebind(worker);
- goto woke_up;
+ WARN_ON_ONCE(!list_empty(&worker->entry));
+ worker->task->flags &= ~PF_WQ_WORKER;
+ return 0;
}
worker_leave_idle(worker);
WARN_ON_ONCE(!list_empty(&worker->scheduled));
/*
- * When control reaches this point, we're guaranteed to have
- * at least one idle worker or that someone else has already
- * assumed the manager role.
+ * Finish PREP stage. We're guaranteed to have at least one idle
+ * worker or that someone else has already assumed the manager
+ * role. This is where @worker starts participating in concurrency
+ * management if applicable and concurrency management is restored
+ * after being rebound. See rebind_workers() for details.
*/
- worker_clr_flags(worker, WORKER_PREP);
+ worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
do {
struct work_struct *work =
}
/* see whether any pwq is asking for help */
- spin_lock_irq(&workqueue_lock);
+ spin_lock_irq(&wq_mayday_lock);
while (!list_empty(&wq->maydays)) {
struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
__set_current_state(TASK_RUNNING);
list_del_init(&pwq->mayday_node);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&wq_mayday_lock);
/* migrate to the target cpu if possible */
worker_maybe_bind_and_lock(pool);
rescuer->pool = NULL;
spin_unlock(&pool->lock);
- spin_lock(&workqueue_lock);
+ spin_lock(&wq_mayday_lock);
}
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&wq_mayday_lock);
/* rescuers should never participate in concurrency management */
WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
* flush_workqueue - ensure that any scheduled work has run to completion.
* @wq: workqueue to flush
*
- * Forces execution of the workqueue and blocks until its completion.
- * This is typically used in driver shutdown handlers.
- *
- * We sleep until all works which were queued on entry have been handled,
- * but we are not livelocked by new incoming ones.
+ * This function sleeps until all work items which were queued on entry
+ * have finished execution, but it is not livelocked by new incoming ones.
*/
void flush_workqueue(struct workqueue_struct *wq)
{
* hotter than drain_workqueue() and already looks at @wq->flags.
* Use __WQ_DRAINING so that queue doesn't have to check nr_drainers.
*/
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
if (!wq->nr_drainers++)
wq->flags |= __WQ_DRAINING;
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
reflush:
flush_workqueue(wq);
if (++flush_cnt == 10 ||
(flush_cnt % 100 == 0 && flush_cnt <= 1000))
- pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
+ pr_warn("workqueue %s: drain_workqueue() isn't complete after %u tries\n",
wq->name, flush_cnt);
local_irq_enable();
goto reflush;
}
- spin_lock(&workqueue_lock);
+ local_irq_enable();
+
+ mutex_lock(&wq_pool_mutex);
if (!--wq->nr_drainers)
wq->flags &= ~__WQ_DRAINING;
- spin_unlock(&workqueue_lock);
-
- local_irq_enable();
+ mutex_unlock(&wq_pool_mutex);
}
EXPORT_SYMBOL_GPL(drain_workqueue);
}
EXPORT_SYMBOL(cancel_delayed_work_sync);
-/**
- * schedule_work_on - put work task on a specific cpu
- * @cpu: cpu to put the work task on
- * @work: job to be done
- *
- * This puts a job on a specific cpu
- */
-bool schedule_work_on(int cpu, struct work_struct *work)
-{
- return queue_work_on(cpu, system_wq, work);
-}
-EXPORT_SYMBOL(schedule_work_on);
-
-/**
- * schedule_work - put work task in global workqueue
- * @work: job to be done
- *
- * Returns %false if @work was already on the kernel-global workqueue and
- * %true otherwise.
- *
- * This puts a job in the kernel-global workqueue if it was not already
- * queued and leaves it in the same position on the kernel-global
- * workqueue otherwise.
- */
-bool schedule_work(struct work_struct *work)
-{
- return queue_work(system_wq, work);
-}
-EXPORT_SYMBOL(schedule_work);
-
-/**
- * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
- * @cpu: cpu to use
- * @dwork: job to be done
- * @delay: number of jiffies to wait
- *
- * After waiting for a given time this puts a job in the kernel-global
- * workqueue on the specified CPU.
- */
-bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork,
- unsigned long delay)
-{
- return queue_delayed_work_on(cpu, system_wq, dwork, delay);
-}
-EXPORT_SYMBOL(schedule_delayed_work_on);
-
-/**
- * schedule_delayed_work - put work task in global workqueue after delay
- * @dwork: job to be done
- * @delay: number of jiffies to wait or 0 for immediate execution
- *
- * After waiting for a given time this puts a job in the kernel-global
- * workqueue.
- */
-bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
-{
- return queue_delayed_work(system_wq, dwork, delay);
-}
-EXPORT_SYMBOL(schedule_delayed_work);
-
/**
* schedule_on_each_cpu - execute a function synchronously on each online CPU
* @func: the function to call
}
EXPORT_SYMBOL_GPL(execute_in_process_context);
-int keventd_up(void)
-{
- return system_wq != NULL;
-}
-
#ifdef CONFIG_SYSFS
/*
* Workqueues with WQ_SYSFS flag set is visible to userland via
(unsigned long)pool);
mutex_init(&pool->manager_arb);
- mutex_init(&pool->assoc_mutex);
- ida_init(&pool->worker_ida);
+ mutex_init(&pool->manager_mutex);
+ idr_init(&pool->worker_idr);
INIT_HLIST_NODE(&pool->hash_node);
pool->refcnt = 1;
{
struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
- ida_destroy(&pool->worker_ida);
+ idr_destroy(&pool->worker_idr);
free_workqueue_attrs(pool->attrs);
kfree(pool);
}
* @pool: worker_pool to put
*
* Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
- * safe manner.
+ * safe manner. get_unbound_pool() calls this function on its failure path
+ * and this function should be able to release pools which went through,
+ * successfully or not, init_worker_pool().
*/
static void put_unbound_pool(struct worker_pool *pool)
{
struct worker *worker;
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
if (--pool->refcnt) {
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return;
}
/* sanity checks */
if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED)) ||
WARN_ON(!list_empty(&pool->worklist))) {
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return;
}
idr_remove(&worker_pool_idr, pool->id);
hash_del(&pool->hash_node);
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
- /* lock out manager and destroy all workers */
+ /*
+ * Become the manager and destroy all workers. Grabbing
+ * manager_arb prevents @pool's workers from blocking on
+ * manager_mutex.
+ */
mutex_lock(&pool->manager_arb);
+ mutex_lock(&pool->manager_mutex);
spin_lock_irq(&pool->lock);
while ((worker = first_worker(pool)))
WARN_ON(pool->nr_workers || pool->nr_idle);
spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->manager_mutex);
mutex_unlock(&pool->manager_arb);
/* shut down the timers */
*/
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
- static DEFINE_MUTEX(create_mutex);
u32 hash = wqattrs_hash(attrs);
struct worker_pool *pool;
- struct worker *worker;
- mutex_lock(&create_mutex);
+ mutex_lock(&wq_pool_mutex);
/* do we already have a matching pool? */
- spin_lock_irq(&workqueue_lock);
hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
if (wqattrs_equal(pool->attrs, attrs)) {
pool->refcnt++;
goto out_unlock;
}
}
- spin_unlock_irq(&workqueue_lock);
/* nope, create a new one */
pool = kzalloc(sizeof(*pool), GFP_KERNEL);
if (!pool || init_worker_pool(pool) < 0)
goto fail;
+ if (workqueue_freezing)
+ pool->flags |= POOL_FREEZING;
+
lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
copy_workqueue_attrs(pool->attrs, attrs);
goto fail;
/* create and start the initial worker */
- worker = create_worker(pool);
- if (!worker)
+ if (create_and_start_worker(pool) < 0)
goto fail;
- spin_lock_irq(&pool->lock);
- start_worker(worker);
- spin_unlock_irq(&pool->lock);
-
/* install */
- spin_lock_irq(&workqueue_lock);
hash_add(unbound_pool_hash, &pool->hash_node, hash);
out_unlock:
- spin_unlock_irq(&workqueue_lock);
- mutex_unlock(&create_mutex);
+ mutex_unlock(&wq_pool_mutex);
return pool;
fail:
- mutex_unlock(&create_mutex);
+ mutex_unlock(&wq_pool_mutex);
if (pool)
put_unbound_pool(pool);
return NULL;
* and consistent with the linking path.
*/
mutex_lock(&wq->flush_mutex);
- spin_lock_irq(&workqueue_lock);
+ spin_lock_irq(&pwq_lock);
list_del_rcu(&pwq->pwqs_node);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&pwq_lock);
mutex_unlock(&wq->flush_mutex);
put_unbound_pool(pool);
bool freezable = wq->flags & WQ_FREEZABLE;
/* for @wq->saved_max_active */
- lockdep_assert_held(&workqueue_lock);
+ lockdep_assert_held(&pwq_lock);
/* fast exit for non-freezable wqs */
if (!freezable && pwq->max_active == wq->saved_max_active)
while (!list_empty(&pwq->delayed_works) &&
pwq->nr_active < pwq->max_active)
pwq_activate_first_delayed(pwq);
+
+ /*
+ * Need to kick a worker after thawed or an unbound wq's
+ * max_active is bumped. It's a slow path. Do it always.
+ */
+ wake_up_worker(pwq->pool);
} else {
pwq->max_active = 0;
}
pwq->wq = wq;
pwq->flush_color = -1;
pwq->refcnt = 1;
- pwq->max_active = wq->saved_max_active;
INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->mayday_node);
INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
- /*
- * Link @pwq and set the matching work_color. This is synchronized
- * with flush_mutex to avoid confusing flush_workqueue().
- */
mutex_lock(&wq->flush_mutex);
- spin_lock_irq(&workqueue_lock);
+ spin_lock_irq(&pwq_lock);
+ /*
+ * Set the matching work_color. This is synchronized with
+ * flush_mutex to avoid confusing flush_workqueue().
+ */
if (p_last_pwq)
*p_last_pwq = first_pwq(wq);
pwq->work_color = wq->work_color;
+
+ /* sync max_active to the current setting */
+ pwq_adjust_max_active(pwq);
+
+ /* link in @pwq */
list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&pwq_lock);
mutex_unlock(&wq->flush_mutex);
}
}
wq->rescuer = rescuer;
- rescuer->task->flags |= PF_THREAD_BOUND;
+ rescuer->task->flags |= PF_NO_SETAFFINITY;
wake_up_process(rescuer->task);
}
goto err_destroy;
/*
- * workqueue_lock protects global freeze state and workqueues list.
- * Grab it, adjust max_active and add the new workqueue to
- * workqueues list.
+ * wq_pool_mutex protects global freeze state and workqueues list.
+ * Grab it, adjust max_active and add the new @wq to workqueues
+ * list.
*/
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
+ spin_lock_irq(&pwq_lock);
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
+ spin_unlock_irq(&pwq_lock);
list_add(&wq->list, &workqueues);
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return wq;
/* drain it before proceeding with destruction */
drain_workqueue(wq);
- spin_lock_irq(&workqueue_lock);
-
/* sanity checks */
+ spin_lock_irq(&pwq_lock);
for_each_pwq(pwq, wq) {
int i;
for (i = 0; i < WORK_NR_COLORS; i++) {
if (WARN_ON(pwq->nr_in_flight[i])) {
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&pwq_lock);
return;
}
}
if (WARN_ON(pwq->refcnt > 1) ||
WARN_ON(pwq->nr_active) ||
WARN_ON(!list_empty(&pwq->delayed_works))) {
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&pwq_lock);
return;
}
}
+ spin_unlock_irq(&pwq_lock);
/*
* wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us.
*/
+ mutex_lock(&wq_pool_mutex);
list_del_init(&wq->list);
-
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
workqueue_sysfs_unregister(wq);
max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
- spin_lock_irq(&workqueue_lock);
+ spin_lock_irq(&pwq_lock);
wq->saved_max_active = max_active;
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
- spin_unlock_irq(&workqueue_lock);
+ spin_unlock_irq(&pwq_lock);
}
EXPORT_SYMBOL_GPL(workqueue_set_max_active);
{
struct worker *worker = current_wq_worker();
- return worker && worker == worker->current_pwq->wq->rescuer;
+ return worker && worker->rescue_wq;
}
/**
struct pool_workqueue *pwq;
bool ret;
- preempt_disable();
+ rcu_read_lock_sched();
if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
pwq = first_pwq(wq);
ret = !list_empty(&pwq->delayed_works);
- preempt_enable();
+ rcu_read_unlock_sched();
return ret;
}
int cpu = smp_processor_id();
struct worker_pool *pool;
struct worker *worker;
- int i;
+ int wi;
for_each_cpu_worker_pool(pool, cpu) {
WARN_ON_ONCE(cpu != smp_processor_id());
- mutex_lock(&pool->assoc_mutex);
+ mutex_lock(&pool->manager_mutex);
spin_lock_irq(&pool->lock);
/*
- * We've claimed all manager positions. Make all workers
+ * We've blocked all manager operations. Make all workers
* unbound and set DISASSOCIATED. Before this, all workers
* except for the ones which are still executing works from
* before the last CPU down must be on the cpu. After
* this, they may become diasporas.
*/
- list_for_each_entry(worker, &pool->idle_list, entry)
- worker->flags |= WORKER_UNBOUND;
-
- for_each_busy_worker(worker, i, pool)
+ for_each_pool_worker(worker, wi, pool)
worker->flags |= WORKER_UNBOUND;
pool->flags |= POOL_DISASSOCIATED;
spin_unlock_irq(&pool->lock);
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
}
/*
atomic_set(&pool->nr_running, 0);
}
+/**
+ * rebind_workers - rebind all workers of a pool to the associated CPU
+ * @pool: pool of interest
+ *
+ * @pool->cpu is coming online. Rebind all workers to the CPU.
+ */
+static void rebind_workers(struct worker_pool *pool)
+{
+ struct worker *worker;
+ int wi;
+
+ lockdep_assert_held(&pool->manager_mutex);
+
+ /*
+ * Restore CPU affinity of all workers. As all idle workers should
+ * be on the run-queue of the associated CPU before any local
+ * wake-ups for concurrency management happen, restore CPU affinty
+ * of all workers first and then clear UNBOUND. As we're called
+ * from CPU_ONLINE, the following shouldn't fail.
+ */
+ for_each_pool_worker(worker, wi, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
+
+ spin_lock_irq(&pool->lock);
+
+ for_each_pool_worker(worker, wi, pool) {
+ unsigned int worker_flags = worker->flags;
+
+ /*
+ * A bound idle worker should actually be on the runqueue
+ * of the associated CPU for local wake-ups targeting it to
+ * work. Kick all idle workers so that they migrate to the
+ * associated CPU. Doing this in the same loop as
+ * replacing UNBOUND with REBOUND is safe as no worker will
+ * be bound before @pool->lock is released.
+ */
+ if (worker_flags & WORKER_IDLE)
+ wake_up_process(worker->task);
+
+ /*
+ * We want to clear UNBOUND but can't directly call
+ * worker_clr_flags() or adjust nr_running. Atomically
+ * replace UNBOUND with another NOT_RUNNING flag REBOUND.
+ * @worker will clear REBOUND using worker_clr_flags() when
+ * it initiates the next execution cycle thus restoring
+ * concurrency management. Note that when or whether
+ * @worker clears REBOUND doesn't affect correctness.
+ *
+ * ACCESS_ONCE() is necessary because @worker->flags may be
+ * tested without holding any lock in
+ * wq_worker_waking_up(). Without it, NOT_RUNNING test may
+ * fail incorrectly leading to premature concurrency
+ * management operations.
+ */
+ WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
+ worker_flags |= WORKER_REBOUND;
+ worker_flags &= ~WORKER_UNBOUND;
+ ACCESS_ONCE(worker->flags) = worker_flags;
+ }
+
+ spin_unlock_irq(&pool->lock);
+}
+
+/**
+ * restore_unbound_workers_cpumask - restore cpumask of unbound workers
+ * @pool: unbound pool of interest
+ * @cpu: the CPU which is coming up
+ *
+ * An unbound pool may end up with a cpumask which doesn't have any online
+ * CPUs. When a worker of such pool get scheduled, the scheduler resets
+ * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
+ * online CPU before, cpus_allowed of all its workers should be restored.
+ */
+static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
+{
+ static cpumask_t cpumask;
+ struct worker *worker;
+ int wi;
+
+ lockdep_assert_held(&pool->manager_mutex);
+
+ /* is @cpu allowed for @pool? */
+ if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
+ return;
+
+ /* is @cpu the only online CPU? */
+ cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
+ if (cpumask_weight(&cpumask) != 1)
+ return;
+
+ /* as we're called from CPU_ONLINE, the following shouldn't fail */
+ for_each_pool_worker(worker, wi, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
+}
+
/*
* Workqueues should be brought up before normal priority CPU notifiers.
* This will be registered high priority CPU notifier.
{
int cpu = (unsigned long)hcpu;
struct worker_pool *pool;
+ int pi;
switch (action & ~CPU_TASKS_FROZEN) {
case CPU_UP_PREPARE:
for_each_cpu_worker_pool(pool, cpu) {
- struct worker *worker;
-
if (pool->nr_workers)
continue;
-
- worker = create_worker(pool);
- if (!worker)
+ if (create_and_start_worker(pool) < 0)
return NOTIFY_BAD;
-
- spin_lock_irq(&pool->lock);
- start_worker(worker);
- spin_unlock_irq(&pool->lock);
}
break;
case CPU_DOWN_FAILED:
case CPU_ONLINE:
- for_each_cpu_worker_pool(pool, cpu) {
- mutex_lock(&pool->assoc_mutex);
- spin_lock_irq(&pool->lock);
+ mutex_lock(&wq_pool_mutex);
- pool->flags &= ~POOL_DISASSOCIATED;
- rebind_workers(pool);
+ for_each_pool(pool, pi) {
+ mutex_lock(&pool->manager_mutex);
+
+ if (pool->cpu == cpu) {
+ spin_lock_irq(&pool->lock);
+ pool->flags &= ~POOL_DISASSOCIATED;
+ spin_unlock_irq(&pool->lock);
+
+ rebind_workers(pool);
+ } else if (pool->cpu < 0) {
+ restore_unbound_workers_cpumask(pool, cpu);
+ }
- spin_unlock_irq(&pool->lock);
- mutex_unlock(&pool->assoc_mutex);
+ mutex_unlock(&pool->manager_mutex);
}
+
+ mutex_unlock(&wq_pool_mutex);
break;
}
return NOTIFY_OK;
* freeze_workqueues_begin - begin freezing workqueues
*
* Start freezing workqueues. After this function returns, all freezable
- * workqueues will queue new works to their frozen_works list instead of
+ * workqueues will queue new works to their delayed_works list instead of
* pool->worklist.
*
* CONTEXT:
- * Grabs and releases workqueue_lock and pool->lock's.
+ * Grabs and releases wq_pool_mutex, pwq_lock and pool->lock's.
*/
void freeze_workqueues_begin(void)
{
struct worker_pool *pool;
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
- int id;
+ int pi;
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(workqueue_freezing);
workqueue_freezing = true;
/* set FREEZING */
- for_each_pool(pool, id) {
- spin_lock(&pool->lock);
+ for_each_pool(pool, pi) {
+ spin_lock_irq(&pool->lock);
WARN_ON_ONCE(pool->flags & POOL_FREEZING);
pool->flags |= POOL_FREEZING;
- spin_unlock(&pool->lock);
+ spin_unlock_irq(&pool->lock);
}
/* suppress further executions by setting max_active to zero */
+ spin_lock_irq(&pwq_lock);
list_for_each_entry(wq, &workqueues, list) {
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
}
+ spin_unlock_irq(&pwq_lock);
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
}
/**
* between freeze_workqueues_begin() and thaw_workqueues().
*
* CONTEXT:
- * Grabs and releases workqueue_lock.
+ * Grabs and releases wq_pool_mutex.
*
* RETURNS:
* %true if some freezable workqueues are still busy. %false if freezing
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
WARN_ON_ONCE(!workqueue_freezing);
* nr_active is monotonically decreasing. It's safe
* to peek without lock.
*/
+ rcu_read_lock_sched();
for_each_pwq(pwq, wq) {
WARN_ON_ONCE(pwq->nr_active < 0);
if (pwq->nr_active) {
busy = true;
+ rcu_read_unlock_sched();
goto out_unlock;
}
}
+ rcu_read_unlock_sched();
}
out_unlock:
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
return busy;
}
* frozen works are transferred to their respective pool worklists.
*
* CONTEXT:
- * Grabs and releases workqueue_lock and pool->lock's.
+ * Grabs and releases wq_pool_mutex, pwq_lock and pool->lock's.
*/
void thaw_workqueues(void)
{
struct workqueue_struct *wq;
struct pool_workqueue *pwq;
struct worker_pool *pool;
- int id;
+ int pi;
- spin_lock_irq(&workqueue_lock);
+ mutex_lock(&wq_pool_mutex);
if (!workqueue_freezing)
goto out_unlock;
/* clear FREEZING */
- for_each_pool(pool, id) {
- spin_lock(&pool->lock);
+ for_each_pool(pool, pi) {
+ spin_lock_irq(&pool->lock);
WARN_ON_ONCE(!(pool->flags & POOL_FREEZING));
pool->flags &= ~POOL_FREEZING;
- spin_unlock(&pool->lock);
+ spin_unlock_irq(&pool->lock);
}
/* restore max_active and repopulate worklist */
+ spin_lock_irq(&pwq_lock);
list_for_each_entry(wq, &workqueues, list) {
for_each_pwq(pwq, wq)
pwq_adjust_max_active(pwq);
}
-
- /* kick workers */
- for_each_pool(pool, id) {
- spin_lock(&pool->lock);
- wake_up_worker(pool);
- spin_unlock(&pool->lock);
- }
+ spin_unlock_irq(&pwq_lock);
workqueue_freezing = false;
out_unlock:
- spin_unlock_irq(&workqueue_lock);
+ mutex_unlock(&wq_pool_mutex);
}
#endif /* CONFIG_FREEZER */
pool->attrs->nice = std_nice[i++];
/* alloc pool ID */
+ mutex_lock(&wq_pool_mutex);
BUG_ON(worker_pool_assign_id(pool));
+ mutex_unlock(&wq_pool_mutex);
}
}
struct worker_pool *pool;
for_each_cpu_worker_pool(pool, cpu) {
- struct worker *worker;
-
pool->flags &= ~POOL_DISASSOCIATED;
-
- worker = create_worker(pool);
- BUG_ON(!worker);
- spin_lock_irq(&pool->lock);
- start_worker(worker);
- spin_unlock_irq(&pool->lock);
+ BUG_ON(create_and_start_worker(pool) < 0);
}
}