Merge branch 'perf-probes-for-linus-2' of git://git.kernel.org/pub/scm/linux/kernel...
[pandora-kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87                                     struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132
133 #define modes_compat(gr, rq) \
134         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162                "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171                r->res_nodeid, r->res_flags, r->res_first_lkid,
172                r->res_recover_locks_count, r->res_name);
173 }
174
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177         struct dlm_lkb *lkb;
178
179         dlm_print_rsb(r);
180
181         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183         printk(KERN_ERR "rsb lookup list\n");
184         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185                 dlm_print_lkb(lkb);
186         printk(KERN_ERR "rsb grant queue:\n");
187         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188                 dlm_print_lkb(lkb);
189         printk(KERN_ERR "rsb convert queue:\n");
190         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb wait queue:\n");
193         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195 }
196
197 /* Threads cannot use the lockspace while it's being recovered */
198
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201         down_read(&ls->ls_in_recovery);
202 }
203
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206         up_read(&ls->ls_in_recovery);
207 }
208
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211         return down_read_trylock(&ls->ls_in_recovery);
212 }
213
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242         return !!r->res_nodeid;
243 }
244
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261                 return 1;
262         return 0;
263 }
264
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283                                   DLM_IFL_OVERLAP_CANCEL));
284 }
285
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288         if (is_master_copy(lkb))
289                 return;
290
291         del_timeout(lkb);
292
293         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294
295         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
296            timeout caused the cancel then return -ETIMEDOUT */
297         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299                 rv = -ETIMEDOUT;
300         }
301
302         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304                 rv = -EDEADLK;
305         }
306
307         lkb->lkb_lksb->sb_status = rv;
308         lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309
310         dlm_add_ast(lkb, AST_COMP, lkb->lkb_grmode);
311 }
312
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315         queue_cast(r, lkb,
316                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321         lkb->lkb_time_bast = ktime_get();
322
323         if (is_master_copy(lkb)) {
324                 lkb->lkb_bastmode = rqmode; /* printed by debugfs */
325                 send_bast(r, lkb, rqmode);
326         } else {
327                 dlm_add_ast(lkb, AST_BAST, rqmode);
328         }
329 }
330
331 /*
332  * Basic operations on rsb's and lkb's
333  */
334
335 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
336 {
337         struct dlm_rsb *r;
338
339         r = dlm_allocate_rsb(ls, len);
340         if (!r)
341                 return NULL;
342
343         r->res_ls = ls;
344         r->res_length = len;
345         memcpy(r->res_name, name, len);
346         mutex_init(&r->res_mutex);
347
348         INIT_LIST_HEAD(&r->res_lookup);
349         INIT_LIST_HEAD(&r->res_grantqueue);
350         INIT_LIST_HEAD(&r->res_convertqueue);
351         INIT_LIST_HEAD(&r->res_waitqueue);
352         INIT_LIST_HEAD(&r->res_root_list);
353         INIT_LIST_HEAD(&r->res_recover_list);
354
355         return r;
356 }
357
358 static int search_rsb_list(struct list_head *head, char *name, int len,
359                            unsigned int flags, struct dlm_rsb **r_ret)
360 {
361         struct dlm_rsb *r;
362         int error = 0;
363
364         list_for_each_entry(r, head, res_hashchain) {
365                 if (len == r->res_length && !memcmp(name, r->res_name, len))
366                         goto found;
367         }
368         *r_ret = NULL;
369         return -EBADR;
370
371  found:
372         if (r->res_nodeid && (flags & R_MASTER))
373                 error = -ENOTBLK;
374         *r_ret = r;
375         return error;
376 }
377
378 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
379                        unsigned int flags, struct dlm_rsb **r_ret)
380 {
381         struct dlm_rsb *r;
382         int error;
383
384         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
385         if (!error) {
386                 kref_get(&r->res_ref);
387                 goto out;
388         }
389         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
390         if (error)
391                 goto out;
392
393         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
394
395         if (dlm_no_directory(ls))
396                 goto out;
397
398         if (r->res_nodeid == -1) {
399                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
400                 r->res_first_lkid = 0;
401         } else if (r->res_nodeid > 0) {
402                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
403                 r->res_first_lkid = 0;
404         } else {
405                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
406                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
407         }
408  out:
409         *r_ret = r;
410         return error;
411 }
412
413 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
414                       unsigned int flags, struct dlm_rsb **r_ret)
415 {
416         int error;
417         spin_lock(&ls->ls_rsbtbl[b].lock);
418         error = _search_rsb(ls, name, len, b, flags, r_ret);
419         spin_unlock(&ls->ls_rsbtbl[b].lock);
420         return error;
421 }
422
423 /*
424  * Find rsb in rsbtbl and potentially create/add one
425  *
426  * Delaying the release of rsb's has a similar benefit to applications keeping
427  * NL locks on an rsb, but without the guarantee that the cached master value
428  * will still be valid when the rsb is reused.  Apps aren't always smart enough
429  * to keep NL locks on an rsb that they may lock again shortly; this can lead
430  * to excessive master lookups and removals if we don't delay the release.
431  *
432  * Searching for an rsb means looking through both the normal list and toss
433  * list.  When found on the toss list the rsb is moved to the normal list with
434  * ref count of 1; when found on normal list the ref count is incremented.
435  */
436
437 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
438                     unsigned int flags, struct dlm_rsb **r_ret)
439 {
440         struct dlm_rsb *r = NULL, *tmp;
441         uint32_t hash, bucket;
442         int error = -EINVAL;
443
444         if (namelen > DLM_RESNAME_MAXLEN)
445                 goto out;
446
447         if (dlm_no_directory(ls))
448                 flags |= R_CREATE;
449
450         error = 0;
451         hash = jhash(name, namelen, 0);
452         bucket = hash & (ls->ls_rsbtbl_size - 1);
453
454         error = search_rsb(ls, name, namelen, bucket, flags, &r);
455         if (!error)
456                 goto out;
457
458         if (error == -EBADR && !(flags & R_CREATE))
459                 goto out;
460
461         /* the rsb was found but wasn't a master copy */
462         if (error == -ENOTBLK)
463                 goto out;
464
465         error = -ENOMEM;
466         r = create_rsb(ls, name, namelen);
467         if (!r)
468                 goto out;
469
470         r->res_hash = hash;
471         r->res_bucket = bucket;
472         r->res_nodeid = -1;
473         kref_init(&r->res_ref);
474
475         /* With no directory, the master can be set immediately */
476         if (dlm_no_directory(ls)) {
477                 int nodeid = dlm_dir_nodeid(r);
478                 if (nodeid == dlm_our_nodeid())
479                         nodeid = 0;
480                 r->res_nodeid = nodeid;
481         }
482
483         spin_lock(&ls->ls_rsbtbl[bucket].lock);
484         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
485         if (!error) {
486                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
487                 dlm_free_rsb(r);
488                 r = tmp;
489                 goto out;
490         }
491         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
492         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
493         error = 0;
494  out:
495         *r_ret = r;
496         return error;
497 }
498
499 /* This is only called to add a reference when the code already holds
500    a valid reference to the rsb, so there's no need for locking. */
501
502 static inline void hold_rsb(struct dlm_rsb *r)
503 {
504         kref_get(&r->res_ref);
505 }
506
507 void dlm_hold_rsb(struct dlm_rsb *r)
508 {
509         hold_rsb(r);
510 }
511
512 static void toss_rsb(struct kref *kref)
513 {
514         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
515         struct dlm_ls *ls = r->res_ls;
516
517         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
518         kref_init(&r->res_ref);
519         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
520         r->res_toss_time = jiffies;
521         if (r->res_lvbptr) {
522                 dlm_free_lvb(r->res_lvbptr);
523                 r->res_lvbptr = NULL;
524         }
525 }
526
527 /* When all references to the rsb are gone it's transfered to
528    the tossed list for later disposal. */
529
530 static void put_rsb(struct dlm_rsb *r)
531 {
532         struct dlm_ls *ls = r->res_ls;
533         uint32_t bucket = r->res_bucket;
534
535         spin_lock(&ls->ls_rsbtbl[bucket].lock);
536         kref_put(&r->res_ref, toss_rsb);
537         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
538 }
539
540 void dlm_put_rsb(struct dlm_rsb *r)
541 {
542         put_rsb(r);
543 }
544
545 /* See comment for unhold_lkb */
546
547 static void unhold_rsb(struct dlm_rsb *r)
548 {
549         int rv;
550         rv = kref_put(&r->res_ref, toss_rsb);
551         DLM_ASSERT(!rv, dlm_dump_rsb(r););
552 }
553
554 static void kill_rsb(struct kref *kref)
555 {
556         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
557
558         /* All work is done after the return from kref_put() so we
559            can release the write_lock before the remove and free. */
560
561         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
562         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
563         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
564         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
565         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
566         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
567 }
568
569 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
570    The rsb must exist as long as any lkb's for it do. */
571
572 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
573 {
574         hold_rsb(r);
575         lkb->lkb_resource = r;
576 }
577
578 static void detach_lkb(struct dlm_lkb *lkb)
579 {
580         if (lkb->lkb_resource) {
581                 put_rsb(lkb->lkb_resource);
582                 lkb->lkb_resource = NULL;
583         }
584 }
585
586 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
587 {
588         struct dlm_lkb *lkb, *tmp;
589         uint32_t lkid = 0;
590         uint16_t bucket;
591
592         lkb = dlm_allocate_lkb(ls);
593         if (!lkb)
594                 return -ENOMEM;
595
596         lkb->lkb_nodeid = -1;
597         lkb->lkb_grmode = DLM_LOCK_IV;
598         kref_init(&lkb->lkb_ref);
599         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
600         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
601         INIT_LIST_HEAD(&lkb->lkb_time_list);
602
603         get_random_bytes(&bucket, sizeof(bucket));
604         bucket &= (ls->ls_lkbtbl_size - 1);
605
606         write_lock(&ls->ls_lkbtbl[bucket].lock);
607
608         /* counter can roll over so we must verify lkid is not in use */
609
610         while (lkid == 0) {
611                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
612
613                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
614                                     lkb_idtbl_list) {
615                         if (tmp->lkb_id != lkid)
616                                 continue;
617                         lkid = 0;
618                         break;
619                 }
620         }
621
622         lkb->lkb_id = lkid;
623         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
624         write_unlock(&ls->ls_lkbtbl[bucket].lock);
625
626         *lkb_ret = lkb;
627         return 0;
628 }
629
630 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
631 {
632         struct dlm_lkb *lkb;
633         uint16_t bucket = (lkid >> 16);
634
635         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
636                 if (lkb->lkb_id == lkid)
637                         return lkb;
638         }
639         return NULL;
640 }
641
642 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
643 {
644         struct dlm_lkb *lkb;
645         uint16_t bucket = (lkid >> 16);
646
647         if (bucket >= ls->ls_lkbtbl_size)
648                 return -EBADSLT;
649
650         read_lock(&ls->ls_lkbtbl[bucket].lock);
651         lkb = __find_lkb(ls, lkid);
652         if (lkb)
653                 kref_get(&lkb->lkb_ref);
654         read_unlock(&ls->ls_lkbtbl[bucket].lock);
655
656         *lkb_ret = lkb;
657         return lkb ? 0 : -ENOENT;
658 }
659
660 static void kill_lkb(struct kref *kref)
661 {
662         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
663
664         /* All work is done after the return from kref_put() so we
665            can release the write_lock before the detach_lkb */
666
667         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
668 }
669
670 /* __put_lkb() is used when an lkb may not have an rsb attached to
671    it so we need to provide the lockspace explicitly */
672
673 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
674 {
675         uint16_t bucket = (lkb->lkb_id >> 16);
676
677         write_lock(&ls->ls_lkbtbl[bucket].lock);
678         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
679                 list_del(&lkb->lkb_idtbl_list);
680                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
681
682                 detach_lkb(lkb);
683
684                 /* for local/process lkbs, lvbptr points to caller's lksb */
685                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
686                         dlm_free_lvb(lkb->lkb_lvbptr);
687                 dlm_free_lkb(lkb);
688                 return 1;
689         } else {
690                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
691                 return 0;
692         }
693 }
694
695 int dlm_put_lkb(struct dlm_lkb *lkb)
696 {
697         struct dlm_ls *ls;
698
699         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
700         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
701
702         ls = lkb->lkb_resource->res_ls;
703         return __put_lkb(ls, lkb);
704 }
705
706 /* This is only called to add a reference when the code already holds
707    a valid reference to the lkb, so there's no need for locking. */
708
709 static inline void hold_lkb(struct dlm_lkb *lkb)
710 {
711         kref_get(&lkb->lkb_ref);
712 }
713
714 /* This is called when we need to remove a reference and are certain
715    it's not the last ref.  e.g. del_lkb is always called between a
716    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
717    put_lkb would work fine, but would involve unnecessary locking */
718
719 static inline void unhold_lkb(struct dlm_lkb *lkb)
720 {
721         int rv;
722         rv = kref_put(&lkb->lkb_ref, kill_lkb);
723         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
724 }
725
726 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
727                             int mode)
728 {
729         struct dlm_lkb *lkb = NULL;
730
731         list_for_each_entry(lkb, head, lkb_statequeue)
732                 if (lkb->lkb_rqmode < mode)
733                         break;
734
735         if (!lkb)
736                 list_add_tail(new, head);
737         else
738                 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
739 }
740
741 /* add/remove lkb to rsb's grant/convert/wait queue */
742
743 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
744 {
745         kref_get(&lkb->lkb_ref);
746
747         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
748
749         lkb->lkb_timestamp = ktime_get();
750
751         lkb->lkb_status = status;
752
753         switch (status) {
754         case DLM_LKSTS_WAITING:
755                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
756                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
757                 else
758                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
759                 break;
760         case DLM_LKSTS_GRANTED:
761                 /* convention says granted locks kept in order of grmode */
762                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
763                                 lkb->lkb_grmode);
764                 break;
765         case DLM_LKSTS_CONVERT:
766                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
767                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
768                 else
769                         list_add_tail(&lkb->lkb_statequeue,
770                                       &r->res_convertqueue);
771                 break;
772         default:
773                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
774         }
775 }
776
777 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
778 {
779         lkb->lkb_status = 0;
780         list_del(&lkb->lkb_statequeue);
781         unhold_lkb(lkb);
782 }
783
784 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
785 {
786         hold_lkb(lkb);
787         del_lkb(r, lkb);
788         add_lkb(r, lkb, sts);
789         unhold_lkb(lkb);
790 }
791
792 static int msg_reply_type(int mstype)
793 {
794         switch (mstype) {
795         case DLM_MSG_REQUEST:
796                 return DLM_MSG_REQUEST_REPLY;
797         case DLM_MSG_CONVERT:
798                 return DLM_MSG_CONVERT_REPLY;
799         case DLM_MSG_UNLOCK:
800                 return DLM_MSG_UNLOCK_REPLY;
801         case DLM_MSG_CANCEL:
802                 return DLM_MSG_CANCEL_REPLY;
803         case DLM_MSG_LOOKUP:
804                 return DLM_MSG_LOOKUP_REPLY;
805         }
806         return -1;
807 }
808
809 /* add/remove lkb from global waiters list of lkb's waiting for
810    a reply from a remote node */
811
812 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
813 {
814         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
815         int error = 0;
816
817         mutex_lock(&ls->ls_waiters_mutex);
818
819         if (is_overlap_unlock(lkb) ||
820             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
821                 error = -EINVAL;
822                 goto out;
823         }
824
825         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
826                 switch (mstype) {
827                 case DLM_MSG_UNLOCK:
828                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
829                         break;
830                 case DLM_MSG_CANCEL:
831                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
832                         break;
833                 default:
834                         error = -EBUSY;
835                         goto out;
836                 }
837                 lkb->lkb_wait_count++;
838                 hold_lkb(lkb);
839
840                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
841                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
842                           lkb->lkb_wait_count, lkb->lkb_flags);
843                 goto out;
844         }
845
846         DLM_ASSERT(!lkb->lkb_wait_count,
847                    dlm_print_lkb(lkb);
848                    printk("wait_count %d\n", lkb->lkb_wait_count););
849
850         lkb->lkb_wait_count++;
851         lkb->lkb_wait_type = mstype;
852         hold_lkb(lkb);
853         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
854  out:
855         if (error)
856                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
857                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
858                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
859         mutex_unlock(&ls->ls_waiters_mutex);
860         return error;
861 }
862
863 /* We clear the RESEND flag because we might be taking an lkb off the waiters
864    list as part of process_requestqueue (e.g. a lookup that has an optimized
865    request reply on the requestqueue) between dlm_recover_waiters_pre() which
866    set RESEND and dlm_recover_waiters_post() */
867
868 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
869                                 struct dlm_message *ms)
870 {
871         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
872         int overlap_done = 0;
873
874         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
875                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
876                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
877                 overlap_done = 1;
878                 goto out_del;
879         }
880
881         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
882                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
883                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
884                 overlap_done = 1;
885                 goto out_del;
886         }
887
888         /* Cancel state was preemptively cleared by a successful convert,
889            see next comment, nothing to do. */
890
891         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
892             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
893                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
894                           lkb->lkb_id, lkb->lkb_wait_type);
895                 return -1;
896         }
897
898         /* Remove for the convert reply, and premptively remove for the
899            cancel reply.  A convert has been granted while there's still
900            an outstanding cancel on it (the cancel is moot and the result
901            in the cancel reply should be 0).  We preempt the cancel reply
902            because the app gets the convert result and then can follow up
903            with another op, like convert.  This subsequent op would see the
904            lingering state of the cancel and fail with -EBUSY. */
905
906         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
907             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
908             is_overlap_cancel(lkb) && ms && !ms->m_result) {
909                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
910                           lkb->lkb_id);
911                 lkb->lkb_wait_type = 0;
912                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
913                 lkb->lkb_wait_count--;
914                 goto out_del;
915         }
916
917         /* N.B. type of reply may not always correspond to type of original
918            msg due to lookup->request optimization, verify others? */
919
920         if (lkb->lkb_wait_type) {
921                 lkb->lkb_wait_type = 0;
922                 goto out_del;
923         }
924
925         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
926                   lkb->lkb_id, mstype, lkb->lkb_flags);
927         return -1;
928
929  out_del:
930         /* the force-unlock/cancel has completed and we haven't recvd a reply
931            to the op that was in progress prior to the unlock/cancel; we
932            give up on any reply to the earlier op.  FIXME: not sure when/how
933            this would happen */
934
935         if (overlap_done && lkb->lkb_wait_type) {
936                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
937                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
938                 lkb->lkb_wait_count--;
939                 lkb->lkb_wait_type = 0;
940         }
941
942         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
943
944         lkb->lkb_flags &= ~DLM_IFL_RESEND;
945         lkb->lkb_wait_count--;
946         if (!lkb->lkb_wait_count)
947                 list_del_init(&lkb->lkb_wait_reply);
948         unhold_lkb(lkb);
949         return 0;
950 }
951
952 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
953 {
954         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
955         int error;
956
957         mutex_lock(&ls->ls_waiters_mutex);
958         error = _remove_from_waiters(lkb, mstype, NULL);
959         mutex_unlock(&ls->ls_waiters_mutex);
960         return error;
961 }
962
963 /* Handles situations where we might be processing a "fake" or "stub" reply in
964    which we can't try to take waiters_mutex again. */
965
966 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
967 {
968         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
969         int error;
970
971         if (ms != &ls->ls_stub_ms)
972                 mutex_lock(&ls->ls_waiters_mutex);
973         error = _remove_from_waiters(lkb, ms->m_type, ms);
974         if (ms != &ls->ls_stub_ms)
975                 mutex_unlock(&ls->ls_waiters_mutex);
976         return error;
977 }
978
979 static void dir_remove(struct dlm_rsb *r)
980 {
981         int to_nodeid;
982
983         if (dlm_no_directory(r->res_ls))
984                 return;
985
986         to_nodeid = dlm_dir_nodeid(r);
987         if (to_nodeid != dlm_our_nodeid())
988                 send_remove(r);
989         else
990                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
991                                      r->res_name, r->res_length);
992 }
993
994 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
995    found since they are in order of newest to oldest? */
996
997 static int shrink_bucket(struct dlm_ls *ls, int b)
998 {
999         struct dlm_rsb *r;
1000         int count = 0, found;
1001
1002         for (;;) {
1003                 found = 0;
1004                 spin_lock(&ls->ls_rsbtbl[b].lock);
1005                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1006                                             res_hashchain) {
1007                         if (!time_after_eq(jiffies, r->res_toss_time +
1008                                            dlm_config.ci_toss_secs * HZ))
1009                                 continue;
1010                         found = 1;
1011                         break;
1012                 }
1013
1014                 if (!found) {
1015                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1016                         break;
1017                 }
1018
1019                 if (kref_put(&r->res_ref, kill_rsb)) {
1020                         list_del(&r->res_hashchain);
1021                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1022
1023                         if (is_master(r))
1024                                 dir_remove(r);
1025                         dlm_free_rsb(r);
1026                         count++;
1027                 } else {
1028                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1029                         log_error(ls, "tossed rsb in use %s", r->res_name);
1030                 }
1031         }
1032
1033         return count;
1034 }
1035
1036 void dlm_scan_rsbs(struct dlm_ls *ls)
1037 {
1038         int i;
1039
1040         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1041                 shrink_bucket(ls, i);
1042                 if (dlm_locking_stopped(ls))
1043                         break;
1044                 cond_resched();
1045         }
1046 }
1047
1048 static void add_timeout(struct dlm_lkb *lkb)
1049 {
1050         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1051
1052         if (is_master_copy(lkb))
1053                 return;
1054
1055         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1056             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1057                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1058                 goto add_it;
1059         }
1060         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1061                 goto add_it;
1062         return;
1063
1064  add_it:
1065         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1066         mutex_lock(&ls->ls_timeout_mutex);
1067         hold_lkb(lkb);
1068         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1069         mutex_unlock(&ls->ls_timeout_mutex);
1070 }
1071
1072 static void del_timeout(struct dlm_lkb *lkb)
1073 {
1074         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1075
1076         mutex_lock(&ls->ls_timeout_mutex);
1077         if (!list_empty(&lkb->lkb_time_list)) {
1078                 list_del_init(&lkb->lkb_time_list);
1079                 unhold_lkb(lkb);
1080         }
1081         mutex_unlock(&ls->ls_timeout_mutex);
1082 }
1083
1084 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1085    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1086    and then lock rsb because of lock ordering in add_timeout.  We may need
1087    to specify some special timeout-related bits in the lkb that are just to
1088    be accessed under the timeout_mutex. */
1089
1090 void dlm_scan_timeout(struct dlm_ls *ls)
1091 {
1092         struct dlm_rsb *r;
1093         struct dlm_lkb *lkb;
1094         int do_cancel, do_warn;
1095         s64 wait_us;
1096
1097         for (;;) {
1098                 if (dlm_locking_stopped(ls))
1099                         break;
1100
1101                 do_cancel = 0;
1102                 do_warn = 0;
1103                 mutex_lock(&ls->ls_timeout_mutex);
1104                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1105
1106                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1107                                                         lkb->lkb_timestamp));
1108
1109                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1110                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1111                                 do_cancel = 1;
1112
1113                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1114                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1115                                 do_warn = 1;
1116
1117                         if (!do_cancel && !do_warn)
1118                                 continue;
1119                         hold_lkb(lkb);
1120                         break;
1121                 }
1122                 mutex_unlock(&ls->ls_timeout_mutex);
1123
1124                 if (!do_cancel && !do_warn)
1125                         break;
1126
1127                 r = lkb->lkb_resource;
1128                 hold_rsb(r);
1129                 lock_rsb(r);
1130
1131                 if (do_warn) {
1132                         /* clear flag so we only warn once */
1133                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1134                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1135                                 del_timeout(lkb);
1136                         dlm_timeout_warn(lkb);
1137                 }
1138
1139                 if (do_cancel) {
1140                         log_debug(ls, "timeout cancel %x node %d %s",
1141                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1142                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1143                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1144                         del_timeout(lkb);
1145                         _cancel_lock(r, lkb);
1146                 }
1147
1148                 unlock_rsb(r);
1149                 unhold_rsb(r);
1150                 dlm_put_lkb(lkb);
1151         }
1152 }
1153
1154 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1155    dlm_recoverd before checking/setting ls_recover_begin. */
1156
1157 void dlm_adjust_timeouts(struct dlm_ls *ls)
1158 {
1159         struct dlm_lkb *lkb;
1160         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1161
1162         ls->ls_recover_begin = 0;
1163         mutex_lock(&ls->ls_timeout_mutex);
1164         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1165                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1166         mutex_unlock(&ls->ls_timeout_mutex);
1167 }
1168
1169 /* lkb is master or local copy */
1170
1171 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1172 {
1173         int b, len = r->res_ls->ls_lvblen;
1174
1175         /* b=1 lvb returned to caller
1176            b=0 lvb written to rsb or invalidated
1177            b=-1 do nothing */
1178
1179         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1180
1181         if (b == 1) {
1182                 if (!lkb->lkb_lvbptr)
1183                         return;
1184
1185                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1186                         return;
1187
1188                 if (!r->res_lvbptr)
1189                         return;
1190
1191                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1192                 lkb->lkb_lvbseq = r->res_lvbseq;
1193
1194         } else if (b == 0) {
1195                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1196                         rsb_set_flag(r, RSB_VALNOTVALID);
1197                         return;
1198                 }
1199
1200                 if (!lkb->lkb_lvbptr)
1201                         return;
1202
1203                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1204                         return;
1205
1206                 if (!r->res_lvbptr)
1207                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1208
1209                 if (!r->res_lvbptr)
1210                         return;
1211
1212                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1213                 r->res_lvbseq++;
1214                 lkb->lkb_lvbseq = r->res_lvbseq;
1215                 rsb_clear_flag(r, RSB_VALNOTVALID);
1216         }
1217
1218         if (rsb_flag(r, RSB_VALNOTVALID))
1219                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1220 }
1221
1222 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1223 {
1224         if (lkb->lkb_grmode < DLM_LOCK_PW)
1225                 return;
1226
1227         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1228                 rsb_set_flag(r, RSB_VALNOTVALID);
1229                 return;
1230         }
1231
1232         if (!lkb->lkb_lvbptr)
1233                 return;
1234
1235         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1236                 return;
1237
1238         if (!r->res_lvbptr)
1239                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1240
1241         if (!r->res_lvbptr)
1242                 return;
1243
1244         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1245         r->res_lvbseq++;
1246         rsb_clear_flag(r, RSB_VALNOTVALID);
1247 }
1248
1249 /* lkb is process copy (pc) */
1250
1251 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1252                             struct dlm_message *ms)
1253 {
1254         int b;
1255
1256         if (!lkb->lkb_lvbptr)
1257                 return;
1258
1259         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1260                 return;
1261
1262         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1263         if (b == 1) {
1264                 int len = receive_extralen(ms);
1265                 if (len > DLM_RESNAME_MAXLEN)
1266                         len = DLM_RESNAME_MAXLEN;
1267                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1268                 lkb->lkb_lvbseq = ms->m_lvbseq;
1269         }
1270 }
1271
1272 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1273    remove_lock -- used for unlock, removes lkb from granted
1274    revert_lock -- used for cancel, moves lkb from convert to granted
1275    grant_lock  -- used for request and convert, adds lkb to granted or
1276                   moves lkb from convert or waiting to granted
1277
1278    Each of these is used for master or local copy lkb's.  There is
1279    also a _pc() variation used to make the corresponding change on
1280    a process copy (pc) lkb. */
1281
1282 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1283 {
1284         del_lkb(r, lkb);
1285         lkb->lkb_grmode = DLM_LOCK_IV;
1286         /* this unhold undoes the original ref from create_lkb()
1287            so this leads to the lkb being freed */
1288         unhold_lkb(lkb);
1289 }
1290
1291 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1292 {
1293         set_lvb_unlock(r, lkb);
1294         _remove_lock(r, lkb);
1295 }
1296
1297 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299         _remove_lock(r, lkb);
1300 }
1301
1302 /* returns: 0 did nothing
1303             1 moved lock to granted
1304            -1 removed lock */
1305
1306 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1307 {
1308         int rv = 0;
1309
1310         lkb->lkb_rqmode = DLM_LOCK_IV;
1311
1312         switch (lkb->lkb_status) {
1313         case DLM_LKSTS_GRANTED:
1314                 break;
1315         case DLM_LKSTS_CONVERT:
1316                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1317                 rv = 1;
1318                 break;
1319         case DLM_LKSTS_WAITING:
1320                 del_lkb(r, lkb);
1321                 lkb->lkb_grmode = DLM_LOCK_IV;
1322                 /* this unhold undoes the original ref from create_lkb()
1323                    so this leads to the lkb being freed */
1324                 unhold_lkb(lkb);
1325                 rv = -1;
1326                 break;
1327         default:
1328                 log_print("invalid status for revert %d", lkb->lkb_status);
1329         }
1330         return rv;
1331 }
1332
1333 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335         return revert_lock(r, lkb);
1336 }
1337
1338 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1339 {
1340         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1341                 lkb->lkb_grmode = lkb->lkb_rqmode;
1342                 if (lkb->lkb_status)
1343                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1344                 else
1345                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1346         }
1347
1348         lkb->lkb_rqmode = DLM_LOCK_IV;
1349 }
1350
1351 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1352 {
1353         set_lvb_lock(r, lkb);
1354         _grant_lock(r, lkb);
1355         lkb->lkb_highbast = 0;
1356 }
1357
1358 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1359                           struct dlm_message *ms)
1360 {
1361         set_lvb_lock_pc(r, lkb, ms);
1362         _grant_lock(r, lkb);
1363 }
1364
1365 /* called by grant_pending_locks() which means an async grant message must
1366    be sent to the requesting node in addition to granting the lock if the
1367    lkb belongs to a remote node. */
1368
1369 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1370 {
1371         grant_lock(r, lkb);
1372         if (is_master_copy(lkb))
1373                 send_grant(r, lkb);
1374         else
1375                 queue_cast(r, lkb, 0);
1376 }
1377
1378 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1379    change the granted/requested modes.  We're munging things accordingly in
1380    the process copy.
1381    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1382    conversion deadlock
1383    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1384    compatible with other granted locks */
1385
1386 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1387 {
1388         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1389                 log_print("munge_demoted %x invalid reply type %d",
1390                           lkb->lkb_id, ms->m_type);
1391                 return;
1392         }
1393
1394         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1395                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1396                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1397                 return;
1398         }
1399
1400         lkb->lkb_grmode = DLM_LOCK_NL;
1401 }
1402
1403 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1404 {
1405         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1406             ms->m_type != DLM_MSG_GRANT) {
1407                 log_print("munge_altmode %x invalid reply type %d",
1408                           lkb->lkb_id, ms->m_type);
1409                 return;
1410         }
1411
1412         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1413                 lkb->lkb_rqmode = DLM_LOCK_PR;
1414         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1415                 lkb->lkb_rqmode = DLM_LOCK_CW;
1416         else {
1417                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1418                 dlm_print_lkb(lkb);
1419         }
1420 }
1421
1422 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1423 {
1424         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1425                                            lkb_statequeue);
1426         if (lkb->lkb_id == first->lkb_id)
1427                 return 1;
1428
1429         return 0;
1430 }
1431
1432 /* Check if the given lkb conflicts with another lkb on the queue. */
1433
1434 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1435 {
1436         struct dlm_lkb *this;
1437
1438         list_for_each_entry(this, head, lkb_statequeue) {
1439                 if (this == lkb)
1440                         continue;
1441                 if (!modes_compat(this, lkb))
1442                         return 1;
1443         }
1444         return 0;
1445 }
1446
1447 /*
1448  * "A conversion deadlock arises with a pair of lock requests in the converting
1449  * queue for one resource.  The granted mode of each lock blocks the requested
1450  * mode of the other lock."
1451  *
1452  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1453  * convert queue from being granted, then deadlk/demote lkb.
1454  *
1455  * Example:
1456  * Granted Queue: empty
1457  * Convert Queue: NL->EX (first lock)
1458  *                PR->EX (second lock)
1459  *
1460  * The first lock can't be granted because of the granted mode of the second
1461  * lock and the second lock can't be granted because it's not first in the
1462  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1463  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1464  * flag set and return DEMOTED in the lksb flags.
1465  *
1466  * Originally, this function detected conv-deadlk in a more limited scope:
1467  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1468  * - if lkb1 was the first entry in the queue (not just earlier), and was
1469  *   blocked by the granted mode of lkb2, and there was nothing on the
1470  *   granted queue preventing lkb1 from being granted immediately, i.e.
1471  *   lkb2 was the only thing preventing lkb1 from being granted.
1472  *
1473  * That second condition meant we'd only say there was conv-deadlk if
1474  * resolving it (by demotion) would lead to the first lock on the convert
1475  * queue being granted right away.  It allowed conversion deadlocks to exist
1476  * between locks on the convert queue while they couldn't be granted anyway.
1477  *
1478  * Now, we detect and take action on conversion deadlocks immediately when
1479  * they're created, even if they may not be immediately consequential.  If
1480  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1481  * mode that would prevent lkb1's conversion from being granted, we do a
1482  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1483  * I think this means that the lkb_is_ahead condition below should always
1484  * be zero, i.e. there will never be conv-deadlk between two locks that are
1485  * both already on the convert queue.
1486  */
1487
1488 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1489 {
1490         struct dlm_lkb *lkb1;
1491         int lkb_is_ahead = 0;
1492
1493         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1494                 if (lkb1 == lkb2) {
1495                         lkb_is_ahead = 1;
1496                         continue;
1497                 }
1498
1499                 if (!lkb_is_ahead) {
1500                         if (!modes_compat(lkb2, lkb1))
1501                                 return 1;
1502                 } else {
1503                         if (!modes_compat(lkb2, lkb1) &&
1504                             !modes_compat(lkb1, lkb2))
1505                                 return 1;
1506                 }
1507         }
1508         return 0;
1509 }
1510
1511 /*
1512  * Return 1 if the lock can be granted, 0 otherwise.
1513  * Also detect and resolve conversion deadlocks.
1514  *
1515  * lkb is the lock to be granted
1516  *
1517  * now is 1 if the function is being called in the context of the
1518  * immediate request, it is 0 if called later, after the lock has been
1519  * queued.
1520  *
1521  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1522  */
1523
1524 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1525 {
1526         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1527
1528         /*
1529          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1530          * a new request for a NL mode lock being blocked.
1531          *
1532          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1533          * request, then it would be granted.  In essence, the use of this flag
1534          * tells the Lock Manager to expedite theis request by not considering
1535          * what may be in the CONVERTING or WAITING queues...  As of this
1536          * writing, the EXPEDITE flag can be used only with new requests for NL
1537          * mode locks.  This flag is not valid for conversion requests.
1538          *
1539          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1540          * conversion or used with a non-NL requested mode.  We also know an
1541          * EXPEDITE request is always granted immediately, so now must always
1542          * be 1.  The full condition to grant an expedite request: (now &&
1543          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1544          * therefore be shortened to just checking the flag.
1545          */
1546
1547         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1548                 return 1;
1549
1550         /*
1551          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1552          * added to the remaining conditions.
1553          */
1554
1555         if (queue_conflict(&r->res_grantqueue, lkb))
1556                 goto out;
1557
1558         /*
1559          * 6-3: By default, a conversion request is immediately granted if the
1560          * requested mode is compatible with the modes of all other granted
1561          * locks
1562          */
1563
1564         if (queue_conflict(&r->res_convertqueue, lkb))
1565                 goto out;
1566
1567         /*
1568          * 6-5: But the default algorithm for deciding whether to grant or
1569          * queue conversion requests does not by itself guarantee that such
1570          * requests are serviced on a "first come first serve" basis.  This, in
1571          * turn, can lead to a phenomenon known as "indefinate postponement".
1572          *
1573          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1574          * the system service employed to request a lock conversion.  This flag
1575          * forces certain conversion requests to be queued, even if they are
1576          * compatible with the granted modes of other locks on the same
1577          * resource.  Thus, the use of this flag results in conversion requests
1578          * being ordered on a "first come first servce" basis.
1579          *
1580          * DCT: This condition is all about new conversions being able to occur
1581          * "in place" while the lock remains on the granted queue (assuming
1582          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1583          * doesn't _have_ to go onto the convert queue where it's processed in
1584          * order.  The "now" variable is necessary to distinguish converts
1585          * being received and processed for the first time now, because once a
1586          * convert is moved to the conversion queue the condition below applies
1587          * requiring fifo granting.
1588          */
1589
1590         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1591                 return 1;
1592
1593         /*
1594          * The NOORDER flag is set to avoid the standard vms rules on grant
1595          * order.
1596          */
1597
1598         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1599                 return 1;
1600
1601         /*
1602          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1603          * granted until all other conversion requests ahead of it are granted
1604          * and/or canceled.
1605          */
1606
1607         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1608                 return 1;
1609
1610         /*
1611          * 6-4: By default, a new request is immediately granted only if all
1612          * three of the following conditions are satisfied when the request is
1613          * issued:
1614          * - The queue of ungranted conversion requests for the resource is
1615          *   empty.
1616          * - The queue of ungranted new requests for the resource is empty.
1617          * - The mode of the new request is compatible with the most
1618          *   restrictive mode of all granted locks on the resource.
1619          */
1620
1621         if (now && !conv && list_empty(&r->res_convertqueue) &&
1622             list_empty(&r->res_waitqueue))
1623                 return 1;
1624
1625         /*
1626          * 6-4: Once a lock request is in the queue of ungranted new requests,
1627          * it cannot be granted until the queue of ungranted conversion
1628          * requests is empty, all ungranted new requests ahead of it are
1629          * granted and/or canceled, and it is compatible with the granted mode
1630          * of the most restrictive lock granted on the resource.
1631          */
1632
1633         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1634             first_in_list(lkb, &r->res_waitqueue))
1635                 return 1;
1636  out:
1637         return 0;
1638 }
1639
1640 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1641                           int *err)
1642 {
1643         int rv;
1644         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1645         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1646
1647         if (err)
1648                 *err = 0;
1649
1650         rv = _can_be_granted(r, lkb, now);
1651         if (rv)
1652                 goto out;
1653
1654         /*
1655          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1656          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1657          * cancels one of the locks.
1658          */
1659
1660         if (is_convert && can_be_queued(lkb) &&
1661             conversion_deadlock_detect(r, lkb)) {
1662                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1663                         lkb->lkb_grmode = DLM_LOCK_NL;
1664                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1665                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1666                         if (err)
1667                                 *err = -EDEADLK;
1668                         else {
1669                                 log_print("can_be_granted deadlock %x now %d",
1670                                           lkb->lkb_id, now);
1671                                 dlm_dump_rsb(r);
1672                         }
1673                 }
1674                 goto out;
1675         }
1676
1677         /*
1678          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1679          * to grant a request in a mode other than the normal rqmode.  It's a
1680          * simple way to provide a big optimization to applications that can
1681          * use them.
1682          */
1683
1684         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1685                 alt = DLM_LOCK_PR;
1686         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1687                 alt = DLM_LOCK_CW;
1688
1689         if (alt) {
1690                 lkb->lkb_rqmode = alt;
1691                 rv = _can_be_granted(r, lkb, now);
1692                 if (rv)
1693                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1694                 else
1695                         lkb->lkb_rqmode = rqmode;
1696         }
1697  out:
1698         return rv;
1699 }
1700
1701 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1702    for locks pending on the convert list.  Once verified (watch for these
1703    log_prints), we should be able to just call _can_be_granted() and not
1704    bother with the demote/deadlk cases here (and there's no easy way to deal
1705    with a deadlk here, we'd have to generate something like grant_lock with
1706    the deadlk error.) */
1707
1708 /* Returns the highest requested mode of all blocked conversions; sets
1709    cw if there's a blocked conversion to DLM_LOCK_CW. */
1710
1711 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1712 {
1713         struct dlm_lkb *lkb, *s;
1714         int hi, demoted, quit, grant_restart, demote_restart;
1715         int deadlk;
1716
1717         quit = 0;
1718  restart:
1719         grant_restart = 0;
1720         demote_restart = 0;
1721         hi = DLM_LOCK_IV;
1722
1723         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1724                 demoted = is_demoted(lkb);
1725                 deadlk = 0;
1726
1727                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1728                         grant_lock_pending(r, lkb);
1729                         grant_restart = 1;
1730                         continue;
1731                 }
1732
1733                 if (!demoted && is_demoted(lkb)) {
1734                         log_print("WARN: pending demoted %x node %d %s",
1735                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1736                         demote_restart = 1;
1737                         continue;
1738                 }
1739
1740                 if (deadlk) {
1741                         log_print("WARN: pending deadlock %x node %d %s",
1742                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1743                         dlm_dump_rsb(r);
1744                         continue;
1745                 }
1746
1747                 hi = max_t(int, lkb->lkb_rqmode, hi);
1748
1749                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1750                         *cw = 1;
1751         }
1752
1753         if (grant_restart)
1754                 goto restart;
1755         if (demote_restart && !quit) {
1756                 quit = 1;
1757                 goto restart;
1758         }
1759
1760         return max_t(int, high, hi);
1761 }
1762
1763 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1764 {
1765         struct dlm_lkb *lkb, *s;
1766
1767         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1768                 if (can_be_granted(r, lkb, 0, NULL))
1769                         grant_lock_pending(r, lkb);
1770                 else {
1771                         high = max_t(int, lkb->lkb_rqmode, high);
1772                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1773                                 *cw = 1;
1774                 }
1775         }
1776
1777         return high;
1778 }
1779
1780 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1781    on either the convert or waiting queue.
1782    high is the largest rqmode of all locks blocked on the convert or
1783    waiting queue. */
1784
1785 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1786 {
1787         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1788                 if (gr->lkb_highbast < DLM_LOCK_EX)
1789                         return 1;
1790                 return 0;
1791         }
1792
1793         if (gr->lkb_highbast < high &&
1794             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1795                 return 1;
1796         return 0;
1797 }
1798
1799 static void grant_pending_locks(struct dlm_rsb *r)
1800 {
1801         struct dlm_lkb *lkb, *s;
1802         int high = DLM_LOCK_IV;
1803         int cw = 0;
1804
1805         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1806
1807         high = grant_pending_convert(r, high, &cw);
1808         high = grant_pending_wait(r, high, &cw);
1809
1810         if (high == DLM_LOCK_IV)
1811                 return;
1812
1813         /*
1814          * If there are locks left on the wait/convert queue then send blocking
1815          * ASTs to granted locks based on the largest requested mode (high)
1816          * found above.
1817          */
1818
1819         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1820                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1821                         if (cw && high == DLM_LOCK_PR &&
1822                             lkb->lkb_grmode == DLM_LOCK_PR)
1823                                 queue_bast(r, lkb, DLM_LOCK_CW);
1824                         else
1825                                 queue_bast(r, lkb, high);
1826                         lkb->lkb_highbast = high;
1827                 }
1828         }
1829 }
1830
1831 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1832 {
1833         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1834             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1835                 if (gr->lkb_highbast < DLM_LOCK_EX)
1836                         return 1;
1837                 return 0;
1838         }
1839
1840         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1841                 return 1;
1842         return 0;
1843 }
1844
1845 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1846                             struct dlm_lkb *lkb)
1847 {
1848         struct dlm_lkb *gr;
1849
1850         list_for_each_entry(gr, head, lkb_statequeue) {
1851                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1852                         queue_bast(r, gr, lkb->lkb_rqmode);
1853                         gr->lkb_highbast = lkb->lkb_rqmode;
1854                 }
1855         }
1856 }
1857
1858 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1859 {
1860         send_bast_queue(r, &r->res_grantqueue, lkb);
1861 }
1862
1863 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1864 {
1865         send_bast_queue(r, &r->res_grantqueue, lkb);
1866         send_bast_queue(r, &r->res_convertqueue, lkb);
1867 }
1868
1869 /* set_master(r, lkb) -- set the master nodeid of a resource
1870
1871    The purpose of this function is to set the nodeid field in the given
1872    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1873    known, it can just be copied to the lkb and the function will return
1874    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1875    before it can be copied to the lkb.
1876
1877    When the rsb nodeid is being looked up remotely, the initial lkb
1878    causing the lookup is kept on the ls_waiters list waiting for the
1879    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1880    on the rsb's res_lookup list until the master is verified.
1881
1882    Return values:
1883    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1884    1: the rsb master is not available and the lkb has been placed on
1885       a wait queue
1886 */
1887
1888 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1889 {
1890         struct dlm_ls *ls = r->res_ls;
1891         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1892
1893         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1894                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1895                 r->res_first_lkid = lkb->lkb_id;
1896                 lkb->lkb_nodeid = r->res_nodeid;
1897                 return 0;
1898         }
1899
1900         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1901                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1902                 return 1;
1903         }
1904
1905         if (r->res_nodeid == 0) {
1906                 lkb->lkb_nodeid = 0;
1907                 return 0;
1908         }
1909
1910         if (r->res_nodeid > 0) {
1911                 lkb->lkb_nodeid = r->res_nodeid;
1912                 return 0;
1913         }
1914
1915         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1916
1917         dir_nodeid = dlm_dir_nodeid(r);
1918
1919         if (dir_nodeid != our_nodeid) {
1920                 r->res_first_lkid = lkb->lkb_id;
1921                 send_lookup(r, lkb);
1922                 return 1;
1923         }
1924
1925         for (i = 0; i < 2; i++) {
1926                 /* It's possible for dlm_scand to remove an old rsb for
1927                    this same resource from the toss list, us to create
1928                    a new one, look up the master locally, and find it
1929                    already exists just before dlm_scand does the
1930                    dir_remove() on the previous rsb. */
1931
1932                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1933                                        r->res_length, &ret_nodeid);
1934                 if (!error)
1935                         break;
1936                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1937                 schedule();
1938         }
1939         if (error && error != -EEXIST)
1940                 return error;
1941
1942         if (ret_nodeid == our_nodeid) {
1943                 r->res_first_lkid = 0;
1944                 r->res_nodeid = 0;
1945                 lkb->lkb_nodeid = 0;
1946         } else {
1947                 r->res_first_lkid = lkb->lkb_id;
1948                 r->res_nodeid = ret_nodeid;
1949                 lkb->lkb_nodeid = ret_nodeid;
1950         }
1951         return 0;
1952 }
1953
1954 static void process_lookup_list(struct dlm_rsb *r)
1955 {
1956         struct dlm_lkb *lkb, *safe;
1957
1958         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1959                 list_del_init(&lkb->lkb_rsb_lookup);
1960                 _request_lock(r, lkb);
1961                 schedule();
1962         }
1963 }
1964
1965 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1966
1967 static void confirm_master(struct dlm_rsb *r, int error)
1968 {
1969         struct dlm_lkb *lkb;
1970
1971         if (!r->res_first_lkid)
1972                 return;
1973
1974         switch (error) {
1975         case 0:
1976         case -EINPROGRESS:
1977                 r->res_first_lkid = 0;
1978                 process_lookup_list(r);
1979                 break;
1980
1981         case -EAGAIN:
1982         case -EBADR:
1983         case -ENOTBLK:
1984                 /* the remote request failed and won't be retried (it was
1985                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1986                    lkb the first_lkid */
1987
1988                 r->res_first_lkid = 0;
1989
1990                 if (!list_empty(&r->res_lookup)) {
1991                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1992                                          lkb_rsb_lookup);
1993                         list_del_init(&lkb->lkb_rsb_lookup);
1994                         r->res_first_lkid = lkb->lkb_id;
1995                         _request_lock(r, lkb);
1996                 }
1997                 break;
1998
1999         default:
2000                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2001         }
2002 }
2003
2004 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2005                          int namelen, unsigned long timeout_cs,
2006                          void (*ast) (void *astparam),
2007                          void *astparam,
2008                          void (*bast) (void *astparam, int mode),
2009                          struct dlm_args *args)
2010 {
2011         int rv = -EINVAL;
2012
2013         /* check for invalid arg usage */
2014
2015         if (mode < 0 || mode > DLM_LOCK_EX)
2016                 goto out;
2017
2018         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2019                 goto out;
2020
2021         if (flags & DLM_LKF_CANCEL)
2022                 goto out;
2023
2024         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2025                 goto out;
2026
2027         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2028                 goto out;
2029
2030         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2031                 goto out;
2032
2033         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2034                 goto out;
2035
2036         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2037                 goto out;
2038
2039         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2040                 goto out;
2041
2042         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2043                 goto out;
2044
2045         if (!ast || !lksb)
2046                 goto out;
2047
2048         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2049                 goto out;
2050
2051         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2052                 goto out;
2053
2054         /* these args will be copied to the lkb in validate_lock_args,
2055            it cannot be done now because when converting locks, fields in
2056            an active lkb cannot be modified before locking the rsb */
2057
2058         args->flags = flags;
2059         args->astfn = ast;
2060         args->astparam = astparam;
2061         args->bastfn = bast;
2062         args->timeout = timeout_cs;
2063         args->mode = mode;
2064         args->lksb = lksb;
2065         rv = 0;
2066  out:
2067         return rv;
2068 }
2069
2070 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2071 {
2072         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2073                       DLM_LKF_FORCEUNLOCK))
2074                 return -EINVAL;
2075
2076         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2077                 return -EINVAL;
2078
2079         args->flags = flags;
2080         args->astparam = astarg;
2081         return 0;
2082 }
2083
2084 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2085                               struct dlm_args *args)
2086 {
2087         int rv = -EINVAL;
2088
2089         if (args->flags & DLM_LKF_CONVERT) {
2090                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2091                         goto out;
2092
2093                 if (args->flags & DLM_LKF_QUECVT &&
2094                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2095                         goto out;
2096
2097                 rv = -EBUSY;
2098                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2099                         goto out;
2100
2101                 if (lkb->lkb_wait_type)
2102                         goto out;
2103
2104                 if (is_overlap(lkb))
2105                         goto out;
2106         }
2107
2108         lkb->lkb_exflags = args->flags;
2109         lkb->lkb_sbflags = 0;
2110         lkb->lkb_astfn = args->astfn;
2111         lkb->lkb_astparam = args->astparam;
2112         lkb->lkb_bastfn = args->bastfn;
2113         lkb->lkb_rqmode = args->mode;
2114         lkb->lkb_lksb = args->lksb;
2115         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2116         lkb->lkb_ownpid = (int) current->pid;
2117         lkb->lkb_timeout_cs = args->timeout;
2118         rv = 0;
2119  out:
2120         if (rv)
2121                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2122                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2123                           lkb->lkb_status, lkb->lkb_wait_type,
2124                           lkb->lkb_resource->res_name);
2125         return rv;
2126 }
2127
2128 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2129    for success */
2130
2131 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2132    because there may be a lookup in progress and it's valid to do
2133    cancel/unlockf on it */
2134
2135 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2136 {
2137         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2138         int rv = -EINVAL;
2139
2140         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2141                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2142                 dlm_print_lkb(lkb);
2143                 goto out;
2144         }
2145
2146         /* an lkb may still exist even though the lock is EOL'ed due to a
2147            cancel, unlock or failed noqueue request; an app can't use these
2148            locks; return same error as if the lkid had not been found at all */
2149
2150         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2151                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2152                 rv = -ENOENT;
2153                 goto out;
2154         }
2155
2156         /* an lkb may be waiting for an rsb lookup to complete where the
2157            lookup was initiated by another lock */
2158
2159         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2160                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2161                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2162                         list_del_init(&lkb->lkb_rsb_lookup);
2163                         queue_cast(lkb->lkb_resource, lkb,
2164                                    args->flags & DLM_LKF_CANCEL ?
2165                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2166                         unhold_lkb(lkb); /* undoes create_lkb() */
2167                 }
2168                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2169                 rv = -EBUSY;
2170                 goto out;
2171         }
2172
2173         /* cancel not allowed with another cancel/unlock in progress */
2174
2175         if (args->flags & DLM_LKF_CANCEL) {
2176                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2177                         goto out;
2178
2179                 if (is_overlap(lkb))
2180                         goto out;
2181
2182                 /* don't let scand try to do a cancel */
2183                 del_timeout(lkb);
2184
2185                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2186                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2187                         rv = -EBUSY;
2188                         goto out;
2189                 }
2190
2191                 /* there's nothing to cancel */
2192                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2193                     !lkb->lkb_wait_type) {
2194                         rv = -EBUSY;
2195                         goto out;
2196                 }
2197
2198                 switch (lkb->lkb_wait_type) {
2199                 case DLM_MSG_LOOKUP:
2200                 case DLM_MSG_REQUEST:
2201                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2202                         rv = -EBUSY;
2203                         goto out;
2204                 case DLM_MSG_UNLOCK:
2205                 case DLM_MSG_CANCEL:
2206                         goto out;
2207                 }
2208                 /* add_to_waiters() will set OVERLAP_CANCEL */
2209                 goto out_ok;
2210         }
2211
2212         /* do we need to allow a force-unlock if there's a normal unlock
2213            already in progress?  in what conditions could the normal unlock
2214            fail such that we'd want to send a force-unlock to be sure? */
2215
2216         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2217                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2218                         goto out;
2219
2220                 if (is_overlap_unlock(lkb))
2221                         goto out;
2222
2223                 /* don't let scand try to do a cancel */
2224                 del_timeout(lkb);
2225
2226                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2227                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2228                         rv = -EBUSY;
2229                         goto out;
2230                 }
2231
2232                 switch (lkb->lkb_wait_type) {
2233                 case DLM_MSG_LOOKUP:
2234                 case DLM_MSG_REQUEST:
2235                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2236                         rv = -EBUSY;
2237                         goto out;
2238                 case DLM_MSG_UNLOCK:
2239                         goto out;
2240                 }
2241                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2242                 goto out_ok;
2243         }
2244
2245         /* normal unlock not allowed if there's any op in progress */
2246         rv = -EBUSY;
2247         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2248                 goto out;
2249
2250  out_ok:
2251         /* an overlapping op shouldn't blow away exflags from other op */
2252         lkb->lkb_exflags |= args->flags;
2253         lkb->lkb_sbflags = 0;
2254         lkb->lkb_astparam = args->astparam;
2255         rv = 0;
2256  out:
2257         if (rv)
2258                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2259                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2260                           args->flags, lkb->lkb_wait_type,
2261                           lkb->lkb_resource->res_name);
2262         return rv;
2263 }
2264
2265 /*
2266  * Four stage 4 varieties:
2267  * do_request(), do_convert(), do_unlock(), do_cancel()
2268  * These are called on the master node for the given lock and
2269  * from the central locking logic.
2270  */
2271
2272 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2273 {
2274         int error = 0;
2275
2276         if (can_be_granted(r, lkb, 1, NULL)) {
2277                 grant_lock(r, lkb);
2278                 queue_cast(r, lkb, 0);
2279                 goto out;
2280         }
2281
2282         if (can_be_queued(lkb)) {
2283                 error = -EINPROGRESS;
2284                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2285                 add_timeout(lkb);
2286                 goto out;
2287         }
2288
2289         error = -EAGAIN;
2290         queue_cast(r, lkb, -EAGAIN);
2291  out:
2292         return error;
2293 }
2294
2295 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2296                                int error)
2297 {
2298         switch (error) {
2299         case -EAGAIN:
2300                 if (force_blocking_asts(lkb))
2301                         send_blocking_asts_all(r, lkb);
2302                 break;
2303         case -EINPROGRESS:
2304                 send_blocking_asts(r, lkb);
2305                 break;
2306         }
2307 }
2308
2309 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2310 {
2311         int error = 0;
2312         int deadlk = 0;
2313
2314         /* changing an existing lock may allow others to be granted */
2315
2316         if (can_be_granted(r, lkb, 1, &deadlk)) {
2317                 grant_lock(r, lkb);
2318                 queue_cast(r, lkb, 0);
2319                 goto out;
2320         }
2321
2322         /* can_be_granted() detected that this lock would block in a conversion
2323            deadlock, so we leave it on the granted queue and return EDEADLK in
2324            the ast for the convert. */
2325
2326         if (deadlk) {
2327                 /* it's left on the granted queue */
2328                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2329                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2330                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2331                 revert_lock(r, lkb);
2332                 queue_cast(r, lkb, -EDEADLK);
2333                 error = -EDEADLK;
2334                 goto out;
2335         }
2336
2337         /* is_demoted() means the can_be_granted() above set the grmode
2338            to NL, and left us on the granted queue.  This auto-demotion
2339            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2340            now grantable.  We have to try to grant other converting locks
2341            before we try again to grant this one. */
2342
2343         if (is_demoted(lkb)) {
2344                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2345                 if (_can_be_granted(r, lkb, 1)) {
2346                         grant_lock(r, lkb);
2347                         queue_cast(r, lkb, 0);
2348                         goto out;
2349                 }
2350                 /* else fall through and move to convert queue */
2351         }
2352
2353         if (can_be_queued(lkb)) {
2354                 error = -EINPROGRESS;
2355                 del_lkb(r, lkb);
2356                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2357                 add_timeout(lkb);
2358                 goto out;
2359         }
2360
2361         error = -EAGAIN;
2362         queue_cast(r, lkb, -EAGAIN);
2363  out:
2364         return error;
2365 }
2366
2367 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2368                                int error)
2369 {
2370         switch (error) {
2371         case 0:
2372                 grant_pending_locks(r);
2373                 /* grant_pending_locks also sends basts */
2374                 break;
2375         case -EAGAIN:
2376                 if (force_blocking_asts(lkb))
2377                         send_blocking_asts_all(r, lkb);
2378                 break;
2379         case -EINPROGRESS:
2380                 send_blocking_asts(r, lkb);
2381                 break;
2382         }
2383 }
2384
2385 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387         remove_lock(r, lkb);
2388         queue_cast(r, lkb, -DLM_EUNLOCK);
2389         return -DLM_EUNLOCK;
2390 }
2391
2392 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2393                               int error)
2394 {
2395         grant_pending_locks(r);
2396 }
2397
2398 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2399  
2400 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2401 {
2402         int error;
2403
2404         error = revert_lock(r, lkb);
2405         if (error) {
2406                 queue_cast(r, lkb, -DLM_ECANCEL);
2407                 return -DLM_ECANCEL;
2408         }
2409         return 0;
2410 }
2411
2412 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2413                               int error)
2414 {
2415         if (error)
2416                 grant_pending_locks(r);
2417 }
2418
2419 /*
2420  * Four stage 3 varieties:
2421  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2422  */
2423
2424 /* add a new lkb to a possibly new rsb, called by requesting process */
2425
2426 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2427 {
2428         int error;
2429
2430         /* set_master: sets lkb nodeid from r */
2431
2432         error = set_master(r, lkb);
2433         if (error < 0)
2434                 goto out;
2435         if (error) {
2436                 error = 0;
2437                 goto out;
2438         }
2439
2440         if (is_remote(r)) {
2441                 /* receive_request() calls do_request() on remote node */
2442                 error = send_request(r, lkb);
2443         } else {
2444                 error = do_request(r, lkb);
2445                 /* for remote locks the request_reply is sent
2446                    between do_request and do_request_effects */
2447                 do_request_effects(r, lkb, error);
2448         }
2449  out:
2450         return error;
2451 }
2452
2453 /* change some property of an existing lkb, e.g. mode */
2454
2455 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2456 {
2457         int error;
2458
2459         if (is_remote(r)) {
2460                 /* receive_convert() calls do_convert() on remote node */
2461                 error = send_convert(r, lkb);
2462         } else {
2463                 error = do_convert(r, lkb);
2464                 /* for remote locks the convert_reply is sent
2465                    between do_convert and do_convert_effects */
2466                 do_convert_effects(r, lkb, error);
2467         }
2468
2469         return error;
2470 }
2471
2472 /* remove an existing lkb from the granted queue */
2473
2474 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2475 {
2476         int error;
2477
2478         if (is_remote(r)) {
2479                 /* receive_unlock() calls do_unlock() on remote node */
2480                 error = send_unlock(r, lkb);
2481         } else {
2482                 error = do_unlock(r, lkb);
2483                 /* for remote locks the unlock_reply is sent
2484                    between do_unlock and do_unlock_effects */
2485                 do_unlock_effects(r, lkb, error);
2486         }
2487
2488         return error;
2489 }
2490
2491 /* remove an existing lkb from the convert or wait queue */
2492
2493 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2494 {
2495         int error;
2496
2497         if (is_remote(r)) {
2498                 /* receive_cancel() calls do_cancel() on remote node */
2499                 error = send_cancel(r, lkb);
2500         } else {
2501                 error = do_cancel(r, lkb);
2502                 /* for remote locks the cancel_reply is sent
2503                    between do_cancel and do_cancel_effects */
2504                 do_cancel_effects(r, lkb, error);
2505         }
2506
2507         return error;
2508 }
2509
2510 /*
2511  * Four stage 2 varieties:
2512  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2513  */
2514
2515 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2516                         int len, struct dlm_args *args)
2517 {
2518         struct dlm_rsb *r;
2519         int error;
2520
2521         error = validate_lock_args(ls, lkb, args);
2522         if (error)
2523                 goto out;
2524
2525         error = find_rsb(ls, name, len, R_CREATE, &r);
2526         if (error)
2527                 goto out;
2528
2529         lock_rsb(r);
2530
2531         attach_lkb(r, lkb);
2532         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2533
2534         error = _request_lock(r, lkb);
2535
2536         unlock_rsb(r);
2537         put_rsb(r);
2538
2539  out:
2540         return error;
2541 }
2542
2543 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2544                         struct dlm_args *args)
2545 {
2546         struct dlm_rsb *r;
2547         int error;
2548
2549         r = lkb->lkb_resource;
2550
2551         hold_rsb(r);
2552         lock_rsb(r);
2553
2554         error = validate_lock_args(ls, lkb, args);
2555         if (error)
2556                 goto out;
2557
2558         error = _convert_lock(r, lkb);
2559  out:
2560         unlock_rsb(r);
2561         put_rsb(r);
2562         return error;
2563 }
2564
2565 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2566                        struct dlm_args *args)
2567 {
2568         struct dlm_rsb *r;
2569         int error;
2570
2571         r = lkb->lkb_resource;
2572
2573         hold_rsb(r);
2574         lock_rsb(r);
2575
2576         error = validate_unlock_args(lkb, args);
2577         if (error)
2578                 goto out;
2579
2580         error = _unlock_lock(r, lkb);
2581  out:
2582         unlock_rsb(r);
2583         put_rsb(r);
2584         return error;
2585 }
2586
2587 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2588                        struct dlm_args *args)
2589 {
2590         struct dlm_rsb *r;
2591         int error;
2592
2593         r = lkb->lkb_resource;
2594
2595         hold_rsb(r);
2596         lock_rsb(r);
2597
2598         error = validate_unlock_args(lkb, args);
2599         if (error)
2600                 goto out;
2601
2602         error = _cancel_lock(r, lkb);
2603  out:
2604         unlock_rsb(r);
2605         put_rsb(r);
2606         return error;
2607 }
2608
2609 /*
2610  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2611  */
2612
2613 int dlm_lock(dlm_lockspace_t *lockspace,
2614              int mode,
2615              struct dlm_lksb *lksb,
2616              uint32_t flags,
2617              void *name,
2618              unsigned int namelen,
2619              uint32_t parent_lkid,
2620              void (*ast) (void *astarg),
2621              void *astarg,
2622              void (*bast) (void *astarg, int mode))
2623 {
2624         struct dlm_ls *ls;
2625         struct dlm_lkb *lkb;
2626         struct dlm_args args;
2627         int error, convert = flags & DLM_LKF_CONVERT;
2628
2629         ls = dlm_find_lockspace_local(lockspace);
2630         if (!ls)
2631                 return -EINVAL;
2632
2633         dlm_lock_recovery(ls);
2634
2635         if (convert)
2636                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2637         else
2638                 error = create_lkb(ls, &lkb);
2639
2640         if (error)
2641                 goto out;
2642
2643         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2644                               astarg, bast, &args);
2645         if (error)
2646                 goto out_put;
2647
2648         if (convert)
2649                 error = convert_lock(ls, lkb, &args);
2650         else
2651                 error = request_lock(ls, lkb, name, namelen, &args);
2652
2653         if (error == -EINPROGRESS)
2654                 error = 0;
2655  out_put:
2656         if (convert || error)
2657                 __put_lkb(ls, lkb);
2658         if (error == -EAGAIN || error == -EDEADLK)
2659                 error = 0;
2660  out:
2661         dlm_unlock_recovery(ls);
2662         dlm_put_lockspace(ls);
2663         return error;
2664 }
2665
2666 int dlm_unlock(dlm_lockspace_t *lockspace,
2667                uint32_t lkid,
2668                uint32_t flags,
2669                struct dlm_lksb *lksb,
2670                void *astarg)
2671 {
2672         struct dlm_ls *ls;
2673         struct dlm_lkb *lkb;
2674         struct dlm_args args;
2675         int error;
2676
2677         ls = dlm_find_lockspace_local(lockspace);
2678         if (!ls)
2679                 return -EINVAL;
2680
2681         dlm_lock_recovery(ls);
2682
2683         error = find_lkb(ls, lkid, &lkb);
2684         if (error)
2685                 goto out;
2686
2687         error = set_unlock_args(flags, astarg, &args);
2688         if (error)
2689                 goto out_put;
2690
2691         if (flags & DLM_LKF_CANCEL)
2692                 error = cancel_lock(ls, lkb, &args);
2693         else
2694                 error = unlock_lock(ls, lkb, &args);
2695
2696         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2697                 error = 0;
2698         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2699                 error = 0;
2700  out_put:
2701         dlm_put_lkb(lkb);
2702  out:
2703         dlm_unlock_recovery(ls);
2704         dlm_put_lockspace(ls);
2705         return error;
2706 }
2707
2708 /*
2709  * send/receive routines for remote operations and replies
2710  *
2711  * send_args
2712  * send_common
2713  * send_request                 receive_request
2714  * send_convert                 receive_convert
2715  * send_unlock                  receive_unlock
2716  * send_cancel                  receive_cancel
2717  * send_grant                   receive_grant
2718  * send_bast                    receive_bast
2719  * send_lookup                  receive_lookup
2720  * send_remove                  receive_remove
2721  *
2722  *                              send_common_reply
2723  * receive_request_reply        send_request_reply
2724  * receive_convert_reply        send_convert_reply
2725  * receive_unlock_reply         send_unlock_reply
2726  * receive_cancel_reply         send_cancel_reply
2727  * receive_lookup_reply         send_lookup_reply
2728  */
2729
2730 static int _create_message(struct dlm_ls *ls, int mb_len,
2731                            int to_nodeid, int mstype,
2732                            struct dlm_message **ms_ret,
2733                            struct dlm_mhandle **mh_ret)
2734 {
2735         struct dlm_message *ms;
2736         struct dlm_mhandle *mh;
2737         char *mb;
2738
2739         /* get_buffer gives us a message handle (mh) that we need to
2740            pass into lowcomms_commit and a message buffer (mb) that we
2741            write our data into */
2742
2743         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2744         if (!mh)
2745                 return -ENOBUFS;
2746
2747         memset(mb, 0, mb_len);
2748
2749         ms = (struct dlm_message *) mb;
2750
2751         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2752         ms->m_header.h_lockspace = ls->ls_global_id;
2753         ms->m_header.h_nodeid = dlm_our_nodeid();
2754         ms->m_header.h_length = mb_len;
2755         ms->m_header.h_cmd = DLM_MSG;
2756
2757         ms->m_type = mstype;
2758
2759         *mh_ret = mh;
2760         *ms_ret = ms;
2761         return 0;
2762 }
2763
2764 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2765                           int to_nodeid, int mstype,
2766                           struct dlm_message **ms_ret,
2767                           struct dlm_mhandle **mh_ret)
2768 {
2769         int mb_len = sizeof(struct dlm_message);
2770
2771         switch (mstype) {
2772         case DLM_MSG_REQUEST:
2773         case DLM_MSG_LOOKUP:
2774         case DLM_MSG_REMOVE:
2775                 mb_len += r->res_length;
2776                 break;
2777         case DLM_MSG_CONVERT:
2778         case DLM_MSG_UNLOCK:
2779         case DLM_MSG_REQUEST_REPLY:
2780         case DLM_MSG_CONVERT_REPLY:
2781         case DLM_MSG_GRANT:
2782                 if (lkb && lkb->lkb_lvbptr)
2783                         mb_len += r->res_ls->ls_lvblen;
2784                 break;
2785         }
2786
2787         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2788                                ms_ret, mh_ret);
2789 }
2790
2791 /* further lowcomms enhancements or alternate implementations may make
2792    the return value from this function useful at some point */
2793
2794 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2795 {
2796         dlm_message_out(ms);
2797         dlm_lowcomms_commit_buffer(mh);
2798         return 0;
2799 }
2800
2801 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2802                       struct dlm_message *ms)
2803 {
2804         ms->m_nodeid   = lkb->lkb_nodeid;
2805         ms->m_pid      = lkb->lkb_ownpid;
2806         ms->m_lkid     = lkb->lkb_id;
2807         ms->m_remid    = lkb->lkb_remid;
2808         ms->m_exflags  = lkb->lkb_exflags;
2809         ms->m_sbflags  = lkb->lkb_sbflags;
2810         ms->m_flags    = lkb->lkb_flags;
2811         ms->m_lvbseq   = lkb->lkb_lvbseq;
2812         ms->m_status   = lkb->lkb_status;
2813         ms->m_grmode   = lkb->lkb_grmode;
2814         ms->m_rqmode   = lkb->lkb_rqmode;
2815         ms->m_hash     = r->res_hash;
2816
2817         /* m_result and m_bastmode are set from function args,
2818            not from lkb fields */
2819
2820         if (lkb->lkb_bastfn)
2821                 ms->m_asts |= AST_BAST;
2822         if (lkb->lkb_astfn)
2823                 ms->m_asts |= AST_COMP;
2824
2825         /* compare with switch in create_message; send_remove() doesn't
2826            use send_args() */
2827
2828         switch (ms->m_type) {
2829         case DLM_MSG_REQUEST:
2830         case DLM_MSG_LOOKUP:
2831                 memcpy(ms->m_extra, r->res_name, r->res_length);
2832                 break;
2833         case DLM_MSG_CONVERT:
2834         case DLM_MSG_UNLOCK:
2835         case DLM_MSG_REQUEST_REPLY:
2836         case DLM_MSG_CONVERT_REPLY:
2837         case DLM_MSG_GRANT:
2838                 if (!lkb->lkb_lvbptr)
2839                         break;
2840                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2841                 break;
2842         }
2843 }
2844
2845 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2846 {
2847         struct dlm_message *ms;
2848         struct dlm_mhandle *mh;
2849         int to_nodeid, error;
2850
2851         error = add_to_waiters(lkb, mstype);
2852         if (error)
2853                 return error;
2854
2855         to_nodeid = r->res_nodeid;
2856
2857         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2858         if (error)
2859                 goto fail;
2860
2861         send_args(r, lkb, ms);
2862
2863         error = send_message(mh, ms);
2864         if (error)
2865                 goto fail;
2866         return 0;
2867
2868  fail:
2869         remove_from_waiters(lkb, msg_reply_type(mstype));
2870         return error;
2871 }
2872
2873 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2874 {
2875         return send_common(r, lkb, DLM_MSG_REQUEST);
2876 }
2877
2878 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2879 {
2880         int error;
2881
2882         error = send_common(r, lkb, DLM_MSG_CONVERT);
2883
2884         /* down conversions go without a reply from the master */
2885         if (!error && down_conversion(lkb)) {
2886                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2887                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2888                 r->res_ls->ls_stub_ms.m_result = 0;
2889                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2890                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2891         }
2892
2893         return error;
2894 }
2895
2896 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2897    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2898    that the master is still correct. */
2899
2900 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2901 {
2902         return send_common(r, lkb, DLM_MSG_UNLOCK);
2903 }
2904
2905 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906 {
2907         return send_common(r, lkb, DLM_MSG_CANCEL);
2908 }
2909
2910 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2911 {
2912         struct dlm_message *ms;
2913         struct dlm_mhandle *mh;
2914         int to_nodeid, error;
2915
2916         to_nodeid = lkb->lkb_nodeid;
2917
2918         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2919         if (error)
2920                 goto out;
2921
2922         send_args(r, lkb, ms);
2923
2924         ms->m_result = 0;
2925
2926         error = send_message(mh, ms);
2927  out:
2928         return error;
2929 }
2930
2931 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2932 {
2933         struct dlm_message *ms;
2934         struct dlm_mhandle *mh;
2935         int to_nodeid, error;
2936
2937         to_nodeid = lkb->lkb_nodeid;
2938
2939         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2940         if (error)
2941                 goto out;
2942
2943         send_args(r, lkb, ms);
2944
2945         ms->m_bastmode = mode;
2946
2947         error = send_message(mh, ms);
2948  out:
2949         return error;
2950 }
2951
2952 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2953 {
2954         struct dlm_message *ms;
2955         struct dlm_mhandle *mh;
2956         int to_nodeid, error;
2957
2958         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2959         if (error)
2960                 return error;
2961
2962         to_nodeid = dlm_dir_nodeid(r);
2963
2964         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2965         if (error)
2966                 goto fail;
2967
2968         send_args(r, lkb, ms);
2969
2970         error = send_message(mh, ms);
2971         if (error)
2972                 goto fail;
2973         return 0;
2974
2975  fail:
2976         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2977         return error;
2978 }
2979
2980 static int send_remove(struct dlm_rsb *r)
2981 {
2982         struct dlm_message *ms;
2983         struct dlm_mhandle *mh;
2984         int to_nodeid, error;
2985
2986         to_nodeid = dlm_dir_nodeid(r);
2987
2988         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2989         if (error)
2990                 goto out;
2991
2992         memcpy(ms->m_extra, r->res_name, r->res_length);
2993         ms->m_hash = r->res_hash;
2994
2995         error = send_message(mh, ms);
2996  out:
2997         return error;
2998 }
2999
3000 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3001                              int mstype, int rv)
3002 {
3003         struct dlm_message *ms;
3004         struct dlm_mhandle *mh;
3005         int to_nodeid, error;
3006
3007         to_nodeid = lkb->lkb_nodeid;
3008
3009         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3010         if (error)
3011                 goto out;
3012
3013         send_args(r, lkb, ms);
3014
3015         ms->m_result = rv;
3016
3017         error = send_message(mh, ms);
3018  out:
3019         return error;
3020 }
3021
3022 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3023 {
3024         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3025 }
3026
3027 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3028 {
3029         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3030 }
3031
3032 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3033 {
3034         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3035 }
3036
3037 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3038 {
3039         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3040 }
3041
3042 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3043                              int ret_nodeid, int rv)
3044 {
3045         struct dlm_rsb *r = &ls->ls_stub_rsb;
3046         struct dlm_message *ms;
3047         struct dlm_mhandle *mh;
3048         int error, nodeid = ms_in->m_header.h_nodeid;
3049
3050         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3051         if (error)
3052                 goto out;
3053
3054         ms->m_lkid = ms_in->m_lkid;
3055         ms->m_result = rv;
3056         ms->m_nodeid = ret_nodeid;
3057
3058         error = send_message(mh, ms);
3059  out:
3060         return error;
3061 }
3062
3063 /* which args we save from a received message depends heavily on the type
3064    of message, unlike the send side where we can safely send everything about
3065    the lkb for any type of message */
3066
3067 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3068 {
3069         lkb->lkb_exflags = ms->m_exflags;
3070         lkb->lkb_sbflags = ms->m_sbflags;
3071         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3072                          (ms->m_flags & 0x0000FFFF);
3073 }
3074
3075 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077         lkb->lkb_sbflags = ms->m_sbflags;
3078         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3079                          (ms->m_flags & 0x0000FFFF);
3080 }
3081
3082 static int receive_extralen(struct dlm_message *ms)
3083 {
3084         return (ms->m_header.h_length - sizeof(struct dlm_message));
3085 }
3086
3087 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3088                        struct dlm_message *ms)
3089 {
3090         int len;
3091
3092         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3093                 if (!lkb->lkb_lvbptr)
3094                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3095                 if (!lkb->lkb_lvbptr)
3096                         return -ENOMEM;
3097                 len = receive_extralen(ms);
3098                 if (len > DLM_RESNAME_MAXLEN)
3099                         len = DLM_RESNAME_MAXLEN;
3100                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3101         }
3102         return 0;
3103 }
3104
3105 static void fake_bastfn(void *astparam, int mode)
3106 {
3107         log_print("fake_bastfn should not be called");
3108 }
3109
3110 static void fake_astfn(void *astparam)
3111 {
3112         log_print("fake_astfn should not be called");
3113 }
3114
3115 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3116                                 struct dlm_message *ms)
3117 {
3118         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3119         lkb->lkb_ownpid = ms->m_pid;
3120         lkb->lkb_remid = ms->m_lkid;
3121         lkb->lkb_grmode = DLM_LOCK_IV;
3122         lkb->lkb_rqmode = ms->m_rqmode;
3123
3124         lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3125         lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3126
3127         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3128                 /* lkb was just created so there won't be an lvb yet */
3129                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3130                 if (!lkb->lkb_lvbptr)
3131                         return -ENOMEM;
3132         }
3133
3134         return 0;
3135 }
3136
3137 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3138                                 struct dlm_message *ms)
3139 {
3140         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3141                 return -EBUSY;
3142
3143         if (receive_lvb(ls, lkb, ms))
3144                 return -ENOMEM;
3145
3146         lkb->lkb_rqmode = ms->m_rqmode;
3147         lkb->lkb_lvbseq = ms->m_lvbseq;
3148
3149         return 0;
3150 }
3151
3152 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3153                                struct dlm_message *ms)
3154 {
3155         if (receive_lvb(ls, lkb, ms))
3156                 return -ENOMEM;
3157         return 0;
3158 }
3159
3160 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3161    uses to send a reply and that the remote end uses to process the reply. */
3162
3163 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3164 {
3165         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3166         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3167         lkb->lkb_remid = ms->m_lkid;
3168 }
3169
3170 /* This is called after the rsb is locked so that we can safely inspect
3171    fields in the lkb. */
3172
3173 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3174 {
3175         int from = ms->m_header.h_nodeid;
3176         int error = 0;
3177
3178         switch (ms->m_type) {
3179         case DLM_MSG_CONVERT:
3180         case DLM_MSG_UNLOCK:
3181         case DLM_MSG_CANCEL:
3182                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3183                         error = -EINVAL;
3184                 break;
3185
3186         case DLM_MSG_CONVERT_REPLY:
3187         case DLM_MSG_UNLOCK_REPLY:
3188         case DLM_MSG_CANCEL_REPLY:
3189         case DLM_MSG_GRANT:
3190         case DLM_MSG_BAST:
3191                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3192                         error = -EINVAL;
3193                 break;
3194
3195         case DLM_MSG_REQUEST_REPLY:
3196                 if (!is_process_copy(lkb))
3197                         error = -EINVAL;
3198                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3199                         error = -EINVAL;
3200                 break;
3201
3202         default:
3203                 error = -EINVAL;
3204         }
3205
3206         if (error)
3207                 log_error(lkb->lkb_resource->res_ls,
3208                           "ignore invalid message %d from %d %x %x %x %d",
3209                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3210                           lkb->lkb_flags, lkb->lkb_nodeid);
3211         return error;
3212 }
3213
3214 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3215 {
3216         struct dlm_lkb *lkb;
3217         struct dlm_rsb *r;
3218         int error, namelen;
3219
3220         error = create_lkb(ls, &lkb);
3221         if (error)
3222                 goto fail;
3223
3224         receive_flags(lkb, ms);
3225         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3226         error = receive_request_args(ls, lkb, ms);
3227         if (error) {
3228                 __put_lkb(ls, lkb);
3229                 goto fail;
3230         }
3231
3232         namelen = receive_extralen(ms);
3233
3234         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3235         if (error) {
3236                 __put_lkb(ls, lkb);
3237                 goto fail;
3238         }
3239
3240         lock_rsb(r);
3241
3242         attach_lkb(r, lkb);
3243         error = do_request(r, lkb);
3244         send_request_reply(r, lkb, error);
3245         do_request_effects(r, lkb, error);
3246
3247         unlock_rsb(r);
3248         put_rsb(r);
3249
3250         if (error == -EINPROGRESS)
3251                 error = 0;
3252         if (error)
3253                 dlm_put_lkb(lkb);
3254         return;
3255
3256  fail:
3257         setup_stub_lkb(ls, ms);
3258         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3259 }
3260
3261 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263         struct dlm_lkb *lkb;
3264         struct dlm_rsb *r;
3265         int error, reply = 1;
3266
3267         error = find_lkb(ls, ms->m_remid, &lkb);
3268         if (error)
3269                 goto fail;
3270
3271         r = lkb->lkb_resource;
3272
3273         hold_rsb(r);
3274         lock_rsb(r);
3275
3276         error = validate_message(lkb, ms);
3277         if (error)
3278                 goto out;
3279
3280         receive_flags(lkb, ms);
3281
3282         error = receive_convert_args(ls, lkb, ms);
3283         if (error) {
3284                 send_convert_reply(r, lkb, error);
3285                 goto out;
3286         }
3287
3288         reply = !down_conversion(lkb);
3289
3290         error = do_convert(r, lkb);
3291         if (reply)
3292                 send_convert_reply(r, lkb, error);
3293         do_convert_effects(r, lkb, error);
3294  out:
3295         unlock_rsb(r);
3296         put_rsb(r);
3297         dlm_put_lkb(lkb);
3298         return;
3299
3300  fail:
3301         setup_stub_lkb(ls, ms);
3302         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3303 }
3304
3305 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3306 {
3307         struct dlm_lkb *lkb;
3308         struct dlm_rsb *r;
3309         int error;
3310
3311         error = find_lkb(ls, ms->m_remid, &lkb);
3312         if (error)
3313                 goto fail;
3314
3315         r = lkb->lkb_resource;
3316
3317         hold_rsb(r);
3318         lock_rsb(r);
3319
3320         error = validate_message(lkb, ms);
3321         if (error)
3322                 goto out;
3323
3324         receive_flags(lkb, ms);
3325
3326         error = receive_unlock_args(ls, lkb, ms);
3327         if (error) {
3328                 send_unlock_reply(r, lkb, error);
3329                 goto out;
3330         }
3331
3332         error = do_unlock(r, lkb);
3333         send_unlock_reply(r, lkb, error);
3334         do_unlock_effects(r, lkb, error);
3335  out:
3336         unlock_rsb(r);
3337         put_rsb(r);
3338         dlm_put_lkb(lkb);
3339         return;
3340
3341  fail:
3342         setup_stub_lkb(ls, ms);
3343         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3344 }
3345
3346 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3347 {
3348         struct dlm_lkb *lkb;
3349         struct dlm_rsb *r;
3350         int error;
3351
3352         error = find_lkb(ls, ms->m_remid, &lkb);
3353         if (error)
3354                 goto fail;
3355
3356         receive_flags(lkb, ms);
3357
3358         r = lkb->lkb_resource;
3359
3360         hold_rsb(r);
3361         lock_rsb(r);
3362
3363         error = validate_message(lkb, ms);
3364         if (error)
3365                 goto out;
3366
3367         error = do_cancel(r, lkb);
3368         send_cancel_reply(r, lkb, error);
3369         do_cancel_effects(r, lkb, error);
3370  out:
3371         unlock_rsb(r);
3372         put_rsb(r);
3373         dlm_put_lkb(lkb);
3374         return;
3375
3376  fail:
3377         setup_stub_lkb(ls, ms);
3378         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3379 }
3380
3381 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3382 {
3383         struct dlm_lkb *lkb;
3384         struct dlm_rsb *r;
3385         int error;
3386
3387         error = find_lkb(ls, ms->m_remid, &lkb);
3388         if (error) {
3389                 log_debug(ls, "receive_grant from %d no lkb %x",
3390                           ms->m_header.h_nodeid, ms->m_remid);
3391                 return;
3392         }
3393
3394         r = lkb->lkb_resource;
3395
3396         hold_rsb(r);
3397         lock_rsb(r);
3398
3399         error = validate_message(lkb, ms);
3400         if (error)
3401                 goto out;
3402
3403         receive_flags_reply(lkb, ms);
3404         if (is_altmode(lkb))
3405                 munge_altmode(lkb, ms);
3406         grant_lock_pc(r, lkb, ms);
3407         queue_cast(r, lkb, 0);
3408  out:
3409         unlock_rsb(r);
3410         put_rsb(r);
3411         dlm_put_lkb(lkb);
3412 }
3413
3414 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3415 {
3416         struct dlm_lkb *lkb;
3417         struct dlm_rsb *r;
3418         int error;
3419
3420         error = find_lkb(ls, ms->m_remid, &lkb);
3421         if (error) {
3422                 log_debug(ls, "receive_bast from %d no lkb %x",
3423                           ms->m_header.h_nodeid, ms->m_remid);
3424                 return;
3425         }
3426
3427         r = lkb->lkb_resource;
3428
3429         hold_rsb(r);
3430         lock_rsb(r);
3431
3432         error = validate_message(lkb, ms);
3433         if (error)
3434                 goto out;
3435
3436         queue_bast(r, lkb, ms->m_bastmode);
3437  out:
3438         unlock_rsb(r);
3439         put_rsb(r);
3440         dlm_put_lkb(lkb);
3441 }
3442
3443 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3444 {
3445         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3446
3447         from_nodeid = ms->m_header.h_nodeid;
3448         our_nodeid = dlm_our_nodeid();
3449
3450         len = receive_extralen(ms);
3451
3452         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3453         if (dir_nodeid != our_nodeid) {
3454                 log_error(ls, "lookup dir_nodeid %d from %d",
3455                           dir_nodeid, from_nodeid);
3456                 error = -EINVAL;
3457                 ret_nodeid = -1;
3458                 goto out;
3459         }
3460
3461         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3462
3463         /* Optimization: we're master so treat lookup as a request */
3464         if (!error && ret_nodeid == our_nodeid) {
3465                 receive_request(ls, ms);
3466                 return;
3467         }
3468  out:
3469         send_lookup_reply(ls, ms, ret_nodeid, error);
3470 }
3471
3472 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3473 {
3474         int len, dir_nodeid, from_nodeid;
3475
3476         from_nodeid = ms->m_header.h_nodeid;
3477
3478         len = receive_extralen(ms);
3479
3480         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3481         if (dir_nodeid != dlm_our_nodeid()) {
3482                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3483                           dir_nodeid, from_nodeid);
3484                 return;
3485         }
3486
3487         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3488 }
3489
3490 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3491 {
3492         do_purge(ls, ms->m_nodeid, ms->m_pid);
3493 }
3494
3495 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3496 {
3497         struct dlm_lkb *lkb;
3498         struct dlm_rsb *r;
3499         int error, mstype, result;
3500
3501         error = find_lkb(ls, ms->m_remid, &lkb);
3502         if (error) {
3503                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3504                           ms->m_header.h_nodeid, ms->m_remid);
3505                 return;
3506         }
3507
3508         r = lkb->lkb_resource;
3509         hold_rsb(r);
3510         lock_rsb(r);
3511
3512         error = validate_message(lkb, ms);
3513         if (error)
3514                 goto out;
3515
3516         mstype = lkb->lkb_wait_type;
3517         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3518         if (error)
3519                 goto out;
3520
3521         /* Optimization: the dir node was also the master, so it took our
3522            lookup as a request and sent request reply instead of lookup reply */
3523         if (mstype == DLM_MSG_LOOKUP) {
3524                 r->res_nodeid = ms->m_header.h_nodeid;
3525                 lkb->lkb_nodeid = r->res_nodeid;
3526         }
3527
3528         /* this is the value returned from do_request() on the master */
3529         result = ms->m_result;
3530
3531         switch (result) {
3532         case -EAGAIN:
3533                 /* request would block (be queued) on remote master */
3534                 queue_cast(r, lkb, -EAGAIN);
3535                 confirm_master(r, -EAGAIN);
3536                 unhold_lkb(lkb); /* undoes create_lkb() */
3537                 break;
3538
3539         case -EINPROGRESS:
3540         case 0:
3541                 /* request was queued or granted on remote master */
3542                 receive_flags_reply(lkb, ms);
3543                 lkb->lkb_remid = ms->m_lkid;
3544                 if (is_altmode(lkb))
3545                         munge_altmode(lkb, ms);
3546                 if (result) {
3547                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3548                         add_timeout(lkb);
3549                 } else {
3550                         grant_lock_pc(r, lkb, ms);
3551                         queue_cast(r, lkb, 0);
3552                 }
3553                 confirm_master(r, result);
3554                 break;
3555
3556         case -EBADR:
3557         case -ENOTBLK:
3558                 /* find_rsb failed to find rsb or rsb wasn't master */
3559                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3560                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3561                 r->res_nodeid = -1;
3562                 lkb->lkb_nodeid = -1;
3563
3564                 if (is_overlap(lkb)) {
3565                         /* we'll ignore error in cancel/unlock reply */
3566                         queue_cast_overlap(r, lkb);
3567                         confirm_master(r, result);
3568                         unhold_lkb(lkb); /* undoes create_lkb() */
3569                 } else
3570                         _request_lock(r, lkb);
3571                 break;
3572
3573         default:
3574                 log_error(ls, "receive_request_reply %x error %d",
3575                           lkb->lkb_id, result);
3576         }
3577
3578         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3579                 log_debug(ls, "receive_request_reply %x result %d unlock",
3580                           lkb->lkb_id, result);
3581                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3582                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3583                 send_unlock(r, lkb);
3584         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3585                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3586                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3587                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3588                 send_cancel(r, lkb);
3589         } else {
3590                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3591                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3592         }
3593  out:
3594         unlock_rsb(r);
3595         put_rsb(r);
3596         dlm_put_lkb(lkb);
3597 }
3598
3599 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3600                                     struct dlm_message *ms)
3601 {
3602         /* this is the value returned from do_convert() on the master */
3603         switch (ms->m_result) {
3604         case -EAGAIN:
3605                 /* convert would block (be queued) on remote master */
3606                 queue_cast(r, lkb, -EAGAIN);
3607                 break;
3608
3609         case -EDEADLK:
3610                 receive_flags_reply(lkb, ms);
3611                 revert_lock_pc(r, lkb);
3612                 queue_cast(r, lkb, -EDEADLK);
3613                 break;
3614
3615         case -EINPROGRESS:
3616                 /* convert was queued on remote master */
3617                 receive_flags_reply(lkb, ms);
3618                 if (is_demoted(lkb))
3619                         munge_demoted(lkb, ms);
3620                 del_lkb(r, lkb);
3621                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3622                 add_timeout(lkb);
3623                 break;
3624
3625         case 0:
3626                 /* convert was granted on remote master */
3627                 receive_flags_reply(lkb, ms);
3628                 if (is_demoted(lkb))
3629                         munge_demoted(lkb, ms);
3630                 grant_lock_pc(r, lkb, ms);
3631                 queue_cast(r, lkb, 0);
3632                 break;
3633
3634         default:
3635                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3636                           lkb->lkb_id, ms->m_result);
3637         }
3638 }
3639
3640 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3641 {
3642         struct dlm_rsb *r = lkb->lkb_resource;
3643         int error;
3644
3645         hold_rsb(r);
3646         lock_rsb(r);
3647
3648         error = validate_message(lkb, ms);
3649         if (error)
3650                 goto out;
3651
3652         /* stub reply can happen with waiters_mutex held */
3653         error = remove_from_waiters_ms(lkb, ms);
3654         if (error)
3655                 goto out;
3656
3657         __receive_convert_reply(r, lkb, ms);
3658  out:
3659         unlock_rsb(r);
3660         put_rsb(r);
3661 }
3662
3663 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3664 {
3665         struct dlm_lkb *lkb;
3666         int error;
3667
3668         error = find_lkb(ls, ms->m_remid, &lkb);
3669         if (error) {
3670                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3671                           ms->m_header.h_nodeid, ms->m_remid);
3672                 return;
3673         }
3674
3675         _receive_convert_reply(lkb, ms);
3676         dlm_put_lkb(lkb);
3677 }
3678
3679 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3680 {
3681         struct dlm_rsb *r = lkb->lkb_resource;
3682         int error;
3683
3684         hold_rsb(r);
3685         lock_rsb(r);
3686
3687         error = validate_message(lkb, ms);
3688         if (error)
3689                 goto out;
3690
3691         /* stub reply can happen with waiters_mutex held */
3692         error = remove_from_waiters_ms(lkb, ms);
3693         if (error)
3694                 goto out;
3695
3696         /* this is the value returned from do_unlock() on the master */
3697
3698         switch (ms->m_result) {
3699         case -DLM_EUNLOCK:
3700                 receive_flags_reply(lkb, ms);
3701                 remove_lock_pc(r, lkb);
3702                 queue_cast(r, lkb, -DLM_EUNLOCK);
3703                 break;
3704         case -ENOENT:
3705                 break;
3706         default:
3707                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3708                           lkb->lkb_id, ms->m_result);
3709         }
3710  out:
3711         unlock_rsb(r);
3712         put_rsb(r);
3713 }
3714
3715 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3716 {
3717         struct dlm_lkb *lkb;
3718         int error;
3719
3720         error = find_lkb(ls, ms->m_remid, &lkb);
3721         if (error) {
3722                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3723                           ms->m_header.h_nodeid, ms->m_remid);
3724                 return;
3725         }
3726
3727         _receive_unlock_reply(lkb, ms);
3728         dlm_put_lkb(lkb);
3729 }
3730
3731 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3732 {
3733         struct dlm_rsb *r = lkb->lkb_resource;
3734         int error;
3735
3736         hold_rsb(r);
3737         lock_rsb(r);
3738
3739         error = validate_message(lkb, ms);
3740         if (error)
3741                 goto out;
3742
3743         /* stub reply can happen with waiters_mutex held */
3744         error = remove_from_waiters_ms(lkb, ms);
3745         if (error)
3746                 goto out;
3747
3748         /* this is the value returned from do_cancel() on the master */
3749
3750         switch (ms->m_result) {
3751         case -DLM_ECANCEL:
3752                 receive_flags_reply(lkb, ms);
3753                 revert_lock_pc(r, lkb);
3754                 queue_cast(r, lkb, -DLM_ECANCEL);
3755                 break;
3756         case 0:
3757                 break;
3758         default:
3759                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3760                           lkb->lkb_id, ms->m_result);
3761         }
3762  out:
3763         unlock_rsb(r);
3764         put_rsb(r);
3765 }
3766
3767 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3768 {
3769         struct dlm_lkb *lkb;
3770         int error;
3771
3772         error = find_lkb(ls, ms->m_remid, &lkb);
3773         if (error) {
3774                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3775                           ms->m_header.h_nodeid, ms->m_remid);
3776                 return;
3777         }
3778
3779         _receive_cancel_reply(lkb, ms);
3780         dlm_put_lkb(lkb);
3781 }
3782
3783 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3784 {
3785         struct dlm_lkb *lkb;
3786         struct dlm_rsb *r;
3787         int error, ret_nodeid;
3788
3789         error = find_lkb(ls, ms->m_lkid, &lkb);
3790         if (error) {
3791                 log_error(ls, "receive_lookup_reply no lkb");
3792                 return;
3793         }
3794
3795         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3796            FIXME: will a non-zero error ever be returned? */
3797
3798         r = lkb->lkb_resource;
3799         hold_rsb(r);
3800         lock_rsb(r);
3801
3802         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3803         if (error)
3804                 goto out;
3805
3806         ret_nodeid = ms->m_nodeid;
3807         if (ret_nodeid == dlm_our_nodeid()) {
3808                 r->res_nodeid = 0;
3809                 ret_nodeid = 0;
3810                 r->res_first_lkid = 0;
3811         } else {
3812                 /* set_master() will copy res_nodeid to lkb_nodeid */
3813                 r->res_nodeid = ret_nodeid;
3814         }
3815
3816         if (is_overlap(lkb)) {
3817                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3818                           lkb->lkb_id, lkb->lkb_flags);
3819                 queue_cast_overlap(r, lkb);
3820                 unhold_lkb(lkb); /* undoes create_lkb() */
3821                 goto out_list;
3822         }
3823
3824         _request_lock(r, lkb);
3825
3826  out_list:
3827         if (!ret_nodeid)
3828                 process_lookup_list(r);
3829  out:
3830         unlock_rsb(r);
3831         put_rsb(r);
3832         dlm_put_lkb(lkb);
3833 }
3834
3835 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3836 {
3837         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3838                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3839                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3840                           ms->m_remid, ms->m_result);
3841                 return;
3842         }
3843
3844         switch (ms->m_type) {
3845
3846         /* messages sent to a master node */
3847
3848         case DLM_MSG_REQUEST:
3849                 receive_request(ls, ms);
3850                 break;
3851
3852         case DLM_MSG_CONVERT:
3853                 receive_convert(ls, ms);
3854                 break;
3855
3856         case DLM_MSG_UNLOCK:
3857                 receive_unlock(ls, ms);
3858                 break;
3859
3860         case DLM_MSG_CANCEL:
3861                 receive_cancel(ls, ms);
3862                 break;
3863
3864         /* messages sent from a master node (replies to above) */
3865
3866         case DLM_MSG_REQUEST_REPLY:
3867                 receive_request_reply(ls, ms);
3868                 break;
3869
3870         case DLM_MSG_CONVERT_REPLY:
3871                 receive_convert_reply(ls, ms);
3872                 break;
3873
3874         case DLM_MSG_UNLOCK_REPLY:
3875                 receive_unlock_reply(ls, ms);
3876                 break;
3877
3878         case DLM_MSG_CANCEL_REPLY:
3879                 receive_cancel_reply(ls, ms);
3880                 break;
3881
3882         /* messages sent from a master node (only two types of async msg) */
3883
3884         case DLM_MSG_GRANT:
3885                 receive_grant(ls, ms);
3886                 break;
3887
3888         case DLM_MSG_BAST:
3889                 receive_bast(ls, ms);
3890                 break;
3891
3892         /* messages sent to a dir node */
3893
3894         case DLM_MSG_LOOKUP:
3895                 receive_lookup(ls, ms);
3896                 break;
3897
3898         case DLM_MSG_REMOVE:
3899                 receive_remove(ls, ms);
3900                 break;
3901
3902         /* messages sent from a dir node (remove has no reply) */
3903
3904         case DLM_MSG_LOOKUP_REPLY:
3905                 receive_lookup_reply(ls, ms);
3906                 break;
3907
3908         /* other messages */
3909
3910         case DLM_MSG_PURGE:
3911                 receive_purge(ls, ms);
3912                 break;
3913
3914         default:
3915                 log_error(ls, "unknown message type %d", ms->m_type);
3916         }
3917
3918         dlm_astd_wake();
3919 }
3920
3921 /* If the lockspace is in recovery mode (locking stopped), then normal
3922    messages are saved on the requestqueue for processing after recovery is
3923    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3924    messages off the requestqueue before we process new ones. This occurs right
3925    after recovery completes when we transition from saving all messages on
3926    requestqueue, to processing all the saved messages, to processing new
3927    messages as they arrive. */
3928
3929 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3930                                 int nodeid)
3931 {
3932         if (dlm_locking_stopped(ls)) {
3933                 dlm_add_requestqueue(ls, nodeid, ms);
3934         } else {
3935                 dlm_wait_requestqueue(ls);
3936                 _receive_message(ls, ms);
3937         }
3938 }
3939
3940 /* This is called by dlm_recoverd to process messages that were saved on
3941    the requestqueue. */
3942
3943 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3944 {
3945         _receive_message(ls, ms);
3946 }
3947
3948 /* This is called by the midcomms layer when something is received for
3949    the lockspace.  It could be either a MSG (normal message sent as part of
3950    standard locking activity) or an RCOM (recovery message sent as part of
3951    lockspace recovery). */
3952
3953 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3954 {
3955         struct dlm_header *hd = &p->header;
3956         struct dlm_ls *ls;
3957         int type = 0;
3958
3959         switch (hd->h_cmd) {
3960         case DLM_MSG:
3961                 dlm_message_in(&p->message);
3962                 type = p->message.m_type;
3963                 break;
3964         case DLM_RCOM:
3965                 dlm_rcom_in(&p->rcom);
3966                 type = p->rcom.rc_type;
3967                 break;
3968         default:
3969                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3970                 return;
3971         }
3972
3973         if (hd->h_nodeid != nodeid) {
3974                 log_print("invalid h_nodeid %d from %d lockspace %x",
3975                           hd->h_nodeid, nodeid, hd->h_lockspace);
3976                 return;
3977         }
3978
3979         ls = dlm_find_lockspace_global(hd->h_lockspace);
3980         if (!ls) {
3981                 if (dlm_config.ci_log_debug)
3982                         log_print("invalid lockspace %x from %d cmd %d type %d",
3983                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3984
3985                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3986                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3987                 return;
3988         }
3989
3990         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3991            be inactive (in this ls) before transitioning to recovery mode */
3992
3993         down_read(&ls->ls_recv_active);
3994         if (hd->h_cmd == DLM_MSG)
3995                 dlm_receive_message(ls, &p->message, nodeid);
3996         else
3997                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3998         up_read(&ls->ls_recv_active);
3999
4000         dlm_put_lockspace(ls);
4001 }
4002
4003 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4004 {
4005         if (middle_conversion(lkb)) {
4006                 hold_lkb(lkb);
4007                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4008                 ls->ls_stub_ms.m_result = -EINPROGRESS;
4009                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4010                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4011                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4012
4013                 /* Same special case as in receive_rcom_lock_args() */
4014                 lkb->lkb_grmode = DLM_LOCK_IV;
4015                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4016                 unhold_lkb(lkb);
4017
4018         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4019                 lkb->lkb_flags |= DLM_IFL_RESEND;
4020         }
4021
4022         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4023            conversions are async; there's no reply from the remote master */
4024 }
4025
4026 /* A waiting lkb needs recovery if the master node has failed, or
4027    the master node is changing (only when no directory is used) */
4028
4029 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4030 {
4031         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4032                 return 1;
4033
4034         if (!dlm_no_directory(ls))
4035                 return 0;
4036
4037         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4038                 return 1;
4039
4040         return 0;
4041 }
4042
4043 /* Recovery for locks that are waiting for replies from nodes that are now
4044    gone.  We can just complete unlocks and cancels by faking a reply from the
4045    dead node.  Requests and up-conversions we flag to be resent after
4046    recovery.  Down-conversions can just be completed with a fake reply like
4047    unlocks.  Conversions between PR and CW need special attention. */
4048
4049 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4050 {
4051         struct dlm_lkb *lkb, *safe;
4052         int wait_type, stub_unlock_result, stub_cancel_result;
4053
4054         mutex_lock(&ls->ls_waiters_mutex);
4055
4056         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4057                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4058                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4059
4060                 /* all outstanding lookups, regardless of destination  will be
4061                    resent after recovery is done */
4062
4063                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4064                         lkb->lkb_flags |= DLM_IFL_RESEND;
4065                         continue;
4066                 }
4067
4068                 if (!waiter_needs_recovery(ls, lkb))
4069                         continue;
4070
4071                 wait_type = lkb->lkb_wait_type;
4072                 stub_unlock_result = -DLM_EUNLOCK;
4073                 stub_cancel_result = -DLM_ECANCEL;
4074
4075                 /* Main reply may have been received leaving a zero wait_type,
4076                    but a reply for the overlapping op may not have been
4077                    received.  In that case we need to fake the appropriate
4078                    reply for the overlap op. */
4079
4080                 if (!wait_type) {
4081                         if (is_overlap_cancel(lkb)) {
4082                                 wait_type = DLM_MSG_CANCEL;
4083                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4084                                         stub_cancel_result = 0;
4085                         }
4086                         if (is_overlap_unlock(lkb)) {
4087                                 wait_type = DLM_MSG_UNLOCK;
4088                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4089                                         stub_unlock_result = -ENOENT;
4090                         }
4091
4092                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4093                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4094                                   stub_cancel_result, stub_unlock_result);
4095                 }
4096
4097                 switch (wait_type) {
4098
4099                 case DLM_MSG_REQUEST:
4100                         lkb->lkb_flags |= DLM_IFL_RESEND;
4101                         break;
4102
4103                 case DLM_MSG_CONVERT:
4104                         recover_convert_waiter(ls, lkb);
4105                         break;
4106
4107                 case DLM_MSG_UNLOCK:
4108                         hold_lkb(lkb);
4109                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4110                         ls->ls_stub_ms.m_result = stub_unlock_result;
4111                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4112                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4113                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4114                         dlm_put_lkb(lkb);
4115                         break;
4116
4117                 case DLM_MSG_CANCEL:
4118                         hold_lkb(lkb);
4119                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4120                         ls->ls_stub_ms.m_result = stub_cancel_result;
4121                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4122                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4123                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4124                         dlm_put_lkb(lkb);
4125                         break;
4126
4127                 default:
4128                         log_error(ls, "invalid lkb wait_type %d %d",
4129                                   lkb->lkb_wait_type, wait_type);
4130                 }
4131                 schedule();
4132         }
4133         mutex_unlock(&ls->ls_waiters_mutex);
4134 }
4135
4136 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4137 {
4138         struct dlm_lkb *lkb;
4139         int found = 0;
4140
4141         mutex_lock(&ls->ls_waiters_mutex);
4142         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4143                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4144                         hold_lkb(lkb);
4145                         found = 1;
4146                         break;
4147                 }
4148         }
4149         mutex_unlock(&ls->ls_waiters_mutex);
4150
4151         if (!found)
4152                 lkb = NULL;
4153         return lkb;
4154 }
4155
4156 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4157    master or dir-node for r.  Processing the lkb may result in it being placed
4158    back on waiters. */
4159
4160 /* We do this after normal locking has been enabled and any saved messages
4161    (in requestqueue) have been processed.  We should be confident that at
4162    this point we won't get or process a reply to any of these waiting
4163    operations.  But, new ops may be coming in on the rsbs/locks here from
4164    userspace or remotely. */
4165
4166 /* there may have been an overlap unlock/cancel prior to recovery or after
4167    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4168    overlap flag would just have been set and nothing new sent.  we can be
4169    confident here than any replies to either the initial op or overlap ops
4170    prior to recovery have been received. */
4171
4172 int dlm_recover_waiters_post(struct dlm_ls *ls)
4173 {
4174         struct dlm_lkb *lkb;
4175         struct dlm_rsb *r;
4176         int error = 0, mstype, err, oc, ou;
4177
4178         while (1) {
4179                 if (dlm_locking_stopped(ls)) {
4180                         log_debug(ls, "recover_waiters_post aborted");
4181                         error = -EINTR;
4182                         break;
4183                 }
4184
4185                 lkb = find_resend_waiter(ls);
4186                 if (!lkb)
4187                         break;
4188
4189                 r = lkb->lkb_resource;
4190                 hold_rsb(r);
4191                 lock_rsb(r);
4192
4193                 mstype = lkb->lkb_wait_type;
4194                 oc = is_overlap_cancel(lkb);
4195                 ou = is_overlap_unlock(lkb);
4196                 err = 0;
4197
4198                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4199                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4200
4201                 /* At this point we assume that we won't get a reply to any
4202                    previous op or overlap op on this lock.  First, do a big
4203                    remove_from_waiters() for all previous ops. */
4204
4205                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4206                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4207                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4208                 lkb->lkb_wait_type = 0;
4209                 lkb->lkb_wait_count = 0;
4210                 mutex_lock(&ls->ls_waiters_mutex);
4211                 list_del_init(&lkb->lkb_wait_reply);
4212                 mutex_unlock(&ls->ls_waiters_mutex);
4213                 unhold_lkb(lkb); /* for waiters list */
4214
4215                 if (oc || ou) {
4216                         /* do an unlock or cancel instead of resending */
4217                         switch (mstype) {
4218                         case DLM_MSG_LOOKUP:
4219                         case DLM_MSG_REQUEST:
4220                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4221                                                         -DLM_ECANCEL);
4222                                 unhold_lkb(lkb); /* undoes create_lkb() */
4223                                 break;
4224                         case DLM_MSG_CONVERT:
4225                                 if (oc) {
4226                                         queue_cast(r, lkb, -DLM_ECANCEL);
4227                                 } else {
4228                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4229                                         _unlock_lock(r, lkb);
4230                                 }
4231                                 break;
4232                         default:
4233                                 err = 1;
4234                         }
4235                 } else {
4236                         switch (mstype) {
4237                         case DLM_MSG_LOOKUP:
4238                         case DLM_MSG_REQUEST:
4239                                 _request_lock(r, lkb);
4240                                 if (is_master(r))
4241                                         confirm_master(r, 0);
4242                                 break;
4243                         case DLM_MSG_CONVERT:
4244                                 _convert_lock(r, lkb);
4245                                 break;
4246                         default:
4247                                 err = 1;
4248                         }
4249                 }
4250
4251                 if (err)
4252                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4253                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4254                 unlock_rsb(r);
4255                 put_rsb(r);
4256                 dlm_put_lkb(lkb);
4257         }
4258
4259         return error;
4260 }
4261
4262 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4263                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4264 {
4265         struct dlm_ls *ls = r->res_ls;
4266         struct dlm_lkb *lkb, *safe;
4267
4268         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4269                 if (test(ls, lkb)) {
4270                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4271                         del_lkb(r, lkb);
4272                         /* this put should free the lkb */
4273                         if (!dlm_put_lkb(lkb))
4274                                 log_error(ls, "purged lkb not released");
4275                 }
4276         }
4277 }
4278
4279 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4280 {
4281         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4282 }
4283
4284 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4285 {
4286         return is_master_copy(lkb);
4287 }
4288
4289 static void purge_dead_locks(struct dlm_rsb *r)
4290 {
4291         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4292         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4293         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4294 }
4295
4296 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4297 {
4298         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4299         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4300         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4301 }
4302
4303 /* Get rid of locks held by nodes that are gone. */
4304
4305 int dlm_purge_locks(struct dlm_ls *ls)
4306 {
4307         struct dlm_rsb *r;
4308
4309         log_debug(ls, "dlm_purge_locks");
4310
4311         down_write(&ls->ls_root_sem);
4312         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4313                 hold_rsb(r);
4314                 lock_rsb(r);
4315                 if (is_master(r))
4316                         purge_dead_locks(r);
4317                 unlock_rsb(r);
4318                 unhold_rsb(r);
4319
4320                 schedule();
4321         }
4322         up_write(&ls->ls_root_sem);
4323
4324         return 0;
4325 }
4326
4327 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4328 {
4329         struct dlm_rsb *r, *r_ret = NULL;
4330
4331         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4332         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4333                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4334                         continue;
4335                 hold_rsb(r);
4336                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4337                 r_ret = r;
4338                 break;
4339         }
4340         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4341         return r_ret;
4342 }
4343
4344 void dlm_grant_after_purge(struct dlm_ls *ls)
4345 {
4346         struct dlm_rsb *r;
4347         int bucket = 0;
4348
4349         while (1) {
4350                 r = find_purged_rsb(ls, bucket);
4351                 if (!r) {
4352                         if (bucket == ls->ls_rsbtbl_size - 1)
4353                                 break;
4354                         bucket++;
4355                         continue;
4356                 }
4357                 lock_rsb(r);
4358                 if (is_master(r)) {
4359                         grant_pending_locks(r);
4360                         confirm_master(r, 0);
4361                 }
4362                 unlock_rsb(r);
4363                 put_rsb(r);
4364                 schedule();
4365         }
4366 }
4367
4368 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4369                                          uint32_t remid)
4370 {
4371         struct dlm_lkb *lkb;
4372
4373         list_for_each_entry(lkb, head, lkb_statequeue) {
4374                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4375                         return lkb;
4376         }
4377         return NULL;
4378 }
4379
4380 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4381                                     uint32_t remid)
4382 {
4383         struct dlm_lkb *lkb;
4384
4385         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4386         if (lkb)
4387                 return lkb;
4388         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4389         if (lkb)
4390                 return lkb;
4391         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4392         if (lkb)
4393                 return lkb;
4394         return NULL;
4395 }
4396
4397 /* needs at least dlm_rcom + rcom_lock */
4398 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4399                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4400 {
4401         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4402
4403         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4404         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4405         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4406         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4407         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4408         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4409         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4410         lkb->lkb_rqmode = rl->rl_rqmode;
4411         lkb->lkb_grmode = rl->rl_grmode;
4412         /* don't set lkb_status because add_lkb wants to itself */
4413
4414         lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4415         lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4416
4417         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4418                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4419                          sizeof(struct rcom_lock);
4420                 if (lvblen > ls->ls_lvblen)
4421                         return -EINVAL;
4422                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4423                 if (!lkb->lkb_lvbptr)
4424                         return -ENOMEM;
4425                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4426         }
4427
4428         /* Conversions between PR and CW (middle modes) need special handling.
4429            The real granted mode of these converting locks cannot be determined
4430            until all locks have been rebuilt on the rsb (recover_conversion) */
4431
4432         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4433             middle_conversion(lkb)) {
4434                 rl->rl_status = DLM_LKSTS_CONVERT;
4435                 lkb->lkb_grmode = DLM_LOCK_IV;
4436                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4437         }
4438
4439         return 0;
4440 }
4441
4442 /* This lkb may have been recovered in a previous aborted recovery so we need
4443    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4444    If so we just send back a standard reply.  If not, we create a new lkb with
4445    the given values and send back our lkid.  We send back our lkid by sending
4446    back the rcom_lock struct we got but with the remid field filled in. */
4447
4448 /* needs at least dlm_rcom + rcom_lock */
4449 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4450 {
4451         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4452         struct dlm_rsb *r;
4453         struct dlm_lkb *lkb;
4454         int error;
4455
4456         if (rl->rl_parent_lkid) {
4457                 error = -EOPNOTSUPP;
4458                 goto out;
4459         }
4460
4461         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4462                          R_MASTER, &r);
4463         if (error)
4464                 goto out;
4465
4466         lock_rsb(r);
4467
4468         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4469         if (lkb) {
4470                 error = -EEXIST;
4471                 goto out_remid;
4472         }
4473
4474         error = create_lkb(ls, &lkb);
4475         if (error)
4476                 goto out_unlock;
4477
4478         error = receive_rcom_lock_args(ls, lkb, r, rc);
4479         if (error) {
4480                 __put_lkb(ls, lkb);
4481                 goto out_unlock;
4482         }
4483
4484         attach_lkb(r, lkb);
4485         add_lkb(r, lkb, rl->rl_status);
4486         error = 0;
4487
4488  out_remid:
4489         /* this is the new value returned to the lock holder for
4490            saving in its process-copy lkb */
4491         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4492
4493  out_unlock:
4494         unlock_rsb(r);
4495         put_rsb(r);
4496  out:
4497         if (error)
4498                 log_debug(ls, "recover_master_copy %d %x", error,
4499                           le32_to_cpu(rl->rl_lkid));
4500         rl->rl_result = cpu_to_le32(error);
4501         return error;
4502 }
4503
4504 /* needs at least dlm_rcom + rcom_lock */
4505 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4506 {
4507         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4508         struct dlm_rsb *r;
4509         struct dlm_lkb *lkb;
4510         int error;
4511
4512         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4513         if (error) {
4514                 log_error(ls, "recover_process_copy no lkid %x",
4515                                 le32_to_cpu(rl->rl_lkid));
4516                 return error;
4517         }
4518
4519         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4520
4521         error = le32_to_cpu(rl->rl_result);
4522
4523         r = lkb->lkb_resource;
4524         hold_rsb(r);
4525         lock_rsb(r);
4526
4527         switch (error) {
4528         case -EBADR:
4529                 /* There's a chance the new master received our lock before
4530                    dlm_recover_master_reply(), this wouldn't happen if we did
4531                    a barrier between recover_masters and recover_locks. */
4532                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4533                           (unsigned long)r, r->res_name);
4534                 dlm_send_rcom_lock(r, lkb);
4535                 goto out;
4536         case -EEXIST:
4537                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4538                 /* fall through */
4539         case 0:
4540                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4541                 break;
4542         default:
4543                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4544                           error, lkb->lkb_id);
4545         }
4546
4547         /* an ack for dlm_recover_locks() which waits for replies from
4548            all the locks it sends to new masters */
4549         dlm_recovered_lock(r);
4550  out:
4551         unlock_rsb(r);
4552         put_rsb(r);
4553         dlm_put_lkb(lkb);
4554
4555         return 0;
4556 }
4557
4558 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4559                      int mode, uint32_t flags, void *name, unsigned int namelen,
4560                      unsigned long timeout_cs)
4561 {
4562         struct dlm_lkb *lkb;
4563         struct dlm_args args;
4564         int error;
4565
4566         dlm_lock_recovery(ls);
4567
4568         error = create_lkb(ls, &lkb);
4569         if (error) {
4570                 kfree(ua);
4571                 goto out;
4572         }
4573
4574         if (flags & DLM_LKF_VALBLK) {
4575                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4576                 if (!ua->lksb.sb_lvbptr) {
4577                         kfree(ua);
4578                         __put_lkb(ls, lkb);
4579                         error = -ENOMEM;
4580                         goto out;
4581                 }
4582         }
4583
4584         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4585            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4586            lock and that lkb_astparam is the dlm_user_args structure. */
4587
4588         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4589                               fake_astfn, ua, fake_bastfn, &args);
4590         lkb->lkb_flags |= DLM_IFL_USER;
4591         ua->old_mode = DLM_LOCK_IV;
4592
4593         if (error) {
4594                 __put_lkb(ls, lkb);
4595                 goto out;
4596         }
4597
4598         error = request_lock(ls, lkb, name, namelen, &args);
4599
4600         switch (error) {
4601         case 0:
4602                 break;
4603         case -EINPROGRESS:
4604                 error = 0;
4605                 break;
4606         case -EAGAIN:
4607                 error = 0;
4608                 /* fall through */
4609         default:
4610                 __put_lkb(ls, lkb);
4611                 goto out;
4612         }
4613
4614         /* add this new lkb to the per-process list of locks */
4615         spin_lock(&ua->proc->locks_spin);
4616         hold_lkb(lkb);
4617         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4618         spin_unlock(&ua->proc->locks_spin);
4619  out:
4620         dlm_unlock_recovery(ls);
4621         return error;
4622 }
4623
4624 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4625                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4626                      unsigned long timeout_cs)
4627 {
4628         struct dlm_lkb *lkb;
4629         struct dlm_args args;
4630         struct dlm_user_args *ua;
4631         int error;
4632
4633         dlm_lock_recovery(ls);
4634
4635         error = find_lkb(ls, lkid, &lkb);
4636         if (error)
4637                 goto out;
4638
4639         /* user can change the params on its lock when it converts it, or
4640            add an lvb that didn't exist before */
4641
4642         ua = lkb->lkb_ua;
4643
4644         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4645                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4646                 if (!ua->lksb.sb_lvbptr) {
4647                         error = -ENOMEM;
4648                         goto out_put;
4649                 }
4650         }
4651         if (lvb_in && ua->lksb.sb_lvbptr)
4652                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4653
4654         ua->xid = ua_tmp->xid;
4655         ua->castparam = ua_tmp->castparam;
4656         ua->castaddr = ua_tmp->castaddr;
4657         ua->bastparam = ua_tmp->bastparam;
4658         ua->bastaddr = ua_tmp->bastaddr;
4659         ua->user_lksb = ua_tmp->user_lksb;
4660         ua->old_mode = lkb->lkb_grmode;
4661
4662         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4663                               fake_astfn, ua, fake_bastfn, &args);
4664         if (error)
4665                 goto out_put;
4666
4667         error = convert_lock(ls, lkb, &args);
4668
4669         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4670                 error = 0;
4671  out_put:
4672         dlm_put_lkb(lkb);
4673  out:
4674         dlm_unlock_recovery(ls);
4675         kfree(ua_tmp);
4676         return error;
4677 }
4678
4679 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4680                     uint32_t flags, uint32_t lkid, char *lvb_in)
4681 {
4682         struct dlm_lkb *lkb;
4683         struct dlm_args args;
4684         struct dlm_user_args *ua;
4685         int error;
4686
4687         dlm_lock_recovery(ls);
4688
4689         error = find_lkb(ls, lkid, &lkb);
4690         if (error)
4691                 goto out;
4692
4693         ua = lkb->lkb_ua;
4694
4695         if (lvb_in && ua->lksb.sb_lvbptr)
4696                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4697         if (ua_tmp->castparam)
4698                 ua->castparam = ua_tmp->castparam;
4699         ua->user_lksb = ua_tmp->user_lksb;
4700
4701         error = set_unlock_args(flags, ua, &args);
4702         if (error)
4703                 goto out_put;
4704
4705         error = unlock_lock(ls, lkb, &args);
4706
4707         if (error == -DLM_EUNLOCK)
4708                 error = 0;
4709         /* from validate_unlock_args() */
4710         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4711                 error = 0;
4712         if (error)
4713                 goto out_put;
4714
4715         spin_lock(&ua->proc->locks_spin);
4716         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4717         if (!list_empty(&lkb->lkb_ownqueue))
4718                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4719         spin_unlock(&ua->proc->locks_spin);
4720  out_put:
4721         dlm_put_lkb(lkb);
4722  out:
4723         dlm_unlock_recovery(ls);
4724         kfree(ua_tmp);
4725         return error;
4726 }
4727
4728 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4729                     uint32_t flags, uint32_t lkid)
4730 {
4731         struct dlm_lkb *lkb;
4732         struct dlm_args args;
4733         struct dlm_user_args *ua;
4734         int error;
4735
4736         dlm_lock_recovery(ls);
4737
4738         error = find_lkb(ls, lkid, &lkb);
4739         if (error)
4740                 goto out;
4741
4742         ua = lkb->lkb_ua;
4743         if (ua_tmp->castparam)
4744                 ua->castparam = ua_tmp->castparam;
4745         ua->user_lksb = ua_tmp->user_lksb;
4746
4747         error = set_unlock_args(flags, ua, &args);
4748         if (error)
4749                 goto out_put;
4750
4751         error = cancel_lock(ls, lkb, &args);
4752
4753         if (error == -DLM_ECANCEL)
4754                 error = 0;
4755         /* from validate_unlock_args() */
4756         if (error == -EBUSY)
4757                 error = 0;
4758  out_put:
4759         dlm_put_lkb(lkb);
4760  out:
4761         dlm_unlock_recovery(ls);
4762         kfree(ua_tmp);
4763         return error;
4764 }
4765
4766 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4767 {
4768         struct dlm_lkb *lkb;
4769         struct dlm_args args;
4770         struct dlm_user_args *ua;
4771         struct dlm_rsb *r;
4772         int error;
4773
4774         dlm_lock_recovery(ls);
4775
4776         error = find_lkb(ls, lkid, &lkb);
4777         if (error)
4778                 goto out;
4779
4780         ua = lkb->lkb_ua;
4781
4782         error = set_unlock_args(flags, ua, &args);
4783         if (error)
4784                 goto out_put;
4785
4786         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4787
4788         r = lkb->lkb_resource;
4789         hold_rsb(r);
4790         lock_rsb(r);
4791
4792         error = validate_unlock_args(lkb, &args);
4793         if (error)
4794                 goto out_r;
4795         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4796
4797         error = _cancel_lock(r, lkb);
4798  out_r:
4799         unlock_rsb(r);
4800         put_rsb(r);
4801
4802         if (error == -DLM_ECANCEL)
4803                 error = 0;
4804         /* from validate_unlock_args() */
4805         if (error == -EBUSY)
4806                 error = 0;
4807  out_put:
4808         dlm_put_lkb(lkb);
4809  out:
4810         dlm_unlock_recovery(ls);
4811         return error;
4812 }
4813
4814 /* lkb's that are removed from the waiters list by revert are just left on the
4815    orphans list with the granted orphan locks, to be freed by purge */
4816
4817 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4818 {
4819         struct dlm_args args;
4820         int error;
4821
4822         hold_lkb(lkb);
4823         mutex_lock(&ls->ls_orphans_mutex);
4824         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4825         mutex_unlock(&ls->ls_orphans_mutex);
4826
4827         set_unlock_args(0, lkb->lkb_ua, &args);
4828
4829         error = cancel_lock(ls, lkb, &args);
4830         if (error == -DLM_ECANCEL)
4831                 error = 0;
4832         return error;
4833 }
4834
4835 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4836    Regardless of what rsb queue the lock is on, it's removed and freed. */
4837
4838 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4839 {
4840         struct dlm_args args;
4841         int error;
4842
4843         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4844
4845         error = unlock_lock(ls, lkb, &args);
4846         if (error == -DLM_EUNLOCK)
4847                 error = 0;
4848         return error;
4849 }
4850
4851 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4852    (which does lock_rsb) due to deadlock with receiving a message that does
4853    lock_rsb followed by dlm_user_add_ast() */
4854
4855 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4856                                      struct dlm_user_proc *proc)
4857 {
4858         struct dlm_lkb *lkb = NULL;
4859
4860         mutex_lock(&ls->ls_clear_proc_locks);
4861         if (list_empty(&proc->locks))
4862                 goto out;
4863
4864         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4865         list_del_init(&lkb->lkb_ownqueue);
4866
4867         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4868                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4869         else
4870                 lkb->lkb_flags |= DLM_IFL_DEAD;
4871  out:
4872         mutex_unlock(&ls->ls_clear_proc_locks);
4873         return lkb;
4874 }
4875
4876 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4877    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4878    which we clear here. */
4879
4880 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4881    list, and no more device_writes should add lkb's to proc->locks list; so we
4882    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4883    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4884    them ourself. */
4885
4886 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4887 {
4888         struct dlm_lkb *lkb, *safe;
4889
4890         dlm_lock_recovery(ls);
4891
4892         while (1) {
4893                 lkb = del_proc_lock(ls, proc);
4894                 if (!lkb)
4895                         break;
4896                 del_timeout(lkb);
4897                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4898                         orphan_proc_lock(ls, lkb);
4899                 else
4900                         unlock_proc_lock(ls, lkb);
4901
4902                 /* this removes the reference for the proc->locks list
4903                    added by dlm_user_request, it may result in the lkb
4904                    being freed */
4905
4906                 dlm_put_lkb(lkb);
4907         }
4908
4909         mutex_lock(&ls->ls_clear_proc_locks);
4910
4911         /* in-progress unlocks */
4912         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4913                 list_del_init(&lkb->lkb_ownqueue);
4914                 lkb->lkb_flags |= DLM_IFL_DEAD;
4915                 dlm_put_lkb(lkb);
4916         }
4917
4918         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4919                 lkb->lkb_ast_type = 0;
4920                 list_del(&lkb->lkb_astqueue);
4921                 dlm_put_lkb(lkb);
4922         }
4923
4924         mutex_unlock(&ls->ls_clear_proc_locks);
4925         dlm_unlock_recovery(ls);
4926 }
4927
4928 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4929 {
4930         struct dlm_lkb *lkb, *safe;
4931
4932         while (1) {
4933                 lkb = NULL;
4934                 spin_lock(&proc->locks_spin);
4935                 if (!list_empty(&proc->locks)) {
4936                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4937                                          lkb_ownqueue);
4938                         list_del_init(&lkb->lkb_ownqueue);
4939                 }
4940                 spin_unlock(&proc->locks_spin);
4941
4942                 if (!lkb)
4943                         break;
4944
4945                 lkb->lkb_flags |= DLM_IFL_DEAD;
4946                 unlock_proc_lock(ls, lkb);
4947                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4948         }
4949
4950         spin_lock(&proc->locks_spin);
4951         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4952                 list_del_init(&lkb->lkb_ownqueue);
4953                 lkb->lkb_flags |= DLM_IFL_DEAD;
4954                 dlm_put_lkb(lkb);
4955         }
4956         spin_unlock(&proc->locks_spin);
4957
4958         spin_lock(&proc->asts_spin);
4959         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4960                 list_del(&lkb->lkb_astqueue);
4961                 dlm_put_lkb(lkb);
4962         }
4963         spin_unlock(&proc->asts_spin);
4964 }
4965
4966 /* pid of 0 means purge all orphans */
4967
4968 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4969 {
4970         struct dlm_lkb *lkb, *safe;
4971
4972         mutex_lock(&ls->ls_orphans_mutex);
4973         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4974                 if (pid && lkb->lkb_ownpid != pid)
4975                         continue;
4976                 unlock_proc_lock(ls, lkb);
4977                 list_del_init(&lkb->lkb_ownqueue);
4978                 dlm_put_lkb(lkb);
4979         }
4980         mutex_unlock(&ls->ls_orphans_mutex);
4981 }
4982
4983 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4984 {
4985         struct dlm_message *ms;
4986         struct dlm_mhandle *mh;
4987         int error;
4988
4989         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4990                                 DLM_MSG_PURGE, &ms, &mh);
4991         if (error)
4992                 return error;
4993         ms->m_nodeid = nodeid;
4994         ms->m_pid = pid;
4995
4996         return send_message(mh, ms);
4997 }
4998
4999 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5000                    int nodeid, int pid)
5001 {
5002         int error = 0;
5003
5004         if (nodeid != dlm_our_nodeid()) {
5005                 error = send_purge(ls, nodeid, pid);
5006         } else {
5007                 dlm_lock_recovery(ls);
5008                 if (pid == current->pid)
5009                         purge_proc_locks(ls, proc);
5010                 else
5011                         do_purge(ls, nodeid, pid);
5012                 dlm_unlock_recovery(ls);
5013         }
5014         return error;
5015 }
5016