c8f0c15ac166460ee07e0476b4a6d9b8c5a70833
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26
27 #ifdef CONFIG_DLM_DEBUG
28 int dlm_create_debug_file(struct dlm_ls *ls);
29 void dlm_delete_debug_file(struct dlm_ls *ls);
30 #else
31 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
32 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
33 #endif
34
35 static int                      ls_count;
36 static struct mutex             ls_lock;
37 static struct list_head         lslist;
38 static spinlock_t               lslist_lock;
39 static struct task_struct *     scand_task;
40
41
42 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
43 {
44         ssize_t ret = len;
45         int n = simple_strtol(buf, NULL, 0);
46
47         ls = dlm_find_lockspace_local(ls->ls_local_handle);
48         if (!ls)
49                 return -EINVAL;
50
51         switch (n) {
52         case 0:
53                 dlm_ls_stop(ls);
54                 break;
55         case 1:
56                 dlm_ls_start(ls);
57                 break;
58         default:
59                 ret = -EINVAL;
60         }
61         dlm_put_lockspace(ls);
62         return ret;
63 }
64
65 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
66 {
67         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
68         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
69         wake_up(&ls->ls_uevent_wait);
70         return len;
71 }
72
73 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
74 {
75         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
76 }
77
78 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
79 {
80         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
81         return len;
82 }
83
84 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
85 {
86         uint32_t status = dlm_recover_status(ls);
87         return snprintf(buf, PAGE_SIZE, "%x\n", status);
88 }
89
90 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
91 {
92         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
93 }
94
95 struct dlm_attr {
96         struct attribute attr;
97         ssize_t (*show)(struct dlm_ls *, char *);
98         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
99 };
100
101 static struct dlm_attr dlm_attr_control = {
102         .attr  = {.name = "control", .mode = S_IWUSR},
103         .store = dlm_control_store
104 };
105
106 static struct dlm_attr dlm_attr_event = {
107         .attr  = {.name = "event_done", .mode = S_IWUSR},
108         .store = dlm_event_store
109 };
110
111 static struct dlm_attr dlm_attr_id = {
112         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
113         .show  = dlm_id_show,
114         .store = dlm_id_store
115 };
116
117 static struct dlm_attr dlm_attr_recover_status = {
118         .attr  = {.name = "recover_status", .mode = S_IRUGO},
119         .show  = dlm_recover_status_show
120 };
121
122 static struct dlm_attr dlm_attr_recover_nodeid = {
123         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
124         .show  = dlm_recover_nodeid_show
125 };
126
127 static struct attribute *dlm_attrs[] = {
128         &dlm_attr_control.attr,
129         &dlm_attr_event.attr,
130         &dlm_attr_id.attr,
131         &dlm_attr_recover_status.attr,
132         &dlm_attr_recover_nodeid.attr,
133         NULL,
134 };
135
136 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
137                              char *buf)
138 {
139         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
140         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
141         return a->show ? a->show(ls, buf) : 0;
142 }
143
144 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
145                               const char *buf, size_t len)
146 {
147         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
148         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
149         return a->store ? a->store(ls, buf, len) : len;
150 }
151
152 static void lockspace_kobj_release(struct kobject *k)
153 {
154         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
155         kfree(ls);
156 }
157
158 static struct sysfs_ops dlm_attr_ops = {
159         .show  = dlm_attr_show,
160         .store = dlm_attr_store,
161 };
162
163 static struct kobj_type dlm_ktype = {
164         .default_attrs = dlm_attrs,
165         .sysfs_ops     = &dlm_attr_ops,
166         .release       = lockspace_kobj_release,
167 };
168
169 static struct kset dlm_kset = {
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
201
202         /* dlm_controld will see the uevent, do the necessary group management
203            and then write to sysfs to wake us */
204
205         error = wait_event_interruptible(ls->ls_uevent_wait,
206                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
207
208         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
209
210         if (error)
211                 goto out;
212
213         error = ls->ls_uevent_result;
214  out:
215         if (error)
216                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
217                           error, ls->ls_uevent_result);
218         return error;
219 }
220
221
222 int dlm_lockspace_init(void)
223 {
224         int error;
225
226         ls_count = 0;
227         mutex_init(&ls_lock);
228         INIT_LIST_HEAD(&lslist);
229         spin_lock_init(&lslist_lock);
230
231         kobj_set_kset_s(&dlm_kset, kernel_subsys);
232         error = kset_register(&dlm_kset);
233         if (error)
234                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
235         return error;
236 }
237
238 void dlm_lockspace_exit(void)
239 {
240         kset_unregister(&dlm_kset);
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 list_for_each_entry(ls, &lslist, ls_list) {
249                         if (dlm_lock_recovery_try(ls)) {
250                                 dlm_scan_rsbs(ls);
251                                 dlm_scan_timeout(ls);
252                                 dlm_unlock_recovery(ls);
253                         }
254                 }
255                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
256         }
257         return 0;
258 }
259
260 static int dlm_scand_start(void)
261 {
262         struct task_struct *p;
263         int error = 0;
264
265         p = kthread_run(dlm_scand, NULL, "dlm_scand");
266         if (IS_ERR(p))
267                 error = PTR_ERR(p);
268         else
269                 scand_task = p;
270         return error;
271 }
272
273 static void dlm_scand_stop(void)
274 {
275         kthread_stop(scand_task);
276 }
277
278 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
279 {
280         struct dlm_ls *ls;
281
282         spin_lock(&lslist_lock);
283
284         list_for_each_entry(ls, &lslist, ls_list) {
285                 if (ls->ls_namelen == namelen &&
286                     memcmp(ls->ls_name, name, namelen) == 0)
287                         goto out;
288         }
289         ls = NULL;
290  out:
291         spin_unlock(&lslist_lock);
292         return ls;
293 }
294
295 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
296 {
297         struct dlm_ls *ls;
298
299         spin_lock(&lslist_lock);
300
301         list_for_each_entry(ls, &lslist, ls_list) {
302                 if (ls->ls_global_id == id) {
303                         ls->ls_count++;
304                         goto out;
305                 }
306         }
307         ls = NULL;
308  out:
309         spin_unlock(&lslist_lock);
310         return ls;
311 }
312
313 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
314 {
315         struct dlm_ls *ls;
316
317         spin_lock(&lslist_lock);
318         list_for_each_entry(ls, &lslist, ls_list) {
319                 if (ls->ls_local_handle == lockspace) {
320                         ls->ls_count++;
321                         goto out;
322                 }
323         }
324         ls = NULL;
325  out:
326         spin_unlock(&lslist_lock);
327         return ls;
328 }
329
330 struct dlm_ls *dlm_find_lockspace_device(int minor)
331 {
332         struct dlm_ls *ls;
333
334         spin_lock(&lslist_lock);
335         list_for_each_entry(ls, &lslist, ls_list) {
336                 if (ls->ls_device.minor == minor) {
337                         ls->ls_count++;
338                         goto out;
339                 }
340         }
341         ls = NULL;
342  out:
343         spin_unlock(&lslist_lock);
344         return ls;
345 }
346
347 void dlm_put_lockspace(struct dlm_ls *ls)
348 {
349         spin_lock(&lslist_lock);
350         ls->ls_count--;
351         spin_unlock(&lslist_lock);
352 }
353
354 static void remove_lockspace(struct dlm_ls *ls)
355 {
356         for (;;) {
357                 spin_lock(&lslist_lock);
358                 if (ls->ls_count == 0) {
359                         list_del(&ls->ls_list);
360                         spin_unlock(&lslist_lock);
361                         return;
362                 }
363                 spin_unlock(&lslist_lock);
364                 ssleep(1);
365         }
366 }
367
368 static int threads_start(void)
369 {
370         int error;
371
372         /* Thread which process lock requests for all lockspace's */
373         error = dlm_astd_start();
374         if (error) {
375                 log_print("cannot start dlm_astd thread %d", error);
376                 goto fail;
377         }
378
379         error = dlm_scand_start();
380         if (error) {
381                 log_print("cannot start dlm_scand thread %d", error);
382                 goto astd_fail;
383         }
384
385         /* Thread for sending/receiving messages for all lockspace's */
386         error = dlm_lowcomms_start();
387         if (error) {
388                 log_print("cannot start dlm lowcomms %d", error);
389                 goto scand_fail;
390         }
391
392         return 0;
393
394  scand_fail:
395         dlm_scand_stop();
396  astd_fail:
397         dlm_astd_stop();
398  fail:
399         return error;
400 }
401
402 static void threads_stop(void)
403 {
404         dlm_scand_stop();
405         dlm_lowcomms_stop();
406         dlm_astd_stop();
407 }
408
409 static int new_lockspace(char *name, int namelen, void **lockspace,
410                          uint32_t flags, int lvblen)
411 {
412         struct dlm_ls *ls;
413         int i, size, error = -ENOMEM;
414         int do_unreg = 0;
415
416         if (namelen > DLM_LOCKSPACE_LEN)
417                 return -EINVAL;
418
419         if (!lvblen || (lvblen % 8))
420                 return -EINVAL;
421
422         if (!try_module_get(THIS_MODULE))
423                 return -EINVAL;
424
425         ls = dlm_find_lockspace_name(name, namelen);
426         if (ls) {
427                 *lockspace = ls;
428                 module_put(THIS_MODULE);
429                 return -EEXIST;
430         }
431
432         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
433         if (!ls)
434                 goto out;
435         memcpy(ls->ls_name, name, namelen);
436         ls->ls_namelen = namelen;
437         ls->ls_lvblen = lvblen;
438         ls->ls_count = 0;
439         ls->ls_flags = 0;
440
441         /* ls_exflags are forced to match among nodes, and we don't
442            need to require all nodes to have TIMEWARN active */
443         if (flags & DLM_LSFL_TIMEWARN)
444                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
445         ls->ls_exflags = (flags & ~DLM_LSFL_TIMEWARN);
446
447         size = dlm_config.ci_rsbtbl_size;
448         ls->ls_rsbtbl_size = size;
449
450         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
451         if (!ls->ls_rsbtbl)
452                 goto out_lsfree;
453         for (i = 0; i < size; i++) {
454                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
455                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
456                 rwlock_init(&ls->ls_rsbtbl[i].lock);
457         }
458
459         size = dlm_config.ci_lkbtbl_size;
460         ls->ls_lkbtbl_size = size;
461
462         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
463         if (!ls->ls_lkbtbl)
464                 goto out_rsbfree;
465         for (i = 0; i < size; i++) {
466                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
467                 rwlock_init(&ls->ls_lkbtbl[i].lock);
468                 ls->ls_lkbtbl[i].counter = 1;
469         }
470
471         size = dlm_config.ci_dirtbl_size;
472         ls->ls_dirtbl_size = size;
473
474         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
475         if (!ls->ls_dirtbl)
476                 goto out_lkbfree;
477         for (i = 0; i < size; i++) {
478                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
479                 rwlock_init(&ls->ls_dirtbl[i].lock);
480         }
481
482         INIT_LIST_HEAD(&ls->ls_waiters);
483         mutex_init(&ls->ls_waiters_mutex);
484         INIT_LIST_HEAD(&ls->ls_orphans);
485         mutex_init(&ls->ls_orphans_mutex);
486         INIT_LIST_HEAD(&ls->ls_timeout);
487         mutex_init(&ls->ls_timeout_mutex);
488
489         INIT_LIST_HEAD(&ls->ls_nodes);
490         INIT_LIST_HEAD(&ls->ls_nodes_gone);
491         ls->ls_num_nodes = 0;
492         ls->ls_low_nodeid = 0;
493         ls->ls_total_weight = 0;
494         ls->ls_node_array = NULL;
495
496         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
497         ls->ls_stub_rsb.res_ls = ls;
498
499         ls->ls_debug_rsb_dentry = NULL;
500         ls->ls_debug_waiters_dentry = NULL;
501
502         init_waitqueue_head(&ls->ls_uevent_wait);
503         ls->ls_uevent_result = 0;
504         init_completion(&ls->ls_members_done);
505         ls->ls_members_result = -1;
506
507         ls->ls_recoverd_task = NULL;
508         mutex_init(&ls->ls_recoverd_active);
509         spin_lock_init(&ls->ls_recover_lock);
510         spin_lock_init(&ls->ls_rcom_spin);
511         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
512         ls->ls_recover_status = 0;
513         ls->ls_recover_seq = 0;
514         ls->ls_recover_args = NULL;
515         init_rwsem(&ls->ls_in_recovery);
516         INIT_LIST_HEAD(&ls->ls_requestqueue);
517         mutex_init(&ls->ls_requestqueue_mutex);
518         mutex_init(&ls->ls_clear_proc_locks);
519
520         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
521         if (!ls->ls_recover_buf)
522                 goto out_dirfree;
523
524         INIT_LIST_HEAD(&ls->ls_recover_list);
525         spin_lock_init(&ls->ls_recover_list_lock);
526         ls->ls_recover_list_count = 0;
527         ls->ls_local_handle = ls;
528         init_waitqueue_head(&ls->ls_wait_general);
529         INIT_LIST_HEAD(&ls->ls_root_list);
530         init_rwsem(&ls->ls_root_sem);
531
532         down_write(&ls->ls_in_recovery);
533
534         spin_lock(&lslist_lock);
535         list_add(&ls->ls_list, &lslist);
536         spin_unlock(&lslist_lock);
537
538         /* needs to find ls in lslist */
539         error = dlm_recoverd_start(ls);
540         if (error) {
541                 log_error(ls, "can't start dlm_recoverd %d", error);
542                 goto out_delist;
543         }
544
545         error = kobject_setup(ls);
546         if (error)
547                 goto out_stop;
548
549         error = kobject_register(&ls->ls_kobj);
550         if (error)
551                 goto out_stop;
552
553         /* let kobject handle freeing of ls if there's an error */
554         do_unreg = 1;
555
556         /* This uevent triggers dlm_controld in userspace to add us to the
557            group of nodes that are members of this lockspace (managed by the
558            cluster infrastructure.)  Once it's done that, it tells us who the
559            current lockspace members are (via configfs) and then tells the
560            lockspace to start running (via sysfs) in dlm_ls_start(). */
561
562         error = do_uevent(ls, 1);
563         if (error)
564                 goto out_stop;
565
566         wait_for_completion(&ls->ls_members_done);
567         error = ls->ls_members_result;
568         if (error)
569                 goto out_members;
570
571         dlm_create_debug_file(ls);
572
573         log_debug(ls, "join complete");
574
575         *lockspace = ls;
576         return 0;
577
578  out_members:
579         do_uevent(ls, 0);
580         dlm_clear_members(ls);
581         kfree(ls->ls_node_array);
582  out_stop:
583         dlm_recoverd_stop(ls);
584  out_delist:
585         spin_lock(&lslist_lock);
586         list_del(&ls->ls_list);
587         spin_unlock(&lslist_lock);
588         kfree(ls->ls_recover_buf);
589  out_dirfree:
590         kfree(ls->ls_dirtbl);
591  out_lkbfree:
592         kfree(ls->ls_lkbtbl);
593  out_rsbfree:
594         kfree(ls->ls_rsbtbl);
595  out_lsfree:
596         if (do_unreg)
597                 kobject_unregister(&ls->ls_kobj);
598         else
599                 kfree(ls);
600  out:
601         module_put(THIS_MODULE);
602         return error;
603 }
604
605 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
606                       uint32_t flags, int lvblen)
607 {
608         int error = 0;
609
610         mutex_lock(&ls_lock);
611         if (!ls_count)
612                 error = threads_start();
613         if (error)
614                 goto out;
615
616         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
617         if (!error)
618                 ls_count++;
619         else if (!ls_count)
620                 threads_stop();
621  out:
622         mutex_unlock(&ls_lock);
623         return error;
624 }
625
626 /* Return 1 if the lockspace still has active remote locks,
627  *        2 if the lockspace still has active local locks.
628  */
629 static int lockspace_busy(struct dlm_ls *ls)
630 {
631         int i, lkb_found = 0;
632         struct dlm_lkb *lkb;
633
634         /* NOTE: We check the lockidtbl here rather than the resource table.
635            This is because there may be LKBs queued as ASTs that have been
636            unlinked from their RSBs and are pending deletion once the AST has
637            been delivered */
638
639         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
640                 read_lock(&ls->ls_lkbtbl[i].lock);
641                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
642                         lkb_found = 1;
643                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
644                                             lkb_idtbl_list) {
645                                 if (!lkb->lkb_nodeid) {
646                                         read_unlock(&ls->ls_lkbtbl[i].lock);
647                                         return 2;
648                                 }
649                         }
650                 }
651                 read_unlock(&ls->ls_lkbtbl[i].lock);
652         }
653         return lkb_found;
654 }
655
656 static int release_lockspace(struct dlm_ls *ls, int force)
657 {
658         struct dlm_lkb *lkb;
659         struct dlm_rsb *rsb;
660         struct list_head *head;
661         int i;
662         int busy = lockspace_busy(ls);
663
664         if (busy > force)
665                 return -EBUSY;
666
667         if (force < 3)
668                 do_uevent(ls, 0);
669
670         dlm_recoverd_stop(ls);
671
672         remove_lockspace(ls);
673
674         dlm_delete_debug_file(ls);
675
676         dlm_astd_suspend();
677
678         kfree(ls->ls_recover_buf);
679
680         /*
681          * Free direntry structs.
682          */
683
684         dlm_dir_clear(ls);
685         kfree(ls->ls_dirtbl);
686
687         /*
688          * Free all lkb's on lkbtbl[] lists.
689          */
690
691         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
692                 head = &ls->ls_lkbtbl[i].list;
693                 while (!list_empty(head)) {
694                         lkb = list_entry(head->next, struct dlm_lkb,
695                                          lkb_idtbl_list);
696
697                         list_del(&lkb->lkb_idtbl_list);
698
699                         dlm_del_ast(lkb);
700
701                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
702                                 free_lvb(lkb->lkb_lvbptr);
703
704                         free_lkb(lkb);
705                 }
706         }
707         dlm_astd_resume();
708
709         kfree(ls->ls_lkbtbl);
710
711         /*
712          * Free all rsb's on rsbtbl[] lists
713          */
714
715         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
716                 head = &ls->ls_rsbtbl[i].list;
717                 while (!list_empty(head)) {
718                         rsb = list_entry(head->next, struct dlm_rsb,
719                                          res_hashchain);
720
721                         list_del(&rsb->res_hashchain);
722                         free_rsb(rsb);
723                 }
724
725                 head = &ls->ls_rsbtbl[i].toss;
726                 while (!list_empty(head)) {
727                         rsb = list_entry(head->next, struct dlm_rsb,
728                                          res_hashchain);
729                         list_del(&rsb->res_hashchain);
730                         free_rsb(rsb);
731                 }
732         }
733
734         kfree(ls->ls_rsbtbl);
735
736         /*
737          * Free structures on any other lists
738          */
739
740         dlm_purge_requestqueue(ls);
741         kfree(ls->ls_recover_args);
742         dlm_clear_free_entries(ls);
743         dlm_clear_members(ls);
744         dlm_clear_members_gone(ls);
745         kfree(ls->ls_node_array);
746         kobject_unregister(&ls->ls_kobj);
747         /* The ls structure will be freed when the kobject is done with */
748
749         mutex_lock(&ls_lock);
750         ls_count--;
751         if (!ls_count)
752                 threads_stop();
753         mutex_unlock(&ls_lock);
754
755         module_put(THIS_MODULE);
756         return 0;
757 }
758
759 /*
760  * Called when a system has released all its locks and is not going to use the
761  * lockspace any longer.  We free everything we're managing for this lockspace.
762  * Remaining nodes will go through the recovery process as if we'd died.  The
763  * lockspace must continue to function as usual, participating in recoveries,
764  * until this returns.
765  *
766  * Force has 4 possible values:
767  * 0 - don't destroy locksapce if it has any LKBs
768  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
769  * 2 - destroy lockspace regardless of LKBs
770  * 3 - destroy lockspace as part of a forced shutdown
771  */
772
773 int dlm_release_lockspace(void *lockspace, int force)
774 {
775         struct dlm_ls *ls;
776
777         ls = dlm_find_lockspace_local(lockspace);
778         if (!ls)
779                 return -EINVAL;
780         dlm_put_lockspace(ls);
781         return release_lockspace(ls, force);
782 }
783