int idx);
static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
- unsigned int namelen, void *nodemap,
- u32 flags);
+static int dlm_do_assert_master(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
struct dlm_master_list_entry **mle,
char *name, unsigned int namelen);
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle, int to);
static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
INIT_LIST_HEAD(&res->purge);
atomic_set(&res->asts_reserved, 0);
res->migration_pending = 0;
+ res->inflight_locks = 0;
kref_init(&res->refs);
res->last_used = 0;
memset(res->lvb, 0, DLM_LVB_LEN);
+ memset(res->refmap, 0, sizeof(res->refmap));
}
struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
return res;
}
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ int new_lockres,
+ const char *file,
+ int line)
+{
+ if (!new_lockres)
+ assert_spin_locked(&res->spinlock);
+
+ if (!test_bit(dlm->node_num, res->refmap)) {
+ BUG_ON(res->inflight_locks != 0);
+ dlm_lockres_set_refmap_bit(dlm->node_num, res);
+ }
+ res->inflight_locks++;
+ mlog(0, "%s:%.*s: inflight++: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_locks);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ const char *file,
+ int line)
+{
+ assert_spin_locked(&res->spinlock);
+
+ BUG_ON(res->inflight_locks == 0);
+ res->inflight_locks--;
+ mlog(0, "%s:%.*s: inflight--: now %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ res->inflight_locks);
+ if (res->inflight_locks == 0)
+ dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+ wake_up(&res->wq);
+}
+
/*
* lookup a lock resource by name.
* may already exist in the hashtable.
unsigned int hash;
int tries = 0;
int bit, wait_on_recovery = 0;
+ int drop_inflight_if_nonlocal = 0;
BUG_ON(!lockid);
lookup:
spin_lock(&dlm->spinlock);
- tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
+ tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
if (tmpres) {
+ int dropping_ref = 0;
+
+ spin_lock(&tmpres->spinlock);
+ if (tmpres->owner == dlm->node_num) {
+ BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
+ dlm_lockres_grab_inflight_ref(dlm, tmpres);
+ } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
+ dropping_ref = 1;
+ spin_unlock(&tmpres->spinlock);
spin_unlock(&dlm->spinlock);
+
+ /* wait until done messaging the master, drop our ref to allow
+ * the lockres to be purged, start over. */
+ if (dropping_ref) {
+ spin_lock(&tmpres->spinlock);
+ __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+ spin_unlock(&tmpres->spinlock);
+ dlm_lockres_put(tmpres);
+ tmpres = NULL;
+ goto lookup;
+ }
+
mlog(0, "found in hash!\n");
if (res)
dlm_lockres_put(res);
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, dlm->node_num);
__dlm_insert_lockres(dlm, res);
+ dlm_lockres_grab_inflight_ref(dlm, res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* lockres still marked IN_PROGRESS */
/* if we found a block, wait for lock to be mastered by another node */
blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
if (blocked) {
+ int mig;
if (mle->type == DLM_MLE_MASTER) {
mlog(ML_ERROR, "master entry for nonexistent lock!\n");
BUG();
- } else if (mle->type == DLM_MLE_MIGRATION) {
- /* migration is in progress! */
- /* the good news is that we now know the
- * "current" master (mle->master). */
-
+ }
+ mig = (mle->type == DLM_MLE_MIGRATION);
+ /* if there is a migration in progress, let the migration
+ * finish before continuing. we can wait for the absence
+ * of the MIGRATION mle: either the migrate finished or
+ * one of the nodes died and the mle was cleaned up.
+ * if there is a BLOCK here, but it already has a master
+ * set, we are too late. the master does not have a ref
+ * for us in the refmap. detach the mle and drop it.
+ * either way, go back to the top and start over. */
+ if (mig || mle->master != O2NM_MAX_NODES) {
+ BUG_ON(mig && mle->master == dlm->node_num);
+ /* we arrived too late. the master does not
+ * have a ref for us. retry. */
+ mlog(0, "%s:%.*s: late on %s\n",
+ dlm->name, namelen, lockid,
+ mig ? "MIGRATION" : "BLOCK");
spin_unlock(&dlm->master_lock);
- assert_spin_locked(&dlm->spinlock);
-
- /* set the lockres owner and hash it */
- spin_lock(&res->spinlock);
- dlm_set_lockres_owner(dlm, res, mle->master);
- __dlm_insert_lockres(dlm, res);
- spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
/* master is known, detach */
- dlm_mle_detach_hb_events(dlm, mle);
+ if (!mig)
+ dlm_mle_detach_hb_events(dlm, mle);
dlm_put_mle(mle);
mle = NULL;
- goto wake_waiters;
+ /* this is lame, but we cant wait on either
+ * the mle or lockres waitqueue here */
+ if (mig)
+ msleep(100);
+ goto lookup;
}
} else {
/* go ahead and try to master lock on this node */
/* finally add the lockres to its hash bucket */
__dlm_insert_lockres(dlm, res);
+ /* since this lockres is new it doesnt not require the spinlock */
+ dlm_lockres_grab_inflight_ref_new(dlm, res);
+
+ /* if this node does not become the master make sure to drop
+ * this inflight reference below */
+ drop_inflight_if_nonlocal = 1;
+
/* get an extra ref on the mle in case this is a BLOCK
* if so, the creator of the BLOCK may try to put the last
* ref at this time in the assert master handler, so we
ret = -EINVAL;
dlm_node_iter_init(mle->vote_map, &iter);
while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
- ret = dlm_do_master_request(mle, nodenum);
+ ret = dlm_do_master_request(res, mle, nodenum);
if (ret < 0)
mlog_errno(ret);
if (mle->master != O2NM_MAX_NODES) {
wake_waiters:
spin_lock(&res->spinlock);
+ if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
+ dlm_lockres_drop_inflight_ref(dlm, res);
res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
spin_unlock(&res->spinlock);
wake_up(&res->wq);
/* this will cause the master to re-assert across
* the whole cluster, freeing up mles */
if (res->owner != dlm->node_num) {
- ret = dlm_do_master_request(mle, res->owner);
+ ret = dlm_do_master_request(res, mle, res->owner);
if (ret < 0) {
/* give recovery a chance to run */
mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
* now tell other nodes that I am
* mastering this. */
mle->master = dlm->node_num;
+ /* ref was grabbed in get_lock_resource
+ * will be dropped in dlmlock_master */
assert = 1;
sleep = 0;
}
(atomic_read(&mle->woken) == 1),
timeo);
if (res->owner == O2NM_MAX_NODES) {
- mlog(0, "waiting again\n");
+ mlog(0, "%s:%.*s: waiting again\n", dlm->name,
+ res->lockname.len, res->lockname.name);
goto recheck;
}
mlog(0, "done waiting, master is %u\n", res->owner);
m = dlm->node_num;
mlog(0, "about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, m);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, mle->vote_map, 0);
+ ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
if (ret) {
/* This is a failure in the network path,
* not in the response to the assert_master
/* set the lockres owner */
spin_lock(&res->spinlock);
+ /* mastery reference obtained either during
+ * assert_master_handler or in get_lock_resource */
dlm_change_lockres_owner(dlm, res, m);
spin_unlock(&res->spinlock);
*
*/
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+ struct dlm_master_list_entry *mle, int to)
{
struct dlm_ctxt *dlm = mle->dlm;
struct dlm_master_request request;
case DLM_MASTER_RESP_YES:
set_bit(to, mle->response_map);
mlog(0, "node %u is the master, response=YES\n", to);
+ mlog(0, "%s:%.*s: master node %u now knows I have a "
+ "reference\n", dlm->name, res->lockname.len,
+ res->lockname.name, to);
mle->master = to;
break;
case DLM_MASTER_RESP_NO:
*
* if possible, TRIM THIS DOWN!!!
*/
-int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
u8 response = DLM_MASTER_RESP_MAYBE;
struct dlm_ctxt *dlm = data;
/* take care of the easy cases up front */
spin_lock(&res->spinlock);
- if (res->state & DLM_LOCK_RES_RECOVERING) {
+ if (res->state & (DLM_LOCK_RES_RECOVERING|
+ DLM_LOCK_RES_MIGRATING)) {
spin_unlock(&res->spinlock);
mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
- "being recovered\n");
+ "being recovered/migrated\n");
response = DLM_MASTER_RESP_ERROR;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
}
if (res->owner == dlm->node_num) {
+ mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+ dlm->name, namelen, name, request->node_idx);
+ dlm_lockres_set_refmap_bit(request->node_idx, res);
spin_unlock(&res->spinlock);
- // mlog(0, "this node is the master\n");
response = DLM_MASTER_RESP_YES;
if (mle)
kmem_cache_free(dlm_mle_cache, mle);
mlog(0, "node %u is master, but trying to migrate to "
"node %u.\n", tmpmle->master, tmpmle->new_master);
if (tmpmle->master == dlm->node_num) {
- response = DLM_MASTER_RESP_YES;
mlog(ML_ERROR, "no owner on lockres, but this "
"node is trying to migrate it to %u?!\n",
tmpmle->new_master);
* go back and clean the mles on any
* other nodes */
dispatch_assert = 1;
+ dlm_lockres_set_refmap_bit(request->node_idx, res);
+ mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+ dlm->name, namelen, name,
+ request->node_idx);
} else
response = DLM_MASTER_RESP_NO;
} else {
* can periodically run all locks owned by this node
* and re-assert across the cluster...
*/
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
- unsigned int namelen, void *nodemap,
- u32 flags)
+int dlm_do_assert_master(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ void *nodemap, u32 flags)
{
struct dlm_assert_master assert;
int to, tmpret;
struct dlm_node_iter iter;
int ret = 0;
int reassert;
+ const char *lockname = res->lockname.name;
+ unsigned int namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+
again:
reassert = 0;
mlog(0, "link to %d went down!\n", to);
/* any nonzero status return will do */
ret = tmpret;
+ r = 0;
} else if (r < 0) {
/* ok, something horribly messed. kill thyself. */
mlog(ML_ERROR,"during assert master of %.*s to %u, "
spin_unlock(&dlm->master_lock);
spin_unlock(&dlm->spinlock);
BUG();
- } else if (r == EAGAIN) {
+ }
+
+ if (r & DLM_ASSERT_RESPONSE_REASSERT &&
+ !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
+ mlog(ML_ERROR, "%.*s: very strange, "
+ "master MLE but no lockres on %u\n",
+ namelen, lockname, to);
+ }
+
+ if (r & DLM_ASSERT_RESPONSE_REASSERT) {
mlog(0, "%.*s: node %u create mles on other "
"nodes and requests a re-assert\n",
namelen, lockname, to);
reassert = 1;
}
+ if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
+ mlog(0, "%.*s: node %u has a reference to this "
+ "lockres, set the bit in the refmap\n",
+ namelen, lockname, to);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(to, res);
+ spin_unlock(&res->spinlock);
+ }
}
if (reassert)
goto again;
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
return ret;
}
*
* if possible, TRIM THIS DOWN!!!
*/
-int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_master_list_entry *mle = NULL;
char *name;
unsigned int namelen, hash;
u32 flags;
- int master_request = 0;
+ int master_request = 0, have_lockres_ref = 0;
int ret = 0;
if (!dlm_grab(dlm))
spin_unlock(&mle->spinlock);
if (res) {
+ int wake = 0;
spin_lock(&res->spinlock);
if (mle->type == DLM_MLE_MIGRATION) {
mlog(0, "finishing off migration of lockres %.*s, "
res->lockname.len, res->lockname.name,
dlm->node_num, mle->new_master);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
dlm_change_lockres_owner(dlm, res, mle->new_master);
BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
} else {
dlm_change_lockres_owner(dlm, res, mle->master);
}
spin_unlock(&res->spinlock);
+ have_lockres_ref = 1;
+ if (wake)
+ wake_up(&res->wq);
}
/* master is known, detach if not already detached.
done:
ret = 0;
- if (res)
- dlm_lockres_put(res);
+ if (res) {
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ *ret_data = (void *)res;
+ }
dlm_put(dlm);
if (master_request) {
mlog(0, "need to tell master to reassert\n");
- ret = EAGAIN; // positive. negative would shoot down the node.
+ /* positive. negative would shoot down the node. */
+ ret |= DLM_ASSERT_RESPONSE_REASSERT;
+ if (!have_lockres_ref) {
+ mlog(ML_ERROR, "strange, got assert from %u, MASTER "
+ "mle present here for %s:%.*s, but no lockres!\n",
+ assert->node_idx, dlm->name, namelen, name);
+ }
+ }
+ if (have_lockres_ref) {
+ /* let the master know we have a reference to the lockres */
+ ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
+ mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
+ dlm->name, namelen, name, assert->node_idx);
}
return ret;
__dlm_print_one_lock_resource(res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
- dlm_lockres_put(res);
+ *ret_data = (void *)res;
dlm_put(dlm);
return -EINVAL;
}
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
+{
+ struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
+
+ if (ret_data) {
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ dlm_lockres_put(res);
+ }
+ return;
+}
+
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int ignore_higher, u8 request_from, u32 flags)
{
struct dlm_work_item *item;
- item = kcalloc(1, sizeof(*item), GFP_NOFS);
+ item = kzalloc(sizeof(*item), GFP_NOFS);
if (!item)
return -ENOMEM;
* even if one or more nodes die */
mlog(0, "worker about to master %.*s here, this=%u\n",
res->lockname.len, res->lockname.name, dlm->node_num);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len,
- nodemap, flags);
+ ret = dlm_do_assert_master(dlm, res, nodemap, flags);
if (ret < 0) {
/* no need to restart, we are done */
if (!dlm_is_host_down(ret))
return ret;
}
-
/*
- * DLM_MIGRATE_LOCKRES
+ * DLM_DEREF_LOCKRES_MSG
*/
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+ struct dlm_deref_lockres deref;
+ int ret = 0, r;
+ const char *lockname;
+ unsigned int namelen;
+
+ lockname = res->lockname.name;
+ namelen = res->lockname.len;
+ BUG_ON(namelen > O2NM_MAX_NAME_LEN);
-int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
- u8 target)
+ mlog(0, "%s:%.*s: sending deref to %d\n",
+ dlm->name, namelen, lockname, res->owner);
+ memset(&deref, 0, sizeof(deref));
+ deref.node_idx = dlm->node_num;
+ deref.namelen = namelen;
+ memcpy(deref.name, lockname, namelen);
+
+ ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
+ &deref, sizeof(deref), res->owner, &r);
+ if (ret < 0)
+ mlog_errno(ret);
+ else if (r < 0) {
+ /* BAD. other node says I did not have a ref. */
+ mlog(ML_ERROR,"while dropping ref on %s:%.*s "
+ "(master=%u) got %d.\n", dlm->name, namelen,
+ lockname, res->owner, r);
+ dlm_print_one_lock_resource(res);
+ BUG();
+ }
+ return ret;
+}
+
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
- struct dlm_master_list_entry *mle = NULL;
- struct dlm_master_list_entry *oldmle = NULL;
- struct dlm_migratable_lockres *mres = NULL;
- int ret = -EINVAL;
- const char *name;
+ struct dlm_ctxt *dlm = data;
+ struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
+ struct dlm_lock_resource *res = NULL;
+ char *name;
unsigned int namelen;
- int mle_added = 0;
- struct list_head *queue, *iter;
- int i;
- struct dlm_lock *lock;
- int empty = 1;
+ int ret = -EINVAL;
+ u8 node;
+ unsigned int hash;
+ struct dlm_work_item *item;
+ int cleared = 0;
+ int dispatch = 0;
if (!dlm_grab(dlm))
- return -EINVAL;
+ return 0;
- name = res->lockname.name;
- namelen = res->lockname.len;
+ name = deref->name;
+ namelen = deref->namelen;
+ node = deref->node_idx;
- mlog(0, "migrating %.*s to %u\n", namelen, name, target);
+ if (namelen > DLM_LOCKID_NAME_MAX) {
+ mlog(ML_ERROR, "Invalid name length!");
+ goto done;
+ }
+ if (deref->node_idx >= O2NM_MAX_NODES) {
+ mlog(ML_ERROR, "Invalid node number: %u\n", node);
+ goto done;
+ }
+
+ hash = dlm_lockid_hash(name, namelen);
+
+ spin_lock(&dlm->spinlock);
+ res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
+ if (!res) {
+ spin_unlock(&dlm->spinlock);
+ mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
+ dlm->name, namelen, name);
+ goto done;
+ }
+ spin_unlock(&dlm->spinlock);
+
+ spin_lock(&res->spinlock);
+ if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+ dispatch = 1;
+ else {
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ }
+ spin_unlock(&res->spinlock);
+
+ if (!dispatch) {
+ if (cleared)
+ dlm_lockres_calc_usage(dlm, res);
+ else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+ ret = 0;
+ goto done;
+ }
+
+ item = kzalloc(sizeof(*item), GFP_NOFS);
+ if (!item) {
+ ret = -ENOMEM;
+ mlog_errno(ret);
+ goto done;
+ }
+
+ dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+ item->u.dl.deref_res = res;
+ item->u.dl.deref_node = node;
+
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+ return 0;
+
+done:
+ if (res)
+ dlm_lockres_put(res);
+ dlm_put(dlm);
+
+ return ret;
+}
+
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+ struct dlm_ctxt *dlm;
+ struct dlm_lock_resource *res;
+ u8 node;
+ u8 cleared = 0;
+
+ dlm = item->dlm;
+ res = item->u.dl.deref_res;
+ node = item->u.dl.deref_node;
- /*
- * ensure this lockres is a proper candidate for migration
- */
spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ spin_unlock(&res->spinlock);
+
+ if (cleared) {
+ mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+ dlm->name, res->lockname.len, res->lockname.name, node);
+ dlm_lockres_calc_usage(dlm, res);
+ } else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+
+ dlm_lockres_put(res);
+}
+
+/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
+ * if not. If 0, numlocks is set to the number of locks in the lockres.
+ */
+static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ int *numlocks)
+{
+ int ret;
+ int i;
+ int count = 0;
+ struct list_head *queue, *iter;
+ struct dlm_lock *lock;
+
+ assert_spin_locked(&res->spinlock);
+
+ ret = -EINVAL;
if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "cannot migrate lockres with unknown owner!\n");
- spin_unlock(&res->spinlock);
goto leave;
}
+
if (res->owner != dlm->node_num) {
mlog(0, "cannot migrate lockres this node doesn't own!\n");
- spin_unlock(&res->spinlock);
goto leave;
}
- mlog(0, "checking queues...\n");
+
+ ret = 0;
queue = &res->granted;
- for (i=0; i<3; i++) {
+ for (i = 0; i < 3; i++) {
list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
- empty = 0;
+ lock = list_entry(iter, struct dlm_lock, list);
+ ++count;
if (lock->ml.node == dlm->node_num) {
- mlog(0, "found a lock owned by this node "
- "still on the %s queue! will not "
- "migrate this lockres\n",
- i==0 ? "granted" :
- (i==1 ? "converting" : "blocked"));
- spin_unlock(&res->spinlock);
+ mlog(0, "found a lock owned by this node still "
+ "on the %s queue! will not migrate this "
+ "lockres\n", (i == 0 ? "granted" :
+ (i == 1 ? "converting" :
+ "blocked")));
ret = -ENOTEMPTY;
goto leave;
}
}
queue++;
}
- mlog(0, "all locks on this lockres are nonlocal. continuing\n");
+
+ *numlocks = count;
+ mlog(0, "migrateable lockres having %d locks\n", *numlocks);
+
+leave:
+ return ret;
+}
+
+/*
+ * DLM_MIGRATE_LOCKRES
+ */
+
+
+static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ u8 target)
+{
+ struct dlm_master_list_entry *mle = NULL;
+ struct dlm_master_list_entry *oldmle = NULL;
+ struct dlm_migratable_lockres *mres = NULL;
+ int ret = 0;
+ const char *name;
+ unsigned int namelen;
+ int mle_added = 0;
+ int numlocks;
+ int wake = 0;
+
+ if (!dlm_grab(dlm))
+ return -EINVAL;
+
+ name = res->lockname.name;
+ namelen = res->lockname.len;
+
+ mlog(0, "migrating %.*s to %u\n", namelen, name, target);
+
+ /*
+ * ensure this lockres is a proper candidate for migration
+ */
+ spin_lock(&res->spinlock);
+ ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+ if (ret < 0) {
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
spin_unlock(&res->spinlock);
/* no work to do */
- if (empty) {
+ if (numlocks == 0) {
mlog(0, "no locks were found on this lockres! done!\n");
- ret = 0;
goto leave;
}
res->lockname.name, target);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
spin_unlock(&res->spinlock);
ret = -EINVAL;
}
* the lockres
*/
+ /* now that remote nodes are spinning on the MIGRATING flag,
+ * ensure that all assert_master work is flushed. */
+ flush_workqueue(dlm->dlm_worker);
/* get an extra reference on the mle.
* otherwise the assert_master from the new
dlm_put_mle_inuse(mle);
spin_lock(&res->spinlock);
res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
spin_unlock(&res->spinlock);
goto leave;
}
res->owner == target)
break;
- mlog(0, "timed out during migration\n");
+ mlog(0, "%s:%.*s: timed out during migration\n",
+ dlm->name, res->lockname.len, res->lockname.name);
/* avoid hang during shutdown when migrating lockres
* to a node which also goes down */
if (dlm_is_node_dead(dlm, target)) {
"target %u is no longer up, restarting\n",
dlm->name, res->lockname.len,
res->lockname.name, target);
- ret = -ERESTARTSYS;
+ ret = -EINVAL;
+ /* migration failed, detach and clean up mle */
+ dlm_mle_detach_hb_events(dlm, mle);
+ dlm_put_mle(mle);
+ dlm_put_mle_inuse(mle);
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_MIGRATING;
+ wake = 1;
+ spin_unlock(&res->spinlock);
+ goto leave;
}
- }
- if (ret == -ERESTARTSYS) {
- /* migration failed, detach and clean up mle */
- dlm_mle_detach_hb_events(dlm, mle);
- dlm_put_mle(mle);
- dlm_put_mle_inuse(mle);
- spin_lock(&res->spinlock);
- res->state &= ~DLM_LOCK_RES_MIGRATING;
- spin_unlock(&res->spinlock);
- goto leave;
- }
- /* TODO: if node died: stop, clean up, return error */
+ } else
+ mlog(0, "%s:%.*s: caught signal during migration\n",
+ dlm->name, res->lockname.len, res->lockname.name);
}
/* all done, set the owner, clear the flag */
if (ret < 0)
dlm_kick_thread(dlm, res);
+ /* wake up waiters if the MIGRATING flag got set
+ * but migration failed */
+ if (wake)
+ wake_up(&res->wq);
+
/* TODO: cleanup */
if (mres)
free_page((unsigned long)mres);
return ret;
}
+#define DLM_MIGRATION_RETRY_MS 100
+
+/* Should be called only after beginning the domain leave process.
+ * There should not be any remaining locks on nonlocal lock resources,
+ * and there should be no local locks left on locally mastered resources.
+ *
+ * Called with the dlm spinlock held, may drop it to do migration, but
+ * will re-acquire before exit.
+ *
+ * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+ int ret;
+ int lock_dropped = 0;
+ int numlocks;
+
+ spin_lock(&res->spinlock);
+ if (res->owner != dlm->node_num) {
+ if (!__dlm_lockres_unused(res)) {
+ mlog(ML_ERROR, "%s:%.*s: this node is not master, "
+ "trying to free this but locks remain\n",
+ dlm->name, res->lockname.len, res->lockname.name);
+ }
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+
+ /* No need to migrate a lockres having no locks */
+ ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+ if (ret >= 0 && numlocks == 0) {
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ spin_unlock(&res->spinlock);
+
+ /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
+ spin_unlock(&dlm->spinlock);
+ lock_dropped = 1;
+ while (1) {
+ ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
+ if (ret >= 0)
+ break;
+ if (ret == -ENOTEMPTY) {
+ mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
+ res->lockname.len, res->lockname.name);
+ BUG();
+ }
+
+ mlog(0, "lockres %.*s: migrate failed, "
+ "retrying\n", res->lockname.len,
+ res->lockname.name);
+ msleep(DLM_MIGRATION_RETRY_MS);
+ }
+ spin_lock(&dlm->spinlock);
+leave:
+ return lock_dropped;
+}
+
int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
{
int ret;
return can_proceed;
}
-int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res)
{
int ret;
spin_lock(&res->spinlock);
__dlm_lockres_reserve_ast(res);
spin_unlock(&res->spinlock);
- /* now flush all the pending asts.. hang out for a bit */
+ /* now flush all the pending asts */
dlm_kick_thread(dlm, res);
+ /* before waiting on DIRTY, block processes which may
+ * try to dirty the lockres before MIGRATING is set */
+ spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
+ res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
+ spin_unlock(&res->spinlock);
+ /* now wait on any pending asts and the DIRTY state */
wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
dlm_lockres_release_ast(dlm, res);
mlog(0, "trying again...\n");
goto again;
}
+ /* now that we are sure the MIGRATING state is there, drop
+ * the unneded state which blocked threads trying to DIRTY */
+ spin_lock(&res->spinlock);
+ BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
+ BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
+ res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
+ spin_unlock(&res->spinlock);
/* did the target go down or die? */
spin_lock(&dlm->spinlock);
{
struct list_head *iter, *iter2;
struct list_head *queue = &res->granted;
- int i;
+ int i, bit;
struct dlm_lock *lock;
assert_spin_locked(&res->spinlock);
BUG_ON(!list_empty(&lock->bast_list));
BUG_ON(lock->ast_pending);
BUG_ON(lock->bast_pending);
+ dlm_lockres_clear_refmap_bit(lock->ml.node, res);
list_del_init(&lock->list);
dlm_lock_put(lock);
}
}
queue++;
}
+ bit = 0;
+ while (1) {
+ bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+ if (bit >= O2NM_MAX_NODES)
+ break;
+ /* do not clear the local node reference, if there is a
+ * process holding this, let it drop the ref itself */
+ if (bit != dlm->node_num) {
+ mlog(0, "%s:%.*s: node %u had a ref to this "
+ "migrating lockres, clearing\n", dlm->name,
+ res->lockname.len, res->lockname.name, bit);
+ dlm_lockres_clear_refmap_bit(bit, res);
+ }
+ bit++;
+ }
}
/* for now this is not too intelligent. we will
mlog(0, "migrate request (node %u) returned %d!\n",
nodenum, status);
ret = status;
+ } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
+ /* during the migration request we short-circuited
+ * the mastery of the lockres. make sure we have
+ * a mastery ref for nodenum */
+ mlog(0, "%s:%.*s: need ref for node %u\n",
+ dlm->name, res->lockname.len, res->lockname.name,
+ nodenum);
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(nodenum, res);
+ spin_unlock(&res->spinlock);
}
}
* we will have no mle in the list to start with. now we can add an mle for
* the migration and this should be the only one found for those scanning the
* list. */
-int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
+int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
+ void **ret_data)
{
struct dlm_ctxt *dlm = data;
struct dlm_lock_resource *res = NULL;
/* remove it from the list so that only one
* mle will be found */
list_del_init(&tmp->list);
- __dlm_mle_detach_hb_events(dlm, mle);
+ /* this was obviously WRONG. mle is uninited here. should be tmp. */
+ __dlm_mle_detach_hb_events(dlm, tmp);
+ ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
+ mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
+ "telling master to get ref for cleared out mle "
+ "during migration\n", dlm->name, namelen, name,
+ master, new_master);
}
spin_unlock(&tmp->spinlock);
}
/* now add a migration mle to the tail of the list */
dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
mle->new_master = new_master;
+ /* the new master will be sending an assert master for this.
+ * at that point we will get the refmap reference */
mle->master = master;
/* do this for consistency with other mle types */
set_bit(new_master, mle->maybe_map);
clear_bit(dlm->node_num, iter.node_map);
spin_unlock(&dlm->spinlock);
+ /* ownership of the lockres is changing. account for the
+ * mastery reference here since old_master will briefly have
+ * a reference after the migration completes */
+ spin_lock(&res->spinlock);
+ dlm_lockres_set_refmap_bit(old_master, res);
+ spin_unlock(&res->spinlock);
+
mlog(0, "now time to do a migrate request to other nodes\n");
ret = dlm_do_migrate_request(dlm, res, old_master,
dlm->node_num, &iter);
res->lockname.len, res->lockname.name);
/* this call now finishes out the nodemap
* even if one or more nodes die */
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, iter.node_map,
+ ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
/* no longer need to retry. all living nodes contacted. */
set_bit(old_master, iter.node_map);
mlog(0, "doing assert master of %.*s back to %u\n",
res->lockname.len, res->lockname.name, old_master);
- ret = dlm_do_assert_master(dlm, res->lockname.name,
- res->lockname.len, iter.node_map,
+ ret = dlm_do_assert_master(dlm, res, iter.node_map,
DLM_ASSERT_MASTER_FINISH_MIGRATION);
if (ret < 0) {
mlog(0, "assert master to original master failed "