static int dlm_do_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
void *nodemap, u32 flags);
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
struct dlm_master_list_entry *mle,
unsigned int namelen = res->lockname.len;
BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+
again:
reassert = 0;
if (reassert)
goto again;
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+
return ret;
}
done:
ret = 0;
- if (res)
- dlm_lockres_put(res);
+ if (res) {
+ spin_lock(&res->spinlock);
+ res->state |= DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ *ret_data = (void *)res;
+ }
dlm_put(dlm);
if (master_request) {
mlog(0, "need to tell master to reassert\n");
__dlm_print_one_lock_resource(res);
spin_unlock(&res->spinlock);
spin_unlock(&dlm->spinlock);
- dlm_lockres_put(res);
+ *ret_data = (void *)res;
dlm_put(dlm);
return -EINVAL;
}
+void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
+{
+ struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
+
+ if (ret_data) {
+ spin_lock(&res->spinlock);
+ res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
+ spin_unlock(&res->spinlock);
+ wake_up(&res->wq);
+ dlm_lockres_put(res);
+ }
+ return;
+}
+
int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
int ignore_higher, u8 request_from, u32 flags)
int ret = -EINVAL;
u8 node;
unsigned int hash;
+ struct dlm_work_item *item;
+ int cleared = 0;
+ int dispatch = 0;
if (!dlm_grab(dlm))
return 0;
spin_unlock(&dlm->spinlock);
spin_lock(&res->spinlock);
- BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
- if (test_bit(node, res->refmap)) {
- ret = 0;
- dlm_lockres_clear_refmap_bit(node, res);
- } else {
- mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
- "but it is already dropped!\n", dlm->name, namelen,
- name, node);
- __dlm_print_one_lock_resource(res);
+ if (res->state & DLM_LOCK_RES_SETREF_INPROG)
+ dispatch = 1;
+ else {
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
}
spin_unlock(&res->spinlock);
- if (!ret)
- dlm_lockres_calc_usage(dlm, res);
+ if (!dispatch) {
+ if (cleared)
+ dlm_lockres_calc_usage(dlm, res);
+ else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+ ret = 0;
+ goto done;
+ }
+
+ item = kzalloc(sizeof(*item), GFP_NOFS);
+ if (!item) {
+ ret = -ENOMEM;
+ mlog_errno(ret);
+ goto done;
+ }
+
+ dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
+ item->u.dl.deref_res = res;
+ item->u.dl.deref_node = node;
+
+ spin_lock(&dlm->work_lock);
+ list_add_tail(&item->list, &dlm->work_list);
+ spin_unlock(&dlm->work_lock);
+
+ queue_work(dlm->dlm_worker, &dlm->dispatched_work);
+ return 0;
+
done:
if (res)
dlm_lockres_put(res);
dlm_put(dlm);
+
return ret;
}
+static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
+{
+ struct dlm_ctxt *dlm;
+ struct dlm_lock_resource *res;
+ u8 node;
+ u8 cleared = 0;
+
+ dlm = item->dlm;
+ res = item->u.dl.deref_res;
+ node = item->u.dl.deref_node;
+
+ spin_lock(&res->spinlock);
+ BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+ if (test_bit(node, res->refmap)) {
+ __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
+ dlm_lockres_clear_refmap_bit(node, res);
+ cleared = 1;
+ }
+ spin_unlock(&res->spinlock);
+
+ if (cleared) {
+ mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
+ dlm->name, res->lockname.len, res->lockname.name, node);
+ dlm_lockres_calc_usage(dlm, res);
+ } else {
+ mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+ "but it is already dropped!\n", dlm->name,
+ res->lockname.len, res->lockname.name, node);
+ __dlm_print_one_lock_resource(res);
+ }
+
+ dlm_lockres_put(res);
+}
+
+/* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
+ * if not. If 0, numlocks is set to the number of locks in the lockres.
+ */
+static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res,
+ int *numlocks)
+{
+ int ret;
+ int i;
+ int count = 0;
+ struct list_head *queue, *iter;
+ struct dlm_lock *lock;
+
+ assert_spin_locked(&res->spinlock);
+
+ ret = -EINVAL;
+ if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
+ mlog(0, "cannot migrate lockres with unknown owner!\n");
+ goto leave;
+ }
+
+ if (res->owner != dlm->node_num) {
+ mlog(0, "cannot migrate lockres this node doesn't own!\n");
+ goto leave;
+ }
+
+ ret = 0;
+ queue = &res->granted;
+ for (i = 0; i < 3; i++) {
+ list_for_each(iter, queue) {
+ lock = list_entry(iter, struct dlm_lock, list);
+ ++count;
+ if (lock->ml.node == dlm->node_num) {
+ mlog(0, "found a lock owned by this node still "
+ "on the %s queue! will not migrate this "
+ "lockres\n", (i == 0 ? "granted" :
+ (i == 1 ? "converting" :
+ "blocked")));
+ ret = -ENOTEMPTY;
+ goto leave;
+ }
+ }
+ queue++;
+ }
+
+ *numlocks = count;
+ mlog(0, "migrateable lockres having %d locks\n", *numlocks);
+
+leave:
+ return ret;
+}
/*
* DLM_MIGRATE_LOCKRES
struct dlm_master_list_entry *mle = NULL;
struct dlm_master_list_entry *oldmle = NULL;
struct dlm_migratable_lockres *mres = NULL;
- int ret = -EINVAL;
+ int ret = 0;
const char *name;
unsigned int namelen;
int mle_added = 0;
- struct list_head *queue, *iter;
- int i;
- struct dlm_lock *lock;
- int empty = 1, wake = 0;
+ int numlocks;
+ int wake = 0;
if (!dlm_grab(dlm))
return -EINVAL;
* ensure this lockres is a proper candidate for migration
*/
spin_lock(&res->spinlock);
- if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
- mlog(0, "cannot migrate lockres with unknown owner!\n");
- spin_unlock(&res->spinlock);
- goto leave;
- }
- if (res->owner != dlm->node_num) {
- mlog(0, "cannot migrate lockres this node doesn't own!\n");
+ ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+ if (ret < 0) {
spin_unlock(&res->spinlock);
goto leave;
}
- mlog(0, "checking queues...\n");
- queue = &res->granted;
- for (i=0; i<3; i++) {
- list_for_each(iter, queue) {
- lock = list_entry (iter, struct dlm_lock, list);
- empty = 0;
- if (lock->ml.node == dlm->node_num) {
- mlog(0, "found a lock owned by this node "
- "still on the %s queue! will not "
- "migrate this lockres\n",
- i==0 ? "granted" :
- (i==1 ? "converting" : "blocked"));
- spin_unlock(&res->spinlock);
- ret = -ENOTEMPTY;
- goto leave;
- }
- }
- queue++;
- }
- mlog(0, "all locks on this lockres are nonlocal. continuing\n");
spin_unlock(&res->spinlock);
/* no work to do */
- if (empty) {
+ if (numlocks == 0) {
mlog(0, "no locks were found on this lockres! done!\n");
- ret = 0;
goto leave;
}
{
int ret;
int lock_dropped = 0;
+ int numlocks;
+ spin_lock(&res->spinlock);
if (res->owner != dlm->node_num) {
if (!__dlm_lockres_unused(res)) {
mlog(ML_ERROR, "%s:%.*s: this node is not master, "
"trying to free this but locks remain\n",
dlm->name, res->lockname.len, res->lockname.name);
}
+ spin_unlock(&res->spinlock);
goto leave;
}
+ /* No need to migrate a lockres having no locks */
+ ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
+ if (ret >= 0 && numlocks == 0) {
+ spin_unlock(&res->spinlock);
+ goto leave;
+ }
+ spin_unlock(&res->spinlock);
+
/* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
spin_unlock(&dlm->spinlock);
lock_dropped = 1;