ocfs2: Periodic quota syncing
authorMark Fasheh <mfasheh@suse.com>
Mon, 20 Oct 2008 13:36:47 +0000 (15:36 +0200)
committerMark Fasheh <mfasheh@suse.com>
Mon, 5 Jan 2009 16:40:24 +0000 (08:40 -0800)
This patch creates a work queue for periodic syncing of locally cached quota
information to the global quota files. We constantly queue a delayed work
item, to get the periodic behavior.

Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Acked-by: Jan Kara <jack@suse.cz>
fs/ocfs2/quota.h
fs/ocfs2/quota_global.c
fs/ocfs2/quota_local.c
fs/ocfs2/super.c

index 1f1c863..e2233d5 100644 (file)
@@ -39,6 +39,7 @@ struct ocfs2_mem_dqinfo {
        unsigned int dqi_chunks;        /* Number of chunks in local quota file */
        unsigned int dqi_blocks;        /* Number of blocks allocated for local quota file */
        unsigned int dqi_syncms;        /* How often should we sync with other nodes */
+       unsigned int dqi_syncjiff;      /* Precomputed dqi_syncms in jiffies */
        struct list_head dqi_chunk;     /* List of chunks */
        struct inode *dqi_gqinode;      /* Global quota file inode */
        struct ocfs2_lock_res dqi_gqlock;       /* Lock protecting quota information structure */
@@ -47,6 +48,7 @@ struct ocfs2_mem_dqinfo {
        struct buffer_head *dqi_lqi_bh; /* Buffer head with local quota file inode */
        struct buffer_head *dqi_ibh;    /* Buffer with information header */
        struct qtree_mem_dqinfo dqi_gi; /* Info about global file */
+       struct delayed_work dqi_sync_work;      /* Work for syncing dquots */
 };
 
 static inline struct ocfs2_dquot *OCFS2_DQUOT(struct dquot *dquot)
@@ -90,4 +92,7 @@ struct buffer_head *ocfs2_read_quota_block(struct inode *inode,
 extern struct dquot_operations ocfs2_quota_operations;
 extern struct quota_format_type ocfs2_quota_format;
 
+int ocfs2_quota_setup(void);
+void ocfs2_quota_shutdown(void);
+
 #endif /* _OCFS2_QUOTA_H */
index af8340c..adf5350 100644 (file)
@@ -1,10 +1,14 @@
 /*
  *  Implementation of operations over global quota file
  */
+#include <linux/spinlock.h>
 #include <linux/fs.h>
 #include <linux/quota.h>
 #include <linux/quotaops.h>
 #include <linux/dqblk_qtree.h>
+#include <linux/jiffies.h>
+#include <linux/writeback.h>
+#include <linux/workqueue.h>
 
 #define MLOG_MASK_PREFIX ML_QUOTA
 #include <cluster/masklog.h>
 #include "uptodate.h"
 #include "quota.h"
 
+static struct workqueue_struct *ocfs2_quota_wq = NULL;
+
+static void qsync_work_fn(struct work_struct *work);
+
 static void ocfs2_global_disk2memdqb(struct dquot *dquot, void *dp)
 {
        struct ocfs2_global_disk_dqblk *d = dp;
@@ -313,6 +321,7 @@ int ocfs2_global_read_info(struct super_block *sb, int type)
        info->dqi_bgrace = le32_to_cpu(dinfo.dqi_bgrace);
        info->dqi_igrace = le32_to_cpu(dinfo.dqi_igrace);
        oinfo->dqi_syncms = le32_to_cpu(dinfo.dqi_syncms);
+       oinfo->dqi_syncjiff = msecs_to_jiffies(oinfo->dqi_syncms);
        oinfo->dqi_gi.dqi_blocks = le32_to_cpu(dinfo.dqi_blocks);
        oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(dinfo.dqi_free_blk);
        oinfo->dqi_gi.dqi_free_entry = le32_to_cpu(dinfo.dqi_free_entry);
@@ -320,6 +329,10 @@ int ocfs2_global_read_info(struct super_block *sb, int type)
        oinfo->dqi_gi.dqi_usable_bs = sb->s_blocksize -
                                                OCFS2_QBLK_RESERVED_SPACE;
        oinfo->dqi_gi.dqi_qtree_depth = qtree_depth(&oinfo->dqi_gi);
+       INIT_DELAYED_WORK(&oinfo->dqi_sync_work, qsync_work_fn);
+       queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
+                          oinfo->dqi_syncjiff);
+
 out_err:
        mlog_exit(status);
        return status;
@@ -519,6 +532,61 @@ out:
        return err;
 }
 
+/*
+ *  Functions for periodic syncing of dquots with global file
+ */
+static int ocfs2_sync_dquot_helper(struct dquot *dquot, unsigned long type)
+{
+       handle_t *handle;
+       struct super_block *sb = dquot->dq_sb;
+       struct ocfs2_mem_dqinfo *oinfo = sb_dqinfo(sb, type)->dqi_priv;
+       struct ocfs2_super *osb = OCFS2_SB(sb);
+       int status = 0;
+
+       mlog_entry("id=%u qtype=%u type=%lu device=%s\n", dquot->dq_id,
+                  dquot->dq_type, type, sb->s_id);
+       if (type != dquot->dq_type)
+               goto out;
+       status = ocfs2_lock_global_qf(oinfo, 1);
+       if (status < 0)
+               goto out;
+
+       handle = ocfs2_start_trans(osb, OCFS2_QSYNC_CREDITS);
+       if (IS_ERR(handle)) {
+               status = PTR_ERR(handle);
+               mlog_errno(status);
+               goto out_ilock;
+       }
+       mutex_lock(&sb_dqopt(sb)->dqio_mutex);
+       status = ocfs2_sync_dquot(dquot);
+       mutex_unlock(&sb_dqopt(sb)->dqio_mutex);
+       if (status < 0)
+               mlog_errno(status);
+       /* We have to write local structure as well... */
+       dquot_mark_dquot_dirty(dquot);
+       status = dquot_commit(dquot);
+       if (status < 0)
+               mlog_errno(status);
+       ocfs2_commit_trans(osb, handle);
+out_ilock:
+       ocfs2_unlock_global_qf(oinfo, 1);
+out:
+       mlog_exit(status);
+       return status;
+}
+
+static void qsync_work_fn(struct work_struct *work)
+{
+       struct ocfs2_mem_dqinfo *oinfo = container_of(work,
+                                                     struct ocfs2_mem_dqinfo,
+                                                     dqi_sync_work.work);
+       struct super_block *sb = oinfo->dqi_gqinode->i_sb;
+
+       dquot_scan_active(sb, ocfs2_sync_dquot_helper, oinfo->dqi_type);
+       queue_delayed_work(ocfs2_quota_wq, &oinfo->dqi_sync_work,
+                          oinfo->dqi_syncjiff);
+}
+
 /*
  *  Wrappers for generic quota functions
  */
@@ -917,3 +985,20 @@ struct dquot_operations ocfs2_quota_operations = {
        .alloc_dquot    = ocfs2_alloc_dquot,
        .destroy_dquot  = ocfs2_destroy_dquot,
 };
+
+int ocfs2_quota_setup(void)
+{
+       ocfs2_quota_wq = create_workqueue("o2quot");
+       if (!ocfs2_quota_wq)
+               return -ENOMEM;
+       return 0;
+}
+
+void ocfs2_quota_shutdown(void)
+{
+       if (ocfs2_quota_wq) {
+               flush_workqueue(ocfs2_quota_wq);
+               destroy_workqueue(ocfs2_quota_wq);
+               ocfs2_quota_wq = NULL;
+       }
+}
index 55c3f2f..40e82b4 100644 (file)
@@ -368,6 +368,10 @@ static int ocfs2_local_free_info(struct super_block *sb, int type)
        int mark_clean = 1, len;
        int status;
 
+       /* At this point we know there are no more dquots and thus
+        * even if there's some sync in the pdflush queue, it won't
+        * find any dquots and return without doing anything */
+       cancel_delayed_work_sync(&oinfo->dqi_sync_work);
        iput(oinfo->dqi_gqinode);
        ocfs2_simple_drop_lockres(OCFS2_SB(sb), &oinfo->dqi_gqlock);
        ocfs2_lock_res_free(&oinfo->dqi_gqlock);
index 7bb83e4..60f1d29 100644 (file)
@@ -1107,11 +1107,16 @@ static int __init ocfs2_init(void)
                mlog(ML_ERROR, "Unable to create ocfs2 debugfs root.\n");
        }
 
+       status = ocfs2_quota_setup();
+       if (status)
+               goto leave;
+
        ocfs2_set_locking_protocol();
 
        status = register_quota_format(&ocfs2_quota_format);
 leave:
        if (status < 0) {
+               ocfs2_quota_shutdown();
                ocfs2_free_mem_caches();
                exit_ocfs2_uptodate_cache();
        }
@@ -1128,6 +1133,8 @@ static void __exit ocfs2_exit(void)
 {
        mlog_entry_void();
 
+       ocfs2_quota_shutdown();
+
        if (ocfs2_wq) {
                flush_workqueue(ocfs2_wq);
                destroy_workqueue(ocfs2_wq);