Merge branch 'for-linus/2640/i2c' of git://git.fluff.org/bjdooks/linux
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25 #include "requestqueue.h"
26 #include "user.h"
27
28 static int                      ls_count;
29 static struct mutex             ls_lock;
30 static struct list_head         lslist;
31 static spinlock_t               lslist_lock;
32 static struct task_struct *     scand_task;
33
34
35 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
36 {
37         ssize_t ret = len;
38         int n = simple_strtol(buf, NULL, 0);
39
40         ls = dlm_find_lockspace_local(ls->ls_local_handle);
41         if (!ls)
42                 return -EINVAL;
43
44         switch (n) {
45         case 0:
46                 dlm_ls_stop(ls);
47                 break;
48         case 1:
49                 dlm_ls_start(ls);
50                 break;
51         default:
52                 ret = -EINVAL;
53         }
54         dlm_put_lockspace(ls);
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
78 {
79         uint32_t status = dlm_recover_status(ls);
80         return snprintf(buf, PAGE_SIZE, "%x\n", status);
81 }
82
83 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
84 {
85         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
86 }
87
88 struct dlm_attr {
89         struct attribute attr;
90         ssize_t (*show)(struct dlm_ls *, char *);
91         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
92 };
93
94 static struct dlm_attr dlm_attr_control = {
95         .attr  = {.name = "control", .mode = S_IWUSR},
96         .store = dlm_control_store
97 };
98
99 static struct dlm_attr dlm_attr_event = {
100         .attr  = {.name = "event_done", .mode = S_IWUSR},
101         .store = dlm_event_store
102 };
103
104 static struct dlm_attr dlm_attr_id = {
105         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
106         .show  = dlm_id_show,
107         .store = dlm_id_store
108 };
109
110 static struct dlm_attr dlm_attr_recover_status = {
111         .attr  = {.name = "recover_status", .mode = S_IRUGO},
112         .show  = dlm_recover_status_show
113 };
114
115 static struct dlm_attr dlm_attr_recover_nodeid = {
116         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
117         .show  = dlm_recover_nodeid_show
118 };
119
120 static struct attribute *dlm_attrs[] = {
121         &dlm_attr_control.attr,
122         &dlm_attr_event.attr,
123         &dlm_attr_id.attr,
124         &dlm_attr_recover_status.attr,
125         &dlm_attr_recover_nodeid.attr,
126         NULL,
127 };
128
129 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
130                              char *buf)
131 {
132         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
133         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
134         return a->show ? a->show(ls, buf) : 0;
135 }
136
137 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
138                               const char *buf, size_t len)
139 {
140         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
141         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
142         return a->store ? a->store(ls, buf, len) : len;
143 }
144
145 static void lockspace_kobj_release(struct kobject *k)
146 {
147         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
148         kfree(ls);
149 }
150
151 static const struct sysfs_ops dlm_attr_ops = {
152         .show  = dlm_attr_show,
153         .store = dlm_attr_store,
154 };
155
156 static struct kobj_type dlm_ktype = {
157         .default_attrs = dlm_attrs,
158         .sysfs_ops     = &dlm_attr_ops,
159         .release       = lockspace_kobj_release,
160 };
161
162 static struct kset *dlm_kset;
163
164 static int do_uevent(struct dlm_ls *ls, int in)
165 {
166         int error;
167
168         if (in)
169                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
170         else
171                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
172
173         log_debug(ls, "%s the lockspace group...", in ? "joining" : "leaving");
174
175         /* dlm_controld will see the uevent, do the necessary group management
176            and then write to sysfs to wake us */
177
178         error = wait_event_interruptible(ls->ls_uevent_wait,
179                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
180
181         log_debug(ls, "group event done %d %d", error, ls->ls_uevent_result);
182
183         if (error)
184                 goto out;
185
186         error = ls->ls_uevent_result;
187  out:
188         if (error)
189                 log_error(ls, "group %s failed %d %d", in ? "join" : "leave",
190                           error, ls->ls_uevent_result);
191         return error;
192 }
193
194 static int dlm_uevent(struct kset *kset, struct kobject *kobj,
195                       struct kobj_uevent_env *env)
196 {
197         struct dlm_ls *ls = container_of(kobj, struct dlm_ls, ls_kobj);
198
199         add_uevent_var(env, "LOCKSPACE=%s", ls->ls_name);
200         return 0;
201 }
202
203 static struct kset_uevent_ops dlm_uevent_ops = {
204         .uevent = dlm_uevent,
205 };
206
207 int __init dlm_lockspace_init(void)
208 {
209         ls_count = 0;
210         mutex_init(&ls_lock);
211         INIT_LIST_HEAD(&lslist);
212         spin_lock_init(&lslist_lock);
213
214         dlm_kset = kset_create_and_add("dlm", &dlm_uevent_ops, kernel_kobj);
215         if (!dlm_kset) {
216                 printk(KERN_WARNING "%s: can not create kset\n", __func__);
217                 return -ENOMEM;
218         }
219         return 0;
220 }
221
222 void dlm_lockspace_exit(void)
223 {
224         kset_unregister(dlm_kset);
225 }
226
227 static struct dlm_ls *find_ls_to_scan(void)
228 {
229         struct dlm_ls *ls;
230
231         spin_lock(&lslist_lock);
232         list_for_each_entry(ls, &lslist, ls_list) {
233                 if (time_after_eq(jiffies, ls->ls_scan_time +
234                                             dlm_config.ci_scan_secs * HZ)) {
235                         spin_unlock(&lslist_lock);
236                         return ls;
237                 }
238         }
239         spin_unlock(&lslist_lock);
240         return NULL;
241 }
242
243 static int dlm_scand(void *data)
244 {
245         struct dlm_ls *ls;
246
247         while (!kthread_should_stop()) {
248                 ls = find_ls_to_scan();
249                 if (ls) {
250                         if (dlm_lock_recovery_try(ls)) {
251                                 ls->ls_scan_time = jiffies;
252                                 dlm_scan_rsbs(ls);
253                                 dlm_scan_timeout(ls);
254                                 dlm_scan_waiters(ls);
255                                 dlm_unlock_recovery(ls);
256                         } else {
257                                 ls->ls_scan_time += HZ;
258                         }
259                         continue;
260                 }
261                 schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
262         }
263         return 0;
264 }
265
266 static int dlm_scand_start(void)
267 {
268         struct task_struct *p;
269         int error = 0;
270
271         p = kthread_run(dlm_scand, NULL, "dlm_scand");
272         if (IS_ERR(p))
273                 error = PTR_ERR(p);
274         else
275                 scand_task = p;
276         return error;
277 }
278
279 static void dlm_scand_stop(void)
280 {
281         kthread_stop(scand_task);
282 }
283
284 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
285 {
286         struct dlm_ls *ls;
287
288         spin_lock(&lslist_lock);
289
290         list_for_each_entry(ls, &lslist, ls_list) {
291                 if (ls->ls_global_id == id) {
292                         ls->ls_count++;
293                         goto out;
294                 }
295         }
296         ls = NULL;
297  out:
298         spin_unlock(&lslist_lock);
299         return ls;
300 }
301
302 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
303 {
304         struct dlm_ls *ls;
305
306         spin_lock(&lslist_lock);
307         list_for_each_entry(ls, &lslist, ls_list) {
308                 if (ls->ls_local_handle == lockspace) {
309                         ls->ls_count++;
310                         goto out;
311                 }
312         }
313         ls = NULL;
314  out:
315         spin_unlock(&lslist_lock);
316         return ls;
317 }
318
319 struct dlm_ls *dlm_find_lockspace_device(int minor)
320 {
321         struct dlm_ls *ls;
322
323         spin_lock(&lslist_lock);
324         list_for_each_entry(ls, &lslist, ls_list) {
325                 if (ls->ls_device.minor == minor) {
326                         ls->ls_count++;
327                         goto out;
328                 }
329         }
330         ls = NULL;
331  out:
332         spin_unlock(&lslist_lock);
333         return ls;
334 }
335
336 void dlm_put_lockspace(struct dlm_ls *ls)
337 {
338         spin_lock(&lslist_lock);
339         ls->ls_count--;
340         spin_unlock(&lslist_lock);
341 }
342
343 static void remove_lockspace(struct dlm_ls *ls)
344 {
345         for (;;) {
346                 spin_lock(&lslist_lock);
347                 if (ls->ls_count == 0) {
348                         WARN_ON(ls->ls_create_count != 0);
349                         list_del(&ls->ls_list);
350                         spin_unlock(&lslist_lock);
351                         return;
352                 }
353                 spin_unlock(&lslist_lock);
354                 ssleep(1);
355         }
356 }
357
358 static int threads_start(void)
359 {
360         int error;
361
362         /* Thread which process lock requests for all lockspace's */
363         error = dlm_astd_start();
364         if (error) {
365                 log_print("cannot start dlm_astd thread %d", error);
366                 goto fail;
367         }
368
369         error = dlm_scand_start();
370         if (error) {
371                 log_print("cannot start dlm_scand thread %d", error);
372                 goto astd_fail;
373         }
374
375         /* Thread for sending/receiving messages for all lockspace's */
376         error = dlm_lowcomms_start();
377         if (error) {
378                 log_print("cannot start dlm lowcomms %d", error);
379                 goto scand_fail;
380         }
381
382         return 0;
383
384  scand_fail:
385         dlm_scand_stop();
386  astd_fail:
387         dlm_astd_stop();
388  fail:
389         return error;
390 }
391
392 static void threads_stop(void)
393 {
394         dlm_scand_stop();
395         dlm_lowcomms_stop();
396         dlm_astd_stop();
397 }
398
399 static int new_lockspace(const char *name, int namelen, void **lockspace,
400                          uint32_t flags, int lvblen)
401 {
402         struct dlm_ls *ls;
403         int i, size, error;
404         int do_unreg = 0;
405
406         if (namelen > DLM_LOCKSPACE_LEN)
407                 return -EINVAL;
408
409         if (!lvblen || (lvblen % 8))
410                 return -EINVAL;
411
412         if (!try_module_get(THIS_MODULE))
413                 return -EINVAL;
414
415         if (!dlm_user_daemon_available()) {
416                 module_put(THIS_MODULE);
417                 return -EUNATCH;
418         }
419
420         error = 0;
421
422         spin_lock(&lslist_lock);
423         list_for_each_entry(ls, &lslist, ls_list) {
424                 WARN_ON(ls->ls_create_count <= 0);
425                 if (ls->ls_namelen != namelen)
426                         continue;
427                 if (memcmp(ls->ls_name, name, namelen))
428                         continue;
429                 if (flags & DLM_LSFL_NEWEXCL) {
430                         error = -EEXIST;
431                         break;
432                 }
433                 ls->ls_create_count++;
434                 *lockspace = ls;
435                 error = 1;
436                 break;
437         }
438         spin_unlock(&lslist_lock);
439
440         if (error)
441                 goto out;
442
443         error = -ENOMEM;
444
445         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_NOFS);
446         if (!ls)
447                 goto out;
448         memcpy(ls->ls_name, name, namelen);
449         ls->ls_namelen = namelen;
450         ls->ls_lvblen = lvblen;
451         ls->ls_count = 0;
452         ls->ls_flags = 0;
453         ls->ls_scan_time = jiffies;
454
455         if (flags & DLM_LSFL_TIMEWARN)
456                 set_bit(LSFL_TIMEWARN, &ls->ls_flags);
457
458         /* ls_exflags are forced to match among nodes, and we don't
459            need to require all nodes to have some flags set */
460         ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS |
461                                     DLM_LSFL_NEWEXCL));
462
463         size = dlm_config.ci_rsbtbl_size;
464         ls->ls_rsbtbl_size = size;
465
466         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_NOFS);
467         if (!ls->ls_rsbtbl)
468                 goto out_lsfree;
469         for (i = 0; i < size; i++) {
470                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
471                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
472                 spin_lock_init(&ls->ls_rsbtbl[i].lock);
473         }
474
475         size = dlm_config.ci_lkbtbl_size;
476         ls->ls_lkbtbl_size = size;
477
478         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_NOFS);
479         if (!ls->ls_lkbtbl)
480                 goto out_rsbfree;
481         for (i = 0; i < size; i++) {
482                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
483                 rwlock_init(&ls->ls_lkbtbl[i].lock);
484                 ls->ls_lkbtbl[i].counter = 1;
485         }
486
487         size = dlm_config.ci_dirtbl_size;
488         ls->ls_dirtbl_size = size;
489
490         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_NOFS);
491         if (!ls->ls_dirtbl)
492                 goto out_lkbfree;
493         for (i = 0; i < size; i++) {
494                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
495                 spin_lock_init(&ls->ls_dirtbl[i].lock);
496         }
497
498         INIT_LIST_HEAD(&ls->ls_waiters);
499         mutex_init(&ls->ls_waiters_mutex);
500         INIT_LIST_HEAD(&ls->ls_orphans);
501         mutex_init(&ls->ls_orphans_mutex);
502         INIT_LIST_HEAD(&ls->ls_timeout);
503         mutex_init(&ls->ls_timeout_mutex);
504
505         INIT_LIST_HEAD(&ls->ls_nodes);
506         INIT_LIST_HEAD(&ls->ls_nodes_gone);
507         ls->ls_num_nodes = 0;
508         ls->ls_low_nodeid = 0;
509         ls->ls_total_weight = 0;
510         ls->ls_node_array = NULL;
511
512         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
513         ls->ls_stub_rsb.res_ls = ls;
514
515         ls->ls_debug_rsb_dentry = NULL;
516         ls->ls_debug_waiters_dentry = NULL;
517
518         init_waitqueue_head(&ls->ls_uevent_wait);
519         ls->ls_uevent_result = 0;
520         init_completion(&ls->ls_members_done);
521         ls->ls_members_result = -1;
522
523         ls->ls_recoverd_task = NULL;
524         mutex_init(&ls->ls_recoverd_active);
525         spin_lock_init(&ls->ls_recover_lock);
526         spin_lock_init(&ls->ls_rcom_spin);
527         get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t));
528         ls->ls_recover_status = 0;
529         ls->ls_recover_seq = 0;
530         ls->ls_recover_args = NULL;
531         init_rwsem(&ls->ls_in_recovery);
532         init_rwsem(&ls->ls_recv_active);
533         INIT_LIST_HEAD(&ls->ls_requestqueue);
534         mutex_init(&ls->ls_requestqueue_mutex);
535         mutex_init(&ls->ls_clear_proc_locks);
536
537         ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
538         if (!ls->ls_recover_buf)
539                 goto out_dirfree;
540
541         INIT_LIST_HEAD(&ls->ls_recover_list);
542         spin_lock_init(&ls->ls_recover_list_lock);
543         ls->ls_recover_list_count = 0;
544         ls->ls_local_handle = ls;
545         init_waitqueue_head(&ls->ls_wait_general);
546         INIT_LIST_HEAD(&ls->ls_root_list);
547         init_rwsem(&ls->ls_root_sem);
548
549         down_write(&ls->ls_in_recovery);
550
551         spin_lock(&lslist_lock);
552         ls->ls_create_count = 1;
553         list_add(&ls->ls_list, &lslist);
554         spin_unlock(&lslist_lock);
555
556         /* needs to find ls in lslist */
557         error = dlm_recoverd_start(ls);
558         if (error) {
559                 log_error(ls, "can't start dlm_recoverd %d", error);
560                 goto out_delist;
561         }
562
563         ls->ls_kobj.kset = dlm_kset;
564         error = kobject_init_and_add(&ls->ls_kobj, &dlm_ktype, NULL,
565                                      "%s", ls->ls_name);
566         if (error)
567                 goto out_stop;
568         kobject_uevent(&ls->ls_kobj, KOBJ_ADD);
569
570         /* let kobject handle freeing of ls if there's an error */
571         do_unreg = 1;
572
573         /* This uevent triggers dlm_controld in userspace to add us to the
574            group of nodes that are members of this lockspace (managed by the
575            cluster infrastructure.)  Once it's done that, it tells us who the
576            current lockspace members are (via configfs) and then tells the
577            lockspace to start running (via sysfs) in dlm_ls_start(). */
578
579         error = do_uevent(ls, 1);
580         if (error)
581                 goto out_stop;
582
583         wait_for_completion(&ls->ls_members_done);
584         error = ls->ls_members_result;
585         if (error)
586                 goto out_members;
587
588         dlm_create_debug_file(ls);
589
590         log_debug(ls, "join complete");
591         *lockspace = ls;
592         return 0;
593
594  out_members:
595         do_uevent(ls, 0);
596         dlm_clear_members(ls);
597         kfree(ls->ls_node_array);
598  out_stop:
599         dlm_recoverd_stop(ls);
600  out_delist:
601         spin_lock(&lslist_lock);
602         list_del(&ls->ls_list);
603         spin_unlock(&lslist_lock);
604         kfree(ls->ls_recover_buf);
605  out_dirfree:
606         kfree(ls->ls_dirtbl);
607  out_lkbfree:
608         kfree(ls->ls_lkbtbl);
609  out_rsbfree:
610         kfree(ls->ls_rsbtbl);
611  out_lsfree:
612         if (do_unreg)
613                 kobject_put(&ls->ls_kobj);
614         else
615                 kfree(ls);
616  out:
617         module_put(THIS_MODULE);
618         return error;
619 }
620
621 int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
622                       uint32_t flags, int lvblen)
623 {
624         int error = 0;
625
626         mutex_lock(&ls_lock);
627         if (!ls_count)
628                 error = threads_start();
629         if (error)
630                 goto out;
631
632         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
633         if (!error)
634                 ls_count++;
635         if (error > 0)
636                 error = 0;
637         if (!ls_count)
638                 threads_stop();
639  out:
640         mutex_unlock(&ls_lock);
641         return error;
642 }
643
644 /* Return 1 if the lockspace still has active remote locks,
645  *        2 if the lockspace still has active local locks.
646  */
647 static int lockspace_busy(struct dlm_ls *ls)
648 {
649         int i, lkb_found = 0;
650         struct dlm_lkb *lkb;
651
652         /* NOTE: We check the lockidtbl here rather than the resource table.
653            This is because there may be LKBs queued as ASTs that have been
654            unlinked from their RSBs and are pending deletion once the AST has
655            been delivered */
656
657         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
658                 read_lock(&ls->ls_lkbtbl[i].lock);
659                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
660                         lkb_found = 1;
661                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
662                                             lkb_idtbl_list) {
663                                 if (!lkb->lkb_nodeid) {
664                                         read_unlock(&ls->ls_lkbtbl[i].lock);
665                                         return 2;
666                                 }
667                         }
668                 }
669                 read_unlock(&ls->ls_lkbtbl[i].lock);
670         }
671         return lkb_found;
672 }
673
674 static int release_lockspace(struct dlm_ls *ls, int force)
675 {
676         struct dlm_lkb *lkb;
677         struct dlm_rsb *rsb;
678         struct list_head *head;
679         int i, busy, rv;
680
681         busy = lockspace_busy(ls);
682
683         spin_lock(&lslist_lock);
684         if (ls->ls_create_count == 1) {
685                 if (busy > force)
686                         rv = -EBUSY;
687                 else {
688                         /* remove_lockspace takes ls off lslist */
689                         ls->ls_create_count = 0;
690                         rv = 0;
691                 }
692         } else if (ls->ls_create_count > 1) {
693                 rv = --ls->ls_create_count;
694         } else {
695                 rv = -EINVAL;
696         }
697         spin_unlock(&lslist_lock);
698
699         if (rv) {
700                 log_debug(ls, "release_lockspace no remove %d", rv);
701                 return rv;
702         }
703
704         dlm_device_deregister(ls);
705
706         if (force < 3 && dlm_user_daemon_available())
707                 do_uevent(ls, 0);
708
709         dlm_recoverd_stop(ls);
710
711         remove_lockspace(ls);
712
713         dlm_delete_debug_file(ls);
714
715         dlm_astd_suspend();
716
717         kfree(ls->ls_recover_buf);
718
719         /*
720          * Free direntry structs.
721          */
722
723         dlm_dir_clear(ls);
724         kfree(ls->ls_dirtbl);
725
726         /*
727          * Free all lkb's on lkbtbl[] lists.
728          */
729
730         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
731                 head = &ls->ls_lkbtbl[i].list;
732                 while (!list_empty(head)) {
733                         lkb = list_entry(head->next, struct dlm_lkb,
734                                          lkb_idtbl_list);
735
736                         list_del(&lkb->lkb_idtbl_list);
737
738                         dlm_del_ast(lkb);
739
740                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
741                                 dlm_free_lvb(lkb->lkb_lvbptr);
742
743                         dlm_free_lkb(lkb);
744                 }
745         }
746         dlm_astd_resume();
747
748         kfree(ls->ls_lkbtbl);
749
750         /*
751          * Free all rsb's on rsbtbl[] lists
752          */
753
754         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
755                 head = &ls->ls_rsbtbl[i].list;
756                 while (!list_empty(head)) {
757                         rsb = list_entry(head->next, struct dlm_rsb,
758                                          res_hashchain);
759
760                         list_del(&rsb->res_hashchain);
761                         dlm_free_rsb(rsb);
762                 }
763
764                 head = &ls->ls_rsbtbl[i].toss;
765                 while (!list_empty(head)) {
766                         rsb = list_entry(head->next, struct dlm_rsb,
767                                          res_hashchain);
768                         list_del(&rsb->res_hashchain);
769                         dlm_free_rsb(rsb);
770                 }
771         }
772
773         kfree(ls->ls_rsbtbl);
774
775         /*
776          * Free structures on any other lists
777          */
778
779         dlm_purge_requestqueue(ls);
780         kfree(ls->ls_recover_args);
781         dlm_clear_free_entries(ls);
782         dlm_clear_members(ls);
783         dlm_clear_members_gone(ls);
784         kfree(ls->ls_node_array);
785         log_debug(ls, "release_lockspace final free");
786         kobject_put(&ls->ls_kobj);
787         /* The ls structure will be freed when the kobject is done with */
788
789         module_put(THIS_MODULE);
790         return 0;
791 }
792
793 /*
794  * Called when a system has released all its locks and is not going to use the
795  * lockspace any longer.  We free everything we're managing for this lockspace.
796  * Remaining nodes will go through the recovery process as if we'd died.  The
797  * lockspace must continue to function as usual, participating in recoveries,
798  * until this returns.
799  *
800  * Force has 4 possible values:
801  * 0 - don't destroy locksapce if it has any LKBs
802  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
803  * 2 - destroy lockspace regardless of LKBs
804  * 3 - destroy lockspace as part of a forced shutdown
805  */
806
807 int dlm_release_lockspace(void *lockspace, int force)
808 {
809         struct dlm_ls *ls;
810         int error;
811
812         ls = dlm_find_lockspace_local(lockspace);
813         if (!ls)
814                 return -EINVAL;
815         dlm_put_lockspace(ls);
816
817         mutex_lock(&ls_lock);
818         error = release_lockspace(ls, force);
819         if (!error)
820                 ls_count--;
821         if (!ls_count)
822                 threads_stop();
823         mutex_unlock(&ls_lock);
824
825         return error;
826 }
827
828 void dlm_stop_lockspaces(void)
829 {
830         struct dlm_ls *ls;
831
832  restart:
833         spin_lock(&lslist_lock);
834         list_for_each_entry(ls, &lslist, ls_list) {
835                 if (!test_bit(LSFL_RUNNING, &ls->ls_flags))
836                         continue;
837                 spin_unlock(&lslist_lock);
838                 log_error(ls, "no userland control daemon, stopping lockspace");
839                 dlm_ls_stop(ls);
840                 goto restart;
841         }
842         spin_unlock(&lslist_lock);
843 }
844