Merge branch 'master'
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         switch (n) {
47         case 0:
48                 dlm_ls_stop(ls);
49                 break;
50         case 1:
51                 dlm_ls_start(ls);
52                 break;
53         default:
54                 ret = -EINVAL;
55         }
56         return ret;
57 }
58
59 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
60 {
61         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
62         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
63         wake_up(&ls->ls_uevent_wait);
64         return len;
65 }
66
67 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
68 {
69         return sprintf(buf, "%u\n", ls->ls_global_id);
70 }
71
72 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
73 {
74         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
75         return len;
76 }
77
78 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
79 {
80         uint32_t status = dlm_recover_status(ls);
81         return sprintf(buf, "%x\n", status);
82 }
83
84 struct dlm_attr {
85         struct attribute attr;
86         ssize_t (*show)(struct dlm_ls *, char *);
87         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
88 };
89
90 static struct dlm_attr dlm_attr_control = {
91         .attr  = {.name = "control", .mode = S_IWUSR},
92         .store = dlm_control_store
93 };
94
95 static struct dlm_attr dlm_attr_event = {
96         .attr  = {.name = "event_done", .mode = S_IWUSR},
97         .store = dlm_event_store
98 };
99
100 static struct dlm_attr dlm_attr_id = {
101         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
102         .show  = dlm_id_show,
103         .store = dlm_id_store
104 };
105
106 static struct dlm_attr dlm_attr_recover_status = {
107         .attr  = {.name = "recover_status", .mode = S_IRUGO},
108         .show  = dlm_recover_status_show
109 };
110
111 static struct attribute *dlm_attrs[] = {
112         &dlm_attr_control.attr,
113         &dlm_attr_event.attr,
114         &dlm_attr_id.attr,
115         &dlm_attr_recover_status.attr,
116         NULL,
117 };
118
119 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
120                              char *buf)
121 {
122         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
123         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
124         return a->show ? a->show(ls, buf) : 0;
125 }
126
127 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
128                               const char *buf, size_t len)
129 {
130         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
131         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
132         return a->store ? a->store(ls, buf, len) : len;
133 }
134
135 static struct sysfs_ops dlm_attr_ops = {
136         .show  = dlm_attr_show,
137         .store = dlm_attr_store,
138 };
139
140 static struct kobj_type dlm_ktype = {
141         .default_attrs = dlm_attrs,
142         .sysfs_ops     = &dlm_attr_ops,
143 };
144
145 static struct kset dlm_kset = {
146         .subsys = &kernel_subsys,
147         .kobj   = {.name = "dlm",},
148         .ktype  = &dlm_ktype,
149 };
150
151 static int kobject_setup(struct dlm_ls *ls)
152 {
153         char lsname[DLM_LOCKSPACE_LEN];
154         int error;
155
156         memset(lsname, 0, DLM_LOCKSPACE_LEN);
157         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
158
159         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
160         if (error)
161                 return error;
162
163         ls->ls_kobj.kset = &dlm_kset;
164         ls->ls_kobj.ktype = &dlm_ktype;
165         return 0;
166 }
167
168 static int do_uevent(struct dlm_ls *ls, int in)
169 {
170         int error;
171
172         if (in)
173                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
174         else
175                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
176
177         error = wait_event_interruptible(ls->ls_uevent_wait,
178                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
179         if (error)
180                 goto out;
181
182         error = ls->ls_uevent_result;
183  out:
184         return error;
185 }
186
187
188 int dlm_lockspace_init(void)
189 {
190         int error;
191
192         ls_count = 0;
193         mutex_init(&ls_lock);
194         INIT_LIST_HEAD(&lslist);
195         spin_lock_init(&lslist_lock);
196
197         error = kset_register(&dlm_kset);
198         if (error)
199                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
200         return error;
201 }
202
203 void dlm_lockspace_exit(void)
204 {
205         kset_unregister(&dlm_kset);
206 }
207
208 static int dlm_scand(void *data)
209 {
210         struct dlm_ls *ls;
211
212         while (!kthread_should_stop()) {
213                 list_for_each_entry(ls, &lslist, ls_list)
214                         dlm_scan_rsbs(ls);
215                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
216         }
217         return 0;
218 }
219
220 static int dlm_scand_start(void)
221 {
222         struct task_struct *p;
223         int error = 0;
224
225         p = kthread_run(dlm_scand, NULL, "dlm_scand");
226         if (IS_ERR(p))
227                 error = PTR_ERR(p);
228         else
229                 scand_task = p;
230         return error;
231 }
232
233 static void dlm_scand_stop(void)
234 {
235         kthread_stop(scand_task);
236 }
237
238 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
239 {
240         struct dlm_ls *ls;
241
242         spin_lock(&lslist_lock);
243
244         list_for_each_entry(ls, &lslist, ls_list) {
245                 if (ls->ls_namelen == namelen &&
246                     memcmp(ls->ls_name, name, namelen) == 0)
247                         goto out;
248         }
249         ls = NULL;
250  out:
251         spin_unlock(&lslist_lock);
252         return ls;
253 }
254
255 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
256 {
257         struct dlm_ls *ls;
258
259         spin_lock(&lslist_lock);
260
261         list_for_each_entry(ls, &lslist, ls_list) {
262                 if (ls->ls_global_id == id) {
263                         ls->ls_count++;
264                         goto out;
265                 }
266         }
267         ls = NULL;
268  out:
269         spin_unlock(&lslist_lock);
270         return ls;
271 }
272
273 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
274 {
275         struct dlm_ls *ls;
276
277         spin_lock(&lslist_lock);
278         list_for_each_entry(ls, &lslist, ls_list) {
279                 if (ls->ls_local_handle == lockspace) {
280                         ls->ls_count++;
281                         goto out;
282                 }
283         }
284         ls = NULL;
285  out:
286         spin_unlock(&lslist_lock);
287         return ls;
288 }
289
290 struct dlm_ls *dlm_find_lockspace_device(int minor)
291 {
292         struct dlm_ls *ls;
293
294         spin_lock(&lslist_lock);
295         list_for_each_entry(ls, &lslist, ls_list) {
296                 if (ls->ls_device.minor == minor) {
297                         ls->ls_count++;
298                         goto out;
299                 }
300         }
301         ls = NULL;
302  out:
303         spin_unlock(&lslist_lock);
304         return ls;
305 }
306
307 void dlm_put_lockspace(struct dlm_ls *ls)
308 {
309         spin_lock(&lslist_lock);
310         ls->ls_count--;
311         spin_unlock(&lslist_lock);
312 }
313
314 static void remove_lockspace(struct dlm_ls *ls)
315 {
316         for (;;) {
317                 spin_lock(&lslist_lock);
318                 if (ls->ls_count == 0) {
319                         list_del(&ls->ls_list);
320                         spin_unlock(&lslist_lock);
321                         return;
322                 }
323                 spin_unlock(&lslist_lock);
324                 ssleep(1);
325         }
326 }
327
328 static int threads_start(void)
329 {
330         int error;
331
332         /* Thread which process lock requests for all lockspace's */
333         error = dlm_astd_start();
334         if (error) {
335                 log_print("cannot start dlm_astd thread %d", error);
336                 goto fail;
337         }
338
339         error = dlm_scand_start();
340         if (error) {
341                 log_print("cannot start dlm_scand thread %d", error);
342                 goto astd_fail;
343         }
344
345         /* Thread for sending/receiving messages for all lockspace's */
346         error = dlm_lowcomms_start();
347         if (error) {
348                 log_print("cannot start dlm lowcomms %d", error);
349                 goto scand_fail;
350         }
351
352         return 0;
353
354  scand_fail:
355         dlm_scand_stop();
356  astd_fail:
357         dlm_astd_stop();
358  fail:
359         return error;
360 }
361
362 static void threads_stop(void)
363 {
364         dlm_scand_stop();
365         dlm_lowcomms_stop();
366         dlm_astd_stop();
367 }
368
369 static int new_lockspace(char *name, int namelen, void **lockspace,
370                          uint32_t flags, int lvblen)
371 {
372         struct dlm_ls *ls;
373         int i, size, error = -ENOMEM;
374
375         if (namelen > DLM_LOCKSPACE_LEN)
376                 return -EINVAL;
377
378         if (!lvblen || (lvblen % 8))
379                 return -EINVAL;
380
381         if (!try_module_get(THIS_MODULE))
382                 return -EINVAL;
383
384         ls = dlm_find_lockspace_name(name, namelen);
385         if (ls) {
386                 *lockspace = ls;
387                 module_put(THIS_MODULE);
388                 return -EEXIST;
389         }
390
391         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
392         if (!ls)
393                 goto out;
394         memcpy(ls->ls_name, name, namelen);
395         ls->ls_namelen = namelen;
396         ls->ls_exflags = flags;
397         ls->ls_lvblen = lvblen;
398         ls->ls_count = 0;
399         ls->ls_flags = 0;
400
401         size = dlm_config.rsbtbl_size;
402         ls->ls_rsbtbl_size = size;
403
404         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
405         if (!ls->ls_rsbtbl)
406                 goto out_lsfree;
407         for (i = 0; i < size; i++) {
408                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
409                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
410                 rwlock_init(&ls->ls_rsbtbl[i].lock);
411         }
412
413         size = dlm_config.lkbtbl_size;
414         ls->ls_lkbtbl_size = size;
415
416         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
417         if (!ls->ls_lkbtbl)
418                 goto out_rsbfree;
419         for (i = 0; i < size; i++) {
420                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
421                 rwlock_init(&ls->ls_lkbtbl[i].lock);
422                 ls->ls_lkbtbl[i].counter = 1;
423         }
424
425         size = dlm_config.dirtbl_size;
426         ls->ls_dirtbl_size = size;
427
428         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
429         if (!ls->ls_dirtbl)
430                 goto out_lkbfree;
431         for (i = 0; i < size; i++) {
432                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
433                 rwlock_init(&ls->ls_dirtbl[i].lock);
434         }
435
436         INIT_LIST_HEAD(&ls->ls_waiters);
437         mutex_init(&ls->ls_waiters_mutex);
438
439         INIT_LIST_HEAD(&ls->ls_nodes);
440         INIT_LIST_HEAD(&ls->ls_nodes_gone);
441         ls->ls_num_nodes = 0;
442         ls->ls_low_nodeid = 0;
443         ls->ls_total_weight = 0;
444         ls->ls_node_array = NULL;
445
446         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
447         ls->ls_stub_rsb.res_ls = ls;
448
449         ls->ls_debug_rsb_dentry = NULL;
450         ls->ls_debug_waiters_dentry = NULL;
451
452         init_waitqueue_head(&ls->ls_uevent_wait);
453         ls->ls_uevent_result = 0;
454
455         ls->ls_recoverd_task = NULL;
456         mutex_init(&ls->ls_recoverd_active);
457         spin_lock_init(&ls->ls_recover_lock);
458         ls->ls_recover_status = 0;
459         ls->ls_recover_seq = 0;
460         ls->ls_recover_args = NULL;
461         init_rwsem(&ls->ls_in_recovery);
462         INIT_LIST_HEAD(&ls->ls_requestqueue);
463         mutex_init(&ls->ls_requestqueue_mutex);
464         mutex_init(&ls->ls_clear_proc_locks);
465
466         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
467         if (!ls->ls_recover_buf)
468                 goto out_dirfree;
469
470         INIT_LIST_HEAD(&ls->ls_recover_list);
471         spin_lock_init(&ls->ls_recover_list_lock);
472         ls->ls_recover_list_count = 0;
473         ls->ls_local_handle = ls;
474         init_waitqueue_head(&ls->ls_wait_general);
475         INIT_LIST_HEAD(&ls->ls_root_list);
476         init_rwsem(&ls->ls_root_sem);
477
478         down_write(&ls->ls_in_recovery);
479
480         error = dlm_recoverd_start(ls);
481         if (error) {
482                 log_error(ls, "can't start dlm_recoverd %d", error);
483                 goto out_rcomfree;
484         }
485
486         spin_lock(&lslist_lock);
487         list_add(&ls->ls_list, &lslist);
488         spin_unlock(&lslist_lock);
489
490         dlm_create_debug_file(ls);
491
492         error = kobject_setup(ls);
493         if (error)
494                 goto out_del;
495
496         error = kobject_register(&ls->ls_kobj);
497         if (error)
498                 goto out_del;
499
500         error = do_uevent(ls, 1);
501         if (error)
502                 goto out_unreg;
503
504         *lockspace = ls;
505         return 0;
506
507  out_unreg:
508         kobject_unregister(&ls->ls_kobj);
509  out_del:
510         dlm_delete_debug_file(ls);
511         spin_lock(&lslist_lock);
512         list_del(&ls->ls_list);
513         spin_unlock(&lslist_lock);
514         dlm_recoverd_stop(ls);
515  out_rcomfree:
516         kfree(ls->ls_recover_buf);
517  out_dirfree:
518         kfree(ls->ls_dirtbl);
519  out_lkbfree:
520         kfree(ls->ls_lkbtbl);
521  out_rsbfree:
522         kfree(ls->ls_rsbtbl);
523  out_lsfree:
524         kfree(ls);
525  out:
526         module_put(THIS_MODULE);
527         return error;
528 }
529
530 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
531                       uint32_t flags, int lvblen)
532 {
533         int error = 0;
534
535         mutex_lock(&ls_lock);
536         if (!ls_count)
537                 error = threads_start();
538         if (error)
539                 goto out;
540
541         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
542         if (!error)
543                 ls_count++;
544  out:
545         mutex_unlock(&ls_lock);
546         return error;
547 }
548
549 /* Return 1 if the lockspace still has active remote locks,
550  *        2 if the lockspace still has active local locks.
551  */
552 static int lockspace_busy(struct dlm_ls *ls)
553 {
554         int i, lkb_found = 0;
555         struct dlm_lkb *lkb;
556
557         /* NOTE: We check the lockidtbl here rather than the resource table.
558            This is because there may be LKBs queued as ASTs that have been
559            unlinked from their RSBs and are pending deletion once the AST has
560            been delivered */
561
562         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
563                 read_lock(&ls->ls_lkbtbl[i].lock);
564                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
565                         lkb_found = 1;
566                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
567                                             lkb_idtbl_list) {
568                                 if (!lkb->lkb_nodeid) {
569                                         read_unlock(&ls->ls_lkbtbl[i].lock);
570                                         return 2;
571                                 }
572                         }
573                 }
574                 read_unlock(&ls->ls_lkbtbl[i].lock);
575         }
576         return lkb_found;
577 }
578
579 static int release_lockspace(struct dlm_ls *ls, int force)
580 {
581         struct dlm_lkb *lkb;
582         struct dlm_rsb *rsb;
583         struct list_head *head;
584         int i;
585         int busy = lockspace_busy(ls);
586
587         if (busy > force)
588                 return -EBUSY;
589
590         if (force < 3)
591                 do_uevent(ls, 0);
592
593         dlm_recoverd_stop(ls);
594
595         remove_lockspace(ls);
596
597         dlm_delete_debug_file(ls);
598
599         dlm_astd_suspend();
600
601         kfree(ls->ls_recover_buf);
602
603         /*
604          * Free direntry structs.
605          */
606
607         dlm_dir_clear(ls);
608         kfree(ls->ls_dirtbl);
609
610         /*
611          * Free all lkb's on lkbtbl[] lists.
612          */
613
614         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
615                 head = &ls->ls_lkbtbl[i].list;
616                 while (!list_empty(head)) {
617                         lkb = list_entry(head->next, struct dlm_lkb,
618                                          lkb_idtbl_list);
619
620                         list_del(&lkb->lkb_idtbl_list);
621
622                         dlm_del_ast(lkb);
623
624                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
625                                 free_lvb(lkb->lkb_lvbptr);
626
627                         free_lkb(lkb);
628                 }
629         }
630         dlm_astd_resume();
631
632         kfree(ls->ls_lkbtbl);
633
634         /*
635          * Free all rsb's on rsbtbl[] lists
636          */
637
638         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
639                 head = &ls->ls_rsbtbl[i].list;
640                 while (!list_empty(head)) {
641                         rsb = list_entry(head->next, struct dlm_rsb,
642                                          res_hashchain);
643
644                         list_del(&rsb->res_hashchain);
645                         free_rsb(rsb);
646                 }
647
648                 head = &ls->ls_rsbtbl[i].toss;
649                 while (!list_empty(head)) {
650                         rsb = list_entry(head->next, struct dlm_rsb,
651                                          res_hashchain);
652                         list_del(&rsb->res_hashchain);
653                         free_rsb(rsb);
654                 }
655         }
656
657         kfree(ls->ls_rsbtbl);
658
659         /*
660          * Free structures on any other lists
661          */
662
663         kfree(ls->ls_recover_args);
664         dlm_clear_free_entries(ls);
665         dlm_clear_members(ls);
666         dlm_clear_members_gone(ls);
667         kfree(ls->ls_node_array);
668         kobject_unregister(&ls->ls_kobj);
669         kfree(ls);
670
671         mutex_lock(&ls_lock);
672         ls_count--;
673         if (!ls_count)
674                 threads_stop();
675         mutex_unlock(&ls_lock);
676
677         module_put(THIS_MODULE);
678         return 0;
679 }
680
681 /*
682  * Called when a system has released all its locks and is not going to use the
683  * lockspace any longer.  We free everything we're managing for this lockspace.
684  * Remaining nodes will go through the recovery process as if we'd died.  The
685  * lockspace must continue to function as usual, participating in recoveries,
686  * until this returns.
687  *
688  * Force has 4 possible values:
689  * 0 - don't destroy locksapce if it has any LKBs
690  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
691  * 2 - destroy lockspace regardless of LKBs
692  * 3 - destroy lockspace as part of a forced shutdown
693  */
694
695 int dlm_release_lockspace(void *lockspace, int force)
696 {
697         struct dlm_ls *ls;
698
699         ls = dlm_find_lockspace_local(lockspace);
700         if (!ls)
701                 return -EINVAL;
702         dlm_put_lockspace(ls);
703         return release_lockspace(ls, force);
704 }
705