Merge branch 'upstream-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/jlbec...
[pandora-kernel.git] / fs / ocfs2 / dlm / dlmmaster.c
index 11eefb8..005261c 100644 (file)
@@ -631,39 +631,54 @@ error:
        return NULL;
 }
 
-void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
-                                  struct dlm_lock_resource *res,
-                                  int new_lockres,
-                                  const char *file,
-                                  int line)
+void dlm_lockres_set_refmap_bit(struct dlm_ctxt *dlm,
+                               struct dlm_lock_resource *res, int bit)
 {
-       if (!new_lockres)
-               assert_spin_locked(&res->spinlock);
+       assert_spin_locked(&res->spinlock);
+
+       mlog(0, "res %.*s, set node %u, %ps()\n", res->lockname.len,
+            res->lockname.name, bit, __builtin_return_address(0));
+
+       set_bit(bit, res->refmap);
+}
+
+void dlm_lockres_clear_refmap_bit(struct dlm_ctxt *dlm,
+                                 struct dlm_lock_resource *res, int bit)
+{
+       assert_spin_locked(&res->spinlock);
+
+       mlog(0, "res %.*s, clr node %u, %ps()\n", res->lockname.len,
+            res->lockname.name, bit, __builtin_return_address(0));
+
+       clear_bit(bit, res->refmap);
+}
+
+
+void dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res)
+{
+       assert_spin_locked(&res->spinlock);
 
-       if (!test_bit(dlm->node_num, res->refmap)) {
-               BUG_ON(res->inflight_locks != 0);
-               dlm_lockres_set_refmap_bit(dlm->node_num, res);
-       }
        res->inflight_locks++;
-       mlog(0, "%s:%.*s: inflight++: now %u\n",
-            dlm->name, res->lockname.len, res->lockname.name,
-            res->inflight_locks);
+
+       mlog(0, "%s: res %.*s, inflight++: now %u, %ps()\n", dlm->name,
+            res->lockname.len, res->lockname.name, res->inflight_locks,
+            __builtin_return_address(0));
 }
 
-void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
-                                  struct dlm_lock_resource *res,
-                                  const char *file,
-                                  int line)
+void dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res)
 {
        assert_spin_locked(&res->spinlock);
 
        BUG_ON(res->inflight_locks == 0);
+
        res->inflight_locks--;
-       mlog(0, "%s:%.*s: inflight--: now %u\n",
-            dlm->name, res->lockname.len, res->lockname.name,
-            res->inflight_locks);
-       if (res->inflight_locks == 0)
-               dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+
+       mlog(0, "%s: res %.*s, inflight--: now %u, %ps()\n", dlm->name,
+            res->lockname.len, res->lockname.name, res->inflight_locks,
+            __builtin_return_address(0));
+
        wake_up(&res->wq);
 }
 
@@ -697,7 +712,6 @@ struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
        unsigned int hash;
        int tries = 0;
        int bit, wait_on_recovery = 0;
-       int drop_inflight_if_nonlocal = 0;
 
        BUG_ON(!lockid);
 
@@ -709,36 +723,33 @@ lookup:
        spin_lock(&dlm->spinlock);
        tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
        if (tmpres) {
-               int dropping_ref = 0;
-
                spin_unlock(&dlm->spinlock);
-
                spin_lock(&tmpres->spinlock);
-               /* We wait for the other thread that is mastering the resource */
+               /* Wait on the thread that is mastering the resource */
                if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
                        __dlm_wait_on_lockres(tmpres);
                        BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
+                       spin_unlock(&tmpres->spinlock);
+                       dlm_lockres_put(tmpres);
+                       tmpres = NULL;
+                       goto lookup;
                }
 
-               if (tmpres->owner == dlm->node_num) {
-                       BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
-                       dlm_lockres_grab_inflight_ref(dlm, tmpres);
-               } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
-                       dropping_ref = 1;
-               spin_unlock(&tmpres->spinlock);
-
-               /* wait until done messaging the master, drop our ref to allow
-                * the lockres to be purged, start over. */
-               if (dropping_ref) {
-                       spin_lock(&tmpres->spinlock);
-                       __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+               /* Wait on the resource purge to complete before continuing */
+               if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) {
+                       BUG_ON(tmpres->owner == dlm->node_num);
+                       __dlm_wait_on_lockres_flags(tmpres,
+                                                   DLM_LOCK_RES_DROPPING_REF);
                        spin_unlock(&tmpres->spinlock);
                        dlm_lockres_put(tmpres);
                        tmpres = NULL;
                        goto lookup;
                }
 
-               mlog(0, "found in hash!\n");
+               /* Grab inflight ref to pin the resource */
+               dlm_lockres_grab_inflight_ref(dlm, tmpres);
+
+               spin_unlock(&tmpres->spinlock);
                if (res)
                        dlm_lockres_put(res);
                res = tmpres;
