Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq
authorLinus Torvalds <torvalds@linux-foundation.org>
Sat, 23 Oct 2010 00:13:10 +0000 (17:13 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Sat, 23 Oct 2010 00:13:10 +0000 (17:13 -0700)
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq:
  workqueue: remove in_workqueue_context()
  workqueue: Clarify that schedule_on_each_cpu is synchronous
  memory_hotplug: drop spurious calls to flush_scheduled_work()
  shpchp: update workqueue usage
  pciehp: update workqueue usage
  isdn/eicon: don't call flush_scheduled_work() from diva_os_remove_soft_isr()
  workqueue: add and use WQ_MEM_RECLAIM flag
  workqueue: fix HIGHPRI handling in keep_working()
  workqueue: add queue_work and activate_work trace points
  workqueue: prepare for more tracepoints
  workqueue: implement flush[_delayed]_work_sync()
  workqueue: factor out start_flush_work()
  workqueue: cleanup flush/cancel functions
  workqueue: implement alloc_ordered_workqueue()

Fix up trivial conflict in fs/gfs2/main.c as per Tejun

17 files changed:
Documentation/workqueue.txt
drivers/ata/libata-sff.c
drivers/isdn/hardware/eicon/divasmain.c
drivers/pci/hotplug/pciehp.h
drivers/pci/hotplug/pciehp_core.c
drivers/pci/hotplug/pciehp_ctrl.c
drivers/pci/hotplug/pciehp_hpc.c
drivers/pci/hotplug/shpchp.h
drivers/pci/hotplug/shpchp_core.c
drivers/pci/hotplug/shpchp_ctrl.c
drivers/pci/hotplug/shpchp_hpc.c
fs/gfs2/main.c
fs/xfs/linux-2.6/xfs_buf.c
include/linux/workqueue.h
include/trace/events/workqueue.h
kernel/workqueue.c
mm/memory_hotplug.c

index e4498a2..996a27d 100644 (file)
@@ -196,11 +196,11 @@ resources, scheduled and executed.
        suspend operations.  Work items on the wq are drained and no
        new work item starts execution until thawed.
 
-  WQ_RESCUER
+  WQ_MEM_RECLAIM
 
        All wq which might be used in the memory reclaim paths _MUST_
-       have this flag set.  This reserves one worker exclusively for
-       the execution of this wq under memory pressure.
+       have this flag set.  The wq is guaranteed to have at least one
+       execution context regardless of memory pressure.
 
   WQ_HIGHPRI
 
@@ -356,11 +356,11 @@ If q1 has WQ_CPU_INTENSIVE set,
 
 6. Guidelines
 
-* Do not forget to use WQ_RESCUER if a wq may process work items which
-  are used during memory reclaim.  Each wq with WQ_RESCUER set has one
-  rescuer thread reserved for it.  If there is dependency among
-  multiple work items used during memory reclaim, they should be
-  queued to separate wq each with WQ_RESCUER.
+* Do not forget to use WQ_MEM_RECLAIM if a wq may process work items
+  which are used during memory reclaim.  Each wq with WQ_MEM_RECLAIM
+  set has an execution context reserved for it.  If there is
+  dependency among multiple work items used during memory reclaim,
+  they should be queued to separate wq each with WQ_MEM_RECLAIM.
 
 * Unless strict ordering is required, there is no need to use ST wq.
 
@@ -368,12 +368,13 @@ If q1 has WQ_CPU_INTENSIVE set,
   recommended.  In most use cases, concurrency level usually stays
   well under the default limit.
 
-* A wq serves as a domain for forward progress guarantee (WQ_RESCUER),
-  flush and work item attributes.  Work items which are not involved
-  in memory reclaim and don't need to be flushed as a part of a group
-  of work items, and don't require any special attribute, can use one
-  of the system wq.  There is no difference in execution
-  characteristics between using a dedicated wq and a system wq.
+* A wq serves as a domain for forward progress guarantee
+  (WQ_MEM_RECLAIM, flush and work item attributes.  Work items which
+  are not involved in memory reclaim and don't need to be flushed as a
+  part of a group of work items, and don't require any special
+  attribute, can use one of the system wq.  There is no difference in
+  execution characteristics between using a dedicated wq and a system
+  wq.
 
 * Unless work items are expected to consume a huge amount of CPU
   cycles, using a bound wq is usually beneficial due to the increased
index 14d18bf..d05387d 100644 (file)
@@ -3335,7 +3335,7 @@ void ata_sff_port_init(struct ata_port *ap)
 
 int __init ata_sff_init(void)
 {
-       ata_sff_wq = alloc_workqueue("ata_sff", WQ_RESCUER, WQ_MAX_ACTIVE);
+       ata_sff_wq = alloc_workqueue("ata_sff", WQ_MEM_RECLAIM, WQ_MAX_ACTIVE);
        if (!ata_sff_wq)
                return -ENOMEM;
 
index ed9c555..f332b60 100644 (file)
@@ -15,7 +15,6 @@
 #include <asm/uaccess.h>
 #include <asm/io.h>
 #include <linux/ioport.h>
-#include <linux/workqueue.h>
 #include <linux/pci.h>
 #include <linux/interrupt.h>
 #include <linux/list.h>
@@ -546,7 +545,6 @@ void diva_os_remove_soft_isr(diva_os_soft_isr_t * psoft_isr)
                void *mem;
 
                tasklet_kill(&pdpc->divas_task);
-               flush_scheduled_work();
                mem = psoft_isr->object;
                psoft_isr->object = NULL;
                diva_os_free(0, mem);
index 73d5139..838f571 100644 (file)
@@ -36,6 +36,7 @@
 #include <linux/sched.h>               /* signal_pending() */
 #include <linux/pcieport_if.h>
 #include <linux/mutex.h>
+#include <linux/workqueue.h>
 
 #define MY_NAME        "pciehp"
 
@@ -44,6 +45,7 @@ extern int pciehp_poll_time;
 extern int pciehp_debug;
 extern int pciehp_force;
 extern struct workqueue_struct *pciehp_wq;
+extern struct workqueue_struct *pciehp_ordered_wq;
 
 #define dbg(format, arg...)                                            \
 do {                                                                   \
index aa5f3ff..7ac8358 100644 (file)
@@ -43,6 +43,7 @@ int pciehp_poll_mode;
 int pciehp_poll_time;
 int pciehp_force;
 struct workqueue_struct *pciehp_wq;
+struct workqueue_struct *pciehp_ordered_wq;
 
 #define DRIVER_VERSION "0.4"
 #define DRIVER_AUTHOR  "Dan Zink <dan.zink@compaq.com>, Greg Kroah-Hartman <greg@kroah.com>, Dely Sy <dely.l.sy@intel.com>"
@@ -340,18 +341,33 @@ static int __init pcied_init(void)
 {
        int retval = 0;
 
+       pciehp_wq = alloc_workqueue("pciehp", 0, 0);
+       if (!pciehp_wq)
+               return -ENOMEM;
+
+       pciehp_ordered_wq = alloc_ordered_workqueue("pciehp_ordered", 0);
+       if (!pciehp_ordered_wq) {
+               destroy_workqueue(pciehp_wq);
+               return -ENOMEM;
+       }
+
        pciehp_firmware_init();
        retval = pcie_port_service_register(&hpdriver_portdrv);
        dbg("pcie_port_service_register = %d\n", retval);
        info(DRIVER_DESC " version: " DRIVER_VERSION "\n");
-       if (retval)
+       if (retval) {
+               destroy_workqueue(pciehp_ordered_wq);
+               destroy_workqueue(pciehp_wq);
                dbg("Failure to register service\n");
+       }
        return retval;
 }
 
 static void __exit pcied_cleanup(void)
 {
        dbg("unload_pciehpd()\n");
+       destroy_workqueue(pciehp_ordered_wq);
+       destroy_workqueue(pciehp_wq);
        pcie_port_service_unregister(&hpdriver_portdrv);
        info(DRIVER_DESC " version: " DRIVER_VERSION " unloaded\n");
 }
index 8f58148..085dbb5 100644 (file)
@@ -32,7 +32,6 @@
 #include <linux/types.h>
 #include <linux/slab.h>
 #include <linux/pci.h>
-#include <linux/workqueue.h>
 #include "../pci.h"
 #include "pciehp.h"
 
@@ -50,7 +49,7 @@ static int queue_interrupt_event(struct slot *p_slot, u32 event_type)
        info->p_slot = p_slot;
        INIT_WORK(&info->work, interrupt_event_handler);
 
-       schedule_work(&info->work);
+       queue_work(pciehp_wq, &info->work);
 
        return 0;
 }
@@ -345,7 +344,7 @@ void pciehp_queue_pushbutton_work(struct work_struct *work)
                kfree(info);
                goto out;
        }
-       queue_work(pciehp_wq, &info->work);
+       queue_work(pciehp_ordered_wq, &info->work);
  out:
        mutex_unlock(&p_slot->lock);
 }
@@ -378,7 +377,7 @@ static void handle_button_press_event(struct slot *p_slot)
                if (ATTN_LED(ctrl))
                        pciehp_set_attention_status(p_slot, 0);
 
-               schedule_delayed_work(&p_slot->work, 5*HZ);
+               queue_delayed_work(pciehp_wq, &p_slot->work, 5*HZ);
                break;
        case BLINKINGOFF_STATE:
        case BLINKINGON_STATE:
@@ -440,7 +439,7 @@ static void handle_surprise_event(struct slot *p_slot)
        else
                p_slot->state = POWERON_STATE;
 
-       queue_work(pciehp_wq, &info->work);
+       queue_work(pciehp_ordered_wq, &info->work);
 }
 
 static void interrupt_event_handler(struct work_struct *work)
