Merge branch 'master'
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return sprintf(buf, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return sprintf(buf, "%x\n", status);
82 }
83
84 struct dlm_attr {
85         struct attribute attr;
86         ssize_t (*show)(struct dlm_ls *, char *);
87         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
88 };
89
90 static struct dlm_attr dlm_attr_control = {
91         .attr  = {.name = "control", .mode = S_IWUSR},
92         .store = dlm_control_store
93 };
94
95 static struct dlm_attr dlm_attr_event = {
96         .attr  = {.name = "event_done", .mode = S_IWUSR},
97         .store = dlm_event_store
98 };
99
100 static struct dlm_attr dlm_attr_id = {
101         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
102         .show  = dlm_id_show,
103         .store = dlm_id_store
104 };
105
106 static struct dlm_attr dlm_attr_recover_status = {
107         .attr  = {.name = "recover_status", .mode = S_IRUGO},
108         .show  = dlm_recover_status_show
109 };
110
111 static struct attribute *dlm_attrs[] = {
112         &dlm_attr_control.attr,
113         &dlm_attr_event.attr,
114         &dlm_attr_id.attr,
115         &dlm_attr_recover_status.attr,
116         NULL,
117 };
118
119 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
120                              char *buf)
121 {
122         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
123         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
124         return a->show ? a->show(ls, buf) : 0;
125 }
126
127 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
128                               const char *buf, size_t len)
129 {
130         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
131         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
132         return a->store ? a->store(ls, buf, len) : len;
133 }
134
135 static struct sysfs_ops dlm_attr_ops = {
136         .show  = dlm_attr_show,
137         .store = dlm_attr_store,
138 };
139
140 static struct kobj_type dlm_ktype = {
141         .default_attrs = dlm_attrs,
142         .sysfs_ops     = &dlm_attr_ops,
143 };
144
145 static struct kset dlm_kset = {
146         .subsys = &kernel_subsys,
147         .kobj   = {.name = "dlm",},
148         .ktype  = &dlm_ktype,
149 };
150
151 static int kobject_setup(struct dlm_ls *ls)
152 {
153         char lsname[DLM_LOCKSPACE_LEN];
154         int error;
155
156         memset(lsname, 0, DLM_LOCKSPACE_LEN);
157         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
158
159         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
160         if (error)
161                 return error;
162
163         ls->ls_kobj.kset = &dlm_kset;
164         ls->ls_kobj.ktype = &dlm_ktype;
165         return 0;
166 }
167
168 static int do_uevent(struct dlm_ls *ls, int in)
169 {
170         int error;
171
172         if (in)
173                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
174         else
175                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
176
177         error = wait_event_interruptible(ls->ls_uevent_wait,
178                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
179         if (error)
180                 goto out;
181
182         error = ls->ls_uevent_result;
183  out:
184         return error;
185 }
186
187
188 int dlm_lockspace_init(void)
189 {
190         int error;
191
192         ls_count = 0;
193         mutex_init(&ls_lock);
194         INIT_LIST_HEAD(&lslist);
195         spin_lock_init(&lslist_lock);
196
197         error = kset_register(&dlm_kset);
198         if (error)
199                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
200         return error;
201 }
202
203 void dlm_lockspace_exit(void)
204 {
205         kset_unregister(&dlm_kset);
206 }
207
208 static int dlm_scand(void *data)
209 {
210         struct dlm_ls *ls;
211
212         while (!kthread_should_stop()) {
213                 list_for_each_entry(ls, &lslist, ls_list)
214                         dlm_scan_rsbs(ls);
215                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
216         }
217         return 0;
218 }
219
220 static int dlm_scand_start(void)
221 {
222         struct task_struct *p;
223         int error = 0;
224
225         p = kthread_run(dlm_scand, NULL, "dlm_scand");
226         if (IS_ERR(p))
227                 error = PTR_ERR(p);
228         else
229                 scand_task = p;
230         return error;
231 }
232
233 static void dlm_scand_stop(void)
234 {
235         kthread_stop(scand_task);
236 }
237
238 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
239 {
240         struct dlm_ls *ls;
241
242         spin_lock(&lslist_lock);
243
244         list_for_each_entry(ls, &lslist, ls_list) {
245                 if (ls->ls_namelen == namelen &&
246                     memcmp(ls->ls_name, name, namelen) == 0)
247                         goto out;
248         }
249         ls = NULL;
250  out:
251         spin_unlock(&lslist_lock);
252         return ls;
253 }
254
255 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
256 {
257         struct dlm_ls *ls;
258
259         spin_lock(&lslist_lock);
260
261         list_for_each_entry(ls, &lslist, ls_list) {
262                 if (ls->ls_global_id == id) {
263                         ls->ls_count++;
264                         goto out;
265                 }
266         }
267         ls = NULL;
268  out:
269         spin_unlock(&lslist_lock);
270         return ls;
271 }
272
273 struct dlm_ls *dlm_find_lockspace_local(void *id)
274 {
275         struct dlm_ls *ls = id;
276
277         spin_lock(&lslist_lock);
278         ls->ls_count++;
279         spin_unlock(&lslist_lock);
280         return ls;
281 }
282
283 void dlm_put_lockspace(struct dlm_ls *ls)
284 {
285         spin_lock(&lslist_lock);
286         ls->ls_count--;
287         spin_unlock(&lslist_lock);
288 }
289
290 static void remove_lockspace(struct dlm_ls *ls)
291 {
292         for (;;) {
293                 spin_lock(&lslist_lock);
294                 if (ls->ls_count == 0) {
295                         list_del(&ls->ls_list);
296                         spin_unlock(&lslist_lock);
297                         return;
298                 }
299                 spin_unlock(&lslist_lock);
300                 ssleep(1);
301         }
302 }
303
304 static int threads_start(void)
305 {
306         int error;
307
308         /* Thread which process lock requests for all lockspace's */
309         error = dlm_astd_start();
310         if (error) {
311                 log_print("cannot start dlm_astd thread %d", error);
312                 goto fail;
313         }
314
315         error = dlm_scand_start();
316         if (error) {
317                 log_print("cannot start dlm_scand thread %d", error);
318                 goto astd_fail;
319         }
320
321         /* Thread for sending/receiving messages for all lockspace's */
322         error = dlm_lowcomms_start();
323         if (error) {
324                 log_print("cannot start dlm lowcomms %d", error);
325                 goto scand_fail;
326         }
327
328         return 0;
329
330  scand_fail:
331         dlm_scand_stop();
332  astd_fail:
333         dlm_astd_stop();
334  fail:
335         return error;
336 }
337
338 static void threads_stop(void)
339 {
340         dlm_scand_stop();
341         dlm_lowcomms_stop();
342         dlm_astd_stop();
343 }
344
345 static int new_lockspace(char *name, int namelen, void **lockspace,
346                          uint32_t flags, int lvblen)
347 {
348         struct dlm_ls *ls;
349         int i, size, error = -ENOMEM;
350
351         if (namelen > DLM_LOCKSPACE_LEN)
352                 return -EINVAL;
353
354         if (!lvblen || (lvblen % 8))
355                 return -EINVAL;
356
357         if (!try_module_get(THIS_MODULE))
358                 return -EINVAL;
359
360         ls = dlm_find_lockspace_name(name, namelen);
361         if (ls) {
362                 *lockspace = ls;
363                 module_put(THIS_MODULE);
364                 return -EEXIST;
365         }
366
367         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
368         if (!ls)
369                 goto out;
370         memcpy(ls->ls_name, name, namelen);
371         ls->ls_namelen = namelen;
372         ls->ls_exflags = flags;
373         ls->ls_lvblen = lvblen;
374         ls->ls_count = 0;
375         ls->ls_flags = 0;
376
377         size = dlm_config.rsbtbl_size;
378         ls->ls_rsbtbl_size = size;
379
380         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
381         if (!ls->ls_rsbtbl)
382                 goto out_lsfree;
383         for (i = 0; i < size; i++) {
384                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
385                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
386                 rwlock_init(&ls->ls_rsbtbl[i].lock);
387         }
388
389         size = dlm_config.lkbtbl_size;
390         ls->ls_lkbtbl_size = size;
391
392         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
393         if (!ls->ls_lkbtbl)
394                 goto out_rsbfree;
395         for (i = 0; i < size; i++) {
396                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
397                 rwlock_init(&ls->ls_lkbtbl[i].lock);
398                 ls->ls_lkbtbl[i].counter = 1;
399         }
400
401         size = dlm_config.dirtbl_size;
402         ls->ls_dirtbl_size = size;
403
404         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
405         if (!ls->ls_dirtbl)
406                 goto out_lkbfree;
407         for (i = 0; i < size; i++) {
408                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
409                 rwlock_init(&ls->ls_dirtbl[i].lock);
410         }
411
412         INIT_LIST_HEAD(&ls->ls_waiters);
413         mutex_init(&ls->ls_waiters_mutex);
414
415         INIT_LIST_HEAD(&ls->ls_nodes);
416         INIT_LIST_HEAD(&ls->ls_nodes_gone);
417         ls->ls_num_nodes = 0;
418         ls->ls_low_nodeid = 0;
419         ls->ls_total_weight = 0;
420         ls->ls_node_array = NULL;
421
422         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
423         ls->ls_stub_rsb.res_ls = ls;
424
425         ls->ls_debug_dentry = NULL;
426
427         init_waitqueue_head(&ls->ls_uevent_wait);
428         ls->ls_uevent_result = 0;
429
430         ls->ls_recoverd_task = NULL;
431         mutex_init(&ls->ls_recoverd_active);
432         spin_lock_init(&ls->ls_recover_lock);
433         ls->ls_recover_status = 0;
434         ls->ls_recover_seq = 0;
435         ls->ls_recover_args = NULL;
436         init_rwsem(&ls->ls_in_recovery);
437         INIT_LIST_HEAD(&ls->ls_requestqueue);
438         mutex_init(&ls->ls_requestqueue_mutex);
439
440         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
441         if (!ls->ls_recover_buf)
442                 goto out_dirfree;
443
444         INIT_LIST_HEAD(&ls->ls_recover_list);
445         spin_lock_init(&ls->ls_recover_list_lock);
446         ls->ls_recover_list_count = 0;
447         init_waitqueue_head(&ls->ls_wait_general);
448         INIT_LIST_HEAD(&ls->ls_root_list);
449         init_rwsem(&ls->ls_root_sem);
450
451         down_write(&ls->ls_in_recovery);
452
453         error = dlm_recoverd_start(ls);
454         if (error) {
455                 log_error(ls, "can't start dlm_recoverd %d", error);
456                 goto out_rcomfree;
457         }
458
459         spin_lock(&lslist_lock);
460         list_add(&ls->ls_list, &lslist);
461         spin_unlock(&lslist_lock);
462
463         dlm_create_debug_file(ls);
464
465         error = kobject_setup(ls);
466         if (error)
467                 goto out_del;
468
469         error = kobject_register(&ls->ls_kobj);
470         if (error)
471                 goto out_del;
472
473         error = do_uevent(ls, 1);
474         if (error)
475                 goto out_unreg;
476
477         *lockspace = ls;
478         return 0;
479
480  out_unreg:
481         kobject_unregister(&ls->ls_kobj);
482  out_del:
483         dlm_delete_debug_file(ls);
484         spin_lock(&lslist_lock);
485         list_del(&ls->ls_list);
486         spin_unlock(&lslist_lock);
487         dlm_recoverd_stop(ls);
488  out_rcomfree:
489         kfree(ls->ls_recover_buf);
490  out_dirfree:
491         kfree(ls->ls_dirtbl);
492  out_lkbfree:
493         kfree(ls->ls_lkbtbl);
494  out_rsbfree:
495         kfree(ls->ls_rsbtbl);
496  out_lsfree:
497         kfree(ls);
498  out:
499         module_put(THIS_MODULE);
500         return error;
501 }
502
503 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
504                       uint32_t flags, int lvblen)
505 {
506         int error = 0;
507
508         mutex_lock(&ls_lock);
509         if (!ls_count)
510                 error = threads_start();
511         if (error)
512                 goto out;
513
514         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
515         if (!error)
516                 ls_count++;
517  out:
518         mutex_unlock(&ls_lock);
519         return error;
520 }
521
522 /* Return 1 if the lockspace still has active remote locks,
523  *        2 if the lockspace still has active local locks.
524  */
525 static int lockspace_busy(struct dlm_ls *ls)
526 {
527         int i, lkb_found = 0;
528         struct dlm_lkb *lkb;
529
530         /* NOTE: We check the lockidtbl here rather than the resource table.
531            This is because there may be LKBs queued as ASTs that have been
532            unlinked from their RSBs and are pending deletion once the AST has
533            been delivered */
534
535         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
536                 read_lock(&ls->ls_lkbtbl[i].lock);
537                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
538                         lkb_found = 1;
539                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
540                                             lkb_idtbl_list) {
541                                 if (!lkb->lkb_nodeid) {
542                                         read_unlock(&ls->ls_lkbtbl[i].lock);
543                                         return 2;
544                                 }
545                         }
546                 }
547                 read_unlock(&ls->ls_lkbtbl[i].lock);
548         }
549         return lkb_found;
550 }
551
552 static int release_lockspace(struct dlm_ls *ls, int force)
553 {
554         struct dlm_lkb *lkb;
555         struct dlm_rsb *rsb;
556         struct list_head *head;
557         int i;
558         int busy = lockspace_busy(ls);
559
560         if (busy > force)
561                 return -EBUSY;
562
563         if (force < 3)
564                 do_uevent(ls, 0);
565
566         dlm_recoverd_stop(ls);
567
568         remove_lockspace(ls);
569
570         dlm_delete_debug_file(ls);
571
572         dlm_astd_suspend();
573
574         kfree(ls->ls_recover_buf);
575
576         /*
577          * Free direntry structs.
578          */
579
580         dlm_dir_clear(ls);
581         kfree(ls->ls_dirtbl);
582
583         /*
584          * Free all lkb's on lkbtbl[] lists.
585          */
586
587         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
588                 head = &ls->ls_lkbtbl[i].list;
589                 while (!list_empty(head)) {
590                         lkb = list_entry(head->next, struct dlm_lkb,
591                                          lkb_idtbl_list);
592
593                         list_del(&lkb->lkb_idtbl_list);
594
595                         dlm_del_ast(lkb);
596
597                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
598                                 free_lvb(lkb->lkb_lvbptr);
599
600                         free_lkb(lkb);
601                 }
602         }
603         dlm_astd_resume();
604
605         kfree(ls->ls_lkbtbl);
606
607         /*
608          * Free all rsb's on rsbtbl[] lists
609          */
610
611         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
612                 head = &ls->ls_rsbtbl[i].list;
613                 while (!list_empty(head)) {
614                         rsb = list_entry(head->next, struct dlm_rsb,
615                                          res_hashchain);
616
617                         list_del(&rsb->res_hashchain);
618                         free_rsb(rsb);
619                 }
620
621                 head = &ls->ls_rsbtbl[i].toss;
622                 while (!list_empty(head)) {
623                         rsb = list_entry(head->next, struct dlm_rsb,
624                                          res_hashchain);
625                         list_del(&rsb->res_hashchain);
626                         free_rsb(rsb);
627                 }
628         }
629
630         kfree(ls->ls_rsbtbl);
631
632         /*
633          * Free structures on any other lists
634          */
635
636         kfree(ls->ls_recover_args);
637         dlm_clear_free_entries(ls);
638         dlm_clear_members(ls);
639         dlm_clear_members_gone(ls);
640         kfree(ls->ls_node_array);
641         kobject_unregister(&ls->ls_kobj);
642         kfree(ls);
643
644         mutex_lock(&ls_lock);
645         ls_count--;
646         if (!ls_count)
647                 threads_stop();
648         mutex_unlock(&ls_lock);
649
650         module_put(THIS_MODULE);
651         return 0;
652 }
653
654 /*
655  * Called when a system has released all its locks and is not going to use the
656  * lockspace any longer.  We free everything we're managing for this lockspace.
657  * Remaining nodes will go through the recovery process as if we'd died.  The
658  * lockspace must continue to function as usual, participating in recoveries,
659  * until this returns.
660  *
661  * Force has 4 possible values:
662  * 0 - don't destroy locksapce if it has any LKBs
663  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
664  * 2 - destroy lockspace regardless of LKBs
665  * 3 - destroy lockspace as part of a forced shutdown
666  */
667
668 int dlm_release_lockspace(void *lockspace, int force)
669 {
670         struct dlm_ls *ls;
671
672         ls = dlm_find_lockspace_local(lockspace);
673         if (!ls)
674                 return -EINVAL;
675         dlm_put_lockspace(ls);
676         return release_lockspace(ls, force);
677 }
678