Merge branch 'upstream-linus' of master.kernel.org:/pub/scm/linux/kernel/git/jgarzik...
[pandora-kernel.git] / fs / dlm / lockspace.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "recoverd.h"
18 #include "ast.h"
19 #include "dir.h"
20 #include "lowcomms.h"
21 #include "config.h"
22 #include "memory.h"
23 #include "lock.h"
24 #include "recover.h"
25
26 #ifdef CONFIG_DLM_DEBUG
27 int dlm_create_debug_file(struct dlm_ls *ls);
28 void dlm_delete_debug_file(struct dlm_ls *ls);
29 #else
30 static inline int dlm_create_debug_file(struct dlm_ls *ls) { return 0; }
31 static inline void dlm_delete_debug_file(struct dlm_ls *ls) { }
32 #endif
33
34 static int                      ls_count;
35 static struct mutex             ls_lock;
36 static struct list_head         lslist;
37 static spinlock_t               lslist_lock;
38 static struct task_struct *     scand_task;
39
40
41 static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len)
42 {
43         ssize_t ret = len;
44         int n = simple_strtol(buf, NULL, 0);
45
46         ls = dlm_find_lockspace_local(ls->ls_local_handle);
47         if (!ls)
48                 return -EINVAL;
49
50         switch (n) {
51         case 0:
52                 dlm_ls_stop(ls);
53                 break;
54         case 1:
55                 dlm_ls_start(ls);
56                 break;
57         default:
58                 ret = -EINVAL;
59         }
60         dlm_put_lockspace(ls);
61         return ret;
62 }
63
64 static ssize_t dlm_event_store(struct dlm_ls *ls, const char *buf, size_t len)
65 {
66         ls->ls_uevent_result = simple_strtol(buf, NULL, 0);
67         set_bit(LSFL_UEVENT_WAIT, &ls->ls_flags);
68         wake_up(&ls->ls_uevent_wait);
69         return len;
70 }
71
72 static ssize_t dlm_id_show(struct dlm_ls *ls, char *buf)
73 {
74         return snprintf(buf, PAGE_SIZE, "%u\n", ls->ls_global_id);
75 }
76
77 static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
78 {
79         ls->ls_global_id = simple_strtoul(buf, NULL, 0);
80         return len;
81 }
82
83 static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
84 {
85         uint32_t status = dlm_recover_status(ls);
86         return snprintf(buf, PAGE_SIZE, "%x\n", status);
87 }
88
89 static ssize_t dlm_recover_nodeid_show(struct dlm_ls *ls, char *buf)
90 {
91         return snprintf(buf, PAGE_SIZE, "%d\n", ls->ls_recover_nodeid);
92 }
93
94 struct dlm_attr {
95         struct attribute attr;
96         ssize_t (*show)(struct dlm_ls *, char *);
97         ssize_t (*store)(struct dlm_ls *, const char *, size_t);
98 };
99
100 static struct dlm_attr dlm_attr_control = {
101         .attr  = {.name = "control", .mode = S_IWUSR},
102         .store = dlm_control_store
103 };
104
105 static struct dlm_attr dlm_attr_event = {
106         .attr  = {.name = "event_done", .mode = S_IWUSR},
107         .store = dlm_event_store
108 };
109
110 static struct dlm_attr dlm_attr_id = {
111         .attr  = {.name = "id", .mode = S_IRUGO | S_IWUSR},
112         .show  = dlm_id_show,
113         .store = dlm_id_store
114 };
115
116 static struct dlm_attr dlm_attr_recover_status = {
117         .attr  = {.name = "recover_status", .mode = S_IRUGO},
118         .show  = dlm_recover_status_show
119 };
120
121 static struct dlm_attr dlm_attr_recover_nodeid = {
122         .attr  = {.name = "recover_nodeid", .mode = S_IRUGO},
123         .show  = dlm_recover_nodeid_show
124 };
125
126 static struct attribute *dlm_attrs[] = {
127         &dlm_attr_control.attr,
128         &dlm_attr_event.attr,
129         &dlm_attr_id.attr,
130         &dlm_attr_recover_status.attr,
131         &dlm_attr_recover_nodeid.attr,
132         NULL,
133 };
134
135 static ssize_t dlm_attr_show(struct kobject *kobj, struct attribute *attr,
136                              char *buf)
137 {
138         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
139         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
140         return a->show ? a->show(ls, buf) : 0;
141 }
142
143 static ssize_t dlm_attr_store(struct kobject *kobj, struct attribute *attr,
144                               const char *buf, size_t len)
145 {
146         struct dlm_ls *ls  = container_of(kobj, struct dlm_ls, ls_kobj);
147         struct dlm_attr *a = container_of(attr, struct dlm_attr, attr);
148         return a->store ? a->store(ls, buf, len) : len;
149 }
150
151 static void lockspace_kobj_release(struct kobject *k)
152 {
153         struct dlm_ls *ls  = container_of(k, struct dlm_ls, ls_kobj);
154         kfree(ls);
155 }
156
157 static struct sysfs_ops dlm_attr_ops = {
158         .show  = dlm_attr_show,
159         .store = dlm_attr_store,
160 };
161
162 static struct kobj_type dlm_ktype = {
163         .default_attrs = dlm_attrs,
164         .sysfs_ops     = &dlm_attr_ops,
165         .release       = lockspace_kobj_release,
166 };
167
168 static struct kset dlm_kset = {
169         .subsys = &kernel_subsys,
170         .kobj   = {.name = "dlm",},
171         .ktype  = &dlm_ktype,
172 };
173
174 static int kobject_setup(struct dlm_ls *ls)
175 {
176         char lsname[DLM_LOCKSPACE_LEN];
177         int error;
178
179         memset(lsname, 0, DLM_LOCKSPACE_LEN);
180         snprintf(lsname, DLM_LOCKSPACE_LEN, "%s", ls->ls_name);
181
182         error = kobject_set_name(&ls->ls_kobj, "%s", lsname);
183         if (error)
184                 return error;
185
186         ls->ls_kobj.kset = &dlm_kset;
187         ls->ls_kobj.ktype = &dlm_ktype;
188         return 0;
189 }
190
191 static int do_uevent(struct dlm_ls *ls, int in)
192 {
193         int error;
194
195         if (in)
196                 kobject_uevent(&ls->ls_kobj, KOBJ_ONLINE);
197         else
198                 kobject_uevent(&ls->ls_kobj, KOBJ_OFFLINE);
199
200         error = wait_event_interruptible(ls->ls_uevent_wait,
201                         test_and_clear_bit(LSFL_UEVENT_WAIT, &ls->ls_flags));
202         if (error)
203                 goto out;
204
205         error = ls->ls_uevent_result;
206  out:
207         return error;
208 }
209
210
211 int dlm_lockspace_init(void)
212 {
213         int error;
214
215         ls_count = 0;
216         mutex_init(&ls_lock);
217         INIT_LIST_HEAD(&lslist);
218         spin_lock_init(&lslist_lock);
219
220         error = kset_register(&dlm_kset);
221         if (error)
222                 printk("dlm_lockspace_init: cannot register kset %d\n", error);
223         return error;
224 }
225
226 void dlm_lockspace_exit(void)
227 {
228         kset_unregister(&dlm_kset);
229 }
230
231 static int dlm_scand(void *data)
232 {
233         struct dlm_ls *ls;
234
235         while (!kthread_should_stop()) {
236                 list_for_each_entry(ls, &lslist, ls_list)
237                         dlm_scan_rsbs(ls);
238                 schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
239         }
240         return 0;
241 }
242
243 static int dlm_scand_start(void)
244 {
245         struct task_struct *p;
246         int error = 0;
247
248         p = kthread_run(dlm_scand, NULL, "dlm_scand");
249         if (IS_ERR(p))
250                 error = PTR_ERR(p);
251         else
252                 scand_task = p;
253         return error;
254 }
255
256 static void dlm_scand_stop(void)
257 {
258         kthread_stop(scand_task);
259 }
260
261 static struct dlm_ls *dlm_find_lockspace_name(char *name, int namelen)
262 {
263         struct dlm_ls *ls;
264
265         spin_lock(&lslist_lock);
266
267         list_for_each_entry(ls, &lslist, ls_list) {
268                 if (ls->ls_namelen == namelen &&
269                     memcmp(ls->ls_name, name, namelen) == 0)
270                         goto out;
271         }
272         ls = NULL;
273  out:
274         spin_unlock(&lslist_lock);
275         return ls;
276 }
277
278 struct dlm_ls *dlm_find_lockspace_global(uint32_t id)
279 {
280         struct dlm_ls *ls;
281
282         spin_lock(&lslist_lock);
283
284         list_for_each_entry(ls, &lslist, ls_list) {
285                 if (ls->ls_global_id == id) {
286                         ls->ls_count++;
287                         goto out;
288                 }
289         }
290         ls = NULL;
291  out:
292         spin_unlock(&lslist_lock);
293         return ls;
294 }
295
296 struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
297 {
298         struct dlm_ls *ls;
299
300         spin_lock(&lslist_lock);
301         list_for_each_entry(ls, &lslist, ls_list) {
302                 if (ls->ls_local_handle == lockspace) {
303                         ls->ls_count++;
304                         goto out;
305                 }
306         }
307         ls = NULL;
308  out:
309         spin_unlock(&lslist_lock);
310         return ls;
311 }
312
313 struct dlm_ls *dlm_find_lockspace_device(int minor)
314 {
315         struct dlm_ls *ls;
316
317         spin_lock(&lslist_lock);
318         list_for_each_entry(ls, &lslist, ls_list) {
319                 if (ls->ls_device.minor == minor) {
320                         ls->ls_count++;
321                         goto out;
322                 }
323         }
324         ls = NULL;
325  out:
326         spin_unlock(&lslist_lock);
327         return ls;
328 }
329
330 void dlm_put_lockspace(struct dlm_ls *ls)
331 {
332         spin_lock(&lslist_lock);
333         ls->ls_count--;
334         spin_unlock(&lslist_lock);
335 }
336
337 static void remove_lockspace(struct dlm_ls *ls)
338 {
339         for (;;) {
340                 spin_lock(&lslist_lock);
341                 if (ls->ls_count == 0) {
342                         list_del(&ls->ls_list);
343                         spin_unlock(&lslist_lock);
344                         return;
345                 }
346                 spin_unlock(&lslist_lock);
347                 ssleep(1);
348         }
349 }
350
351 static int threads_start(void)
352 {
353         int error;
354
355         /* Thread which process lock requests for all lockspace's */
356         error = dlm_astd_start();
357         if (error) {
358                 log_print("cannot start dlm_astd thread %d", error);
359                 goto fail;
360         }
361
362         error = dlm_scand_start();
363         if (error) {
364                 log_print("cannot start dlm_scand thread %d", error);
365                 goto astd_fail;
366         }
367
368         /* Thread for sending/receiving messages for all lockspace's */
369         error = dlm_lowcomms_start();
370         if (error) {
371                 log_print("cannot start dlm lowcomms %d", error);
372                 goto scand_fail;
373         }
374
375         return 0;
376
377  scand_fail:
378         dlm_scand_stop();
379  astd_fail:
380         dlm_astd_stop();
381  fail:
382         return error;
383 }
384
385 static void threads_stop(void)
386 {
387         dlm_scand_stop();
388         dlm_lowcomms_stop();
389         dlm_astd_stop();
390 }
391
392 static int new_lockspace(char *name, int namelen, void **lockspace,
393                          uint32_t flags, int lvblen)
394 {
395         struct dlm_ls *ls;
396         int i, size, error = -ENOMEM;
397
398         if (namelen > DLM_LOCKSPACE_LEN)
399                 return -EINVAL;
400
401         if (!lvblen || (lvblen % 8))
402                 return -EINVAL;
403
404         if (!try_module_get(THIS_MODULE))
405                 return -EINVAL;
406
407         ls = dlm_find_lockspace_name(name, namelen);
408         if (ls) {
409                 *lockspace = ls;
410                 module_put(THIS_MODULE);
411                 return -EEXIST;
412         }
413
414         ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL);
415         if (!ls)
416                 goto out;
417         memcpy(ls->ls_name, name, namelen);
418         ls->ls_namelen = namelen;
419         ls->ls_exflags = flags;
420         ls->ls_lvblen = lvblen;
421         ls->ls_count = 0;
422         ls->ls_flags = 0;
423
424         size = dlm_config.rsbtbl_size;
425         ls->ls_rsbtbl_size = size;
426
427         ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
428         if (!ls->ls_rsbtbl)
429                 goto out_lsfree;
430         for (i = 0; i < size; i++) {
431                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
432                 INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
433                 rwlock_init(&ls->ls_rsbtbl[i].lock);
434         }
435
436         size = dlm_config.lkbtbl_size;
437         ls->ls_lkbtbl_size = size;
438
439         ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
440         if (!ls->ls_lkbtbl)
441                 goto out_rsbfree;
442         for (i = 0; i < size; i++) {
443                 INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list);
444                 rwlock_init(&ls->ls_lkbtbl[i].lock);
445                 ls->ls_lkbtbl[i].counter = 1;
446         }
447
448         size = dlm_config.dirtbl_size;
449         ls->ls_dirtbl_size = size;
450
451         ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
452         if (!ls->ls_dirtbl)
453                 goto out_lkbfree;
454         for (i = 0; i < size; i++) {
455                 INIT_LIST_HEAD(&ls->ls_dirtbl[i].list);
456                 rwlock_init(&ls->ls_dirtbl[i].lock);
457         }
458
459         INIT_LIST_HEAD(&ls->ls_waiters);
460         mutex_init(&ls->ls_waiters_mutex);
461
462         INIT_LIST_HEAD(&ls->ls_nodes);
463         INIT_LIST_HEAD(&ls->ls_nodes_gone);
464         ls->ls_num_nodes = 0;
465         ls->ls_low_nodeid = 0;
466         ls->ls_total_weight = 0;
467         ls->ls_node_array = NULL;
468
469         memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb));
470         ls->ls_stub_rsb.res_ls = ls;
471
472         ls->ls_debug_rsb_dentry = NULL;
473         ls->ls_debug_waiters_dentry = NULL;
474
475         init_waitqueue_head(&ls->ls_uevent_wait);
476         ls->ls_uevent_result = 0;
477
478         ls->ls_recoverd_task = NULL;
479         mutex_init(&ls->ls_recoverd_active);
480         spin_lock_init(&ls->ls_recover_lock);
481         ls->ls_recover_status = 0;
482         ls->ls_recover_seq = 0;
483         ls->ls_recover_args = NULL;
484         init_rwsem(&ls->ls_in_recovery);
485         INIT_LIST_HEAD(&ls->ls_requestqueue);
486         mutex_init(&ls->ls_requestqueue_mutex);
487         mutex_init(&ls->ls_clear_proc_locks);
488
489         ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
490         if (!ls->ls_recover_buf)
491                 goto out_dirfree;
492
493         INIT_LIST_HEAD(&ls->ls_recover_list);
494         spin_lock_init(&ls->ls_recover_list_lock);
495         ls->ls_recover_list_count = 0;
496         ls->ls_local_handle = ls;
497         init_waitqueue_head(&ls->ls_wait_general);
498         INIT_LIST_HEAD(&ls->ls_root_list);
499         init_rwsem(&ls->ls_root_sem);
500
501         down_write(&ls->ls_in_recovery);
502
503         spin_lock(&lslist_lock);
504         list_add(&ls->ls_list, &lslist);
505         spin_unlock(&lslist_lock);
506
507         /* needs to find ls in lslist */
508         error = dlm_recoverd_start(ls);
509         if (error) {
510                 log_error(ls, "can't start dlm_recoverd %d", error);
511                 goto out_rcomfree;
512         }
513
514         dlm_create_debug_file(ls);
515
516         error = kobject_setup(ls);
517         if (error)
518                 goto out_del;
519
520         error = kobject_register(&ls->ls_kobj);
521         if (error)
522                 goto out_del;
523
524         error = do_uevent(ls, 1);
525         if (error)
526                 goto out_unreg;
527
528         *lockspace = ls;
529         return 0;
530
531  out_unreg:
532         kobject_unregister(&ls->ls_kobj);
533  out_del:
534         dlm_delete_debug_file(ls);
535         dlm_recoverd_stop(ls);
536  out_rcomfree:
537         spin_lock(&lslist_lock);
538         list_del(&ls->ls_list);
539         spin_unlock(&lslist_lock);
540         kfree(ls->ls_recover_buf);
541  out_dirfree:
542         kfree(ls->ls_dirtbl);
543  out_lkbfree:
544         kfree(ls->ls_lkbtbl);
545  out_rsbfree:
546         kfree(ls->ls_rsbtbl);
547  out_lsfree:
548         kfree(ls);
549  out:
550         module_put(THIS_MODULE);
551         return error;
552 }
553
554 int dlm_new_lockspace(char *name, int namelen, void **lockspace,
555                       uint32_t flags, int lvblen)
556 {
557         int error = 0;
558
559         mutex_lock(&ls_lock);
560         if (!ls_count)
561                 error = threads_start();
562         if (error)
563                 goto out;
564
565         error = new_lockspace(name, namelen, lockspace, flags, lvblen);
566         if (!error)
567                 ls_count++;
568  out:
569         mutex_unlock(&ls_lock);
570         return error;
571 }
572
573 /* Return 1 if the lockspace still has active remote locks,
574  *        2 if the lockspace still has active local locks.
575  */
576 static int lockspace_busy(struct dlm_ls *ls)
577 {
578         int i, lkb_found = 0;
579         struct dlm_lkb *lkb;
580
581         /* NOTE: We check the lockidtbl here rather than the resource table.
582            This is because there may be LKBs queued as ASTs that have been
583            unlinked from their RSBs and are pending deletion once the AST has
584            been delivered */
585
586         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
587                 read_lock(&ls->ls_lkbtbl[i].lock);
588                 if (!list_empty(&ls->ls_lkbtbl[i].list)) {
589                         lkb_found = 1;
590                         list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list,
591                                             lkb_idtbl_list) {
592                                 if (!lkb->lkb_nodeid) {
593                                         read_unlock(&ls->ls_lkbtbl[i].lock);
594                                         return 2;
595                                 }
596                         }
597                 }
598                 read_unlock(&ls->ls_lkbtbl[i].lock);
599         }
600         return lkb_found;
601 }
602
603 static int release_lockspace(struct dlm_ls *ls, int force)
604 {
605         struct dlm_lkb *lkb;
606         struct dlm_rsb *rsb;
607         struct list_head *head;
608         int i;
609         int busy = lockspace_busy(ls);
610
611         if (busy > force)
612                 return -EBUSY;
613
614         if (force < 3)
615                 do_uevent(ls, 0);
616
617         dlm_recoverd_stop(ls);
618
619         remove_lockspace(ls);
620
621         dlm_delete_debug_file(ls);
622
623         dlm_astd_suspend();
624
625         kfree(ls->ls_recover_buf);
626
627         /*
628          * Free direntry structs.
629          */
630
631         dlm_dir_clear(ls);
632         kfree(ls->ls_dirtbl);
633
634         /*
635          * Free all lkb's on lkbtbl[] lists.
636          */
637
638         for (i = 0; i < ls->ls_lkbtbl_size; i++) {
639                 head = &ls->ls_lkbtbl[i].list;
640                 while (!list_empty(head)) {
641                         lkb = list_entry(head->next, struct dlm_lkb,
642                                          lkb_idtbl_list);
643
644                         list_del(&lkb->lkb_idtbl_list);
645
646                         dlm_del_ast(lkb);
647
648                         if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY)
649                                 free_lvb(lkb->lkb_lvbptr);
650
651                         free_lkb(lkb);
652                 }
653         }
654         dlm_astd_resume();
655
656         kfree(ls->ls_lkbtbl);
657
658         /*
659          * Free all rsb's on rsbtbl[] lists
660          */
661
662         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
663                 head = &ls->ls_rsbtbl[i].list;
664                 while (!list_empty(head)) {
665                         rsb = list_entry(head->next, struct dlm_rsb,
666                                          res_hashchain);
667
668                         list_del(&rsb->res_hashchain);
669                         free_rsb(rsb);
670                 }
671
672                 head = &ls->ls_rsbtbl[i].toss;
673                 while (!list_empty(head)) {
674                         rsb = list_entry(head->next, struct dlm_rsb,
675                                          res_hashchain);
676                         list_del(&rsb->res_hashchain);
677                         free_rsb(rsb);
678                 }
679         }
680
681         kfree(ls->ls_rsbtbl);
682
683         /*
684          * Free structures on any other lists
685          */
686
687         kfree(ls->ls_recover_args);
688         dlm_clear_free_entries(ls);
689         dlm_clear_members(ls);
690         dlm_clear_members_gone(ls);
691         kfree(ls->ls_node_array);
692         kobject_unregister(&ls->ls_kobj);
693         /* The ls structure will be freed when the kobject is done with */
694
695         mutex_lock(&ls_lock);
696         ls_count--;
697         if (!ls_count)
698                 threads_stop();
699         mutex_unlock(&ls_lock);
700
701         module_put(THIS_MODULE);
702         return 0;
703 }
704
705 /*
706  * Called when a system has released all its locks and is not going to use the
707  * lockspace any longer.  We free everything we're managing for this lockspace.
708  * Remaining nodes will go through the recovery process as if we'd died.  The
709  * lockspace must continue to function as usual, participating in recoveries,
710  * until this returns.
711  *
712  * Force has 4 possible values:
713  * 0 - don't destroy locksapce if it has any LKBs
714  * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs
715  * 2 - destroy lockspace regardless of LKBs
716  * 3 - destroy lockspace as part of a forced shutdown
717  */
718
719 int dlm_release_lockspace(void *lockspace, int force)
720 {
721         struct dlm_ls *ls;
722
723         ls = dlm_find_lockspace_local(lockspace);
724         if (!ls)
725                 return -EINVAL;
726         dlm_put_lockspace(ls);
727         return release_lockspace(ls, force);
728 }
729