index 0cd4204..50a23da 100644 (file)
@@ -41,8 +41,6 @@
 #include "../pci.h"
 #include "pciehp.h"
 
-static atomic_t pciehp_num_controllers = ATOMIC_INIT(0);
-
 static inline int pciehp_readw(struct controller *ctrl, int reg, u16 *value)
 {
        struct pci_dev *dev = ctrl->pcie->port;
@@ -805,8 +803,8 @@ static void pcie_cleanup_slot(struct controller *ctrl)
 {
        struct slot *slot = ctrl->slot;
        cancel_delayed_work(&slot->work);
-       flush_scheduled_work();
        flush_workqueue(pciehp_wq);
+       flush_workqueue(pciehp_ordered_wq);
        kfree(slot);
 }
 
@@ -912,16 +910,6 @@ struct controller *pcie_init(struct pcie_device *dev)
        /* Disable sotfware notification */
        pcie_disable_notification(ctrl);
 
-       /*
-        * If this is the first controller to be initialized,
-        * initialize the pciehp work queue
-        */
-       if (atomic_add_return(1, &pciehp_num_controllers) == 1) {
-               pciehp_wq = create_singlethread_workqueue("pciehpd");
-               if (!pciehp_wq)
-                       goto abort_ctrl;
-       }
-
        ctrl_info(ctrl, "HPC vendor_id %x device_id %x ss_vid %x ss_did %x\n",
                  pdev->vendor, pdev->device, pdev->subsystem_vendor,
                  pdev->subsystem_device);
@@ -941,11 +929,5 @@ void pciehp_release_ctrl(struct controller *ctrl)
 {
        pcie_shutdown_notification(ctrl);
        pcie_cleanup_slot(ctrl);
-       /*
-        * If this is the last controller to be released, destroy the
-        * pciehp work queue
-        */
-       if (atomic_dec_and_test(&pciehp_num_controllers))
-               destroy_workqueue(pciehp_wq);
        kfree(ctrl);
 }
index d2627e1..e0c90e6 100644 (file)
@@ -35,6 +35,7 @@
 #include <linux/delay.h>
 #include <linux/sched.h>       /* signal_pending(), struct timer_list */
 #include <linux/mutex.h>
+#include <linux/workqueue.h>
 
 #if !defined(MODULE)
        #define MY_NAME "shpchp"
@@ -46,6 +47,7 @@ extern int shpchp_poll_mode;
 extern int shpchp_poll_time;
 extern int shpchp_debug;
 extern struct workqueue_struct *shpchp_wq;
+extern struct workqueue_struct *shpchp_ordered_wq;
 
 #define dbg(format, arg...)                                            \
 do {                                                                   \
index a7bd504..aca972b 100644 (file)
@@ -33,7 +33,6 @@
 #include <linux/types.h>
 #include <linux/slab.h>
 #include <linux/pci.h>
-#include <linux/workqueue.h>
 #include "shpchp.h"
 
 /* Global variables */
@@ -41,6 +40,7 @@ int shpchp_debug;
 int shpchp_poll_mode;
 int shpchp_poll_time;
 struct workqueue_struct *shpchp_wq;
+struct workqueue_struct *shpchp_ordered_wq;
 
 #define DRIVER_VERSION "0.4"
 #define DRIVER_AUTHOR  "Dan Zink <dan.zink@compaq.com>, Greg Kroah-Hartman <greg@kroah.com>, Dely Sy <dely.l.sy@intel.com>"
@@ -174,8 +174,8 @@ void cleanup_slots(struct controller *ctrl)
                slot = list_entry(tmp, struct slot, slot_list);
                list_del(&slot->slot_list);
                cancel_delayed_work(&slot->work);
-               flush_scheduled_work();
                flush_workqueue(shpchp_wq);
+               flush_workqueue(shpchp_ordered_wq);
                pci_hp_deregister(slot->hotplug_slot);
        }
 }
