Merge branch 'master'
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24
25 #ifdef CONFIG_DLM_DEBUG
26 int dlm_create_debug_file(struct dlm_ls *ls);
27 void dlm_delete_debug_file(struct dlm_ls *ls);
28 #else
29 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
30 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
31 #endif
32
33 static int                      ls_count;
34 static struct mutex             ls_lock;
35 static struct list_head         lslist;
36 static spinlock_t               lslist_lock;
37 static struct task_struct *     scand_task;
38
39
40 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
41 {
42         ssize_t ret = len;
43         int n = simple_strtol(buf, NULL, 0);
44
45         switch (n) {
46         case 0:
47                 dlm_ls_stop(ls);
48                 break;
49         case 1:
50                 dlm_ls_start(ls);
51                 break;
52         default:
53                 ret = -EINVAL;
54         }
55         return ret;
56 }
57
58 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
59 {
60         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
61         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
62         wake_up(&ls->ls_uevent_wait);
63         return len;
64 }
65
66 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
67 {
68         return sprintf(buf, "%u\n", ls->ls_global_id);
69 }
70
71 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
72 {
73         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
74         return len;
75 }
76
77 struct dlm_attr {
78         struct attribute attr;
79         ssize_t (*show)(struct dlm_ls *, char *);
80         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
81 };
82
83 static struct dlm_attr dlm_attr_control = {
84         .attr  = {.name = "control", .mode = S_IWUSR},
85         .store = dlm_control_store
86 };
87
88 static struct dlm_attr dlm_attr_event = {
89         .attr  = {.name = "event_done", .mode = S_IWUSR},
90         .store = dlm_event_store
91 };
92
93 static struct dlm_attr dlm_attr_id = {
94         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
95         .show  = dlm_id_show,
96         .store = dlm_id_store
97 };
98
99 static struct attribute *dlm_attrs[] = {
100         &dlm_attr_control.attr,
101         &dlm_attr_event.attr,
102         &dlm_attr_id.attr,
103         NULL,
104 };
105
106 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
107                              char *buf)
108 {
109         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
110         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
111         return a->show ? a->show(ls, buf) : 0;
112 }
113
114 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
115                               const char *buf, size_t len)
116 {
117         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
118         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
119         return a->store ? a->store(ls, buf, len) : len;
120 }
121
122 static struct sysfs_ops dlm_attr_ops = {
123         .show  = dlm_attr_show,
124         .store = dlm_attr_store,
125 };
126
127 static struct kobj_type dlm_ktype = {
128         .default_attrs = dlm_attrs,
129         .sysfs_ops     = &dlm_attr_ops,
130 };
131
132 static struct kset dlm_kset = {
133         .subsys = &kernel_subsys,
134         .kobj   = {.name = "dlm",},
135         .ktype  = &dlm_ktype,
136 };
137
138 static int kobject_setup(struct dlm_ls *ls)
139 {
140         char lsname[DLM_LOCKSPACE_LEN];
141         int error;
142
143         memset(lsname, 0, DLM_LOCKSPACE_LEN);
144         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
145
146         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
147         if (error)
148                 return error;
149
150         ls->ls_kobj.kset = &dlm_kset;
151         ls->ls_kobj.ktype = &dlm_ktype;
152         return 0;
153 }
154
155 static int do_uevent(struct dlm_ls *ls, int in)
156 {
157         int error;
158
159         if (in)
160                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
161         else
162                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
163
164         error = wait_event_interruptible(ls->ls_uevent_wait,
165                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
166         if (error)
167                 goto out;
168
169         error = ls->ls_uevent_result;
170  out:
171         return error;
172 }
173
174
175 int dlm_lockspace_init(void)
176 {
177         int error;
178
179         ls_count = 0;
180         mutex_init(&ls_lock);
181         INIT_LIST_HEAD(&lslist);
182         spin_lock_init(&lslist_lock);
183
184         error = kset_register(&dlm_kset);
185         if (error)
186                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
187         return error;
188 }
189
190 void dlm_lockspace_exit(void)
191 {
192         kset_unregister(&dlm_kset);
193 }
194
195 static int dlm_scand(void *data)
196 {
197         struct dlm_ls *ls;
198
199         while (!kthread_should_stop()) {
200                 list_for_each_entry(ls, &lslist, ls_list)
201                         dlm_scan_rsbs(ls);
202                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
203         }
204         return 0;
205 }
206
207 static int dlm_scand_start(void)
208 {
209         struct task_struct *p;
210         int error = 0;
211
212         p = kthread_run(dlm_scand, NULL, "dlm_scand");
213         if (IS_ERR(p))
214                 error = PTR_ERR(p);
215         else
216                 scand_task = p;
217         return error;
218 }
219
220 static void dlm_scand_stop(void)
221 {
222         kthread_stop(scand_task);
223 }
224
225 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
226 {
227         struct dlm_ls *ls;
228
229         spin_lock(&lslist_lock);
230
231         list_for_each_entry(ls, &lslist, ls_list) {
232                 if (ls->ls_namelen == namelen &&
233                     memcmp(ls->ls_name, name, namelen) == 0)
234                         goto out;
235         }
236         ls = NULL;
237  out:
238         spin_unlock(&lslist_lock);
239         return ls;
240 }
241
242 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
243 {
244         struct dlm_ls *ls;
245
246         spin_lock(&lslist_lock);
247
248         list_for_each_entry(ls, &lslist, ls_list) {
249                 if (ls->ls_global_id == id) {
250                         ls->ls_count++;
251                         goto out;
252                 }
253         }
254         ls = NULL;
255  out:
256         spin_unlock(&lslist_lock);
257         return ls;
258 }
259
260 struct dlm_ls *dlm_find_lockspace_local(void *id)
261 {
262         struct dlm_ls *ls = id;
263
264         spin_lock(&lslist_lock);
265         ls->ls_count++;
266         spin_unlock(&lslist_lock);
267         return ls;
268 }
269
270 void dlm_put_lockspace(struct dlm_ls *ls)
271 {
272         spin_lock(&lslist_lock);
273         ls->ls_count--;
274         spin_unlock(&lslist_lock);
275 }
276
277 static void remove_lockspace(struct dlm_ls *ls)
278 {
279         for (;;) {
280                 spin_lock(&lslist_lock);
281                 if (ls->ls_count == 0) {
282                         list_del(&ls->ls_list);
283                         spin_unlock(&lslist_lock);
284                         return;
285                 }
286                 spin_unlock(&lslist_lock);
287                 ssleep(1);
288         }
289 }
290
291 static int threads_start(void)
292 {
293         int error;
294
295         /* Thread which process lock requests for all lockspace's */
296         error = dlm_astd_start();
297         if (error) {
298                 log_print("cannot start dlm_astd thread %d", error);
299                 goto fail;
300         }
301
302         error = dlm_scand_start();
303         if (error) {
304                 log_print("cannot start dlm_scand thread %d", error);
305                 goto astd_fail;
306         }
307
308         /* Thread for sending/receiving messages for all lockspace's */
309         error = dlm_lowcomms_start();
310         if (error) {
311                 log_print("cannot start dlm lowcomms %d", error);
312                 goto scand_fail;
313         }
314
315         return 0;
316
317  scand_fail:
318         dlm_scand_stop();
319  astd_fail:
320         dlm_astd_stop();
321  fail:
322         return error;
323 }
324
325 static void threads_stop(void)
326 {
327         dlm_scand_stop();
328         dlm_lowcomms_stop();
329         dlm_astd_stop();
330 }
331
332 static int new_lockspace(char *name, int namelen, void **lockspace,
333                          uint32_t flags, int lvblen)
334 {
335         struct dlm_ls *ls;
336         int i, size, error = -ENOMEM;
337
338         if (namelen > DLM_LOCKSPACE_LEN)
339                 return -EINVAL;
340
341         if (!lvblen || (lvblen % 8))
342                 return -EINVAL;
343
344         if (!try_module_get(THIS_MODULE))
345                 return -EINVAL;
346
347         ls = dlm_find_lockspace_name(name, namelen);
348         if (ls) {
349                 *lockspace = ls;
350                 module_put(THIS_MODULE);
351                 return -EEXIST;
352         }
353
354         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
355         if (!ls)
356                 goto out;
357         memcpy(ls->ls_name, name, namelen);
358         ls->ls_namelen = namelen;
359         ls->ls_exflags = flags;
360         ls->ls_lvblen = lvblen;
361         ls->ls_count = 0;
362         ls->ls_flags = 0;
363
364         size = dlm_config.rsbtbl_size;
365         ls->ls_rsbtbl_size = size;
366
367         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
368         if (!ls->ls_rsbtbl)
369                 goto out_lsfree;
370         for (i = 0; i < size; i++) {
371                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
372                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
373                 rwlock_init(&ls->ls_rsbtbl[i].lock);
374         }
375
376         size = dlm_config.lkbtbl_size;
377         ls->ls_lkbtbl_size = size;
378
379         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
380         if (!ls->ls_lkbtbl)
381                 goto out_rsbfree;
382         for (i = 0; i < size; i++) {
383                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
384                 rwlock_init(&ls->ls_lkbtbl[i].lock);
385                 ls->ls_lkbtbl[i].counter = 1;
386         }
387
388         size = dlm_config.dirtbl_size;
389         ls->ls_dirtbl_size = size;
390
391         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
392         if (!ls->ls_dirtbl)
393                 goto out_lkbfree;
394         for (i = 0; i < size; i++) {
395                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
396                 rwlock_init(&ls->ls_dirtbl[i].lock);
397         }
398
399         INIT_LIST_HEAD(&ls->ls_waiters);
400         mutex_init(&ls->ls_waiters_mutex);
401
402         INIT_LIST_HEAD(&ls->ls_nodes);
403         INIT_LIST_HEAD(&ls->ls_nodes_gone);
404         ls->ls_num_nodes = 0;
405         ls->ls_low_nodeid = 0;
406         ls->ls_total_weight = 0;
407         ls->ls_node_array = NULL;
408
409         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
410         ls->ls_stub_rsb.res_ls = ls;
411
412         ls->ls_debug_dentry = NULL;
413
414         init_waitqueue_head(&ls->ls_uevent_wait);
415         ls->ls_uevent_result = 0;
416
417         ls->ls_recoverd_task = NULL;
418         mutex_init(&ls->ls_recoverd_active);
419         spin_lock_init(&ls->ls_recover_lock);
420         ls->ls_recover_status = 0;
421         ls->ls_recover_seq = 0;
422         ls->ls_recover_args = NULL;
423         init_rwsem(&ls->ls_in_recovery);
424         INIT_LIST_HEAD(&ls->ls_requestqueue);
425         mutex_init(&ls->ls_requestqueue_mutex);
426
427         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
428         if (!ls->ls_recover_buf)
429                 goto out_dirfree;
430
431         INIT_LIST_HEAD(&ls->ls_recover_list);
432         spin_lock_init(&ls->ls_recover_list_lock);
433         ls->ls_recover_list_count = 0;
434         init_waitqueue_head(&ls->ls_wait_general);
435         INIT_LIST_HEAD(&ls->ls_root_list);
436         init_rwsem(&ls->ls_root_sem);
437
438         down_write(&ls->ls_in_recovery);
439
440         error = dlm_recoverd_start(ls);
441         if (error) {
442                 log_error(ls, "can't start dlm_recoverd %d", error);
443                 goto out_rcomfree;
444         }
445
446         spin_lock(&lslist_lock);
447         list_add(&ls->ls_list, &lslist);
448         spin_unlock(&lslist_lock);
449
450         dlm_create_debug_file(ls);
451
452         error = kobject_setup(ls);
453         if (error)
454                 goto out_del;
455
456         error = kobject_register(&ls->ls_kobj);
457         if (error)
458                 goto out_del;
459
460         error = do_uevent(ls, 1);
461         if (error)
462                 goto out_unreg;
463
464         *lockspace = ls;
465         return 0;
466
467  out_unreg:
468         kobject_unregister(&ls->ls_kobj);
469  out_del:
470         dlm_delete_debug_file(ls);
471         spin_lock(&lslist_lock);
472         list_del(&ls->ls_list);
473         spin_unlock(&lslist_lock);
474         dlm_recoverd_stop(ls);
475  out_rcomfree:
476         kfree(ls->ls_recover_buf);
477  out_dirfree:
478         kfree(ls->ls_dirtbl);
479  out_lkbfree:
480         kfree(ls->ls_lkbtbl);
481  out_rsbfree:
482         kfree(ls->ls_rsbtbl);
483  out_lsfree:
484         kfree(ls);
485  out:
486         module_put(THIS_MODULE);
487         return error;
488 }
489
490 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
491                       uint32_t flags, int lvblen)
492 {
493         int error = 0;
494
495         mutex_lock(&ls_lock);
496         if (!ls_count)
497                 error = threads_start();
498         if (error)
499                 goto out;
500
501         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
502         if (!error)
503                 ls_count++;
504  out:
505         mutex_unlock(&ls_lock);
506         return error;
507 }
508
509 /* Return 1 if the lockspace still has active remote locks,
510  *        2 if the lockspace still has active local locks.
511  */
512 static int lockspace_busy(struct dlm_ls *ls)
513 {
514         int i, lkb_found = 0;
515         struct dlm_lkb *lkb;
516
517         /* NOTE: We check the lockidtbl here rather than the resource table.
518            This is because there may be LKBs queued as ASTs that have been
519            unlinked from their RSBs and are pending deletion once the AST has
520            been delivered */
521
522         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
523                 read_lock(&ls->ls_lkbtbl[i].lock);
524                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
525                         lkb_found = 1;
526                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
527                                             lkb_idtbl_list) {
528                                 if (!lkb->lkb_nodeid) {
529                                         read_unlock(&ls->ls_lkbtbl[i].lock);
530                                         return 2;
531                                 }
532                         }
533                 }
534                 read_unlock(&ls->ls_lkbtbl[i].lock);
535         }
536         return lkb_found;
537 }
538
539 static int release_lockspace(struct dlm_ls *ls, int force)
540 {
541         struct dlm_lkb *lkb;
542         struct dlm_rsb *rsb;
543         struct list_head *head;
544         int i;
545         int busy = lockspace_busy(ls);
546
547         if (busy > force)
548                 return -EBUSY;
549
550         if (force < 3)
551                 do_uevent(ls, 0);
552
553         dlm_recoverd_stop(ls);
554
555         remove_lockspace(ls);
556
557         dlm_delete_debug_file(ls);
558
559         dlm_astd_suspend();
560
561         kfree(ls->ls_recover_buf);
562
563         /*
564          * Free direntry structs.
565          */
566
567         dlm_dir_clear(ls);
568         kfree(ls->ls_dirtbl);
569
570         /*
571          * Free all lkb's on lkbtbl[] lists.
572          */
573
574         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
575                 head = &ls->ls_lkbtbl[i].list;
576                 while (!list_empty(head)) {
577                         lkb = list_entry(head->next, struct dlm_lkb,
578                                          lkb_idtbl_list);
579
580                         list_del(&lkb->lkb_idtbl_list);
581
582                         dlm_del_ast(lkb);
583
584                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
585                                 free_lvb(lkb->lkb_lvbptr);
586
587                         free_lkb(lkb);
588                 }
589         }
590         dlm_astd_resume();
591
592         kfree(ls->ls_lkbtbl);
593
594         /*
595          * Free all rsb's on rsbtbl[] lists
596          */
597
598         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
599                 head = &ls->ls_rsbtbl[i].list;
600                 while (!list_empty(head)) {
601                         rsb = list_entry(head->next, struct dlm_rsb,
602                                          res_hashchain);
603
604                         list_del(&rsb->res_hashchain);
605                         free_rsb(rsb);
606                 }
607
608                 head = &ls->ls_rsbtbl[i].toss;
609                 while (!list_empty(head)) {
610                         rsb = list_entry(head->next, struct dlm_rsb,
611                                          res_hashchain);
612                         list_del(&rsb->res_hashchain);
613                         free_rsb(rsb);
614                 }
615         }
616
617         kfree(ls->ls_rsbtbl);
618
619         /*
620          * Free structures on any other lists
621          */
622
623         kfree(ls->ls_recover_args);
624         dlm_clear_free_entries(ls);
625         dlm_clear_members(ls);
626         dlm_clear_members_gone(ls);
627         kfree(ls->ls_node_array);
628         kobject_unregister(&ls->ls_kobj);
629         kfree(ls);
630
631         mutex_lock(&ls_lock);
632         ls_count--;
633         if (!ls_count)
634                 threads_stop();
635         mutex_unlock(&ls_lock);
636
637         module_put(THIS_MODULE);
638         return 0;
639 }
640
641 /*
642  * Called when a system has released all its locks and is not going to use the
643  * lockspace any longer.  We free everything we're managing for this lockspace.
644  * Remaining nodes will go through the recovery process as if we'd died.  The
645  * lockspace must continue to function as usual, participating in recoveries,
646  * until this returns.
647  *
648  * Force has 4 possible values:
649  * 0 - don't destroy locksapce if it has any LKBs
650  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
651  * 2 - destroy lockspace regardless of LKBs
652  * 3 - destroy lockspace as part of a forced shutdown
653  */
654
655 int dlm_release_lockspace(void *lockspace, int force)
656 {
657         struct dlm_ls *ls;
658
659         ls = dlm_find_lockspace_local(lockspace);
660         if (!ls)
661                 return -EINVAL;
662         dlm_put_lockspace(ls);
663         return release_lockspace(ls, force);
664 }
665