dlm: keep cached master rsbs during recovery
[pandora-kernel.git] / fs / dlm / dir.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25
26
27 static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de)
28 {
29         spin_lock(&ls->ls_recover_list_lock);
30         list_add(&de->list, &ls->ls_recover_list);
31         spin_unlock(&ls->ls_recover_list_lock);
32 }
33
34 static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len)
35 {
36         int found = 0;
37         struct dlm_direntry *de;
38
39         spin_lock(&ls->ls_recover_list_lock);
40         list_for_each_entry(de, &ls->ls_recover_list, list) {
41                 if (de->length == len) {
42                         list_del(&de->list);
43                         de->master_nodeid = 0;
44                         memset(de->name, 0, len);
45                         found = 1;
46                         break;
47                 }
48         }
49         spin_unlock(&ls->ls_recover_list_lock);
50
51         if (!found)
52                 de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_KERNEL);
53         return de;
54 }
55
56 void dlm_clear_free_entries(struct dlm_ls *ls)
57 {
58         struct dlm_direntry *de;
59
60         spin_lock(&ls->ls_recover_list_lock);
61         while (!list_empty(&ls->ls_recover_list)) {
62                 de = list_entry(ls->ls_recover_list.next, struct dlm_direntry,
63                                 list);
64                 list_del(&de->list);
65                 kfree(de);
66         }
67         spin_unlock(&ls->ls_recover_list_lock);
68 }
69
70 /*
71  * We use the upper 16 bits of the hash value to select the directory node.
72  * Low bits are used for distribution of rsb's among hash buckets on each node.
73  *
74  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75  * num_nodes to the hash value.  This value in the desired range is used as an
76  * offset into the sorted list of nodeid's to give the particular nodeid.
77  */
78
79 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
80 {
81         struct list_head *tmp;
82         struct dlm_member *memb = NULL;
83         uint32_t node, n = 0;
84         int nodeid;
85
86         if (ls->ls_num_nodes == 1) {
87                 nodeid = dlm_our_nodeid();
88                 goto out;
89         }
90
91         if (ls->ls_node_array) {
92                 node = (hash >> 16) % ls->ls_total_weight;
93                 nodeid = ls->ls_node_array[node];
94                 goto out;
95         }
96
97         /* make_member_array() failed to kmalloc ls_node_array... */
98
99         node = (hash >> 16) % ls->ls_num_nodes;
100
101         list_for_each(tmp, &ls->ls_nodes) {
102                 if (n++ != node)
103                         continue;
104                 memb = list_entry(tmp, struct dlm_member, list);
105                 break;
106         }
107
108         DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n",
109                                  ls->ls_num_nodes, n, node););
110         nodeid = memb->nodeid;
111  out:
112         return nodeid;
113 }
114
115 int dlm_dir_nodeid(struct dlm_rsb *r)
116 {
117         return dlm_hash2nodeid(r->res_ls, r->res_hash);
118 }
119
120 static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len)
121 {
122         uint32_t val;
123
124         val = jhash(name, len, 0);
125         val &= (ls->ls_dirtbl_size - 1);
126
127         return val;
128 }
129
130 static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de)
131 {
132         uint32_t bucket;
133
134         bucket = dir_hash(ls, de->name, de->length);
135         list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
136 }
137
138 static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name,
139                                           int namelen, uint32_t bucket)
140 {
141         struct dlm_direntry *de;
142
143         list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) {
144                 if (de->length == namelen && !memcmp(name, de->name, namelen))
145                         goto out;
146         }
147         de = NULL;
148  out:
149         return de;
150 }
151
152 void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen)
153 {
154         struct dlm_direntry *de;
155         uint32_t bucket;
156
157         bucket = dir_hash(ls, name, namelen);
158
159         write_lock(&ls->ls_dirtbl[bucket].lock);
160
161         de = search_bucket(ls, name, namelen, bucket);
162
163         if (!de) {
164                 log_error(ls, "remove fr %u none", nodeid);
165                 goto out;
166         }
167
168         if (de->master_nodeid != nodeid) {
169                 log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid);
170                 goto out;
171         }
172
173         list_del(&de->list);
174         kfree(de);
175  out:
176         write_unlock(&ls->ls_dirtbl[bucket].lock);
177 }
178
179 void dlm_dir_clear(struct dlm_ls *ls)
180 {
181         struct list_head *head;
182         struct dlm_direntry *de;
183         int i;
184
185         DLM_ASSERT(list_empty(&ls->ls_recover_list), );
186
187         for (i = 0; i < ls->ls_dirtbl_size; i++) {
188                 write_lock(&ls->ls_dirtbl[i].lock);
189                 head = &ls->ls_dirtbl[i].list;
190                 while (!list_empty(head)) {
191                         de = list_entry(head->next, struct dlm_direntry, list);
192                         list_del(&de->list);
193                         put_free_de(ls, de);
194                 }
195                 write_unlock(&ls->ls_dirtbl[i].lock);
196         }
197 }
198
199 int dlm_recover_directory(struct dlm_ls *ls)
200 {
201         struct dlm_member *memb;
202         struct dlm_direntry *de;
203         char *b, *last_name = NULL;
204         int error = -ENOMEM, last_len, count = 0;
205         uint16_t namelen;
206
207         log_debug(ls, "dlm_recover_directory");
208
209         if (dlm_no_directory(ls))
210                 goto out_status;
211
212         dlm_dir_clear(ls);
213
214         last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_KERNEL);
215         if (!last_name)
216                 goto out;
217
218         list_for_each_entry(memb, &ls->ls_nodes, list) {
219                 memset(last_name, 0, DLM_RESNAME_MAXLEN);
220                 last_len = 0;
221
222                 for (;;) {
223                         error = dlm_recovery_stopped(ls);
224                         if (error)
225                                 goto out_free;
226
227                         error = dlm_rcom_names(ls, memb->nodeid,
228                                                last_name, last_len);
229                         if (error)
230                                 goto out_free;
231
232                         schedule();
233
234                         /*
235                          * pick namelen/name pairs out of received buffer
236                          */
237
238                         b = ls->ls_recover_buf + sizeof(struct dlm_rcom);
239
240                         for (;;) {
241                                 memcpy(&namelen, b, sizeof(uint16_t));
242                                 namelen = be16_to_cpu(namelen);
243                                 b += sizeof(uint16_t);
244
245                                 /* namelen of 0xFFFFF marks end of names for
246                                    this node; namelen of 0 marks end of the
247                                    buffer */
248
249                                 if (namelen == 0xFFFF)
250                                         goto done;
251                                 if (!namelen)
252                                         break;
253
254                                 error = -ENOMEM;
255                                 de = get_free_de(ls, namelen);
256                                 if (!de)
257                                         goto out_free;
258
259                                 de->master_nodeid = memb->nodeid;
260                                 de->length = namelen;
261                                 last_len = namelen;
262                                 memcpy(de->name, b, namelen);
263                                 memcpy(last_name, b, namelen);
264                                 b += namelen;
265
266                                 add_entry_to_hash(ls, de);
267                                 count++;
268                         }
269                 }
270          done:
271                 ;
272         }
273
274  out_status:
275         error = 0;
276         dlm_set_recover_status(ls, DLM_RS_DIR);
277         log_debug(ls, "dlm_recover_directory %d entries", count);
278  out_free:
279         kfree(last_name);
280  out:
281         dlm_clear_free_entries(ls);
282         return error;
283 }
284
285 static int get_entry(struct dlm_ls *ls, int nodeid, char *name,
286                      int namelen, int *r_nodeid)
287 {
288         struct dlm_direntry *de, *tmp;
289         uint32_t bucket;
290
291         bucket = dir_hash(ls, name, namelen);
292
293         write_lock(&ls->ls_dirtbl[bucket].lock);
294         de = search_bucket(ls, name, namelen, bucket);
295         if (de) {
296                 *r_nodeid = de->master_nodeid;
297                 write_unlock(&ls->ls_dirtbl[bucket].lock);
298                 if (*r_nodeid == nodeid)
299                         return -EEXIST;
300                 return 0;
301         }
302
303         write_unlock(&ls->ls_dirtbl[bucket].lock);
304
305         de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_KERNEL);
306         if (!de)
307                 return -ENOMEM;
308
309         de->master_nodeid = nodeid;
310         de->length = namelen;
311         memcpy(de->name, name, namelen);
312
313         write_lock(&ls->ls_dirtbl[bucket].lock);
314         tmp = search_bucket(ls, name, namelen, bucket);
315         if (tmp) {
316                 kfree(de);
317                 de = tmp;
318         } else {
319                 list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list);
320         }
321         *r_nodeid = de->master_nodeid;
322         write_unlock(&ls->ls_dirtbl[bucket].lock);
323         return 0;
324 }
325
326 int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen,
327                    int *r_nodeid)
328 {
329         return get_entry(ls, nodeid, name, namelen, r_nodeid);
330 }
331
332 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
333 {
334         struct dlm_rsb *r;
335
336         down_read(&ls->ls_root_sem);
337         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
338                 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
339                         up_read(&ls->ls_root_sem);
340                         return r;
341                 }
342         }
343         up_read(&ls->ls_root_sem);
344         return NULL;
345 }
346
347 /* Find the rsb where we left off (or start again), then send rsb names
348    for rsb's we're master of and whose directory node matches the requesting
349    node.  inbuf is the rsb name last sent, inlen is the name's length */
350
351 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
352                            char *outbuf, int outlen, int nodeid)
353 {
354         struct list_head *list;
355         struct dlm_rsb *r;
356         int offset = 0, dir_nodeid;
357         uint16_t be_namelen;
358
359         down_read(&ls->ls_root_sem);
360
361         if (inlen > 1) {
362                 r = find_rsb_root(ls, inbuf, inlen);
363                 if (!r) {
364                         inbuf[inlen - 1] = '\0';
365                         log_error(ls, "copy_master_names from %d start %d %s",
366                                   nodeid, inlen, inbuf);
367                         goto out;
368                 }
369                 list = r->res_root_list.next;
370         } else {
371                 list = ls->ls_root_list.next;
372         }
373
374         for (offset = 0; list != &ls->ls_root_list; list = list->next) {
375                 r = list_entry(list, struct dlm_rsb, res_root_list);
376                 if (r->res_nodeid)
377                         continue;
378
379                 dir_nodeid = dlm_dir_nodeid(r);
380                 if (dir_nodeid != nodeid)
381                         continue;
382
383                 /*
384                  * The block ends when we can't fit the following in the
385                  * remaining buffer space:
386                  * namelen (uint16_t) +
387                  * name (r->res_length) +
388                  * end-of-block record 0x0000 (uint16_t)
389                  */
390
391                 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
392                         /* Write end-of-block record */
393                         be_namelen = 0;
394                         memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
395                         offset += sizeof(uint16_t);
396                         goto out;
397                 }
398
399                 be_namelen = cpu_to_be16(r->res_length);
400                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
401                 offset += sizeof(uint16_t);
402                 memcpy(outbuf + offset, r->res_name, r->res_length);
403                 offset += r->res_length;
404         }
405
406         /*
407          * If we've reached the end of the list (and there's room) write a
408          * terminating record.
409          */
410
411         if ((list == &ls->ls_root_list) &&
412             (offset + sizeof(uint16_t) <= outlen)) {
413                 be_namelen = 0xFFFF;
414                 memcpy(outbuf + offset, &be_namelen, sizeof(uint16_t));
415                 offset += sizeof(uint16_t);
416         }
417
418  out:
419         up_read(&ls->ls_root_sem);
420 }
421