@@ -360,9 +360,23 @@ static int __init shpcd_init(void)
 {
        int retval = 0;
 
+       shpchp_wq = alloc_ordered_workqueue("shpchp", 0);
+       if (!shpchp_wq)
+               return -ENOMEM;
+
+       shpchp_ordered_wq = alloc_ordered_workqueue("shpchp_ordered", 0);
+       if (!shpchp_ordered_wq) {
+               destroy_workqueue(shpchp_wq);
+               return -ENOMEM;
+       }
+
        retval = pci_register_driver(&shpc_driver);
        dbg("%s: pci_register_driver = %d\n", __func__, retval);
        info(DRIVER_DESC " version: " DRIVER_VERSION "\n");
+       if (retval) {
+               destroy_workqueue(shpchp_ordered_wq);
+               destroy_workqueue(shpchp_wq);
+       }
        return retval;
 }
 
@@ -370,6 +384,8 @@ static void __exit shpcd_cleanup(void)
 {
        dbg("unload_shpchpd()\n");
        pci_unregister_driver(&shpc_driver);
+       destroy_workqueue(shpchp_ordered_wq);
+       destroy_workqueue(shpchp_wq);
        info(DRIVER_DESC " version: " DRIVER_VERSION " unloaded\n");
 }
 
index 3387fbf..b00b09b 100644 (file)
@@ -32,7 +32,6 @@
 #include <linux/types.h>
 #include <linux/slab.h>
 #include <linux/pci.h>
-#include <linux/workqueue.h>
 #include "../pci.h"
 #include "shpchp.h"
 
@@ -52,7 +51,7 @@ static int queue_interrupt_event(struct slot *p_slot, u32 event_type)
        info->p_slot = p_slot;
        INIT_WORK(&info->work, interrupt_event_handler);
 
-       schedule_work(&info->work);
+       queue_work(shpchp_wq, &info->work);
 
        return 0;
 }
@@ -457,7 +456,7 @@ void shpchp_queue_pushbutton_work(struct work_struct *work)
                kfree(info);
                goto out;
        }
-       queue_work(shpchp_wq, &info->work);
+       queue_work(shpchp_ordered_wq, &info->work);
  out:
        mutex_unlock(&p_slot->lock);
 }
@@ -505,7 +504,7 @@ static void handle_button_press_event(struct slot *p_slot)
                p_slot->hpc_ops->green_led_blink(p_slot);
                p_slot->hpc_ops->set_attention_status(p_slot, 0);
 
-               schedule_delayed_work(&p_slot->work, 5*HZ);
+               queue_delayed_work(shpchp_wq, &p_slot->work, 5*HZ);
                break;
        case BLINKINGOFF_STATE:
        case BLINKINGON_STATE:
index d3985e7..36547f0 100644 (file)
 #define SLOT_EVENT_LATCH       0x2
 #define SLOT_SERR_INT_MASK     0x3
 
-static atomic_t shpchp_num_controllers = ATOMIC_INIT(0);
-
 static irqreturn_t shpc_isr(int irq, void *dev_id);
 static void start_int_poll_timer(struct controller *ctrl, int sec);
 static int hpc_check_cmd_status(struct controller *ctrl);