@@ -829,8 +840,8 @@ lookup:
                 * but they might own this lockres.  wait on them. */
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
                if (bit < O2NM_MAX_NODES) {
-                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
-                            "recover before lock mastery can begin\n",
+                       mlog(0, "%s: res %.*s, At least one node (%d) "
+                            "to recover before lock mastery can begin\n",
                             dlm->name, namelen, (char *)lockid, bit);
                        wait_on_recovery = 1;
                }
@@ -843,12 +854,11 @@ lookup:
 
        /* finally add the lockres to its hash bucket */
        __dlm_insert_lockres(dlm, res);
-       /* since this lockres is new it doesn't not require the spinlock */
-       dlm_lockres_grab_inflight_ref_new(dlm, res);
 
-       /* if this node does not become the master make sure to drop
-        * this inflight reference below */
-       drop_inflight_if_nonlocal = 1;
+       /* Grab inflight ref to pin the resource */
+       spin_lock(&res->spinlock);
+       dlm_lockres_grab_inflight_ref(dlm, res);
+       spin_unlock(&res->spinlock);
 
        /* get an extra ref on the mle in case this is a BLOCK
         * if so, the creator of the BLOCK may try to put the last
@@ -864,8 +874,8 @@ redo_request:
                 * dlm spinlock would be detectable be a change on the mle,
                 * so we only need to clear out the recovery map once. */
                if (dlm_is_recovery_lock(lockid, namelen)) {
-                       mlog(ML_NOTICE, "%s: recovery map is not empty, but "
-                            "must master $RECOVERY lock now\n", dlm->name);
+                       mlog(0, "%s: Recovery map is not empty, but must "
+                            "master $RECOVERY lock now\n", dlm->name);
                        if (!dlm_pre_master_reco_lockres(dlm, res))
                                wait_on_recovery = 0;
                        else {
@@ -883,8 +893,8 @@ redo_request:
                spin_lock(&dlm->spinlock);
                bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
                if (bit < O2NM_MAX_NODES) {
-                       mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
-                            "recover before lock mastery can begin\n",
+                       mlog(0, "%s: res %.*s, At least one node (%d) "
+                            "to recover before lock mastery can begin\n",
                             dlm->name, namelen, (char *)lockid, bit);
                        wait_on_recovery = 1;
                } else
@@ -913,8 +923,8 @@ redo_request:
                         * yet, keep going until it does.  this is how the
                         * master will know that asserts are needed back to
                         * the lower nodes. */
-                       mlog(0, "%s:%.*s: requests only up to %u but master "
-                            "is %u, keep going\n", dlm->name, namelen,
+                       mlog(0, "%s: res %.*s, Requests only up to %u but "
+                            "master is %u, keep going\n", dlm->name, namelen,
                             lockid, nodenum, mle->master);
                }
        }
@@ -924,13 +934,12 @@ wait:
        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
        if (ret < 0) {
                wait_on_recovery = 1;
-               mlog(0, "%s:%.*s: node map changed, redo the "
-                    "master request now, blocked=%d\n",
-                    dlm->name, res->lockname.len,
+               mlog(0, "%s: res %.*s, Node map changed, redo the master "
+                    "request now, blocked=%d\n", dlm->name, res->lockname.len,
                     res->lockname.name, blocked);
                if (++tries > 20) {
-                       mlog(ML_ERROR, "%s:%.*s: spinning on "
-                            "dlm_wait_for_lock_mastery, blocked=%d\n",
+                       mlog(ML_ERROR, "%s: res %.*s, Spinning on "
+                            "dlm_wait_for_lock_mastery, blocked = %d\n",
                             dlm->name, res->lockname.len,
                             res->lockname.name, blocked);
                        dlm_print_one_lock_resource(res);
@@ -940,7 +949,8 @@ wait:
                goto redo_request;
        }
 
-       mlog(0, "lockres mastered by %u\n", res->owner);
+       mlog(0, "%s: res %.*s, Mastered by %u\n", dlm->name, res->lockname.len,
+            res->lockname.name, res->owner);
        /* make sure we never continue without this */
        BUG_ON(res->owner == O2NM_MAX_NODES);
 
@@ -952,8 +962,6 @@ wait:
 
 wake_waiters:
        spin_lock(&res->spinlock);
-       if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
-               dlm_lockres_drop_inflight_ref(dlm, res);
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
        spin_unlock(&res->spinlock);
        wake_up(&res->wq);
@@ -1426,9 +1434,7 @@ way_up_top:
                }
 
                if (res->owner == dlm->node_num) {
-                       mlog(0, "%s:%.*s: setting bit %u in refmap\n",
-                            dlm->name, namelen, name, request->node_idx);
-                       dlm_lockres_set_refmap_bit(request->node_idx, res);
+                       dlm_lockres_set_refmap_bit(dlm, res, request->node_idx);
                        spin_unlock(&res->spinlock);
                        response = DLM_MASTER_RESP_YES;
                        if (mle)
@@ -1493,10 +1499,8 @@ way_up_top:
                                 * go back and clean the mles on any
                                 * other nodes */
                                dispatch_assert = 1;
-                               dlm_lockres_set_refmap_bit(request->node_idx, res);
-                               mlog(0, "%s:%.*s: setting bit %u in refmap\n",
-                                    dlm->name, namelen, name,
-                                    request->node_idx);
+                               dlm_lockres_set_refmap_bit(dlm, res,
+                                                          request->node_idx);
                        } else
                                response = DLM_MASTER_RESP_NO;
                } else {
@@ -1702,7 +1706,7 @@ again:
                             "lockres, set the bit in the refmap\n",
                             namelen, lockname, to);
                        spin_lock(&res->spinlock);
-                       dlm_lockres_set_refmap_bit(to, res);
+                       dlm_lockres_set_refmap_bit(dlm, res, to);
                        spin_unlock(&res->spinlock);
                }
        }
