[PATCH] ocfs2: fix orphan recovery deadlock
authorMark Fasheh <mark.fasheh@oracle.com>
Thu, 23 Feb 2006 01:35:08 +0000 (17:35 -0800)
committerMark Fasheh <mark.fasheh@oracle.com>
Wed, 1 Mar 2006 19:32:41 +0000 (11:32 -0800)
Orphan dir recovery can deadlock with another process in
ocfs2_delete_inode() in some corner cases. Fix this by tracking recovery
state more closely and allowing it to handle inode wipes which might
deadlock.

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/heartbeat.c
fs/ocfs2/inode.c
fs/ocfs2/journal.c
fs/ocfs2/ocfs2.h
fs/ocfs2/super.c

index 0bbd22f..cbfd45a 100644 (file)
@@ -67,6 +67,7 @@ void ocfs2_init_node_maps(struct ocfs2_super *osb)
        ocfs2_node_map_init(&osb->mounted_map);
        ocfs2_node_map_init(&osb->recovery_map);
        ocfs2_node_map_init(&osb->umount_map);
+       ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
 }
 
 static void ocfs2_do_node_down(int node_num,
index 8122489..315472a 100644 (file)
@@ -41,6 +41,7 @@
 #include "dlmglue.h"
 #include "extent_map.h"
 #include "file.h"
+#include "heartbeat.h"
 #include "inode.h"
 #include "journal.h"
 #include "namei.h"
@@ -544,6 +545,42 @@ bail:
        return status;
 }
 
+/* 
+ * Serialize with orphan dir recovery. If the process doing
+ * recovery on this orphan dir does an iget() with the dir
+ * i_mutex held, we'll deadlock here. Instead we detect this
+ * and exit early - recovery will wipe this inode for us.
+ */
+static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb,
+                                            int slot)
+{
+       int ret = 0;
+
+       spin_lock(&osb->osb_lock);
+       if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) {
+               mlog(0, "Recovery is happening on orphan dir %d, will skip "
+                    "this inode\n", slot);
+               ret = -EDEADLK;
+               goto out;
+       }
+       /* This signals to the orphan recovery process that it should
+        * wait for us to handle the wipe. */
+       osb->osb_orphan_wipes[slot]++;
+out:
+       spin_unlock(&osb->osb_lock);
+       return ret;
+}
+
+static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb,
+                                        int slot)
+{
+       spin_lock(&osb->osb_lock);
+       osb->osb_orphan_wipes[slot]--;
+       spin_unlock(&osb->osb_lock);
+
+       wake_up(&osb->osb_wipe_event);
+}
+
 static int ocfs2_wipe_inode(struct inode *inode,
                            struct buffer_head *di_bh)
 {
@@ -555,6 +592,11 @@ static int ocfs2_wipe_inode(struct inode *inode,
        /* We've already voted on this so it should be readonly - no
         * spinlock needed. */
        orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
+
+       status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot);
+       if (status)
+               return status;
+
        orphan_dir_inode = ocfs2_get_system_file_inode(osb,
                                                       ORPHAN_DIR_SYSTEM_INODE,
                                                       orphaned_slot);
@@ -597,6 +639,7 @@ bail_unlock_dir:
        brelse(orphan_dir_bh);
 bail:
        iput(orphan_dir_inode);
+       ocfs2_signal_wipe_completion(osb, orphaned_slot);
 
        return status;
 }
@@ -822,7 +865,8 @@ void ocfs2_delete_inode(struct inode *inode)
 
        status = ocfs2_wipe_inode(inode, di_bh);
        if (status < 0) {
-               mlog_errno(status);
+               if (status != -EDEADLK)
+                       mlog_errno(status);
                goto bail_unlock_inode;
        }
 
index d329c9d..4be801f 100644 (file)
@@ -1408,21 +1408,17 @@ bail:
        return status;
 }
 
-static int ocfs2_recover_orphans(struct ocfs2_super *osb,
-                                int slot)
+static int ocfs2_queue_orphans(struct ocfs2_super *osb,
+                              int slot,
+                              struct inode **head)
 {
-       int status = 0;
-       int have_disk_lock = 0;
-       struct inode *inode = NULL;
-       struct inode *iter;
+       int status;
        struct inode *orphan_dir_inode = NULL;
+       struct inode *iter;
        unsigned long offset, blk, local;
        struct buffer_head *bh = NULL;
        struct ocfs2_dir_entry *de;
        struct super_block *sb = osb->sb;
-       struct ocfs2_inode_info *oi;
-
-       mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
 
        orphan_dir_inode = ocfs2_get_system_file_inode(osb,
                                                       ORPHAN_DIR_SYSTEM_INODE,
@@ -1430,17 +1426,15 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
        if  (!orphan_dir_inode) {
                status = -ENOENT;
                mlog_errno(status);
-               goto out;
-       }
+               return status;
+       }       
 
        mutex_lock(&orphan_dir_inode->i_mutex);
        status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
        if (status < 0) {
-               mutex_unlock(&orphan_dir_inode->i_mutex);
                mlog_errno(status);
                goto out;
        }
-       have_disk_lock = 1;
 
        offset = 0;
        iter = NULL;
@@ -1451,11 +1445,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                if (!bh)
                        status = -EINVAL;
                if (status < 0) {
-                       mutex_unlock(&orphan_dir_inode->i_mutex);
                        if (bh)
                                brelse(bh);
                        mlog_errno(status);
-                       goto out;
+                       goto out_unlock;
                }
 
                local = 0;
@@ -1465,11 +1458,10 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
 
                        if (!ocfs2_check_dir_entry(orphan_dir_inode,
                                                  de, bh, local)) {
-                               mutex_unlock(&orphan_dir_inode->i_mutex);
                                status = -EINVAL;
                                mlog_errno(status);
                                brelse(bh);
-                               goto out;
+                               goto out_unlock;
                        }
 
                        local += le16_to_cpu(de->rec_len);
@@ -1504,18 +1496,95 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
 
                        mlog(0, "queue orphan %"MLFu64"\n",
                             OCFS2_I(iter)->ip_blkno);
-                       OCFS2_I(iter)->ip_next_orphan = inode;
-                       inode = iter;
+                       /* No locking is required for the next_orphan
+                        * queue as there is only ever a single
+                        * process doing orphan recovery. */
+                       OCFS2_I(iter)->ip_next_orphan = *head;
+                       *head = iter;
                }
                brelse(bh);
        }