@@ -614,13 +612,6 @@ static void hpc_release_ctlr(struct controller *ctrl)
 
        iounmap(ctrl->creg);
        release_mem_region(ctrl->mmio_base, ctrl->mmio_size);
-
-       /*
-        * If this is the last controller to be released, destroy the
-        * shpchpd work queue
-        */
-       if (atomic_dec_and_test(&shpchp_num_controllers))
-               destroy_workqueue(shpchp_wq);
 }
 
 static int hpc_power_on_slot(struct slot * slot)
@@ -1077,9 +1068,8 @@ int shpc_init(struct controller *ctrl, struct pci_dev *pdev)
 
                rc = request_irq(ctrl->pci_dev->irq, shpc_isr, IRQF_SHARED,
                                 MY_NAME, (void *)ctrl);
-               ctrl_dbg(ctrl, "request_irq %d for hpc%d (returns %d)\n",
-                        ctrl->pci_dev->irq,
-                   atomic_read(&shpchp_num_controllers), rc);
+               ctrl_dbg(ctrl, "request_irq %d (returns %d)\n",
+                        ctrl->pci_dev->irq, rc);
                if (rc) {
                        ctrl_err(ctrl, "Can't get irq %d for the hotplug "
                                 "controller\n", ctrl->pci_dev->irq);
@@ -1091,18 +1081,6 @@ int shpc_init(struct controller *ctrl, struct pci_dev *pdev)
        shpc_get_max_bus_speed(ctrl);
        shpc_get_cur_bus_speed(ctrl);
 
-       /*
-        * If this is the first controller to be initialized,
-        * initialize the shpchpd work queue
-        */
-       if (atomic_add_return(1, &shpchp_num_controllers) == 1) {
-               shpchp_wq = create_singlethread_workqueue("shpchpd");
-               if (!shpchp_wq) {
-                       rc = -ENOMEM;
-                       goto abort_iounmap;
-               }
-       }
-
        /*
         * Unmask all event interrupts of all slots
         */
index d7eb1e2..ebef7ab 100644 (file)
@@ -144,7 +144,7 @@ static int __init init_gfs2_fs(void)
 
        error = -ENOMEM;
        gfs_recovery_wq = alloc_workqueue("gfs_recovery",
-                                         WQ_RESCUER | WQ_FREEZEABLE, 0);
+                                         WQ_MEM_RECLAIM | WQ_FREEZEABLE, 0);
        if (!gfs_recovery_wq)
                goto fail_wq;
 
index 1846a0d..f3ccaec 100644 (file)
@@ -1921,7 +1921,7 @@ xfs_buf_init(void)
                goto out;
 
        xfslogd_workqueue = alloc_workqueue("xfslogd",
-                                       WQ_RESCUER | WQ_HIGHPRI, 1);
+                                       WQ_MEM_RECLAIM | WQ_HIGHPRI, 1);
        if (!xfslogd_workqueue)
                goto out_free_buf_zone;
 
