Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/hch/hfsplus
[pandora-kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329
330 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331 {
332         struct dlm_rsb *r;
333
334         r = dlm_allocate_rsb(ls, len);
335         if (!r)
336                 return NULL;
337
338         r->res_ls = ls;
339         r->res_length = len;
340         memcpy(r->res_name, name, len);
341         mutex_init(&r->res_mutex);
342
343         INIT_LIST_HEAD(&r->res_lookup);
344         INIT_LIST_HEAD(&r->res_grantqueue);
345         INIT_LIST_HEAD(&r->res_convertqueue);
346         INIT_LIST_HEAD(&r->res_waitqueue);
347         INIT_LIST_HEAD(&r->res_root_list);
348         INIT_LIST_HEAD(&r->res_recover_list);
349
350         return r;
351 }
352
353 static int search_rsb_list(struct list_head *head, char *name, int len,
354                            unsigned int flags, struct dlm_rsb **r_ret)
355 {
356         struct dlm_rsb *r;
357         int error = 0;
358
359         list_for_each_entry(r, head, res_hashchain) {
360                 if (len == r->res_length && !memcmp(name, r->res_name, len))
361                         goto found;
362         }
363         *r_ret = NULL;
364         return -EBADR;
365
366  found:
367         if (r->res_nodeid && (flags & R_MASTER))
368                 error = -ENOTBLK;
369         *r_ret = r;
370         return error;
371 }
372
373 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374                        unsigned int flags, struct dlm_rsb **r_ret)
375 {
376         struct dlm_rsb *r;
377         int error;
378
379         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380         if (!error) {
381                 kref_get(&r->res_ref);
382                 goto out;
383         }
384         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385         if (error)
386                 goto out;
387
388         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390         if (dlm_no_directory(ls))
391                 goto out;
392
393         if (r->res_nodeid == -1) {
394                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395                 r->res_first_lkid = 0;
396         } else if (r->res_nodeid > 0) {
397                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else {
400                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402         }
403  out:
404         *r_ret = r;
405         return error;
406 }
407
408 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409                       unsigned int flags, struct dlm_rsb **r_ret)
410 {
411         int error;
412         spin_lock(&ls->ls_rsbtbl[b].lock);
413         error = _search_rsb(ls, name, len, b, flags, r_ret);
414         spin_unlock(&ls->ls_rsbtbl[b].lock);
415         return error;
416 }
417
418 /*
419  * Find rsb in rsbtbl and potentially create/add one
420  *
421  * Delaying the release of rsb's has a similar benefit to applications keeping
422  * NL locks on an rsb, but without the guarantee that the cached master value
423  * will still be valid when the rsb is reused.  Apps aren't always smart enough
424  * to keep NL locks on an rsb that they may lock again shortly; this can lead
425  * to excessive master lookups and removals if we don't delay the release.
426  *
427  * Searching for an rsb means looking through both the normal list and toss
428  * list.  When found on the toss list the rsb is moved to the normal list with
429  * ref count of 1; when found on normal list the ref count is incremented.
430  */
431
432 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433                     unsigned int flags, struct dlm_rsb **r_ret)
434 {
435         struct dlm_rsb *r = NULL, *tmp;
436         uint32_t hash, bucket;
437         int error = -EINVAL;
438
439         if (namelen > DLM_RESNAME_MAXLEN)
440                 goto out;
441
442         if (dlm_no_directory(ls))
443                 flags |= R_CREATE;
444
445         error = 0;
446         hash = jhash(name, namelen, 0);
447         bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449         error = search_rsb(ls, name, namelen, bucket, flags, &r);
450         if (!error)
451                 goto out;
452
453         if (error == -EBADR && !(flags & R_CREATE))
454                 goto out;
455
456         /* the rsb was found but wasn't a master copy */
457         if (error == -ENOTBLK)
458                 goto out;
459
460         error = -ENOMEM;
461         r = create_rsb(ls, name, namelen);
462         if (!r)
463                 goto out;
464
465         r->res_hash = hash;
466         r->res_bucket = bucket;
467         r->res_nodeid = -1;
468         kref_init(&r->res_ref);
469
470         /* With no directory, the master can be set immediately */
471         if (dlm_no_directory(ls)) {
472                 int nodeid = dlm_dir_nodeid(r);
473                 if (nodeid == dlm_our_nodeid())
474                         nodeid = 0;
475                 r->res_nodeid = nodeid;
476         }
477
478         spin_lock(&ls->ls_rsbtbl[bucket].lock);
479         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480         if (!error) {
481                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482                 dlm_free_rsb(r);
483                 r = tmp;
484                 goto out;
485         }
486         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488         error = 0;
489  out:
490         *r_ret = r;
491         return error;
492 }
493
494 /* This is only called to add a reference when the code already holds
495    a valid reference to the rsb, so there's no need for locking. */
496
497 static inline void hold_rsb(struct dlm_rsb *r)
498 {
499         kref_get(&r->res_ref);
500 }
501
502 void dlm_hold_rsb(struct dlm_rsb *r)
503 {
504         hold_rsb(r);
505 }
506
507 static void toss_rsb(struct kref *kref)
508 {
509         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510         struct dlm_ls *ls = r->res_ls;
511
512         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513         kref_init(&r->res_ref);
514         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515         r->res_toss_time = jiffies;
516         if (r->res_lvbptr) {
517                 dlm_free_lvb(r->res_lvbptr);
518                 r->res_lvbptr = NULL;
519         }
520 }
521
522 /* When all references to the rsb are gone it's transferred to
523    the tossed list for later disposal. */
524
525 static void put_rsb(struct dlm_rsb *r)
526 {
527         struct dlm_ls *ls = r->res_ls;
528         uint32_t bucket = r->res_bucket;
529
530         spin_lock(&ls->ls_rsbtbl[bucket].lock);
531         kref_put(&r->res_ref, toss_rsb);
532         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
533 }
534
535 void dlm_put_rsb(struct dlm_rsb *r)
536 {
537         put_rsb(r);
538 }
539
540 /* See comment for unhold_lkb */
541
542 static void unhold_rsb(struct dlm_rsb *r)
543 {
544         int rv;
545         rv = kref_put(&r->res_ref, toss_rsb);
546         DLM_ASSERT(!rv, dlm_dump_rsb(r););
547 }
548
549 static void kill_rsb(struct kref *kref)
550 {
551         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553         /* All work is done after the return from kref_put() so we
554            can release the write_lock before the remove and free. */
555
556         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
562 }
563
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565    The rsb must exist as long as any lkb's for it do. */
566
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568 {
569         hold_rsb(r);
570         lkb->lkb_resource = r;
571 }
572
573 static void detach_lkb(struct dlm_lkb *lkb)
574 {
575         if (lkb->lkb_resource) {
576                 put_rsb(lkb->lkb_resource);
577                 lkb->lkb_resource = NULL;
578         }
579 }
580
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582 {
583         struct dlm_lkb *lkb, *tmp;
584         uint32_t lkid = 0;
585         uint16_t bucket;
586
587         lkb = dlm_allocate_lkb(ls);
588         if (!lkb)
589                 return -ENOMEM;
590
591         lkb->lkb_nodeid = -1;
592         lkb->lkb_grmode = DLM_LOCK_IV;
593         kref_init(&lkb->lkb_ref);
594         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596         INIT_LIST_HEAD(&lkb->lkb_time_list);
597         INIT_LIST_HEAD(&lkb->lkb_astqueue);
598
599         get_random_bytes(&bucket, sizeof(bucket));
600         bucket &= (ls->ls_lkbtbl_size - 1);
601
602         write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604         /* counter can roll over so we must verify lkid is not in use */
605
606         while (lkid == 0) {
607                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608
609                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610                                     lkb_idtbl_list) {
611                         if (tmp->lkb_id != lkid)
612                                 continue;
613                         lkid = 0;
614                         break;
615                 }
616         }
617
618         lkb->lkb_id = lkid;
619         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620         write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622         *lkb_ret = lkb;
623         return 0;
624 }
625
626 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627 {
628         struct dlm_lkb *lkb;
629         uint16_t bucket = (lkid >> 16);
630
631         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632                 if (lkb->lkb_id == lkid)
633                         return lkb;
634         }
635         return NULL;
636 }
637
638 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639 {
640         struct dlm_lkb *lkb;
641         uint16_t bucket = (lkid >> 16);
642
643         if (bucket >= ls->ls_lkbtbl_size)
644                 return -EBADSLT;
645
646         read_lock(&ls->ls_lkbtbl[bucket].lock);
647         lkb = __find_lkb(ls, lkid);
648         if (lkb)
649                 kref_get(&lkb->lkb_ref);
650         read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652         *lkb_ret = lkb;
653         return lkb ? 0 : -ENOENT;
654 }
655
656 static void kill_lkb(struct kref *kref)
657 {
658         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660         /* All work is done after the return from kref_put() so we
661            can release the write_lock before the detach_lkb */
662
663         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664 }
665
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667    it so we need to provide the lockspace explicitly */
668
669 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 {
671         uint16_t bucket = (lkb->lkb_id >> 16);
672
673         write_lock(&ls->ls_lkbtbl[bucket].lock);
674         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675                 list_del(&lkb->lkb_idtbl_list);
676                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678                 detach_lkb(lkb);
679
680                 /* for local/process lkbs, lvbptr points to caller's lksb */
681                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
682                         dlm_free_lvb(lkb->lkb_lvbptr);
683                 dlm_free_lkb(lkb);
684                 return 1;
685         } else {
686                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
687                 return 0;
688         }
689 }
690
691 int dlm_put_lkb(struct dlm_lkb *lkb)
692 {
693         struct dlm_ls *ls;
694
695         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698         ls = lkb->lkb_resource->res_ls;
699         return __put_lkb(ls, lkb);
700 }
701
702 /* This is only called to add a reference when the code already holds
703    a valid reference to the lkb, so there's no need for locking. */
704
705 static inline void hold_lkb(struct dlm_lkb *lkb)
706 {
707         kref_get(&lkb->lkb_ref);
708 }
709
710 /* This is called when we need to remove a reference and are certain
711    it's not the last ref.  e.g. del_lkb is always called between a
712    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713    put_lkb would work fine, but would involve unnecessary locking */
714
715 static inline void unhold_lkb(struct dlm_lkb *lkb)
716 {
717         int rv;
718         rv = kref_put(&lkb->lkb_ref, kill_lkb);
719         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720 }
721
722 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723                             int mode)
724 {
725         struct dlm_lkb *lkb = NULL;
726
727         list_for_each_entry(lkb, head, lkb_statequeue)
728                 if (lkb->lkb_rqmode < mode)
729                         break;
730
731         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738         kref_get(&lkb->lkb_ref);
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
742         lkb->lkb_timestamp = ktime_get();
743
744         lkb->lkb_status = status;
745
746         switch (status) {
747         case DLM_LKSTS_WAITING:
748                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750                 else
751                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752                 break;
753         case DLM_LKSTS_GRANTED:
754                 /* convention says granted locks kept in order of grmode */
755                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756                                 lkb->lkb_grmode);
757                 break;
758         case DLM_LKSTS_CONVERT:
759                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761                 else
762                         list_add_tail(&lkb->lkb_statequeue,
763                                       &r->res_convertqueue);
764                 break;
765         default:
766                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767         }
768 }
769
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771 {
772         lkb->lkb_status = 0;
773         list_del(&lkb->lkb_statequeue);
774         unhold_lkb(lkb);
775 }
776
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778 {
779         hold_lkb(lkb);
780         del_lkb(r, lkb);
781         add_lkb(r, lkb, sts);
782         unhold_lkb(lkb);
783 }
784
785 static int msg_reply_type(int mstype)
786 {
787         switch (mstype) {
788         case DLM_MSG_REQUEST:
789                 return DLM_MSG_REQUEST_REPLY;
790         case DLM_MSG_CONVERT:
791                 return DLM_MSG_CONVERT_REPLY;
792         case DLM_MSG_UNLOCK:
793                 return DLM_MSG_UNLOCK_REPLY;
794         case DLM_MSG_CANCEL:
795                 return DLM_MSG_CANCEL_REPLY;
796         case DLM_MSG_LOOKUP:
797                 return DLM_MSG_LOOKUP_REPLY;
798         }
799         return -1;
800 }
801
802 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
803 {
804         int i;
805
806         for (i = 0; i < num_nodes; i++) {
807                 if (!warned[i]) {
808                         warned[i] = nodeid;
809                         return 0;
810                 }
811                 if (warned[i] == nodeid)
812                         return 1;
813         }
814         return 0;
815 }
816
817 void dlm_scan_waiters(struct dlm_ls *ls)
818 {
819         struct dlm_lkb *lkb;
820         ktime_t zero = ktime_set(0, 0);
821         s64 us;
822         s64 debug_maxus = 0;
823         u32 debug_scanned = 0;
824         u32 debug_expired = 0;
825         int num_nodes = 0;
826         int *warned = NULL;
827
828         if (!dlm_config.ci_waitwarn_us)
829                 return;
830
831         mutex_lock(&ls->ls_waiters_mutex);
832
833         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
834                 if (ktime_equal(lkb->lkb_wait_time, zero))
835                         continue;
836
837                 debug_scanned++;
838
839                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
840
841                 if (us < dlm_config.ci_waitwarn_us)
842                         continue;
843
844                 lkb->lkb_wait_time = zero;
845
846                 debug_expired++;
847                 if (us > debug_maxus)
848                         debug_maxus = us;
849
850                 if (!num_nodes) {
851                         num_nodes = ls->ls_num_nodes;
852                         warned = kmalloc(GFP_KERNEL, num_nodes * sizeof(int));
853                         if (warned)
854                                 memset(warned, 0, num_nodes * sizeof(int));
855                 }
856                 if (!warned)
857                         continue;
858                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
859                         continue;
860
861                 log_error(ls, "waitwarn %x %lld %d us check connection to "
862                           "node %d", lkb->lkb_id, (long long)us,
863                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
864         }
865         mutex_unlock(&ls->ls_waiters_mutex);
866
867         if (warned)
868                 kfree(warned);
869
870         if (debug_expired)
871                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
872                           debug_scanned, debug_expired,
873                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
874 }
875
876 /* add/remove lkb from global waiters list of lkb's waiting for
877    a reply from a remote node */
878
879 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
880 {
881         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
882         int error = 0;
883
884         mutex_lock(&ls->ls_waiters_mutex);
885
886         if (is_overlap_unlock(lkb) ||
887             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
888                 error = -EINVAL;
889                 goto out;
890         }
891
892         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
893                 switch (mstype) {
894                 case DLM_MSG_UNLOCK:
895                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
896                         break;
897                 case DLM_MSG_CANCEL:
898                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
899                         break;
900                 default:
901                         error = -EBUSY;
902                         goto out;
903                 }
904                 lkb->lkb_wait_count++;
905                 hold_lkb(lkb);
906
907                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
908                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
909                           lkb->lkb_wait_count, lkb->lkb_flags);
910                 goto out;
911         }
912
913         DLM_ASSERT(!lkb->lkb_wait_count,
914                    dlm_print_lkb(lkb);
915                    printk("wait_count %d\n", lkb->lkb_wait_count););
916
917         lkb->lkb_wait_count++;
918         lkb->lkb_wait_type = mstype;
919         lkb->lkb_wait_time = ktime_get();
920         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
921         hold_lkb(lkb);
922         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
923  out:
924         if (error)
925                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
926                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
927                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
928         mutex_unlock(&ls->ls_waiters_mutex);
929         return error;
930 }
931
932 /* We clear the RESEND flag because we might be taking an lkb off the waiters
933    list as part of process_requestqueue (e.g. a lookup that has an optimized
934    request reply on the requestqueue) between dlm_recover_waiters_pre() which
935    set RESEND and dlm_recover_waiters_post() */
936
937 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
938                                 struct dlm_message *ms)
939 {
940         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
941         int overlap_done = 0;
942
943         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
944                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
945                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
946                 overlap_done = 1;
947                 goto out_del;
948         }
949
950         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
951                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
952                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
953                 overlap_done = 1;
954                 goto out_del;
955         }
956
957         /* Cancel state was preemptively cleared by a successful convert,
958            see next comment, nothing to do. */
959
960         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
961             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
962                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
963                           lkb->lkb_id, lkb->lkb_wait_type);
964                 return -1;
965         }
966
967         /* Remove for the convert reply, and premptively remove for the
968            cancel reply.  A convert has been granted while there's still
969            an outstanding cancel on it (the cancel is moot and the result
970            in the cancel reply should be 0).  We preempt the cancel reply
971            because the app gets the convert result and then can follow up
972            with another op, like convert.  This subsequent op would see the
973            lingering state of the cancel and fail with -EBUSY. */
974
975         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
976             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
977             is_overlap_cancel(lkb) && ms && !ms->m_result) {
978                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
979                           lkb->lkb_id);
980                 lkb->lkb_wait_type = 0;
981                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
982                 lkb->lkb_wait_count--;
983                 goto out_del;
984         }
985
986         /* N.B. type of reply may not always correspond to type of original
987            msg due to lookup->request optimization, verify others? */
988
989         if (lkb->lkb_wait_type) {
990                 lkb->lkb_wait_type = 0;
991                 goto out_del;
992         }
993
994         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
995                   lkb->lkb_id, mstype, lkb->lkb_flags);
996         return -1;
997
998  out_del:
999         /* the force-unlock/cancel has completed and we haven't recvd a reply
1000            to the op that was in progress prior to the unlock/cancel; we
1001            give up on any reply to the earlier op.  FIXME: not sure when/how
1002            this would happen */
1003
1004         if (overlap_done && lkb->lkb_wait_type) {
1005                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1006                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1007                 lkb->lkb_wait_count--;
1008                 lkb->lkb_wait_type = 0;
1009         }
1010
1011         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1012
1013         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1014         lkb->lkb_wait_count--;
1015         if (!lkb->lkb_wait_count)
1016                 list_del_init(&lkb->lkb_wait_reply);
1017         unhold_lkb(lkb);
1018         return 0;
1019 }
1020
1021 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1022 {
1023         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1024         int error;
1025
1026         mutex_lock(&ls->ls_waiters_mutex);
1027         error = _remove_from_waiters(lkb, mstype, NULL);
1028         mutex_unlock(&ls->ls_waiters_mutex);
1029         return error;
1030 }
1031
1032 /* Handles situations where we might be processing a "fake" or "stub" reply in
1033    which we can't try to take waiters_mutex again. */
1034
1035 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1036 {
1037         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038         int error;
1039
1040         if (ms->m_flags != DLM_IFL_STUB_MS)
1041                 mutex_lock(&ls->ls_waiters_mutex);
1042         error = _remove_from_waiters(lkb, ms->m_type, ms);
1043         if (ms->m_flags != DLM_IFL_STUB_MS)
1044                 mutex_unlock(&ls->ls_waiters_mutex);
1045         return error;
1046 }
1047
1048 static void dir_remove(struct dlm_rsb *r)
1049 {
1050         int to_nodeid;
1051
1052         if (dlm_no_directory(r->res_ls))
1053                 return;
1054
1055         to_nodeid = dlm_dir_nodeid(r);
1056         if (to_nodeid != dlm_our_nodeid())
1057                 send_remove(r);
1058         else
1059                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1060                                      r->res_name, r->res_length);
1061 }
1062
1063 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1064    found since they are in order of newest to oldest? */
1065
1066 static int shrink_bucket(struct dlm_ls *ls, int b)
1067 {
1068         struct dlm_rsb *r;
1069         int count = 0, found;
1070
1071         for (;;) {
1072                 found = 0;
1073                 spin_lock(&ls->ls_rsbtbl[b].lock);
1074                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1075                                             res_hashchain) {
1076                         if (!time_after_eq(jiffies, r->res_toss_time +
1077                                            dlm_config.ci_toss_secs * HZ))
1078                                 continue;
1079                         found = 1;
1080                         break;
1081                 }
1082
1083                 if (!found) {
1084                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1085                         break;
1086                 }
1087
1088                 if (kref_put(&r->res_ref, kill_rsb)) {
1089                         list_del(&r->res_hashchain);
1090                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1091
1092                         if (is_master(r))
1093                                 dir_remove(r);
1094                         dlm_free_rsb(r);
1095                         count++;
1096                 } else {
1097                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1098                         log_error(ls, "tossed rsb in use %s", r->res_name);
1099                 }
1100         }
1101
1102         return count;
1103 }
1104
1105 void dlm_scan_rsbs(struct dlm_ls *ls)
1106 {
1107         int i;
1108
1109         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1110                 shrink_bucket(ls, i);
1111                 if (dlm_locking_stopped(ls))
1112                         break;
1113                 cond_resched();
1114         }
1115 }
1116
1117 static void add_timeout(struct dlm_lkb *lkb)
1118 {
1119         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1120
1121         if (is_master_copy(lkb))
1122                 return;
1123
1124         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1125             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1126                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1127                 goto add_it;
1128         }
1129         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1130                 goto add_it;
1131         return;
1132
1133  add_it:
1134         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1135         mutex_lock(&ls->ls_timeout_mutex);
1136         hold_lkb(lkb);
1137         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1138         mutex_unlock(&ls->ls_timeout_mutex);
1139 }
1140
1141 static void del_timeout(struct dlm_lkb *lkb)
1142 {
1143         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1144
1145         mutex_lock(&ls->ls_timeout_mutex);
1146         if (!list_empty(&lkb->lkb_time_list)) {
1147                 list_del_init(&lkb->lkb_time_list);
1148                 unhold_lkb(lkb);
1149         }
1150         mutex_unlock(&ls->ls_timeout_mutex);
1151 }
1152
1153 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1154    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1155    and then lock rsb because of lock ordering in add_timeout.  We may need
1156    to specify some special timeout-related bits in the lkb that are just to
1157    be accessed under the timeout_mutex. */
1158
1159 void dlm_scan_timeout(struct dlm_ls *ls)
1160 {
1161         struct dlm_rsb *r;
1162         struct dlm_lkb *lkb;
1163         int do_cancel, do_warn;
1164         s64 wait_us;
1165
1166         for (;;) {
1167                 if (dlm_locking_stopped(ls))
1168                         break;
1169
1170                 do_cancel = 0;
1171                 do_warn = 0;
1172                 mutex_lock(&ls->ls_timeout_mutex);
1173                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1174
1175                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1176                                                         lkb->lkb_timestamp));
1177
1178                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1179                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1180                                 do_cancel = 1;
1181
1182                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1183                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1184                                 do_warn = 1;
1185
1186                         if (!do_cancel && !do_warn)
1187                                 continue;
1188                         hold_lkb(lkb);
1189                         break;
1190                 }
1191                 mutex_unlock(&ls->ls_timeout_mutex);
1192
1193                 if (!do_cancel && !do_warn)
1194                         break;
1195
1196                 r = lkb->lkb_resource;
1197                 hold_rsb(r);
1198                 lock_rsb(r);
1199
1200                 if (do_warn) {
1201                         /* clear flag so we only warn once */
1202                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1203                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1204                                 del_timeout(lkb);
1205                         dlm_timeout_warn(lkb);
1206                 }
1207
1208                 if (do_cancel) {
1209                         log_debug(ls, "timeout cancel %x node %d %s",
1210                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1211                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1212                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1213                         del_timeout(lkb);
1214                         _cancel_lock(r, lkb);
1215                 }
1216
1217                 unlock_rsb(r);
1218                 unhold_rsb(r);
1219                 dlm_put_lkb(lkb);
1220         }
1221 }
1222
1223 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1224    dlm_recoverd before checking/setting ls_recover_begin. */
1225
1226 void dlm_adjust_timeouts(struct dlm_ls *ls)
1227 {
1228         struct dlm_lkb *lkb;
1229         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1230
1231         ls->ls_recover_begin = 0;
1232         mutex_lock(&ls->ls_timeout_mutex);
1233         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1234                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1235         mutex_unlock(&ls->ls_timeout_mutex);
1236
1237         if (!dlm_config.ci_waitwarn_us)
1238                 return;
1239
1240         mutex_lock(&ls->ls_waiters_mutex);
1241         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1242                 if (ktime_to_us(lkb->lkb_wait_time))
1243                         lkb->lkb_wait_time = ktime_get();
1244         }
1245         mutex_unlock(&ls->ls_waiters_mutex);
1246 }
1247
1248 /* lkb is master or local copy */
1249
1250 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251 {
1252         int b, len = r->res_ls->ls_lvblen;
1253
1254         /* b=1 lvb returned to caller
1255            b=0 lvb written to rsb or invalidated
1256            b=-1 do nothing */
1257
1258         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1259
1260         if (b == 1) {
1261                 if (!lkb->lkb_lvbptr)
1262                         return;
1263
1264                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1265                         return;
1266
1267                 if (!r->res_lvbptr)
1268                         return;
1269
1270                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1271                 lkb->lkb_lvbseq = r->res_lvbseq;
1272
1273         } else if (b == 0) {
1274                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1275                         rsb_set_flag(r, RSB_VALNOTVALID);
1276                         return;
1277                 }
1278
1279                 if (!lkb->lkb_lvbptr)
1280                         return;
1281
1282                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1283                         return;
1284
1285                 if (!r->res_lvbptr)
1286                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1287
1288                 if (!r->res_lvbptr)
1289                         return;
1290
1291                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1292                 r->res_lvbseq++;
1293                 lkb->lkb_lvbseq = r->res_lvbseq;
1294                 rsb_clear_flag(r, RSB_VALNOTVALID);
1295         }
1296
1297         if (rsb_flag(r, RSB_VALNOTVALID))
1298                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1299 }
1300
1301 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1302 {
1303         if (lkb->lkb_grmode < DLM_LOCK_PW)
1304                 return;
1305
1306         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1307                 rsb_set_flag(r, RSB_VALNOTVALID);
1308                 return;
1309         }
1310
1311         if (!lkb->lkb_lvbptr)
1312                 return;
1313
1314         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1315                 return;
1316
1317         if (!r->res_lvbptr)
1318                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1319
1320         if (!r->res_lvbptr)
1321                 return;
1322
1323         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1324         r->res_lvbseq++;
1325         rsb_clear_flag(r, RSB_VALNOTVALID);
1326 }
1327
1328 /* lkb is process copy (pc) */
1329
1330 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1331                             struct dlm_message *ms)
1332 {
1333         int b;
1334
1335         if (!lkb->lkb_lvbptr)
1336                 return;
1337
1338         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1339                 return;
1340
1341         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1342         if (b == 1) {
1343                 int len = receive_extralen(ms);
1344                 if (len > DLM_RESNAME_MAXLEN)
1345                         len = DLM_RESNAME_MAXLEN;
1346                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1347                 lkb->lkb_lvbseq = ms->m_lvbseq;
1348         }
1349 }
1350
1351 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1352    remove_lock -- used for unlock, removes lkb from granted
1353    revert_lock -- used for cancel, moves lkb from convert to granted
1354    grant_lock  -- used for request and convert, adds lkb to granted or
1355                   moves lkb from convert or waiting to granted
1356
1357    Each of these is used for master or local copy lkb's.  There is
1358    also a _pc() variation used to make the corresponding change on
1359    a process copy (pc) lkb. */
1360
1361 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1362 {
1363         del_lkb(r, lkb);
1364         lkb->lkb_grmode = DLM_LOCK_IV;
1365         /* this unhold undoes the original ref from create_lkb()
1366            so this leads to the lkb being freed */
1367         unhold_lkb(lkb);
1368 }
1369
1370 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1371 {
1372         set_lvb_unlock(r, lkb);
1373         _remove_lock(r, lkb);
1374 }
1375
1376 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1377 {
1378         _remove_lock(r, lkb);
1379 }
1380
1381 /* returns: 0 did nothing
1382             1 moved lock to granted
1383            -1 removed lock */
1384
1385 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1386 {
1387         int rv = 0;
1388
1389         lkb->lkb_rqmode = DLM_LOCK_IV;
1390
1391         switch (lkb->lkb_status) {
1392         case DLM_LKSTS_GRANTED:
1393                 break;
1394         case DLM_LKSTS_CONVERT:
1395                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1396                 rv = 1;
1397                 break;
1398         case DLM_LKSTS_WAITING:
1399                 del_lkb(r, lkb);
1400                 lkb->lkb_grmode = DLM_LOCK_IV;
1401                 /* this unhold undoes the original ref from create_lkb()
1402                    so this leads to the lkb being freed */
1403                 unhold_lkb(lkb);
1404                 rv = -1;
1405                 break;
1406         default:
1407                 log_print("invalid status for revert %d", lkb->lkb_status);
1408         }
1409         return rv;
1410 }
1411
1412 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1413 {
1414         return revert_lock(r, lkb);
1415 }
1416
1417 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1418 {
1419         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1420                 lkb->lkb_grmode = lkb->lkb_rqmode;
1421                 if (lkb->lkb_status)
1422                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1423                 else
1424                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1425         }
1426
1427         lkb->lkb_rqmode = DLM_LOCK_IV;
1428 }
1429
1430 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1431 {
1432         set_lvb_lock(r, lkb);
1433         _grant_lock(r, lkb);
1434         lkb->lkb_highbast = 0;
1435 }
1436
1437 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1438                           struct dlm_message *ms)
1439 {
1440         set_lvb_lock_pc(r, lkb, ms);
1441         _grant_lock(r, lkb);
1442 }
1443
1444 /* called by grant_pending_locks() which means an async grant message must
1445    be sent to the requesting node in addition to granting the lock if the
1446    lkb belongs to a remote node. */
1447
1448 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1449 {
1450         grant_lock(r, lkb);
1451         if (is_master_copy(lkb))
1452                 send_grant(r, lkb);
1453         else
1454                 queue_cast(r, lkb, 0);
1455 }
1456
1457 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1458    change the granted/requested modes.  We're munging things accordingly in
1459    the process copy.
1460    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1461    conversion deadlock
1462    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1463    compatible with other granted locks */
1464
1465 static void munge_demoted(struct dlm_lkb *lkb)
1466 {
1467         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1468                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1469                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1470                 return;
1471         }
1472
1473         lkb->lkb_grmode = DLM_LOCK_NL;
1474 }
1475
1476 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1477 {
1478         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1479             ms->m_type != DLM_MSG_GRANT) {
1480                 log_print("munge_altmode %x invalid reply type %d",
1481                           lkb->lkb_id, ms->m_type);
1482                 return;
1483         }
1484
1485         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1486                 lkb->lkb_rqmode = DLM_LOCK_PR;
1487         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1488                 lkb->lkb_rqmode = DLM_LOCK_CW;
1489         else {
1490                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1491                 dlm_print_lkb(lkb);
1492         }
1493 }
1494
1495 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1496 {
1497         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1498                                            lkb_statequeue);
1499         if (lkb->lkb_id == first->lkb_id)
1500                 return 1;
1501
1502         return 0;
1503 }
1504
1505 /* Check if the given lkb conflicts with another lkb on the queue. */
1506
1507 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1508 {
1509         struct dlm_lkb *this;
1510
1511         list_for_each_entry(this, head, lkb_statequeue) {
1512                 if (this == lkb)
1513                         continue;
1514                 if (!modes_compat(this, lkb))
1515                         return 1;
1516         }
1517         return 0;
1518 }
1519
1520 /*
1521  * "A conversion deadlock arises with a pair of lock requests in the converting
1522  * queue for one resource.  The granted mode of each lock blocks the requested
1523  * mode of the other lock."
1524  *
1525  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1526  * convert queue from being granted, then deadlk/demote lkb.
1527  *
1528  * Example:
1529  * Granted Queue: empty
1530  * Convert Queue: NL->EX (first lock)
1531  *                PR->EX (second lock)
1532  *
1533  * The first lock can't be granted because of the granted mode of the second
1534  * lock and the second lock can't be granted because it's not first in the
1535  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1536  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1537  * flag set and return DEMOTED in the lksb flags.
1538  *
1539  * Originally, this function detected conv-deadlk in a more limited scope:
1540  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1541  * - if lkb1 was the first entry in the queue (not just earlier), and was
1542  *   blocked by the granted mode of lkb2, and there was nothing on the
1543  *   granted queue preventing lkb1 from being granted immediately, i.e.
1544  *   lkb2 was the only thing preventing lkb1 from being granted.
1545  *
1546  * That second condition meant we'd only say there was conv-deadlk if
1547  * resolving it (by demotion) would lead to the first lock on the convert
1548  * queue being granted right away.  It allowed conversion deadlocks to exist
1549  * between locks on the convert queue while they couldn't be granted anyway.
1550  *
1551  * Now, we detect and take action on conversion deadlocks immediately when
1552  * they're created, even if they may not be immediately consequential.  If
1553  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1554  * mode that would prevent lkb1's conversion from being granted, we do a
1555  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1556  * I think this means that the lkb_is_ahead condition below should always
1557  * be zero, i.e. there will never be conv-deadlk between two locks that are
1558  * both already on the convert queue.
1559  */
1560
1561 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1562 {
1563         struct dlm_lkb *lkb1;
1564         int lkb_is_ahead = 0;
1565
1566         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1567                 if (lkb1 == lkb2) {
1568                         lkb_is_ahead = 1;
1569                         continue;
1570                 }
1571
1572                 if (!lkb_is_ahead) {
1573                         if (!modes_compat(lkb2, lkb1))
1574                                 return 1;
1575                 } else {
1576                         if (!modes_compat(lkb2, lkb1) &&
1577                             !modes_compat(lkb1, lkb2))
1578                                 return 1;
1579                 }
1580         }
1581         return 0;
1582 }
1583
1584 /*
1585  * Return 1 if the lock can be granted, 0 otherwise.
1586  * Also detect and resolve conversion deadlocks.
1587  *
1588  * lkb is the lock to be granted
1589  *
1590  * now is 1 if the function is being called in the context of the
1591  * immediate request, it is 0 if called later, after the lock has been
1592  * queued.
1593  *
1594  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1595  */
1596
1597 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1598 {
1599         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1600
1601         /*
1602          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1603          * a new request for a NL mode lock being blocked.
1604          *
1605          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1606          * request, then it would be granted.  In essence, the use of this flag
1607          * tells the Lock Manager to expedite theis request by not considering
1608          * what may be in the CONVERTING or WAITING queues...  As of this
1609          * writing, the EXPEDITE flag can be used only with new requests for NL
1610          * mode locks.  This flag is not valid for conversion requests.
1611          *
1612          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1613          * conversion or used with a non-NL requested mode.  We also know an
1614          * EXPEDITE request is always granted immediately, so now must always
1615          * be 1.  The full condition to grant an expedite request: (now &&
1616          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1617          * therefore be shortened to just checking the flag.
1618          */
1619
1620         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1621                 return 1;
1622
1623         /*
1624          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1625          * added to the remaining conditions.
1626          */
1627
1628         if (queue_conflict(&r->res_grantqueue, lkb))
1629                 goto out;
1630
1631         /*
1632          * 6-3: By default, a conversion request is immediately granted if the
1633          * requested mode is compatible with the modes of all other granted
1634          * locks
1635          */
1636
1637         if (queue_conflict(&r->res_convertqueue, lkb))
1638                 goto out;
1639
1640         /*
1641          * 6-5: But the default algorithm for deciding whether to grant or
1642          * queue conversion requests does not by itself guarantee that such
1643          * requests are serviced on a "first come first serve" basis.  This, in
1644          * turn, can lead to a phenomenon known as "indefinate postponement".
1645          *
1646          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1647          * the system service employed to request a lock conversion.  This flag
1648          * forces certain conversion requests to be queued, even if they are
1649          * compatible with the granted modes of other locks on the same
1650          * resource.  Thus, the use of this flag results in conversion requests
1651          * being ordered on a "first come first servce" basis.
1652          *
1653          * DCT: This condition is all about new conversions being able to occur
1654          * "in place" while the lock remains on the granted queue (assuming
1655          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1656          * doesn't _have_ to go onto the convert queue where it's processed in
1657          * order.  The "now" variable is necessary to distinguish converts
1658          * being received and processed for the first time now, because once a
1659          * convert is moved to the conversion queue the condition below applies
1660          * requiring fifo granting.
1661          */
1662
1663         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1664                 return 1;
1665
1666         /*
1667          * The NOORDER flag is set to avoid the standard vms rules on grant
1668          * order.
1669          */
1670
1671         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1672                 return 1;
1673
1674         /*
1675          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1676          * granted until all other conversion requests ahead of it are granted
1677          * and/or canceled.
1678          */
1679
1680         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1681                 return 1;
1682
1683         /*
1684          * 6-4: By default, a new request is immediately granted only if all
1685          * three of the following conditions are satisfied when the request is
1686          * issued:
1687          * - The queue of ungranted conversion requests for the resource is
1688          *   empty.
1689          * - The queue of ungranted new requests for the resource is empty.
1690          * - The mode of the new request is compatible with the most
1691          *   restrictive mode of all granted locks on the resource.
1692          */
1693
1694         if (now && !conv && list_empty(&r->res_convertqueue) &&
1695             list_empty(&r->res_waitqueue))
1696                 return 1;
1697
1698         /*
1699          * 6-4: Once a lock request is in the queue of ungranted new requests,
1700          * it cannot be granted until the queue of ungranted conversion
1701          * requests is empty, all ungranted new requests ahead of it are
1702          * granted and/or canceled, and it is compatible with the granted mode
1703          * of the most restrictive lock granted on the resource.
1704          */
1705
1706         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1707             first_in_list(lkb, &r->res_waitqueue))
1708                 return 1;
1709  out:
1710         return 0;
1711 }
1712
1713 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1714                           int *err)
1715 {
1716         int rv;
1717         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1718         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1719
1720         if (err)
1721                 *err = 0;
1722
1723         rv = _can_be_granted(r, lkb, now);
1724         if (rv)
1725                 goto out;
1726
1727         /*
1728          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1729          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1730          * cancels one of the locks.
1731          */
1732
1733         if (is_convert && can_be_queued(lkb) &&
1734             conversion_deadlock_detect(r, lkb)) {
1735                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1736                         lkb->lkb_grmode = DLM_LOCK_NL;
1737                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1738                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1739                         if (err)
1740                                 *err = -EDEADLK;
1741                         else {
1742                                 log_print("can_be_granted deadlock %x now %d",
1743                                           lkb->lkb_id, now);
1744                                 dlm_dump_rsb(r);
1745                         }
1746                 }
1747                 goto out;
1748         }
1749
1750         /*
1751          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1752          * to grant a request in a mode other than the normal rqmode.  It's a
1753          * simple way to provide a big optimization to applications that can
1754          * use them.
1755          */
1756
1757         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1758                 alt = DLM_LOCK_PR;
1759         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1760                 alt = DLM_LOCK_CW;
1761
1762         if (alt) {
1763                 lkb->lkb_rqmode = alt;
1764                 rv = _can_be_granted(r, lkb, now);
1765                 if (rv)
1766                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1767                 else
1768                         lkb->lkb_rqmode = rqmode;
1769         }
1770  out:
1771         return rv;
1772 }
1773
1774 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1775    for locks pending on the convert list.  Once verified (watch for these
1776    log_prints), we should be able to just call _can_be_granted() and not
1777    bother with the demote/deadlk cases here (and there's no easy way to deal
1778    with a deadlk here, we'd have to generate something like grant_lock with
1779    the deadlk error.) */
1780
1781 /* Returns the highest requested mode of all blocked conversions; sets
1782    cw if there's a blocked conversion to DLM_LOCK_CW. */
1783
1784 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1785 {
1786         struct dlm_lkb *lkb, *s;
1787         int hi, demoted, quit, grant_restart, demote_restart;
1788         int deadlk;
1789
1790         quit = 0;
1791  restart:
1792         grant_restart = 0;
1793         demote_restart = 0;
1794         hi = DLM_LOCK_IV;
1795
1796         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1797                 demoted = is_demoted(lkb);
1798                 deadlk = 0;
1799
1800                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1801                         grant_lock_pending(r, lkb);
1802                         grant_restart = 1;
1803                         continue;
1804                 }
1805
1806                 if (!demoted && is_demoted(lkb)) {
1807                         log_print("WARN: pending demoted %x node %d %s",
1808                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1809                         demote_restart = 1;
1810                         continue;
1811                 }
1812
1813                 if (deadlk) {
1814                         log_print("WARN: pending deadlock %x node %d %s",
1815                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1816                         dlm_dump_rsb(r);
1817                         continue;
1818                 }
1819
1820                 hi = max_t(int, lkb->lkb_rqmode, hi);
1821
1822                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1823                         *cw = 1;
1824         }
1825
1826         if (grant_restart)
1827                 goto restart;
1828         if (demote_restart && !quit) {
1829                 quit = 1;
1830                 goto restart;
1831         }
1832
1833         return max_t(int, high, hi);
1834 }
1835
1836 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1837 {
1838         struct dlm_lkb *lkb, *s;
1839
1840         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1841                 if (can_be_granted(r, lkb, 0, NULL))
1842                         grant_lock_pending(r, lkb);
1843                 else {
1844                         high = max_t(int, lkb->lkb_rqmode, high);
1845                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1846                                 *cw = 1;
1847                 }
1848         }
1849
1850         return high;
1851 }
1852
1853 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1854    on either the convert or waiting queue.
1855    high is the largest rqmode of all locks blocked on the convert or
1856    waiting queue. */
1857
1858 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1859 {
1860         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1861                 if (gr->lkb_highbast < DLM_LOCK_EX)
1862                         return 1;
1863                 return 0;
1864         }
1865
1866         if (gr->lkb_highbast < high &&
1867             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1868                 return 1;
1869         return 0;
1870 }
1871
1872 static void grant_pending_locks(struct dlm_rsb *r)
1873 {
1874         struct dlm_lkb *lkb, *s;
1875         int high = DLM_LOCK_IV;
1876         int cw = 0;
1877
1878         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1879
1880         high = grant_pending_convert(r, high, &cw);
1881         high = grant_pending_wait(r, high, &cw);
1882
1883         if (high == DLM_LOCK_IV)
1884                 return;
1885
1886         /*
1887          * If there are locks left on the wait/convert queue then send blocking
1888          * ASTs to granted locks based on the largest requested mode (high)
1889          * found above.
1890          */
1891
1892         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1893                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1894                         if (cw && high == DLM_LOCK_PR &&
1895                             lkb->lkb_grmode == DLM_LOCK_PR)
1896                                 queue_bast(r, lkb, DLM_LOCK_CW);
1897                         else
1898                                 queue_bast(r, lkb, high);
1899                         lkb->lkb_highbast = high;
1900                 }
1901         }
1902 }
1903
1904 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1905 {
1906         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1907             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1908                 if (gr->lkb_highbast < DLM_LOCK_EX)
1909                         return 1;
1910                 return 0;
1911         }
1912
1913         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1914                 return 1;
1915         return 0;
1916 }
1917
1918 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1919                             struct dlm_lkb *lkb)
1920 {
1921         struct dlm_lkb *gr;
1922
1923         list_for_each_entry(gr, head, lkb_statequeue) {
1924                 /* skip self when sending basts to convertqueue */
1925                 if (gr == lkb)
1926                         continue;
1927                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1928                         queue_bast(r, gr, lkb->lkb_rqmode);
1929                         gr->lkb_highbast = lkb->lkb_rqmode;
1930                 }
1931         }
1932 }
1933
1934 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1935 {
1936         send_bast_queue(r, &r->res_grantqueue, lkb);
1937 }
1938
1939 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1940 {
1941         send_bast_queue(r, &r->res_grantqueue, lkb);
1942         send_bast_queue(r, &r->res_convertqueue, lkb);
1943 }
1944
1945 /* set_master(r, lkb) -- set the master nodeid of a resource
1946
1947    The purpose of this function is to set the nodeid field in the given
1948    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1949    known, it can just be copied to the lkb and the function will return
1950    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1951    before it can be copied to the lkb.
1952
1953    When the rsb nodeid is being looked up remotely, the initial lkb
1954    causing the lookup is kept on the ls_waiters list waiting for the
1955    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1956    on the rsb's res_lookup list until the master is verified.
1957
1958    Return values:
1959    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1960    1: the rsb master is not available and the lkb has been placed on
1961       a wait queue
1962 */
1963
1964 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1965 {
1966         struct dlm_ls *ls = r->res_ls;
1967         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1968
1969         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1970                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1971                 r->res_first_lkid = lkb->lkb_id;
1972                 lkb->lkb_nodeid = r->res_nodeid;
1973                 return 0;
1974         }
1975
1976         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1977                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1978                 return 1;
1979         }
1980
1981         if (r->res_nodeid == 0) {
1982                 lkb->lkb_nodeid = 0;
1983                 return 0;
1984         }
1985
1986         if (r->res_nodeid > 0) {
1987                 lkb->lkb_nodeid = r->res_nodeid;
1988                 return 0;
1989         }
1990
1991         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1992
1993         dir_nodeid = dlm_dir_nodeid(r);
1994
1995         if (dir_nodeid != our_nodeid) {
1996                 r->res_first_lkid = lkb->lkb_id;
1997                 send_lookup(r, lkb);
1998                 return 1;
1999         }
2000
2001         for (i = 0; i < 2; i++) {
2002                 /* It's possible for dlm_scand to remove an old rsb for
2003                    this same resource from the toss list, us to create
2004                    a new one, look up the master locally, and find it
2005                    already exists just before dlm_scand does the
2006                    dir_remove() on the previous rsb. */
2007
2008                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2009                                        r->res_length, &ret_nodeid);
2010                 if (!error)
2011                         break;
2012                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2013                 schedule();
2014         }
2015         if (error && error != -EEXIST)
2016                 return error;
2017
2018         if (ret_nodeid == our_nodeid) {
2019                 r->res_first_lkid = 0;
2020                 r->res_nodeid = 0;
2021                 lkb->lkb_nodeid = 0;
2022         } else {
2023                 r->res_first_lkid = lkb->lkb_id;
2024                 r->res_nodeid = ret_nodeid;
2025                 lkb->lkb_nodeid = ret_nodeid;
2026         }
2027         return 0;
2028 }
2029
2030 static void process_lookup_list(struct dlm_rsb *r)
2031 {
2032         struct dlm_lkb *lkb, *safe;
2033
2034         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2035                 list_del_init(&lkb->lkb_rsb_lookup);
2036                 _request_lock(r, lkb);
2037                 schedule();
2038         }
2039 }
2040
2041 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2042
2043 static void confirm_master(struct dlm_rsb *r, int error)
2044 {
2045         struct dlm_lkb *lkb;
2046
2047         if (!r->res_first_lkid)
2048                 return;
2049
2050         switch (error) {
2051         case 0:
2052         case -EINPROGRESS:
2053                 r->res_first_lkid = 0;
2054                 process_lookup_list(r);
2055                 break;
2056
2057         case -EAGAIN:
2058         case -EBADR:
2059         case -ENOTBLK:
2060                 /* the remote request failed and won't be retried (it was
2061                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2062                    lkb the first_lkid */
2063
2064                 r->res_first_lkid = 0;
2065
2066                 if (!list_empty(&r->res_lookup)) {
2067                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2068                                          lkb_rsb_lookup);
2069                         list_del_init(&lkb->lkb_rsb_lookup);
2070                         r->res_first_lkid = lkb->lkb_id;
2071                         _request_lock(r, lkb);
2072                 }
2073                 break;
2074
2075         default:
2076                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2077         }
2078 }
2079
2080 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2081                          int namelen, unsigned long timeout_cs,
2082                          void (*ast) (void *astparam),
2083                          void *astparam,
2084                          void (*bast) (void *astparam, int mode),
2085                          struct dlm_args *args)
2086 {
2087         int rv = -EINVAL;
2088
2089         /* check for invalid arg usage */
2090
2091         if (mode < 0 || mode > DLM_LOCK_EX)
2092                 goto out;
2093
2094         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2095                 goto out;
2096
2097         if (flags & DLM_LKF_CANCEL)
2098                 goto out;
2099
2100         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2101                 goto out;
2102
2103         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2104                 goto out;
2105
2106         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2107                 goto out;
2108
2109         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2110                 goto out;
2111
2112         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2113                 goto out;
2114
2115         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2116                 goto out;
2117
2118         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2119                 goto out;
2120
2121         if (!ast || !lksb)
2122                 goto out;
2123
2124         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2125                 goto out;
2126
2127         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2128                 goto out;
2129
2130         /* these args will be copied to the lkb in validate_lock_args,
2131            it cannot be done now because when converting locks, fields in
2132            an active lkb cannot be modified before locking the rsb */
2133
2134         args->flags = flags;
2135         args->astfn = ast;
2136         args->astparam = astparam;
2137         args->bastfn = bast;
2138         args->timeout = timeout_cs;
2139         args->mode = mode;
2140         args->lksb = lksb;
2141         rv = 0;
2142  out:
2143         return rv;
2144 }
2145
2146 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2147 {
2148         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2149                       DLM_LKF_FORCEUNLOCK))
2150                 return -EINVAL;
2151
2152         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2153                 return -EINVAL;
2154
2155         args->flags = flags;
2156         args->astparam = astarg;
2157         return 0;
2158 }
2159
2160 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2161                               struct dlm_args *args)
2162 {
2163         int rv = -EINVAL;
2164
2165         if (args->flags & DLM_LKF_CONVERT) {
2166                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2167                         goto out;
2168
2169                 if (args->flags & DLM_LKF_QUECVT &&
2170                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2171                         goto out;
2172
2173                 rv = -EBUSY;
2174                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2175                         goto out;
2176
2177                 if (lkb->lkb_wait_type)
2178                         goto out;
2179
2180                 if (is_overlap(lkb))
2181                         goto out;
2182         }
2183
2184         lkb->lkb_exflags = args->flags;
2185         lkb->lkb_sbflags = 0;
2186         lkb->lkb_astfn = args->astfn;
2187         lkb->lkb_astparam = args->astparam;
2188         lkb->lkb_bastfn = args->bastfn;
2189         lkb->lkb_rqmode = args->mode;
2190         lkb->lkb_lksb = args->lksb;
2191         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2192         lkb->lkb_ownpid = (int) current->pid;
2193         lkb->lkb_timeout_cs = args->timeout;
2194         rv = 0;
2195  out:
2196         if (rv)
2197                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2198                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2199                           lkb->lkb_status, lkb->lkb_wait_type,
2200                           lkb->lkb_resource->res_name);
2201         return rv;
2202 }
2203
2204 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2205    for success */
2206
2207 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2208    because there may be a lookup in progress and it's valid to do
2209    cancel/unlockf on it */
2210
2211 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2212 {
2213         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2214         int rv = -EINVAL;
2215
2216         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2217                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2218                 dlm_print_lkb(lkb);
2219                 goto out;
2220         }
2221
2222         /* an lkb may still exist even though the lock is EOL'ed due to a
2223            cancel, unlock or failed noqueue request; an app can't use these
2224            locks; return same error as if the lkid had not been found at all */
2225
2226         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2227                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2228                 rv = -ENOENT;
2229                 goto out;
2230         }
2231
2232         /* an lkb may be waiting for an rsb lookup to complete where the
2233            lookup was initiated by another lock */
2234
2235         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2236                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2237                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2238                         list_del_init(&lkb->lkb_rsb_lookup);
2239                         queue_cast(lkb->lkb_resource, lkb,
2240                                    args->flags & DLM_LKF_CANCEL ?
2241                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2242                         unhold_lkb(lkb); /* undoes create_lkb() */
2243                 }
2244                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2245                 rv = -EBUSY;
2246                 goto out;
2247         }
2248
2249         /* cancel not allowed with another cancel/unlock in progress */
2250
2251         if (args->flags & DLM_LKF_CANCEL) {
2252                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2253                         goto out;
2254
2255                 if (is_overlap(lkb))
2256                         goto out;
2257
2258                 /* don't let scand try to do a cancel */
2259                 del_timeout(lkb);
2260
2261                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2262                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2263                         rv = -EBUSY;
2264                         goto out;
2265                 }
2266
2267                 /* there's nothing to cancel */
2268                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2269                     !lkb->lkb_wait_type) {
2270                         rv = -EBUSY;
2271                         goto out;
2272                 }
2273
2274                 switch (lkb->lkb_wait_type) {
2275                 case DLM_MSG_LOOKUP:
2276                 case DLM_MSG_REQUEST:
2277                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2278                         rv = -EBUSY;
2279                         goto out;
2280                 case DLM_MSG_UNLOCK:
2281                 case DLM_MSG_CANCEL:
2282                         goto out;
2283                 }
2284                 /* add_to_waiters() will set OVERLAP_CANCEL */
2285                 goto out_ok;
2286         }
2287
2288         /* do we need to allow a force-unlock if there's a normal unlock
2289            already in progress?  in what conditions could the normal unlock
2290            fail such that we'd want to send a force-unlock to be sure? */
2291
2292         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2293                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2294                         goto out;
2295
2296                 if (is_overlap_unlock(lkb))
2297                         goto out;
2298
2299                 /* don't let scand try to do a cancel */
2300                 del_timeout(lkb);
2301
2302                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2303                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2304                         rv = -EBUSY;
2305                         goto out;
2306                 }
2307
2308                 switch (lkb->lkb_wait_type) {
2309                 case DLM_MSG_LOOKUP:
2310                 case DLM_MSG_REQUEST:
2311                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2312                         rv = -EBUSY;
2313                         goto out;
2314                 case DLM_MSG_UNLOCK:
2315                         goto out;
2316                 }
2317                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2318                 goto out_ok;
2319         }
2320
2321         /* normal unlock not allowed if there's any op in progress */
2322         rv = -EBUSY;
2323         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2324                 goto out;
2325
2326  out_ok:
2327         /* an overlapping op shouldn't blow away exflags from other op */
2328         lkb->lkb_exflags |= args->flags;
2329         lkb->lkb_sbflags = 0;
2330         lkb->lkb_astparam = args->astparam;
2331         rv = 0;
2332  out:
2333         if (rv)
2334                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2335                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2336                           args->flags, lkb->lkb_wait_type,
2337                           lkb->lkb_resource->res_name);
2338         return rv;
2339 }
2340
2341 /*
2342  * Four stage 4 varieties:
2343  * do_request(), do_convert(), do_unlock(), do_cancel()
2344  * These are called on the master node for the given lock and
2345  * from the central locking logic.
2346  */
2347
2348 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2349 {
2350         int error = 0;
2351
2352         if (can_be_granted(r, lkb, 1, NULL)) {
2353                 grant_lock(r, lkb);
2354                 queue_cast(r, lkb, 0);
2355                 goto out;
2356         }
2357
2358         if (can_be_queued(lkb)) {
2359                 error = -EINPROGRESS;
2360                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2361                 add_timeout(lkb);
2362                 goto out;
2363         }
2364
2365         error = -EAGAIN;
2366         queue_cast(r, lkb, -EAGAIN);
2367  out:
2368         return error;
2369 }
2370
2371 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2372                                int error)
2373 {
2374         switch (error) {
2375         case -EAGAIN:
2376                 if (force_blocking_asts(lkb))
2377                         send_blocking_asts_all(r, lkb);
2378                 break;
2379         case -EINPROGRESS:
2380                 send_blocking_asts(r, lkb);
2381                 break;
2382         }
2383 }
2384
2385 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2386 {
2387         int error = 0;
2388         int deadlk = 0;
2389
2390         /* changing an existing lock may allow others to be granted */
2391
2392         if (can_be_granted(r, lkb, 1, &deadlk)) {
2393                 grant_lock(r, lkb);
2394                 queue_cast(r, lkb, 0);
2395                 goto out;
2396         }
2397
2398         /* can_be_granted() detected that this lock would block in a conversion
2399            deadlock, so we leave it on the granted queue and return EDEADLK in
2400            the ast for the convert. */
2401
2402         if (deadlk) {
2403                 /* it's left on the granted queue */
2404                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2405                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2406                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2407                 revert_lock(r, lkb);
2408                 queue_cast(r, lkb, -EDEADLK);
2409                 error = -EDEADLK;
2410                 goto out;
2411         }
2412
2413         /* is_demoted() means the can_be_granted() above set the grmode
2414            to NL, and left us on the granted queue.  This auto-demotion
2415            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2416            now grantable.  We have to try to grant other converting locks
2417            before we try again to grant this one. */
2418
2419         if (is_demoted(lkb)) {
2420                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2421                 if (_can_be_granted(r, lkb, 1)) {
2422                         grant_lock(r, lkb);
2423                         queue_cast(r, lkb, 0);
2424                         goto out;
2425                 }
2426                 /* else fall through and move to convert queue */
2427         }
2428
2429         if (can_be_queued(lkb)) {
2430                 error = -EINPROGRESS;
2431                 del_lkb(r, lkb);
2432                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2433                 add_timeout(lkb);
2434                 goto out;
2435         }
2436
2437         error = -EAGAIN;
2438         queue_cast(r, lkb, -EAGAIN);
2439  out:
2440         return error;
2441 }
2442
2443 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2444                                int error)
2445 {
2446         switch (error) {
2447         case 0:
2448                 grant_pending_locks(r);
2449                 /* grant_pending_locks also sends basts */
2450                 break;
2451         case -EAGAIN:
2452                 if (force_blocking_asts(lkb))
2453                         send_blocking_asts_all(r, lkb);
2454                 break;
2455         case -EINPROGRESS:
2456                 send_blocking_asts(r, lkb);
2457                 break;
2458         }
2459 }
2460
2461 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2462 {
2463         remove_lock(r, lkb);
2464         queue_cast(r, lkb, -DLM_EUNLOCK);
2465         return -DLM_EUNLOCK;
2466 }
2467
2468 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2469                               int error)
2470 {
2471         grant_pending_locks(r);
2472 }
2473
2474 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2475  
2476 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2477 {
2478         int error;
2479
2480         error = revert_lock(r, lkb);
2481         if (error) {
2482                 queue_cast(r, lkb, -DLM_ECANCEL);
2483                 return -DLM_ECANCEL;
2484         }
2485         return 0;
2486 }
2487
2488 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2489                               int error)
2490 {
2491         if (error)
2492                 grant_pending_locks(r);
2493 }
2494
2495 /*
2496  * Four stage 3 varieties:
2497  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2498  */
2499
2500 /* add a new lkb to a possibly new rsb, called by requesting process */
2501
2502 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2503 {
2504         int error;
2505
2506         /* set_master: sets lkb nodeid from r */
2507
2508         error = set_master(r, lkb);
2509         if (error < 0)
2510                 goto out;
2511         if (error) {
2512                 error = 0;
2513                 goto out;
2514         }
2515
2516         if (is_remote(r)) {
2517                 /* receive_request() calls do_request() on remote node */
2518                 error = send_request(r, lkb);
2519         } else {
2520                 error = do_request(r, lkb);
2521                 /* for remote locks the request_reply is sent
2522                    between do_request and do_request_effects */
2523                 do_request_effects(r, lkb, error);
2524         }
2525  out:
2526         return error;
2527 }
2528
2529 /* change some property of an existing lkb, e.g. mode */
2530
2531 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2532 {
2533         int error;
2534
2535         if (is_remote(r)) {
2536                 /* receive_convert() calls do_convert() on remote node */
2537                 error = send_convert(r, lkb);
2538         } else {
2539                 error = do_convert(r, lkb);
2540                 /* for remote locks the convert_reply is sent
2541                    between do_convert and do_convert_effects */
2542                 do_convert_effects(r, lkb, error);
2543         }
2544
2545         return error;
2546 }
2547
2548 /* remove an existing lkb from the granted queue */
2549
2550 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552         int error;
2553
2554         if (is_remote(r)) {
2555                 /* receive_unlock() calls do_unlock() on remote node */
2556                 error = send_unlock(r, lkb);
2557         } else {
2558                 error = do_unlock(r, lkb);
2559                 /* for remote locks the unlock_reply is sent
2560                    between do_unlock and do_unlock_effects */
2561                 do_unlock_effects(r, lkb, error);
2562         }
2563
2564         return error;
2565 }
2566
2567 /* remove an existing lkb from the convert or wait queue */
2568
2569 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571         int error;
2572
2573         if (is_remote(r)) {
2574                 /* receive_cancel() calls do_cancel() on remote node */
2575                 error = send_cancel(r, lkb);
2576         } else {
2577                 error = do_cancel(r, lkb);
2578                 /* for remote locks the cancel_reply is sent
2579                    between do_cancel and do_cancel_effects */
2580                 do_cancel_effects(r, lkb, error);
2581         }
2582
2583         return error;
2584 }
2585
2586 /*
2587  * Four stage 2 varieties:
2588  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2589  */
2590
2591 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2592                         int len, struct dlm_args *args)
2593 {
2594         struct dlm_rsb *r;
2595         int error;
2596
2597         error = validate_lock_args(ls, lkb, args);
2598         if (error)
2599                 goto out;
2600
2601         error = find_rsb(ls, name, len, R_CREATE, &r);
2602         if (error)
2603                 goto out;
2604
2605         lock_rsb(r);
2606
2607         attach_lkb(r, lkb);
2608         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2609
2610         error = _request_lock(r, lkb);
2611
2612         unlock_rsb(r);
2613         put_rsb(r);
2614
2615  out:
2616         return error;
2617 }
2618
2619 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2620                         struct dlm_args *args)
2621 {
2622         struct dlm_rsb *r;
2623         int error;
2624
2625         r = lkb->lkb_resource;
2626
2627         hold_rsb(r);
2628         lock_rsb(r);
2629
2630         error = validate_lock_args(ls, lkb, args);
2631         if (error)
2632                 goto out;
2633
2634         error = _convert_lock(r, lkb);
2635  out:
2636         unlock_rsb(r);
2637         put_rsb(r);
2638         return error;
2639 }
2640
2641 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2642                        struct dlm_args *args)
2643 {
2644         struct dlm_rsb *r;
2645         int error;
2646
2647         r = lkb->lkb_resource;
2648
2649         hold_rsb(r);
2650         lock_rsb(r);
2651
2652         error = validate_unlock_args(lkb, args);
2653         if (error)
2654                 goto out;
2655
2656         error = _unlock_lock(r, lkb);
2657  out:
2658         unlock_rsb(r);
2659         put_rsb(r);
2660         return error;
2661 }
2662
2663 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2664                        struct dlm_args *args)
2665 {
2666         struct dlm_rsb *r;
2667         int error;
2668
2669         r = lkb->lkb_resource;
2670
2671         hold_rsb(r);
2672         lock_rsb(r);
2673
2674         error = validate_unlock_args(lkb, args);
2675         if (error)
2676                 goto out;
2677
2678         error = _cancel_lock(r, lkb);
2679  out:
2680         unlock_rsb(r);
2681         put_rsb(r);
2682         return error;
2683 }
2684
2685 /*
2686  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2687  */
2688
2689 int dlm_lock(dlm_lockspace_t *lockspace,
2690              int mode,
2691              struct dlm_lksb *lksb,
2692              uint32_t flags,
2693              void *name,
2694              unsigned int namelen,
2695              uint32_t parent_lkid,
2696              void (*ast) (void *astarg),
2697              void *astarg,
2698              void (*bast) (void *astarg, int mode))
2699 {
2700         struct dlm_ls *ls;
2701         struct dlm_lkb *lkb;
2702         struct dlm_args args;
2703         int error, convert = flags & DLM_LKF_CONVERT;
2704
2705         ls = dlm_find_lockspace_local(lockspace);
2706         if (!ls)
2707                 return -EINVAL;
2708
2709         dlm_lock_recovery(ls);
2710
2711         if (convert)
2712                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2713         else
2714                 error = create_lkb(ls, &lkb);
2715
2716         if (error)
2717                 goto out;
2718
2719         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2720                               astarg, bast, &args);
2721         if (error)
2722                 goto out_put;
2723
2724         if (convert)
2725                 error = convert_lock(ls, lkb, &args);
2726         else
2727                 error = request_lock(ls, lkb, name, namelen, &args);
2728
2729         if (error == -EINPROGRESS)
2730                 error = 0;
2731  out_put:
2732         if (convert || error)
2733                 __put_lkb(ls, lkb);
2734         if (error == -EAGAIN || error == -EDEADLK)
2735                 error = 0;
2736  out:
2737         dlm_unlock_recovery(ls);
2738         dlm_put_lockspace(ls);
2739         return error;
2740 }
2741
2742 int dlm_unlock(dlm_lockspace_t *lockspace,
2743                uint32_t lkid,
2744                uint32_t flags,
2745                struct dlm_lksb *lksb,
2746                void *astarg)
2747 {
2748         struct dlm_ls *ls;
2749         struct dlm_lkb *lkb;
2750         struct dlm_args args;
2751         int error;
2752
2753         ls = dlm_find_lockspace_local(lockspace);
2754         if (!ls)
2755                 return -EINVAL;
2756
2757         dlm_lock_recovery(ls);
2758
2759         error = find_lkb(ls, lkid, &lkb);
2760         if (error)
2761                 goto out;
2762
2763         error = set_unlock_args(flags, astarg, &args);
2764         if (error)
2765                 goto out_put;
2766
2767         if (flags & DLM_LKF_CANCEL)
2768                 error = cancel_lock(ls, lkb, &args);
2769         else
2770                 error = unlock_lock(ls, lkb, &args);
2771
2772         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2773                 error = 0;
2774         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2775                 error = 0;
2776  out_put:
2777         dlm_put_lkb(lkb);
2778  out:
2779         dlm_unlock_recovery(ls);
2780         dlm_put_lockspace(ls);
2781         return error;
2782 }
2783
2784 /*
2785  * send/receive routines for remote operations and replies
2786  *
2787  * send_args
2788  * send_common
2789  * send_request                 receive_request
2790  * send_convert                 receive_convert
2791  * send_unlock                  receive_unlock
2792  * send_cancel                  receive_cancel
2793  * send_grant                   receive_grant
2794  * send_bast                    receive_bast
2795  * send_lookup                  receive_lookup
2796  * send_remove                  receive_remove
2797  *
2798  *                              send_common_reply
2799  * receive_request_reply        send_request_reply
2800  * receive_convert_reply        send_convert_reply
2801  * receive_unlock_reply         send_unlock_reply
2802  * receive_cancel_reply         send_cancel_reply
2803  * receive_lookup_reply         send_lookup_reply
2804  */
2805
2806 static int _create_message(struct dlm_ls *ls, int mb_len,
2807                            int to_nodeid, int mstype,
2808                            struct dlm_message **ms_ret,
2809                            struct dlm_mhandle **mh_ret)
2810 {
2811         struct dlm_message *ms;
2812         struct dlm_mhandle *mh;
2813         char *mb;
2814
2815         /* get_buffer gives us a message handle (mh) that we need to
2816            pass into lowcomms_commit and a message buffer (mb) that we
2817            write our data into */
2818
2819         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2820         if (!mh)
2821                 return -ENOBUFS;
2822
2823         memset(mb, 0, mb_len);
2824
2825         ms = (struct dlm_message *) mb;
2826
2827         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2828         ms->m_header.h_lockspace = ls->ls_global_id;
2829         ms->m_header.h_nodeid = dlm_our_nodeid();
2830         ms->m_header.h_length = mb_len;
2831         ms->m_header.h_cmd = DLM_MSG;
2832
2833         ms->m_type = mstype;
2834
2835         *mh_ret = mh;
2836         *ms_ret = ms;
2837         return 0;
2838 }
2839
2840 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2841                           int to_nodeid, int mstype,
2842                           struct dlm_message **ms_ret,
2843                           struct dlm_mhandle **mh_ret)
2844 {
2845         int mb_len = sizeof(struct dlm_message);
2846
2847         switch (mstype) {
2848         case DLM_MSG_REQUEST:
2849         case DLM_MSG_LOOKUP:
2850         case DLM_MSG_REMOVE:
2851                 mb_len += r->res_length;
2852                 break;
2853         case DLM_MSG_CONVERT:
2854         case DLM_MSG_UNLOCK:
2855         case DLM_MSG_REQUEST_REPLY:
2856         case DLM_MSG_CONVERT_REPLY:
2857         case DLM_MSG_GRANT:
2858                 if (lkb && lkb->lkb_lvbptr)
2859                         mb_len += r->res_ls->ls_lvblen;
2860                 break;
2861         }
2862
2863         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2864                                ms_ret, mh_ret);
2865 }
2866
2867 /* further lowcomms enhancements or alternate implementations may make
2868    the return value from this function useful at some point */
2869
2870 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2871 {
2872         dlm_message_out(ms);
2873         dlm_lowcomms_commit_buffer(mh);
2874         return 0;
2875 }
2876
2877 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2878                       struct dlm_message *ms)
2879 {
2880         ms->m_nodeid   = lkb->lkb_nodeid;
2881         ms->m_pid      = lkb->lkb_ownpid;
2882         ms->m_lkid     = lkb->lkb_id;
2883         ms->m_remid    = lkb->lkb_remid;
2884         ms->m_exflags  = lkb->lkb_exflags;
2885         ms->m_sbflags  = lkb->lkb_sbflags;
2886         ms->m_flags    = lkb->lkb_flags;
2887         ms->m_lvbseq   = lkb->lkb_lvbseq;
2888         ms->m_status   = lkb->lkb_status;
2889         ms->m_grmode   = lkb->lkb_grmode;
2890         ms->m_rqmode   = lkb->lkb_rqmode;
2891         ms->m_hash     = r->res_hash;
2892
2893         /* m_result and m_bastmode are set from function args,
2894            not from lkb fields */
2895
2896         if (lkb->lkb_bastfn)
2897                 ms->m_asts |= DLM_CB_BAST;
2898         if (lkb->lkb_astfn)
2899                 ms->m_asts |= DLM_CB_CAST;
2900
2901         /* compare with switch in create_message; send_remove() doesn't
2902            use send_args() */
2903
2904         switch (ms->m_type) {
2905         case DLM_MSG_REQUEST:
2906         case DLM_MSG_LOOKUP:
2907                 memcpy(ms->m_extra, r->res_name, r->res_length);
2908                 break;
2909         case DLM_MSG_CONVERT:
2910         case DLM_MSG_UNLOCK:
2911         case DLM_MSG_REQUEST_REPLY:
2912         case DLM_MSG_CONVERT_REPLY:
2913         case DLM_MSG_GRANT:
2914                 if (!lkb->lkb_lvbptr)
2915                         break;
2916                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2917                 break;
2918         }
2919 }
2920
2921 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2922 {
2923         struct dlm_message *ms;
2924         struct dlm_mhandle *mh;
2925         int to_nodeid, error;
2926
2927         to_nodeid = r->res_nodeid;
2928
2929         error = add_to_waiters(lkb, mstype, to_nodeid);
2930         if (error)
2931                 return error;
2932
2933         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2934         if (error)
2935                 goto fail;
2936
2937         send_args(r, lkb, ms);
2938
2939         error = send_message(mh, ms);
2940         if (error)
2941                 goto fail;
2942         return 0;
2943
2944  fail:
2945         remove_from_waiters(lkb, msg_reply_type(mstype));
2946         return error;
2947 }
2948
2949 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2950 {
2951         return send_common(r, lkb, DLM_MSG_REQUEST);
2952 }
2953
2954 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2955 {
2956         int error;
2957
2958         error = send_common(r, lkb, DLM_MSG_CONVERT);
2959
2960         /* down conversions go without a reply from the master */
2961         if (!error && down_conversion(lkb)) {
2962                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2963                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2964                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2965                 r->res_ls->ls_stub_ms.m_result = 0;
2966                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2967         }
2968
2969         return error;
2970 }
2971
2972 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2973    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2974    that the master is still correct. */
2975
2976 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2977 {
2978         return send_common(r, lkb, DLM_MSG_UNLOCK);
2979 }
2980
2981 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2982 {
2983         return send_common(r, lkb, DLM_MSG_CANCEL);
2984 }
2985
2986 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2987 {
2988         struct dlm_message *ms;
2989         struct dlm_mhandle *mh;
2990         int to_nodeid, error;
2991
2992         to_nodeid = lkb->lkb_nodeid;
2993
2994         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2995         if (error)
2996                 goto out;
2997
2998         send_args(r, lkb, ms);
2999
3000         ms->m_result = 0;
3001
3002         error = send_message(mh, ms);
3003  out:
3004         return error;
3005 }
3006
3007 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3008 {
3009         struct dlm_message *ms;
3010         struct dlm_mhandle *mh;
3011         int to_nodeid, error;
3012
3013         to_nodeid = lkb->lkb_nodeid;
3014
3015         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3016         if (error)
3017                 goto out;
3018
3019         send_args(r, lkb, ms);
3020
3021         ms->m_bastmode = mode;
3022
3023         error = send_message(mh, ms);
3024  out:
3025         return error;
3026 }
3027
3028 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3029 {
3030         struct dlm_message *ms;
3031         struct dlm_mhandle *mh;
3032         int to_nodeid, error;
3033
3034         to_nodeid = dlm_dir_nodeid(r);
3035
3036         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3037         if (error)
3038                 return error;
3039
3040         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3041         if (error)
3042                 goto fail;
3043
3044         send_args(r, lkb, ms);
3045
3046         error = send_message(mh, ms);
3047         if (error)
3048                 goto fail;
3049         return 0;
3050
3051  fail:
3052         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3053         return error;
3054 }
3055
3056 static int send_remove(struct dlm_rsb *r)
3057 {
3058         struct dlm_message *ms;
3059         struct dlm_mhandle *mh;
3060         int to_nodeid, error;
3061
3062         to_nodeid = dlm_dir_nodeid(r);
3063
3064         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3065         if (error)
3066                 goto out;
3067
3068         memcpy(ms->m_extra, r->res_name, r->res_length);
3069         ms->m_hash = r->res_hash;
3070
3071         error = send_message(mh, ms);
3072  out:
3073         return error;
3074 }
3075
3076 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3077                              int mstype, int rv)
3078 {
3079         struct dlm_message *ms;
3080         struct dlm_mhandle *mh;
3081         int to_nodeid, error;
3082
3083         to_nodeid = lkb->lkb_nodeid;
3084
3085         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3086         if (error)
3087                 goto out;
3088
3089         send_args(r, lkb, ms);
3090
3091         ms->m_result = rv;
3092
3093         error = send_message(mh, ms);
3094  out:
3095         return error;
3096 }
3097
3098 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3099 {
3100         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3101 }
3102
3103 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3104 {
3105         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3106 }
3107
3108 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3109 {
3110         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3111 }
3112
3113 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3114 {
3115         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3116 }
3117
3118 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3119                              int ret_nodeid, int rv)
3120 {
3121         struct dlm_rsb *r = &ls->ls_stub_rsb;
3122         struct dlm_message *ms;
3123         struct dlm_mhandle *mh;
3124         int error, nodeid = ms_in->m_header.h_nodeid;
3125
3126         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3127         if (error)
3128                 goto out;
3129
3130         ms->m_lkid = ms_in->m_lkid;
3131         ms->m_result = rv;
3132         ms->m_nodeid = ret_nodeid;
3133
3134         error = send_message(mh, ms);
3135  out:
3136         return error;
3137 }
3138
3139 /* which args we save from a received message depends heavily on the type
3140    of message, unlike the send side where we can safely send everything about
3141    the lkb for any type of message */
3142
3143 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3144 {
3145         lkb->lkb_exflags = ms->m_exflags;
3146         lkb->lkb_sbflags = ms->m_sbflags;
3147         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3148                          (ms->m_flags & 0x0000FFFF);
3149 }
3150
3151 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3152 {
3153         if (ms->m_flags == DLM_IFL_STUB_MS)
3154                 return;
3155
3156         lkb->lkb_sbflags = ms->m_sbflags;
3157         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3158                          (ms->m_flags & 0x0000FFFF);
3159 }
3160
3161 static int receive_extralen(struct dlm_message *ms)
3162 {
3163         return (ms->m_header.h_length - sizeof(struct dlm_message));
3164 }
3165
3166 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3167                        struct dlm_message *ms)
3168 {
3169         int len;
3170
3171         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3172                 if (!lkb->lkb_lvbptr)
3173                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3174                 if (!lkb->lkb_lvbptr)
3175                         return -ENOMEM;
3176                 len = receive_extralen(ms);
3177                 if (len > DLM_RESNAME_MAXLEN)
3178                         len = DLM_RESNAME_MAXLEN;
3179                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3180         }
3181         return 0;
3182 }
3183
3184 static void fake_bastfn(void *astparam, int mode)
3185 {
3186         log_print("fake_bastfn should not be called");
3187 }
3188
3189 static void fake_astfn(void *astparam)
3190 {
3191         log_print("fake_astfn should not be called");
3192 }
3193
3194 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3195                                 struct dlm_message *ms)
3196 {
3197         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3198         lkb->lkb_ownpid = ms->m_pid;
3199         lkb->lkb_remid = ms->m_lkid;
3200         lkb->lkb_grmode = DLM_LOCK_IV;
3201         lkb->lkb_rqmode = ms->m_rqmode;
3202
3203         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3204         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3205
3206         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3207                 /* lkb was just created so there won't be an lvb yet */
3208                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3209                 if (!lkb->lkb_lvbptr)
3210                         return -ENOMEM;
3211         }
3212
3213         return 0;
3214 }
3215
3216 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3217                                 struct dlm_message *ms)
3218 {
3219         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3220                 return -EBUSY;
3221
3222         if (receive_lvb(ls, lkb, ms))
3223                 return -ENOMEM;
3224
3225         lkb->lkb_rqmode = ms->m_rqmode;
3226         lkb->lkb_lvbseq = ms->m_lvbseq;
3227
3228         return 0;
3229 }
3230
3231 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3232                                struct dlm_message *ms)
3233 {
3234         if (receive_lvb(ls, lkb, ms))
3235                 return -ENOMEM;
3236         return 0;
3237 }
3238
3239 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3240    uses to send a reply and that the remote end uses to process the reply. */
3241
3242 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3243 {
3244         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3245         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3246         lkb->lkb_remid = ms->m_lkid;
3247 }
3248
3249 /* This is called after the rsb is locked so that we can safely inspect
3250    fields in the lkb. */
3251
3252 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3253 {
3254         int from = ms->m_header.h_nodeid;
3255         int error = 0;
3256
3257         switch (ms->m_type) {
3258         case DLM_MSG_CONVERT:
3259         case DLM_MSG_UNLOCK:
3260         case DLM_MSG_CANCEL:
3261                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3262                         error = -EINVAL;
3263                 break;
3264
3265         case DLM_MSG_CONVERT_REPLY:
3266         case DLM_MSG_UNLOCK_REPLY:
3267         case DLM_MSG_CANCEL_REPLY:
3268         case DLM_MSG_GRANT:
3269         case DLM_MSG_BAST:
3270                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3271                         error = -EINVAL;
3272                 break;
3273
3274         case DLM_MSG_REQUEST_REPLY:
3275                 if (!is_process_copy(lkb))
3276                         error = -EINVAL;
3277                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3278                         error = -EINVAL;
3279                 break;
3280
3281         default:
3282                 error = -EINVAL;
3283         }
3284
3285         if (error)
3286                 log_error(lkb->lkb_resource->res_ls,
3287                           "ignore invalid message %d from %d %x %x %x %d",
3288                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3289                           lkb->lkb_flags, lkb->lkb_nodeid);
3290         return error;
3291 }
3292
3293 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3294 {
3295         struct dlm_lkb *lkb;
3296         struct dlm_rsb *r;
3297         int error, namelen;
3298
3299         error = create_lkb(ls, &lkb);
3300         if (error)
3301                 goto fail;
3302
3303         receive_flags(lkb, ms);
3304         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3305         error = receive_request_args(ls, lkb, ms);
3306         if (error) {
3307                 __put_lkb(ls, lkb);
3308                 goto fail;
3309         }
3310
3311         namelen = receive_extralen(ms);
3312
3313         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3314         if (error) {
3315                 __put_lkb(ls, lkb);
3316                 goto fail;
3317         }
3318
3319         lock_rsb(r);
3320
3321         attach_lkb(r, lkb);
3322         error = do_request(r, lkb);
3323         send_request_reply(r, lkb, error);
3324         do_request_effects(r, lkb, error);
3325
3326         unlock_rsb(r);
3327         put_rsb(r);
3328
3329         if (error == -EINPROGRESS)
3330                 error = 0;
3331         if (error)
3332                 dlm_put_lkb(lkb);
3333         return;
3334
3335  fail:
3336         setup_stub_lkb(ls, ms);
3337         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3338 }
3339
3340 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3341 {
3342         struct dlm_lkb *lkb;
3343         struct dlm_rsb *r;
3344         int error, reply = 1;
3345
3346         error = find_lkb(ls, ms->m_remid, &lkb);
3347         if (error)
3348                 goto fail;
3349
3350         r = lkb->lkb_resource;
3351
3352         hold_rsb(r);
3353         lock_rsb(r);
3354
3355         error = validate_message(lkb, ms);
3356         if (error)
3357                 goto out;
3358
3359         receive_flags(lkb, ms);
3360
3361         error = receive_convert_args(ls, lkb, ms);
3362         if (error) {
3363                 send_convert_reply(r, lkb, error);
3364                 goto out;
3365         }
3366
3367         reply = !down_conversion(lkb);
3368
3369         error = do_convert(r, lkb);
3370         if (reply)
3371                 send_convert_reply(r, lkb, error);
3372         do_convert_effects(r, lkb, error);
3373  out:
3374         unlock_rsb(r);
3375         put_rsb(r);
3376         dlm_put_lkb(lkb);
3377         return;
3378
3379  fail:
3380         setup_stub_lkb(ls, ms);
3381         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3382 }
3383
3384 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3385 {
3386         struct dlm_lkb *lkb;
3387         struct dlm_rsb *r;
3388         int error;
3389