@@ -2187,8 +2191,6 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
        namelen = res->lockname.len;
        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
 
-       mlog(0, "%s:%.*s: sending deref to %d\n",
-            dlm->name, namelen, lockname, res->owner);
        memset(&deref, 0, sizeof(deref));
        deref.node_idx = dlm->node_num;
        deref.namelen = namelen;
@@ -2197,14 +2199,12 @@ int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
        ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
                                 &deref, sizeof(deref), res->owner, &r);
        if (ret < 0)
-               mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
-                    "node %u\n", ret, DLM_DEREF_LOCKRES_MSG, dlm->key,
-                    res->owner);
+               mlog(ML_ERROR, "%s: res %.*s, error %d send DEREF to node %u\n",
+                    dlm->name, namelen, lockname, ret, res->owner);
        else if (r < 0) {
                /* BAD.  other node says I did not have a ref. */
-               mlog(ML_ERROR,"while dropping ref on %s:%.*s "
-                   "(master=%u) got %d.\n", dlm->name, namelen,
-                   lockname, res->owner, r);
+               mlog(ML_ERROR, "%s: res %.*s, DEREF to node %u got %d\n",
+                    dlm->name, namelen, lockname, res->owner, r);
                dlm_print_one_lock_resource(res);
                BUG();
        }
@@ -2260,7 +2260,7 @@ int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
        else {
                BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
                if (test_bit(node, res->refmap)) {
-                       dlm_lockres_clear_refmap_bit(node, res);
+                       dlm_lockres_clear_refmap_bit(dlm, res, node);
                        cleared = 1;
                }
        }
@@ -2320,7 +2320,7 @@ static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
        BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
        if (test_bit(node, res->refmap)) {
                __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
-               dlm_lockres_clear_refmap_bit(node, res);
+               dlm_lockres_clear_refmap_bit(dlm, res, node);
                cleared = 1;
        }
        spin_unlock(&res->spinlock);
@@ -2802,7 +2802,8 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
                                BUG_ON(!list_empty(&lock->bast_list));
                                BUG_ON(lock->ast_pending);
                                BUG_ON(lock->bast_pending);
-                               dlm_lockres_clear_refmap_bit(lock->ml.node, res);
+                               dlm_lockres_clear_refmap_bit(dlm, res,
+                                                            lock->ml.node);
                                list_del_init(&lock->list);
                                dlm_lock_put(lock);
                                /* In a normal unlock, we would have added a
@@ -2823,7 +2824,7 @@ static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
                        mlog(0, "%s:%.*s: node %u had a ref to this "
                             "migrating lockres, clearing\n", dlm->name,
                             res->lockname.len, res->lockname.name, bit);
-                       dlm_lockres_clear_refmap_bit(bit, res);
+                       dlm_lockres_clear_refmap_bit(dlm, res, bit);
                }
                bit++;
        }
@@ -2916,9 +2917,9 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
                                         &migrate, sizeof(migrate), nodenum,
                                         &status);
                if (ret < 0) {
-                       mlog(ML_ERROR, "Error %d when sending message %u (key "
-                            "0x%x) to node %u\n", ret, DLM_MIGRATE_REQUEST_MSG,
-                            dlm->key, nodenum);
+                       mlog(ML_ERROR, "%s: res %.*s, Error %d send "
+                            "MIGRATE_REQUEST to node %u\n", dlm->name,
+                            migrate.namelen, migrate.name, ret, nodenum);
                        if (!dlm_is_host_down(ret)) {
                                mlog(ML_ERROR, "unhandled error=%d!\n", ret);
                                BUG();
@@ -2937,7 +2938,7 @@ static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
                             dlm->name, res->lockname.len, res->lockname.name,
                             nodenum);
                        spin_lock(&res->spinlock);
-                       dlm_lockres_set_refmap_bit(nodenum, res);
+                       dlm_lockres_set_refmap_bit(dlm, res, nodenum);
                        spin_unlock(&res->spinlock);
                }
        }
@@ -3271,7 +3272,7 @@ int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
         * mastery reference here since old_master will briefly have
         * a reference after the migration completes */
        spin_lock(&res->spinlock);
-       dlm_lockres_set_refmap_bit(old_master, res);
+       dlm_lockres_set_refmap_bit(dlm, res, old_master);
        spin_unlock(&res->spinlock);
 
        mlog(0, "now time to do a migrate request to other nodes\n");