index 25e02c9..070bb7a 100644 (file)
@@ -243,11 +243,12 @@ enum {
        WQ_NON_REENTRANT        = 1 << 0, /* guarantee non-reentrance */
        WQ_UNBOUND              = 1 << 1, /* not bound to any cpu */
        WQ_FREEZEABLE           = 1 << 2, /* freeze during suspend */
-       WQ_RESCUER              = 1 << 3, /* has an rescue worker */
+       WQ_MEM_RECLAIM          = 1 << 3, /* may be used for memory reclaim */
        WQ_HIGHPRI              = 1 << 4, /* high priority */
        WQ_CPU_INTENSIVE        = 1 << 5, /* cpu instensive workqueue */
 
        WQ_DYING                = 1 << 6, /* internal: workqueue is dying */
+       WQ_RESCUER              = 1 << 7, /* internal: workqueue has rescuer */
 
        WQ_MAX_ACTIVE           = 512,    /* I like 512, better ideas? */
        WQ_MAX_UNBOUND_PER_CPU  = 4,      /* 4 * #cpus for unbound wq */
@@ -306,12 +307,30 @@ __alloc_workqueue_key(const char *name, unsigned int flags, int max_active,
        __alloc_workqueue_key((name), (flags), (max_active), NULL, NULL)
 #endif
 
+/**
+ * alloc_ordered_workqueue - allocate an ordered workqueue
+ * @name: name of the workqueue
+ * @flags: WQ_* flags (only WQ_FREEZEABLE and WQ_MEM_RECLAIM are meaningful)
+ *
+ * Allocate an ordered workqueue.  An ordered workqueue executes at
+ * most one work item at any given time in the queued order.  They are
+ * implemented as unbound workqueues with @max_active of one.
+ *
+ * RETURNS:
+ * Pointer to the allocated workqueue on success, %NULL on failure.
+ */
+static inline struct workqueue_struct *
+alloc_ordered_workqueue(const char *name, unsigned int flags)
+{
+       return alloc_workqueue(name, WQ_UNBOUND | flags, 1);
+}
+
 #define create_workqueue(name)                                 \
-       alloc_workqueue((name), WQ_RESCUER, 1)
+       alloc_workqueue((name), WQ_MEM_RECLAIM, 1)
 #define create_freezeable_workqueue(name)                      \
-       alloc_workqueue((name), WQ_FREEZEABLE | WQ_UNBOUND | WQ_RESCUER, 1)
+       alloc_workqueue((name), WQ_FREEZEABLE | WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
 #define create_singlethread_workqueue(name)                    \
-       alloc_workqueue((name), WQ_UNBOUND | WQ_RESCUER, 1)
+       alloc_workqueue((name), WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
 
 extern void destroy_workqueue(struct workqueue_struct *wq);
 
@@ -325,7 +344,6 @@ extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
 
 extern void flush_workqueue(struct workqueue_struct *wq);
 extern void flush_scheduled_work(void);
-extern void flush_delayed_work(struct delayed_work *work);
 
 extern int schedule_work(struct work_struct *work);
 extern int schedule_work_on(int cpu, struct work_struct *work);
@@ -337,8 +355,13 @@ extern int keventd_up(void);
 
 int execute_in_process_context(work_func_t fn, struct execute_work *);
 
-extern int flush_work(struct work_struct *work);
-extern int cancel_work_sync(struct work_struct *work);
+extern bool flush_work(struct work_struct *work);
+extern bool flush_work_sync(struct work_struct *work);
+extern bool cancel_work_sync(struct work_struct *work);
+
+extern bool flush_delayed_work(struct delayed_work *dwork);
+extern bool flush_delayed_work_sync(struct delayed_work *work);
+extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
 
 extern void workqueue_set_max_active(struct workqueue_struct *wq,
                                     int max_active);
@@ -352,9 +375,9 @@ extern unsigned int work_busy(struct work_struct *work);
  * it returns 1 and the work doesn't re-arm itself. Run flush_workqueue() or
  * cancel_work_sync() to wait on it.
  */
-static inline int cancel_delayed_work(struct delayed_work *work)
+static inline bool cancel_delayed_work(struct delayed_work *work)
 {
-       int ret;
+       bool ret;
 
        ret = del_timer_sync(&work->timer);
        if (ret)
@@ -367,9 +390,9 @@ static inline int cancel_delayed_work(struct delayed_work *work)
  * if it returns 0 the timer function may be running and the queueing is in
  * progress.
  */
-static inline int __cancel_delayed_work(struct delayed_work *work)
+static inline bool __cancel_delayed_work(struct delayed_work *work)
 {
-       int ret;
+       bool ret;
 
        ret = del_timer(&work->timer);
        if (ret)
@@ -377,8 +400,6 @@ static inline int __cancel_delayed_work(struct delayed_work *work)
        return ret;
 }
 
-extern int cancel_delayed_work_sync(struct delayed_work *work);
-
 /* Obsolete. use cancel_delayed_work_sync() */
 static inline
 void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
@@ -409,8 +430,4 @@ extern bool freeze_workqueues_busy(void);
 extern void thaw_workqueues(void);
 #endif /* CONFIG_FREEZER */
 
-#ifdef CONFIG_LOCKDEP
-int in_workqueue_context(struct workqueue_struct *wq);
-#endif
-
 #endif
index 49682d7..7d49729 100644 (file)
@@ -7,38 +7,83 @@
 #include <linux/tracepoint.h>
 #include <linux/workqueue.h>
 
+DECLARE_EVENT_CLASS(workqueue_work,
+
+       TP_PROTO(struct work_struct *work),
+
+       TP_ARGS(work),
+
+       TP_STRUCT__entry(
+               __field( void *,        work    )
+       ),
+
+       TP_fast_assign(
+               __entry->work           = work;
+       ),
+
+       TP_printk("work struct %p", __entry->work)
+);
+
 /**
- * workqueue_execute_start - called immediately before the workqueue callback
+ * workqueue_queue_work - called when a work gets queued
+ * @req_cpu:   the requested cpu
+ * @cwq:       pointer to struct cpu_workqueue_struct
  * @work:      pointer to struct work_struct
  *
- * Allows to track workqueue execution.
+ * This event occurs when a work is queued immediately or once a
+ * delayed work is actually queued on a workqueue (ie: once the delay
+ * has been reached).
  */
-TRACE_EVENT(workqueue_execute_start,
+TRACE_EVENT(workqueue_queue_work,
 
-       TP_PROTO(struct work_struct *work),
+       TP_PROTO(unsigned int req_cpu, struct cpu_workqueue_struct *cwq,
+                struct work_struct *work),
 
-       TP_ARGS(work),
+       TP_ARGS(req_cpu, cwq, work),
 
        TP_STRUCT__entry(
                __field( void *,        work    )
                __field( void *,        function)
+               __field( void *,        workqueue)
+               __field( unsigned int,  req_cpu )
+               __field( unsigned int,  cpu     )
        ),
 
        TP_fast_assign(
                __entry->work           = work;
                __entry->function       = work->func;
+               __entry->workqueue      = cwq->wq;
+               __entry->req_cpu        = req_cpu;
+               __entry->cpu            = cwq->gcwq->cpu;
        ),
 
-       TP_printk("work struct %p: function %pf", __entry->work, __entry->function)
+       TP_printk("work struct=%p function=%pf workqueue=%p req_cpu=%u cpu=%u",
+                 __entry->work, __entry->function, __entry->workqueue,
+                 __entry->req_cpu, __entry->cpu)
 );
 
 /**
- * workqueue_execute_end - called immediately before the workqueue callback
+ * workqueue_activate_work - called when a work gets activated
+ * @work:      pointer to struct work_struct
+ *
+ * This event occurs when a queued work is put on the active queue,
+ * which happens immediately after queueing unless @max_active limit
+ * is reached.
+ */
+DEFINE_EVENT(workqueue_work, workqueue_activate_work,
+
+       TP_PROTO(struct work_struct *work),
+
+       TP_ARGS(work)
+);
+
+/**
+ * workqueue_execute_start - called immediately before the workqueue callback
  * @work:      pointer to struct work_struct
  *
  * Allows to track workqueue execution.
  */
-TRACE_EVENT(workqueue_execute_end,
+TRACE_EVENT(workqueue_execute_start,
 
        TP_PROTO(struct work_struct *work),
 
@@ -46,15 +91,29 @@ TRACE_EVENT(workqueue_execute_end,
 
        TP_STRUCT__entry(
                __field( void *,        work    )
+               __field( void *,        function)
        ),
 
        TP_fast_assign(
                __entry->work           = work;
+               __entry->function       = work->func;
        ),
 
-       TP_printk("work struct %p", __entry->work)
+       TP_printk("work struct %p: function %pf", __entry->work, __entry->function)
 );
 
+/**
+ * workqueue_execute_end - called immediately before the workqueue callback
+ * @work:      pointer to struct work_struct
+ *
+ * Allows to track workqueue execution.
+ */
+DEFINE_EVENT(workqueue_work, workqueue_execute_end,
+
+       TP_PROTO(struct work_struct *work),
+
+       TP_ARGS(work)
+);
 
 #endif /*  _TRACE_WORKQUEUE_H */
 
index f77afd9..30acdb7 100644 (file)
@@ -42,9 +42,6 @@
 #include <linux/lockdep.h>
 #include <linux/idr.h>
 
-#define CREATE_TRACE_POINTS
-#include <trace/events/workqueue.h>
-
 #include "workqueue_sched.h"
 
 enum {
@@ -257,6 +254,9 @@ EXPORT_SYMBOL_GPL(system_long_wq);
 EXPORT_SYMBOL_GPL(system_nrt_wq);
 EXPORT_SYMBOL_GPL(system_unbound_wq);
 
+#define CREATE_TRACE_POINTS
+#include <trace/events/workqueue.h>
+
 #define for_each_busy_worker(worker, i, pos, gcwq)                     \
        for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
                hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
@@ -310,21 +310,6 @@ static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
             (cpu) < WORK_CPU_NONE;                                     \
             (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
 
-#ifdef CONFIG_LOCKDEP
-/**
- * in_workqueue_context() - in context of specified workqueue?
- * @wq: the workqueue of interest
- *
- * Checks lockdep state to see if the current task is executing from
- * within a workqueue item.  This function exists only if lockdep is
- * enabled.
- */
-int in_workqueue_context(struct workqueue_struct *wq)
-{
-       return lock_is_held(&wq->lockdep_map);
-}
-#endif
-
 #ifdef CONFIG_DEBUG_OBJECTS_WORK
 
 static struct debug_obj_descr work_debug_descr;
@@ -604,7 +589,9 @@ static bool keep_working(struct global_cwq *gcwq)
 {
        atomic_t *nr_running = get_gcwq_nr_running(gcwq->cpu);
 
-       return !list_empty(&gcwq->worklist) && atomic_read(nr_running) <= 1;
+       return !list_empty(&gcwq->worklist) &&
+               (atomic_read(nr_running) <= 1 ||
+                gcwq->flags & GCWQ_HIGHPRI_PENDING);
 }
 
 /* Do we need a new worker?  Called from manager. */
@@ -997,6 +984,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
 
        /* gcwq determined, get cwq and queue */
        cwq = get_cwq(gcwq->cpu, wq);
+       trace_workqueue_queue_work(cpu, cwq, work);
 
        BUG_ON(!list_empty(&work->entry));
 
@@ -1004,6 +992,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
        work_flags = work_color_to_flags(cwq->work_color);
 
        if (likely(cwq->nr_active < cwq->max_active)) {
+               trace_workqueue_activate_work(work);
                cwq->nr_active++;
                worklist = gcwq_determine_ins_pos(gcwq, cwq);
        } else {
@@ -1679,6 +1668,7 @@ static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
                                                    struct work_struct, entry);
        struct list_head *pos = gcwq_determine_ins_pos(cwq->gcwq, cwq);
 
+       trace_workqueue_activate_work(work);
        move_linked_works(work, pos, NULL);
        __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
        cwq->nr_active++;
@@ -2326,27 +2316,17 @@ out_unlock:
 }
 EXPORT_SYMBOL_GPL(flush_workqueue);
 
-/**
- * flush_work - block until a work_struct's callback has terminated
- * @work: the work which is to be flushed
- *
- * Returns false if @work has already terminated.
- *
- * It is expected that, prior to calling flush_work(), the caller has
- * arranged for the work to not be requeued, otherwise it doesn't make
- * sense to use this function.
- */
-int flush_work(struct work_struct *work)
+static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
+                            bool wait_executing)
 {
        struct worker *worker = NULL;
        struct global_cwq *gcwq;
        struct cpu_workqueue_struct *cwq;
-       struct wq_barrier barr;
 
        might_sleep();
        gcwq = get_work_gcwq(work);
        if (!gcwq)
-               return 0;
+               return false;
 
        spin_lock_irq(&gcwq->lock);
        if (!list_empty(&work->entry)) {
@@ -2359,28 +2339,127 @@ int flush_work(struct work_struct *work)
                cwq = get_work_cwq(work);
                if (unlikely(!cwq || gcwq != cwq->gcwq))
                        goto already_gone;
-       } else {
+       } else if (wait_executing) {
                worker = find_worker_executing_work(gcwq, work);
                if (!worker)
                        goto already_gone;
                cwq = worker->current_cwq;
-       }
+       } else
+               goto already_gone;
 
-       insert_wq_barrier(cwq, &barr, work, worker);
+       insert_wq_barrier(cwq, barr, work, worker);
        spin_unlock_irq(&gcwq->lock);
 
        lock_map_acquire(&cwq->wq->lockdep_map);
        lock_map_release(&cwq->wq->lockdep_map);
-
-       wait_for_completion(&barr.done);
-       destroy_work_on_stack(&barr.work);
-       return 1;
+       return true;
 already_gone:
        spin_unlock_irq(&gcwq->lock);
-       return 0;
+       return false;
+}
+
+/**
+ * flush_work - wait for a work to finish executing the last queueing instance
+ * @work: the work to flush
+ *
+ * Wait until @work has finished execution.  This function considers
+ * only the last queueing instance of @work.  If @work has been
+ * enqueued across different CPUs on a non-reentrant workqueue or on
+ * multiple workqueues, @work might still be executing on return on
+ * some of the CPUs from earlier queueing.
+ *
+ * If @work was queued only on a non-reentrant, ordered or unbound
+ * workqueue, @work is guaranteed to be idle on return if it hasn't
+ * been requeued since flush started.
+ *
+ * RETURNS:
+ * %true if flush_work() waited for the work to finish execution,
+ * %false if it was already idle.
+ */
+bool flush_work(struct work_struct *work)
+{
+       struct wq_barrier barr;
+
+       if (start_flush_work(work, &barr, true)) {
+               wait_for_completion(&barr.done);
+               destroy_work_on_stack(&barr.work);
+               return true;
+       } else
+               return false;
 }
 EXPORT_SYMBOL_GPL(flush_work);
 
+static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
+{
+       struct wq_barrier barr;
+       struct worker *worker;
+
+       spin_lock_irq(&gcwq->lock);
+
+       worker = find_worker_executing_work(gcwq, work);
+       if (unlikely(worker))
+               insert_wq_barrier(worker->current_cwq, &barr, work, worker);
+
+       spin_unlock_irq(&gcwq->lock);
+
+       if (unlikely(worker)) {
+               wait_for_completion(&barr.done);
+               destroy_work_on_stack(&barr.work);
+               return true;
+       } else
+               return false;
+}
+
+static bool wait_on_work(struct work_struct *work)
+{
+       bool ret = false;
+       int cpu;
+
+       might_sleep();
+
+       lock_map_acquire(&work->lockdep_map);
+       lock_map_release(&work->lockdep_map);
+
+       for_each_gcwq_cpu(cpu)
+               ret |= wait_on_cpu_work(get_gcwq(cpu), work);
+       return ret;
+}
+
+/**
+ * flush_work_sync - wait until a work has finished execution
+ * @work: the work to flush
+ *
+ * Wait until @work has finished execution.  On return, it's
+ * guaranteed that all queueing instances of @work which happened
+ * before this function is called are finished.  In other words, if
+ * @work hasn't been requeued since this function was called, @work is
+ * guaranteed to be idle on return.
+ *
+ * RETURNS:
+ * %true if flush_work_sync() waited for the work to finish execution,
+ * %false if it was already idle.
+ */
+bool flush_work_sync(struct work_struct *work)
+{
+       struct wq_barrier barr;
+       bool pending, waited;
+
+       /* we'll wait for executions separately, queue barr only if pending */
+       pending = start_flush_work(work, &barr, false);
+
+       /* wait for executions to finish */
+       waited = wait_on_work(work);
+
+       /* wait for the pending one */
+       if (pending) {
+               wait_for_completion(&barr.done);
+               destroy_work_on_stack(&barr.work);
+       }
+
+       return pending || waited;
+}
+EXPORT_SYMBOL_GPL(flush_work_sync);
+
 /*
  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
  * so this work can't be re-armed in any way.
@@ -2423,39 +2502,7 @@ static int try_to_grab_pending(struct work_struct *work)
        return ret;
 }
 
-static void wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
-{
-       struct wq_barrier barr;
-       struct worker *worker;
-
-       spin_lock_irq(&gcwq->lock);
-
-       worker = find_worker_executing_work(gcwq, work);
-       if (unlikely(worker))
-               insert_wq_barrier(worker->current_cwq, &barr, work, worker);
-
-       spin_unlock_irq(&gcwq->lock);
-
-       if (unlikely(worker)) {
-               wait_for_completion(&barr.done);
-               destroy_work_on_stack(&barr.work);
-       }
-}
-
-static void wait_on_work(struct work_struct *work)
-{
-       int cpu;
-
-       might_sleep();
-
-       lock_map_acquire(&work->lockdep_map);
-       lock_map_release(&work->lockdep_map);
-
-       for_each_gcwq_cpu(cpu)
-               wait_on_cpu_work(get_gcwq(cpu), work);
-}
-
-static int __cancel_work_timer(struct work_struct *work,
+static bool __cancel_work_timer(struct work_struct *work,
                                struct timer_list* timer)
 {
        int ret;
@@ -2472,42 +2519,81 @@ static int __cancel_work_timer(struct work_struct *work,
 }
 
 /**
- * cancel_work_sync - block until a work_struct's callback has terminated
- * @work: the work which is to be flushed
- *
- * Returns true if @work was pending.
+ * cancel_work_sync - cancel a work and wait for it to finish
+ * @work: the work to cancel
  *
- * cancel_work_sync() will cancel the work if it is queued. If the work's
- * callback appears to be running, cancel_work_sync() will block until it
- * has completed.
- *
- * It is possible to use this function if the work re-queues itself. It can
- * cancel the work even if it migrates to another workqueue, however in that
- * case it only guarantees that work->func() has completed on the last queued
- * workqueue.
+ * Cancel @work and wait for its execution to finish.  This function
+ * can be used even if the work re-queues itself or migrates to
+ * another workqueue.  On return from this function, @work is
+ * guaranteed to be not pending or executing on any CPU.
  *
- * cancel_work_sync(&delayed_work->work) should be used only if ->timer is not
- * pending, otherwise it goes into a busy-wait loop until the timer expires.
+ * cancel_work_sync(&delayed_work->work) must not be used for
+ * delayed_work's.  Use cancel_delayed_work_sync() instead.
  *
- * The caller must ensure that workqueue_struct on which this work was last
+ * The caller must ensure that the workqueue on which @work was last
  * queued can't be destroyed before this function returns.
+ *
+ * RETURNS:
+ * %true if @work was pending, %false otherwise.
  */
-int cancel_work_sync(struct work_struct *work)
+bool cancel_work_sync(struct work_struct *work)
 {
        return __cancel_work_timer(work, NULL);
 }
 EXPORT_SYMBOL_GPL(cancel_work_sync);
 
 /**
- * cancel_delayed_work_sync - reliably kill off a delayed work.
- * @dwork: the delayed work struct
+ * flush_delayed_work - wait for a dwork to finish executing the last queueing
+ * @dwork: the delayed work to flush
+ *
+ * Delayed timer is cancelled and the pending work is queued for
+ * immediate execution.  Like flush_work(), this function only
+ * considers the last queueing instance of @dwork.
+ *
+ * RETURNS:
+ * %true if flush_work() waited for the work to finish execution,
+ * %false if it was already idle.
+ */
+bool flush_delayed_work(struct delayed_work *dwork)
+{
+       if (del_timer_sync(&dwork->timer))
+               __queue_work(raw_smp_processor_id(),
+                            get_work_cwq(&dwork->work)->wq, &dwork->work);
+       return flush_work(&dwork->work);
+}
+EXPORT_SYMBOL(flush_delayed_work);
+
+/**
+ * flush_delayed_work_sync - wait for a dwork to finish
+ * @dwork: the delayed work to flush
  *
- * Returns true if @dwork was pending.
+ * Delayed timer is cancelled and the pending work is queued for
+ * execution immediately.  Other than timer handling, its behavior
+ * is identical to flush_work_sync().
  *
- * It is possible to use this function if @dwork rearms itself via queue_work()
- * or queue_delayed_work(). See also the comment for cancel_work_sync().
+ * RETURNS:
+ * %true if flush_work_sync() waited for the work to finish execution,
+ * %false if it was already idle.
  */
-int cancel_delayed_work_sync(struct delayed_work *dwork)
+bool flush_delayed_work_sync(struct delayed_work *dwork)
+{
+       if (del_timer_sync(&dwork->timer))
+               __queue_work(raw_smp_processor_id(),
+                            get_work_cwq(&dwork->work)->wq, &dwork->work);
+       return flush_work_sync(&dwork->work);
+}
+EXPORT_SYMBOL(flush_delayed_work_sync);
+
+/**
+ * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
+ * @dwork: the delayed work cancel
+ *
+ * This is cancel_work_sync() for delayed works.
+ *
+ * RETURNS:
+ * %true if @dwork was pending, %false otherwise.
+ */
+bool cancel_delayed_work_sync(struct delayed_work *dwork)
 {
        return __cancel_work_timer(&dwork->work, &dwork->timer);
 }
@@ -2558,23 +2644,6 @@ int schedule_delayed_work(struct delayed_work *dwork,
 }
 EXPORT_SYMBOL(schedule_delayed_work);
 
-/**
- * flush_delayed_work - block until a dwork_struct's callback has terminated
- * @dwork: the delayed work which is to be flushed
- *
- * Any timeout is cancelled, and any pending work is run immediately.
- */
-void flush_delayed_work(struct delayed_work *dwork)
-{
-       if (del_timer_sync(&dwork->timer)) {
-               __queue_work(get_cpu(), get_work_cwq(&dwork->work)->wq,
-                            &dwork->work);
-               put_cpu();
-       }
-       flush_work(&dwork->work);
-}
-EXPORT_SYMBOL(flush_delayed_work);
-
 /**
  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
  * @cpu: cpu to use
@@ -2592,13 +2661,15 @@ int schedule_delayed_work_on(int cpu,
 EXPORT_SYMBOL(schedule_delayed_work_on);
 
 /**
- * schedule_on_each_cpu - call a function on each online CPU from keventd
+ * schedule_on_each_cpu - execute a function synchronously on each online CPU
  * @func: the function to call
  *
- * Returns zero on success.
- * Returns -ve errno on failure.
- *
+ * schedule_on_each_cpu() executes @func on each online CPU using the
+ * system workqueue and blocks until all CPUs have completed.
  * schedule_on_each_cpu() is very slow.
+ *
+ * RETURNS:
+ * 0 on success, -errno on failure.
  */
 int schedule_on_each_cpu(work_func_t func)
 {
@@ -2763,6 +2834,13 @@ struct workqueue_struct *__alloc_workqueue_key(const char *name,
        struct workqueue_struct *wq;
        unsigned int cpu;
 
+       /*
+        * Workqueues which may be used during memory reclaim should
+        * have a rescuer to guarantee forward progress.
+        */
+       if (flags & WQ_MEM_RECLAIM)
+               flags |= WQ_RESCUER;
+
        /*
         * Unbound workqueues aren't concurrency managed and should be
         * dispatched to workers immediately.
index dd186c1..d4e940a 100644 (file)
@@ -840,7 +840,6 @@ repeat:
        ret = 0;
        if (drain) {
                lru_add_drain_all();
-               flush_scheduled_work();
                cond_resched();
                drain_all_pages();
        }
@@ -862,7 +861,6 @@ repeat:
        }
        /* drain all zone's lru pagevec, this is asyncronous... */
        lru_add_drain_all();
-       flush_scheduled_work();
        yield();
        /* drain pcp pages , this is synchrouns. */
        drain_all_pages();