Linux 3.2.87
[pandora-kernel.git] / fs / ocfs2 / dlm / dlmrecovery.c
index 41e74f0..171db3f 100644 (file)
@@ -362,40 +362,38 @@ static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node)
 }
 
 
-int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
+void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
 {
-       if (timeout) {
-               mlog(ML_NOTICE, "%s: waiting %dms for notification of "
-                    "death of node %u\n", dlm->name, timeout, node);
+       if (dlm_is_node_dead(dlm, node))
+               return;
+
+       printk(KERN_NOTICE "o2dlm: Waiting on the death of node %u in "
+              "domain %s\n", node, dlm->name);
+
+       if (timeout)
                wait_event_timeout(dlm->dlm_reco_thread_wq,
-                          dlm_is_node_dead(dlm, node),
-                          msecs_to_jiffies(timeout));
-       } else {
-               mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
-                    "of death of node %u\n", dlm->name, node);
+                                  dlm_is_node_dead(dlm, node),
+                                  msecs_to_jiffies(timeout));
+       else
                wait_event(dlm->dlm_reco_thread_wq,
                           dlm_is_node_dead(dlm, node));
-       }
-       /* for now, return 0 */
-       return 0;
 }
 
-int dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
+void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout)
 {
-       if (timeout) {
-               mlog(0, "%s: waiting %dms for notification of "
-                    "recovery of node %u\n", dlm->name, timeout, node);
+       if (dlm_is_node_recovered(dlm, node))
+               return;
+
+       printk(KERN_NOTICE "o2dlm: Waiting on the recovery of node %u in "
+              "domain %s\n", node, dlm->name);
+
+       if (timeout)
                wait_event_timeout(dlm->dlm_reco_thread_wq,
-                          dlm_is_node_recovered(dlm, node),
-                          msecs_to_jiffies(timeout));
-       } else {
-               mlog(0, "%s: waiting indefinitely for notification "
-                    "of recovery of node %u\n", dlm->name, node);
+                                  dlm_is_node_recovered(dlm, node),
+                                  msecs_to_jiffies(timeout));
+       else
                wait_event(dlm->dlm_reco_thread_wq,
                           dlm_is_node_recovered(dlm, node));
-       }
-       /* for now, return 0 */
-       return 0;
 }
 
 /* callers of the top-level api calls (dlmlock/dlmunlock) should
@@ -542,7 +540,10 @@ master_here:
                /* success!  see if any other nodes need recovery */
                mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",
                     dlm->name, dlm->reco.dead_node, dlm->node_num);
-               dlm_reset_recovery(dlm);
+               spin_lock(&dlm->spinlock);
+               __dlm_reset_recovery(dlm);
+               dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+               spin_unlock(&dlm->spinlock);
        }
        dlm_end_recovery(dlm);
 
@@ -700,6 +701,14 @@ static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node)
                if (all_nodes_done) {
                        int ret;
 
+                       /* Set this flag on recovery master to avoid
+                        * a new recovery for another dead node start
+                        * before the recovery is not done. That may
+                        * cause recovery hung.*/
+                       spin_lock(&dlm->spinlock);
+                       dlm->reco.state |= DLM_RECO_STATE_FINALIZE;
+                       spin_unlock(&dlm->spinlock);
+
                        /* all nodes are now in DLM_RECO_NODE_DATA_DONE state
                         * just send a finalize message to everyone and
                         * clean up */
@@ -1680,6 +1689,7 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
        unsigned int hash;
        int master = DLM_LOCK_RES_OWNER_UNKNOWN;
        u32 flags = DLM_ASSERT_MASTER_REQUERY;
+       int dispatched = 0;
 
        if (!dlm_grab(dlm)) {
                /* since the domain has gone away on this
@@ -1701,6 +1711,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
                                mlog_errno(-ENOMEM);
                                /* retry!? */
                                BUG();
+                       } else {
+                               dispatched = 1;
                        }
                } else /* put.. incase we are not the master */
                        dlm_lockres_put(res);
@@ -1708,7 +1720,8 @@ int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data,
        }
        spin_unlock(&dlm->spinlock);
 
