static void dlm_wait_timer_fn(unsigned long data)
{
struct dlm_ls *ls = (struct dlm_ls *) data;
- mod_timer(&ls->ls_timer, jiffies + (dlm_config.recover_timer * HZ));
+ mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
wake_up(&ls->ls_wait_general);
}
init_timer(&ls->ls_timer);
ls->ls_timer.function = dlm_wait_timer_fn;
ls->ls_timer.data = (long) ls;
- ls->ls_timer.expires = jiffies + (dlm_config.recover_timer * HZ);
+ ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
add_timer(&ls->ls_timer);
wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
spin_lock(&ls->ls_recover_list_lock);
list_for_each_entry_safe(r, s, &ls->ls_recover_list, res_recover_list) {
list_del_init(&r->res_recover_list);
+ r->res_recover_locks_count = 0;
dlm_put_rsb(r);
ls->ls_recover_list_count--;
}
/*
* Propogate the new master nodeid to locks
* The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
- * The NEW_MASTER2 flag tells recover_lvb() which rsb's to consider.
+ * The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which
+ * rsb's to consider.
*/
static void set_new_master(struct dlm_rsb *r, int nodeid)
if (dlm_no_directory(ls))
count += recover_master_static(r);
- else if (!is_master(r) && dlm_is_removed(ls, r->res_nodeid)) {
+ else if (!is_master(r) &&
+ (dlm_is_removed(ls, r->res_nodeid) ||
+ rsb_flag(r, RSB_NEW_MASTER))) {
recover_master(r);
count++;
}
return error;
}
-static int all_queues_empty(struct dlm_rsb *r)
-{
- if (!list_empty(&r->res_grantqueue) ||
- !list_empty(&r->res_convertqueue) ||
- !list_empty(&r->res_waitqueue))
- return 0;
- return 1;
-}
-
static int recover_locks(struct dlm_rsb *r)
{
int error = 0;
lock_rsb(r);
- if (all_queues_empty(r))
- goto out;
- DLM_ASSERT(!r->res_recover_locks_count, dlm_print_rsb(r););
+ DLM_ASSERT(!r->res_recover_locks_count, dlm_dump_rsb(r););
error = recover_locks_queue(r, &r->res_grantqueue);
if (error)
void dlm_recovered_lock(struct dlm_rsb *r)
{
- DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_print_rsb(r););
+ DLM_ASSERT(rsb_flag(r, RSB_NEW_MASTER), dlm_dump_rsb(r););
r->res_recover_locks_count--;
if (!r->res_recover_locks_count) {
}
}
+/* We've become the new master for this rsb and waiting/converting locks may
+ need to be granted in dlm_grant_after_purge() due to locks that may have
+ existed from a removed node. */
+
+static void set_locks_purged(struct dlm_rsb *r)
+{
+ if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
+ rsb_set_flag(r, RSB_LOCKS_PURGED);
+}
+
void dlm_recover_rsbs(struct dlm_ls *ls)
{
struct dlm_rsb *r;
if (is_master(r)) {
if (rsb_flag(r, RSB_RECOVER_CONVERT))
recover_conversion(r);
+ if (rsb_flag(r, RSB_NEW_MASTER2))
+ set_locks_purged(r);
recover_lvb(r);
count++;
}
rsb_clear_flag(r, RSB_RECOVER_CONVERT);
+ rsb_clear_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
}
up_read(&ls->ls_root_sem);