-       mutex_unlock(&orphan_dir_inode->i_mutex);
 
+out_unlock:
        ocfs2_meta_unlock(orphan_dir_inode, 0);
-       have_disk_lock = 0;
-
+out:
+       mutex_unlock(&orphan_dir_inode->i_mutex);
        iput(orphan_dir_inode);
-       orphan_dir_inode = NULL;
+       return status;
+}
+
+static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
+                                             int slot)
+{
+       int ret;
+
+       spin_lock(&osb->osb_lock);
+       ret = !osb->osb_orphan_wipes[slot];
+       spin_unlock(&osb->osb_lock);
+       return ret;
+}
+
+static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
+                                            int slot)
+{
+       spin_lock(&osb->osb_lock);
+       /* Mark ourselves such that new processes in delete_inode()
+        * know to quit early. */
+       ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+       while (osb->osb_orphan_wipes[slot]) {
+               /* If any processes are already in the middle of an
+                * orphan wipe on this dir, then we need to wait for
+                * them. */
+               spin_unlock(&osb->osb_lock);
+               wait_event_interruptible(osb->osb_wipe_event,
+                                        ocfs2_orphan_recovery_can_continue(osb, slot));
+               spin_lock(&osb->osb_lock);
+       }
+       spin_unlock(&osb->osb_lock);
+}
+
+static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
+                                             int slot)
+{
+       ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+}
+
+/*
+ * Orphan recovery. Each mounted node has it's own orphan dir which we
+ * must run during recovery. Our strategy here is to build a list of
+ * the inodes in the orphan dir and iget/iput them. The VFS does
+ * (most) of the rest of the work.
+ *
+ * Orphan recovery can happen at any time, not just mount so we have a
+ * couple of extra considerations.
+ *
+ * - We grab as many inodes as we can under the orphan dir lock -
+ *   doing iget() outside the orphan dir risks getting a reference on
+ *   an invalid inode.
+ * - We must be sure not to deadlock with other processes on the
+ *   system wanting to run delete_inode(). This can happen when they go
+ *   to lock the orphan dir and the orphan recovery process attempts to
+ *   iget() inside the orphan dir lock. This can be avoided by
+ *   advertising our state to ocfs2_delete_inode().
+ */
+static int ocfs2_recover_orphans(struct ocfs2_super *osb,
+                                int slot)
+{
+       int ret = 0;
+       struct inode *inode = NULL;
+       struct inode *iter;
+       struct ocfs2_inode_info *oi;
+
+       mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
+
+       ocfs2_mark_recovering_orphan_dir(osb, slot);
+       ret = ocfs2_queue_orphans(osb, slot, &inode);
+       ocfs2_clear_recovering_orphan_dir(osb, slot);
+
+       /* Error here should be noted, but we want to continue with as
+        * many queued inodes as we've got. */
+       if (ret)
+               mlog_errno(ret);
 
        while (inode) {
                oi = OCFS2_I(inode);
@@ -1541,14 +1610,7 @@ static int ocfs2_recover_orphans(struct ocfs2_super *osb,
                inode = iter;
        }
 
-out:
-       if (have_disk_lock)
-               ocfs2_meta_unlock(orphan_dir_inode, 0);
-
-       if (orphan_dir_inode)
-               iput(orphan_dir_inode);
-
-       return status;
+       return ret;
 }
 
 static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
index 19360e3..e89de9b 100644 (file)
@@ -287,6 +287,10 @@ struct ocfs2_super
        struct inode                    *osb_tl_inode;
        struct buffer_head              *osb_tl_bh;
        struct work_struct              osb_truncate_log_wq;
+
+       struct ocfs2_node_map           osb_recovering_orphan_dirs;
+       unsigned int                    *osb_orphan_wipes;
+       wait_queue_head_t               osb_wipe_event;
 };
 
 #define OCFS2_SB(sb)       ((struct ocfs2_super *)(sb)->s_fs_info)
index 046824b..8dd3aaf 100644 (file)
@@ -1325,6 +1325,16 @@ static int ocfs2_initialize_super(struct super_block *sb,
        }
        mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots);
 
+       init_waitqueue_head(&osb->osb_wipe_event);
+       osb->osb_orphan_wipes = kcalloc(osb->max_slots,
+                                       sizeof(*osb->osb_orphan_wipes),
+                                       GFP_KERNEL);
+       if (!osb->osb_orphan_wipes) {
+               status = -ENOMEM;
+               mlog_errno(status);
+               goto bail;
+       }
+
        osb->s_feature_compat =
                le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat);
        osb->s_feature_ro_compat =
@@ -1638,6 +1648,7 @@ static void ocfs2_delete_osb(struct ocfs2_super *osb)
        if (osb->slot_info)
                ocfs2_free_slot_info(osb->slot_info);
 
+       kfree(osb->osb_orphan_wipes);
        /* FIXME
         * This belongs in journal shutdown, but because we have to
         * allocate osb->journal at the start of ocfs2_initalize_osb(),