-       dlm_put(dlm);
+       if (!dispatched)
+               dlm_put(dlm);
        return master;
 }
 
@@ -1754,13 +1767,13 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                                     struct dlm_migratable_lockres *mres)
 {
        struct dlm_migratable_lock *ml;
-       struct list_head *queue;
+       struct list_head *queue, *iter;
        struct list_head *tmpq = NULL;
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
        int i, j, bad;
-       struct dlm_lock *lock = NULL;
+       struct dlm_lock *lock;
        u8 from = O2NM_MAX_NODES;
        unsigned int added = 0;
        __be64 c;
@@ -1776,7 +1789,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                             dlm->name, mres->lockname_len, mres->lockname,
                             from);
                        spin_lock(&res->spinlock);
-                       dlm_lockres_set_refmap_bit(from, res);
+                       dlm_lockres_set_refmap_bit(dlm, res, from);
                        spin_unlock(&res->spinlock);
                        added++;
                        break;
@@ -1795,14 +1808,16 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                        /* MIGRATION ONLY! */
                        BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));
 
+                       lock = NULL;
                        spin_lock(&res->spinlock);
                        for (j = DLM_GRANTED_LIST; j <= DLM_BLOCKED_LIST; j++) {
                                tmpq = dlm_list_idx_to_ptr(res, j);
-                               list_for_each_entry(lock, tmpq, list) {
-                                       if (lock->ml.cookie != ml->cookie)
-                                               lock = NULL;
-                                       else
+                               list_for_each(iter, tmpq) {
+                                       lock = list_entry(iter,
+                                                 struct dlm_lock, list);
+                                       if (lock->ml.cookie == ml->cookie)
                                                break;
+                                       lock = NULL;
                                }
                                if (lock)
                                        break;
@@ -1974,7 +1989,7 @@ skip_lvb:
                        mlog(0, "%s:%.*s: added lock for node %u, "
                             "setting refmap bit\n", dlm->name,
                             res->lockname.len, res->lockname.name, ml->node);
-                       dlm_lockres_set_refmap_bit(ml->node, res);
+                       dlm_lockres_set_refmap_bit(dlm, res, ml->node);
                        added++;
                }
                spin_unlock(&res->spinlock);
@@ -2024,7 +2039,6 @@ void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,
                        dlm_lock_get(lock);
                        if (lock->convert_pending) {
                                /* move converting lock back to granted */
-                               BUG_ON(i != DLM_CONVERTING_LIST);
                                mlog(0, "node died with convert pending "
                                     "on %.*s. move back to granted list.\n",
                                     res->lockname.len, res->lockname.name);
@@ -2254,12 +2268,12 @@ static void dlm_free_dead_locks(struct dlm_ctxt *dlm,
                             res->lockname.len, res->lockname.name, freed, dead_node);
                        __dlm_print_one_lock_resource(res);
                }
-               dlm_lockres_clear_refmap_bit(dead_node, res);
+               dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
        } else if (test_bit(dead_node, res->refmap)) {
                mlog(0, "%s:%.*s: dead node %u had a ref, but had "
                     "no locks and had not purged before dying\n", dlm->name,
                     res->lockname.len, res->lockname.name, dead_node);
-               dlm_lockres_clear_refmap_bit(dead_node, res);
+               dlm_lockres_clear_refmap_bit(dlm, res, dead_node);
        }
 
        /* do not kick thread yet */
@@ -2318,6 +2332,8 @@ static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
                                                break;
                                        }
                                }
+                               dlm_lockres_clear_refmap_bit(dlm, res,
+                                               dead_node);
                                spin_unlock(&res->spinlock);
                                continue;
                        }
@@ -2872,8 +2888,8 @@ int dlm_finalize_reco_handler(struct o2net_msg *msg, u32 len, void *data,
                                BUG();
                        }
                        dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE;
+                       __dlm_reset_recovery(dlm);
                        spin_unlock(&dlm->spinlock);
-                       dlm_reset_recovery(dlm);
                        dlm_kick_recovery_thread(dlm);
                        break;
                default: