Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/dlm
[pandora-kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329
330 static int pre_rsb_struct(struct dlm_ls *ls)
331 {
332         struct dlm_rsb *r1, *r2;
333         int count = 0;
334
335         spin_lock(&ls->ls_new_rsb_spin);
336         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337                 spin_unlock(&ls->ls_new_rsb_spin);
338                 return 0;
339         }
340         spin_unlock(&ls->ls_new_rsb_spin);
341
342         r1 = dlm_allocate_rsb(ls);
343         r2 = dlm_allocate_rsb(ls);
344
345         spin_lock(&ls->ls_new_rsb_spin);
346         if (r1) {
347                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348                 ls->ls_new_rsb_count++;
349         }
350         if (r2) {
351                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352                 ls->ls_new_rsb_count++;
353         }
354         count = ls->ls_new_rsb_count;
355         spin_unlock(&ls->ls_new_rsb_spin);
356
357         if (!count)
358                 return -ENOMEM;
359         return 0;
360 }
361
362 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363    unlock any spinlocks, go back and call pre_rsb_struct again.
364    Otherwise, take an rsb off the list and return it. */
365
366 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367                           struct dlm_rsb **r_ret)
368 {
369         struct dlm_rsb *r;
370         int count;
371
372         spin_lock(&ls->ls_new_rsb_spin);
373         if (list_empty(&ls->ls_new_rsb)) {
374                 count = ls->ls_new_rsb_count;
375                 spin_unlock(&ls->ls_new_rsb_spin);
376                 log_debug(ls, "find_rsb retry %d %d %s",
377                           count, dlm_config.ci_new_rsb_count, name);
378                 return -EAGAIN;
379         }
380
381         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382         list_del(&r->res_hashchain);
383         ls->ls_new_rsb_count--;
384         spin_unlock(&ls->ls_new_rsb_spin);
385
386         r->res_ls = ls;
387         r->res_length = len;
388         memcpy(r->res_name, name, len);
389         mutex_init(&r->res_mutex);
390
391         INIT_LIST_HEAD(&r->res_hashchain);
392         INIT_LIST_HEAD(&r->res_lookup);
393         INIT_LIST_HEAD(&r->res_grantqueue);
394         INIT_LIST_HEAD(&r->res_convertqueue);
395         INIT_LIST_HEAD(&r->res_waitqueue);
396         INIT_LIST_HEAD(&r->res_root_list);
397         INIT_LIST_HEAD(&r->res_recover_list);
398
399         *r_ret = r;
400         return 0;
401 }
402
403 static int search_rsb_list(struct list_head *head, char *name, int len,
404                            unsigned int flags, struct dlm_rsb **r_ret)
405 {
406         struct dlm_rsb *r;
407         int error = 0;
408
409         list_for_each_entry(r, head, res_hashchain) {
410                 if (len == r->res_length && !memcmp(name, r->res_name, len))
411                         goto found;
412         }
413         *r_ret = NULL;
414         return -EBADR;
415
416  found:
417         if (r->res_nodeid && (flags & R_MASTER))
418                 error = -ENOTBLK;
419         *r_ret = r;
420         return error;
421 }
422
423 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
424                        unsigned int flags, struct dlm_rsb **r_ret)
425 {
426         struct dlm_rsb *r;
427         int error;
428
429         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
430         if (!error) {
431                 kref_get(&r->res_ref);
432                 goto out;
433         }
434         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
435         if (error)
436                 goto out;
437
438         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
439
440         if (dlm_no_directory(ls))
441                 goto out;
442
443         if (r->res_nodeid == -1) {
444                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
445                 r->res_first_lkid = 0;
446         } else if (r->res_nodeid > 0) {
447                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
448                 r->res_first_lkid = 0;
449         } else {
450                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
451                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
452         }
453  out:
454         *r_ret = r;
455         return error;
456 }
457
458 /*
459  * Find rsb in rsbtbl and potentially create/add one
460  *
461  * Delaying the release of rsb's has a similar benefit to applications keeping
462  * NL locks on an rsb, but without the guarantee that the cached master value
463  * will still be valid when the rsb is reused.  Apps aren't always smart enough
464  * to keep NL locks on an rsb that they may lock again shortly; this can lead
465  * to excessive master lookups and removals if we don't delay the release.
466  *
467  * Searching for an rsb means looking through both the normal list and toss
468  * list.  When found on the toss list the rsb is moved to the normal list with
469  * ref count of 1; when found on normal list the ref count is incremented.
470  */
471
472 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
473                     unsigned int flags, struct dlm_rsb **r_ret)
474 {
475         struct dlm_rsb *r = NULL;
476         uint32_t hash, bucket;
477         int error;
478
479         if (namelen > DLM_RESNAME_MAXLEN) {
480                 error = -EINVAL;
481                 goto out;
482         }
483
484         if (dlm_no_directory(ls))
485                 flags |= R_CREATE;
486
487         hash = jhash(name, namelen, 0);
488         bucket = hash & (ls->ls_rsbtbl_size - 1);
489
490  retry:
491         if (flags & R_CREATE) {
492                 error = pre_rsb_struct(ls);
493                 if (error < 0)
494                         goto out;
495         }
496
497         spin_lock(&ls->ls_rsbtbl[bucket].lock);
498
499         error = _search_rsb(ls, name, namelen, bucket, flags, &r);
500         if (!error)
501                 goto out_unlock;
502
503         if (error == -EBADR && !(flags & R_CREATE))
504                 goto out_unlock;
505
506         /* the rsb was found but wasn't a master copy */
507         if (error == -ENOTBLK)
508                 goto out_unlock;
509
510         error = get_rsb_struct(ls, name, namelen, &r);
511         if (error == -EAGAIN) {
512                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
513                 goto retry;
514         }
515         if (error)
516                 goto out_unlock;
517
518         r->res_hash = hash;
519         r->res_bucket = bucket;
520         r->res_nodeid = -1;
521         kref_init(&r->res_ref);
522
523         /* With no directory, the master can be set immediately */
524         if (dlm_no_directory(ls)) {
525                 int nodeid = dlm_dir_nodeid(r);
526                 if (nodeid == dlm_our_nodeid())
527                         nodeid = 0;
528                 r->res_nodeid = nodeid;
529         }
530         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
531         error = 0;
532  out_unlock:
533         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
534  out:
535         *r_ret = r;
536         return error;
537 }
538
539 /* This is only called to add a reference when the code already holds
540    a valid reference to the rsb, so there's no need for locking. */
541
542 static inline void hold_rsb(struct dlm_rsb *r)
543 {
544         kref_get(&r->res_ref);
545 }
546
547 void dlm_hold_rsb(struct dlm_rsb *r)
548 {
549         hold_rsb(r);
550 }
551
552 static void toss_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555         struct dlm_ls *ls = r->res_ls;
556
557         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
558         kref_init(&r->res_ref);
559         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
560         r->res_toss_time = jiffies;
561         if (r->res_lvbptr) {
562                 dlm_free_lvb(r->res_lvbptr);
563                 r->res_lvbptr = NULL;
564         }
565 }
566
567 /* When all references to the rsb are gone it's transferred to
568    the tossed list for later disposal. */
569
570 static void put_rsb(struct dlm_rsb *r)
571 {
572         struct dlm_ls *ls = r->res_ls;
573         uint32_t bucket = r->res_bucket;
574
575         spin_lock(&ls->ls_rsbtbl[bucket].lock);
576         kref_put(&r->res_ref, toss_rsb);
577         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
578 }
579
580 void dlm_put_rsb(struct dlm_rsb *r)
581 {
582         put_rsb(r);
583 }
584
585 /* See comment for unhold_lkb */
586
587 static void unhold_rsb(struct dlm_rsb *r)
588 {
589         int rv;
590         rv = kref_put(&r->res_ref, toss_rsb);
591         DLM_ASSERT(!rv, dlm_dump_rsb(r););
592 }
593
594 static void kill_rsb(struct kref *kref)
595 {
596         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
597
598         /* All work is done after the return from kref_put() so we
599            can release the write_lock before the remove and free. */
600
601         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
602         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
603         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
604         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
605         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
606         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
607 }
608
609 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
610    The rsb must exist as long as any lkb's for it do. */
611
612 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
613 {
614         hold_rsb(r);
615         lkb->lkb_resource = r;
616 }
617
618 static void detach_lkb(struct dlm_lkb *lkb)
619 {
620         if (lkb->lkb_resource) {
621                 put_rsb(lkb->lkb_resource);
622                 lkb->lkb_resource = NULL;
623         }
624 }
625
626 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
627 {
628         struct dlm_lkb *lkb;
629         int rv, id;
630
631         lkb = dlm_allocate_lkb(ls);
632         if (!lkb)
633                 return -ENOMEM;
634
635         lkb->lkb_nodeid = -1;
636         lkb->lkb_grmode = DLM_LOCK_IV;
637         kref_init(&lkb->lkb_ref);
638         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
639         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
640         INIT_LIST_HEAD(&lkb->lkb_time_list);
641         INIT_LIST_HEAD(&lkb->lkb_cb_list);
642         mutex_init(&lkb->lkb_cb_mutex);
643         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
644
645  retry:
646         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
647         if (!rv)
648                 return -ENOMEM;
649
650         spin_lock(&ls->ls_lkbidr_spin);
651         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
652         if (!rv)
653                 lkb->lkb_id = id;
654         spin_unlock(&ls->ls_lkbidr_spin);
655
656         if (rv == -EAGAIN)
657                 goto retry;
658
659         if (rv < 0) {
660                 log_error(ls, "create_lkb idr error %d", rv);
661                 return rv;
662         }
663
664         *lkb_ret = lkb;
665         return 0;
666 }
667
668 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
669 {
670         struct dlm_lkb *lkb;
671
672         spin_lock(&ls->ls_lkbidr_spin);
673         lkb = idr_find(&ls->ls_lkbidr, lkid);
674         if (lkb)
675                 kref_get(&lkb->lkb_ref);
676         spin_unlock(&ls->ls_lkbidr_spin);
677
678         *lkb_ret = lkb;
679         return lkb ? 0 : -ENOENT;
680 }
681
682 static void kill_lkb(struct kref *kref)
683 {
684         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
685
686         /* All work is done after the return from kref_put() so we
687            can release the write_lock before the detach_lkb */
688
689         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
690 }
691
692 /* __put_lkb() is used when an lkb may not have an rsb attached to
693    it so we need to provide the lockspace explicitly */
694
695 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
696 {
697         uint32_t lkid = lkb->lkb_id;
698
699         spin_lock(&ls->ls_lkbidr_spin);
700         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
701                 idr_remove(&ls->ls_lkbidr, lkid);
702                 spin_unlock(&ls->ls_lkbidr_spin);
703
704                 detach_lkb(lkb);
705
706                 /* for local/process lkbs, lvbptr points to caller's lksb */
707                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
708                         dlm_free_lvb(lkb->lkb_lvbptr);
709                 dlm_free_lkb(lkb);
710                 return 1;
711         } else {
712                 spin_unlock(&ls->ls_lkbidr_spin);
713                 return 0;
714         }
715 }
716
717 int dlm_put_lkb(struct dlm_lkb *lkb)
718 {
719         struct dlm_ls *ls;
720
721         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
722         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
723
724         ls = lkb->lkb_resource->res_ls;
725         return __put_lkb(ls, lkb);
726 }
727
728 /* This is only called to add a reference when the code already holds
729    a valid reference to the lkb, so there's no need for locking. */
730
731 static inline void hold_lkb(struct dlm_lkb *lkb)
732 {
733         kref_get(&lkb->lkb_ref);
734 }
735
736 /* This is called when we need to remove a reference and are certain
737    it's not the last ref.  e.g. del_lkb is always called between a
738    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
739    put_lkb would work fine, but would involve unnecessary locking */
740
741 static inline void unhold_lkb(struct dlm_lkb *lkb)
742 {
743         int rv;
744         rv = kref_put(&lkb->lkb_ref, kill_lkb);
745         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
746 }
747
748 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
749                             int mode)
750 {
751         struct dlm_lkb *lkb = NULL;
752
753         list_for_each_entry(lkb, head, lkb_statequeue)
754                 if (lkb->lkb_rqmode < mode)
755                         break;
756
757         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
758 }
759
760 /* add/remove lkb to rsb's grant/convert/wait queue */
761
762 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
763 {
764         kref_get(&lkb->lkb_ref);
765
766         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
767
768         lkb->lkb_timestamp = ktime_get();
769
770         lkb->lkb_status = status;
771
772         switch (status) {
773         case DLM_LKSTS_WAITING:
774                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
775                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
776                 else
777                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
778                 break;
779         case DLM_LKSTS_GRANTED:
780                 /* convention says granted locks kept in order of grmode */
781                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
782                                 lkb->lkb_grmode);
783                 break;
784         case DLM_LKSTS_CONVERT:
785                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
786                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
787                 else
788                         list_add_tail(&lkb->lkb_statequeue,
789                                       &r->res_convertqueue);
790                 break;
791         default:
792                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
793         }
794 }
795
796 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
797 {
798         lkb->lkb_status = 0;
799         list_del(&lkb->lkb_statequeue);
800         unhold_lkb(lkb);
801 }
802
803 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
804 {
805         hold_lkb(lkb);
806         del_lkb(r, lkb);
807         add_lkb(r, lkb, sts);
808         unhold_lkb(lkb);
809 }
810
811 static int msg_reply_type(int mstype)
812 {
813         switch (mstype) {
814         case DLM_MSG_REQUEST:
815                 return DLM_MSG_REQUEST_REPLY;
816         case DLM_MSG_CONVERT:
817                 return DLM_MSG_CONVERT_REPLY;
818         case DLM_MSG_UNLOCK:
819                 return DLM_MSG_UNLOCK_REPLY;
820         case DLM_MSG_CANCEL:
821                 return DLM_MSG_CANCEL_REPLY;
822         case DLM_MSG_LOOKUP:
823                 return DLM_MSG_LOOKUP_REPLY;
824         }
825         return -1;
826 }
827
828 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
829 {
830         int i;
831
832         for (i = 0; i < num_nodes; i++) {
833                 if (!warned[i]) {
834                         warned[i] = nodeid;
835                         return 0;
836                 }
837                 if (warned[i] == nodeid)
838                         return 1;
839         }
840         return 0;
841 }
842
843 void dlm_scan_waiters(struct dlm_ls *ls)
844 {
845         struct dlm_lkb *lkb;
846         ktime_t zero = ktime_set(0, 0);
847         s64 us;
848         s64 debug_maxus = 0;
849         u32 debug_scanned = 0;
850         u32 debug_expired = 0;
851         int num_nodes = 0;
852         int *warned = NULL;
853
854         if (!dlm_config.ci_waitwarn_us)
855                 return;
856
857         mutex_lock(&ls->ls_waiters_mutex);
858
859         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
860                 if (ktime_equal(lkb->lkb_wait_time, zero))
861                         continue;
862
863                 debug_scanned++;
864
865                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
866
867                 if (us < dlm_config.ci_waitwarn_us)
868                         continue;
869
870                 lkb->lkb_wait_time = zero;
871
872                 debug_expired++;
873                 if (us > debug_maxus)
874                         debug_maxus = us;
875
876                 if (!num_nodes) {
877                         num_nodes = ls->ls_num_nodes;
878                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
879                 }
880                 if (!warned)
881                         continue;
882                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
883                         continue;
884
885                 log_error(ls, "waitwarn %x %lld %d us check connection to "
886                           "node %d", lkb->lkb_id, (long long)us,
887                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
888         }
889         mutex_unlock(&ls->ls_waiters_mutex);
890         kfree(warned);
891
892         if (debug_expired)
893                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
894                           debug_scanned, debug_expired,
895                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
896 }
897
898 /* add/remove lkb from global waiters list of lkb's waiting for
899    a reply from a remote node */
900
901 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
902 {
903         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
904         int error = 0;
905
906         mutex_lock(&ls->ls_waiters_mutex);
907
908         if (is_overlap_unlock(lkb) ||
909             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
910                 error = -EINVAL;
911                 goto out;
912         }
913
914         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
915                 switch (mstype) {
916                 case DLM_MSG_UNLOCK:
917                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
918                         break;
919                 case DLM_MSG_CANCEL:
920                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
921                         break;
922                 default:
923                         error = -EBUSY;
924                         goto out;
925                 }
926                 lkb->lkb_wait_count++;
927                 hold_lkb(lkb);
928
929                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
930                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
931                           lkb->lkb_wait_count, lkb->lkb_flags);
932                 goto out;
933         }
934
935         DLM_ASSERT(!lkb->lkb_wait_count,
936                    dlm_print_lkb(lkb);
937                    printk("wait_count %d\n", lkb->lkb_wait_count););
938
939         lkb->lkb_wait_count++;
940         lkb->lkb_wait_type = mstype;
941         lkb->lkb_wait_time = ktime_get();
942         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
943         hold_lkb(lkb);
944         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
945  out:
946         if (error)
947                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
948                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
949                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
950         mutex_unlock(&ls->ls_waiters_mutex);
951         return error;
952 }
953
954 /* We clear the RESEND flag because we might be taking an lkb off the waiters
955    list as part of process_requestqueue (e.g. a lookup that has an optimized
956    request reply on the requestqueue) between dlm_recover_waiters_pre() which
957    set RESEND and dlm_recover_waiters_post() */
958
959 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
960                                 struct dlm_message *ms)
961 {
962         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
963         int overlap_done = 0;
964
965         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
966                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
967                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
968                 overlap_done = 1;
969                 goto out_del;
970         }
971
972         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
973                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
974                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
975                 overlap_done = 1;
976                 goto out_del;
977         }
978
979         /* Cancel state was preemptively cleared by a successful convert,
980            see next comment, nothing to do. */
981
982         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
983             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
984                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
985                           lkb->lkb_id, lkb->lkb_wait_type);
986                 return -1;
987         }
988
989         /* Remove for the convert reply, and premptively remove for the
990            cancel reply.  A convert has been granted while there's still
991            an outstanding cancel on it (the cancel is moot and the result
992            in the cancel reply should be 0).  We preempt the cancel reply
993            because the app gets the convert result and then can follow up
994            with another op, like convert.  This subsequent op would see the
995            lingering state of the cancel and fail with -EBUSY. */
996
997         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
998             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
999             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1000                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1001                           lkb->lkb_id);
1002                 lkb->lkb_wait_type = 0;
1003                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1004                 lkb->lkb_wait_count--;
1005                 goto out_del;
1006         }
1007
1008         /* N.B. type of reply may not always correspond to type of original
1009            msg due to lookup->request optimization, verify others? */
1010
1011         if (lkb->lkb_wait_type) {
1012                 lkb->lkb_wait_type = 0;
1013                 goto out_del;
1014         }
1015
1016         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1017                   lkb->lkb_id, mstype, lkb->lkb_flags);
1018         return -1;
1019
1020  out_del:
1021         /* the force-unlock/cancel has completed and we haven't recvd a reply
1022            to the op that was in progress prior to the unlock/cancel; we
1023            give up on any reply to the earlier op.  FIXME: not sure when/how
1024            this would happen */
1025
1026         if (overlap_done && lkb->lkb_wait_type) {
1027                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1028                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1029                 lkb->lkb_wait_count--;
1030                 lkb->lkb_wait_type = 0;
1031         }
1032
1033         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1034
1035         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1036         lkb->lkb_wait_count--;
1037         if (!lkb->lkb_wait_count)
1038                 list_del_init(&lkb->lkb_wait_reply);
1039         unhold_lkb(lkb);
1040         return 0;
1041 }
1042
1043 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1044 {
1045         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1046         int error;
1047
1048         mutex_lock(&ls->ls_waiters_mutex);
1049         error = _remove_from_waiters(lkb, mstype, NULL);
1050         mutex_unlock(&ls->ls_waiters_mutex);
1051         return error;
1052 }
1053
1054 /* Handles situations where we might be processing a "fake" or "stub" reply in
1055    which we can't try to take waiters_mutex again. */
1056
1057 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1058 {
1059         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1060         int error;
1061
1062         if (ms->m_flags != DLM_IFL_STUB_MS)
1063                 mutex_lock(&ls->ls_waiters_mutex);
1064         error = _remove_from_waiters(lkb, ms->m_type, ms);
1065         if (ms->m_flags != DLM_IFL_STUB_MS)
1066                 mutex_unlock(&ls->ls_waiters_mutex);
1067         return error;
1068 }
1069
1070 static void dir_remove(struct dlm_rsb *r)
1071 {
1072         int to_nodeid;
1073
1074         if (dlm_no_directory(r->res_ls))
1075                 return;
1076
1077         to_nodeid = dlm_dir_nodeid(r);
1078         if (to_nodeid != dlm_our_nodeid())
1079                 send_remove(r);
1080         else
1081                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1082                                      r->res_name, r->res_length);
1083 }
1084
1085 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1086    found since they are in order of newest to oldest? */
1087
1088 static int shrink_bucket(struct dlm_ls *ls, int b)
1089 {
1090         struct dlm_rsb *r;
1091         int count = 0, found;
1092
1093         for (;;) {
1094                 found = 0;
1095                 spin_lock(&ls->ls_rsbtbl[b].lock);
1096                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1097                                             res_hashchain) {
1098                         if (!time_after_eq(jiffies, r->res_toss_time +
1099                                            dlm_config.ci_toss_secs * HZ))
1100                                 continue;
1101                         found = 1;
1102                         break;
1103                 }
1104
1105                 if (!found) {
1106                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1107                         break;
1108                 }
1109
1110                 if (kref_put(&r->res_ref, kill_rsb)) {
1111                         list_del(&r->res_hashchain);
1112                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1113
1114                         if (is_master(r))
1115                                 dir_remove(r);
1116                         dlm_free_rsb(r);
1117                         count++;
1118                 } else {
1119                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1120                         log_error(ls, "tossed rsb in use %s", r->res_name);
1121                 }
1122         }
1123
1124         return count;
1125 }
1126
1127 void dlm_scan_rsbs(struct dlm_ls *ls)
1128 {
1129         int i;
1130
1131         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1132                 shrink_bucket(ls, i);
1133                 if (dlm_locking_stopped(ls))
1134                         break;
1135                 cond_resched();
1136         }
1137 }
1138
1139 static void add_timeout(struct dlm_lkb *lkb)
1140 {
1141         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1142
1143         if (is_master_copy(lkb))
1144                 return;
1145
1146         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1147             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1148                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1149                 goto add_it;
1150         }
1151         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1152                 goto add_it;
1153         return;
1154
1155  add_it:
1156         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1157         mutex_lock(&ls->ls_timeout_mutex);
1158         hold_lkb(lkb);
1159         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1160         mutex_unlock(&ls->ls_timeout_mutex);
1161 }
1162
1163 static void del_timeout(struct dlm_lkb *lkb)
1164 {
1165         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1166
1167         mutex_lock(&ls->ls_timeout_mutex);
1168         if (!list_empty(&lkb->lkb_time_list)) {
1169                 list_del_init(&lkb->lkb_time_list);
1170                 unhold_lkb(lkb);
1171         }
1172         mutex_unlock(&ls->ls_timeout_mutex);
1173 }
1174
1175 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1176    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1177    and then lock rsb because of lock ordering in add_timeout.  We may need
1178    to specify some special timeout-related bits in the lkb that are just to
1179    be accessed under the timeout_mutex. */
1180
1181 void dlm_scan_timeout(struct dlm_ls *ls)
1182 {
1183         struct dlm_rsb *r;
1184         struct dlm_lkb *lkb;
1185         int do_cancel, do_warn;
1186         s64 wait_us;
1187
1188         for (;;) {
1189                 if (dlm_locking_stopped(ls))
1190                         break;
1191
1192                 do_cancel = 0;
1193                 do_warn = 0;
1194                 mutex_lock(&ls->ls_timeout_mutex);
1195                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1196
1197                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1198                                                         lkb->lkb_timestamp));
1199
1200                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1201                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1202                                 do_cancel = 1;
1203
1204                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1205                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1206                                 do_warn = 1;
1207
1208                         if (!do_cancel && !do_warn)
1209                                 continue;
1210                         hold_lkb(lkb);
1211                         break;
1212                 }
1213                 mutex_unlock(&ls->ls_timeout_mutex);
1214
1215                 if (!do_cancel && !do_warn)
1216                         break;
1217
1218                 r = lkb->lkb_resource;
1219                 hold_rsb(r);
1220                 lock_rsb(r);
1221
1222                 if (do_warn) {
1223                         /* clear flag so we only warn once */
1224                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1225                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1226                                 del_timeout(lkb);
1227                         dlm_timeout_warn(lkb);
1228                 }
1229
1230                 if (do_cancel) {
1231                         log_debug(ls, "timeout cancel %x node %d %s",
1232                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1233                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1234                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1235                         del_timeout(lkb);
1236                         _cancel_lock(r, lkb);
1237                 }
1238
1239                 unlock_rsb(r);
1240                 unhold_rsb(r);
1241                 dlm_put_lkb(lkb);
1242         }
1243 }
1244
1245 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1246    dlm_recoverd before checking/setting ls_recover_begin. */
1247
1248 void dlm_adjust_timeouts(struct dlm_ls *ls)
1249 {
1250         struct dlm_lkb *lkb;
1251         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1252
1253         ls->ls_recover_begin = 0;
1254         mutex_lock(&ls->ls_timeout_mutex);
1255         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1256                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1257         mutex_unlock(&ls->ls_timeout_mutex);
1258
1259         if (!dlm_config.ci_waitwarn_us)
1260                 return;
1261
1262         mutex_lock(&ls->ls_waiters_mutex);
1263         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1264                 if (ktime_to_us(lkb->lkb_wait_time))
1265                         lkb->lkb_wait_time = ktime_get();
1266         }
1267         mutex_unlock(&ls->ls_waiters_mutex);
1268 }
1269
1270 /* lkb is master or local copy */
1271
1272 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273 {
1274         int b, len = r->res_ls->ls_lvblen;
1275
1276         /* b=1 lvb returned to caller
1277            b=0 lvb written to rsb or invalidated
1278            b=-1 do nothing */
1279
1280         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1281
1282         if (b == 1) {
1283                 if (!lkb->lkb_lvbptr)
1284                         return;
1285
1286                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1287                         return;
1288
1289                 if (!r->res_lvbptr)
1290                         return;
1291
1292                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1293                 lkb->lkb_lvbseq = r->res_lvbseq;
1294
1295         } else if (b == 0) {
1296                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1297                         rsb_set_flag(r, RSB_VALNOTVALID);
1298                         return;
1299                 }
1300
1301                 if (!lkb->lkb_lvbptr)
1302                         return;
1303
1304                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1305                         return;
1306
1307                 if (!r->res_lvbptr)
1308                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1309
1310                 if (!r->res_lvbptr)
1311                         return;
1312
1313                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1314                 r->res_lvbseq++;
1315                 lkb->lkb_lvbseq = r->res_lvbseq;
1316                 rsb_clear_flag(r, RSB_VALNOTVALID);
1317         }
1318
1319         if (rsb_flag(r, RSB_VALNOTVALID))
1320                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1321 }
1322
1323 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325         if (lkb->lkb_grmode < DLM_LOCK_PW)
1326                 return;
1327
1328         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1329                 rsb_set_flag(r, RSB_VALNOTVALID);
1330                 return;
1331         }
1332
1333         if (!lkb->lkb_lvbptr)
1334                 return;
1335
1336         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1337                 return;
1338
1339         if (!r->res_lvbptr)
1340                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1341
1342         if (!r->res_lvbptr)
1343                 return;
1344
1345         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1346         r->res_lvbseq++;
1347         rsb_clear_flag(r, RSB_VALNOTVALID);
1348 }
1349
1350 /* lkb is process copy (pc) */
1351
1352 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1353                             struct dlm_message *ms)
1354 {
1355         int b;
1356
1357         if (!lkb->lkb_lvbptr)
1358                 return;
1359
1360         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1361                 return;
1362
1363         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1364         if (b == 1) {
1365                 int len = receive_extralen(ms);
1366                 if (len > DLM_RESNAME_MAXLEN)
1367                         len = DLM_RESNAME_MAXLEN;
1368                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1369                 lkb->lkb_lvbseq = ms->m_lvbseq;
1370         }
1371 }
1372
1373 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1374    remove_lock -- used for unlock, removes lkb from granted
1375    revert_lock -- used for cancel, moves lkb from convert to granted
1376    grant_lock  -- used for request and convert, adds lkb to granted or
1377                   moves lkb from convert or waiting to granted
1378
1379    Each of these is used for master or local copy lkb's.  There is
1380    also a _pc() variation used to make the corresponding change on
1381    a process copy (pc) lkb. */
1382
1383 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1384 {
1385         del_lkb(r, lkb);
1386         lkb->lkb_grmode = DLM_LOCK_IV;
1387         /* this unhold undoes the original ref from create_lkb()
1388            so this leads to the lkb being freed */
1389         unhold_lkb(lkb);
1390 }
1391
1392 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1393 {
1394         set_lvb_unlock(r, lkb);
1395         _remove_lock(r, lkb);
1396 }
1397
1398 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1399 {
1400         _remove_lock(r, lkb);
1401 }
1402
1403 /* returns: 0 did nothing
1404             1 moved lock to granted
1405            -1 removed lock */
1406
1407 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408 {
1409         int rv = 0;
1410
1411         lkb->lkb_rqmode = DLM_LOCK_IV;
1412
1413         switch (lkb->lkb_status) {
1414         case DLM_LKSTS_GRANTED:
1415                 break;
1416         case DLM_LKSTS_CONVERT:
1417                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1418                 rv = 1;
1419                 break;
1420         case DLM_LKSTS_WAITING:
1421                 del_lkb(r, lkb);
1422                 lkb->lkb_grmode = DLM_LOCK_IV;
1423                 /* this unhold undoes the original ref from create_lkb()
1424                    so this leads to the lkb being freed */
1425                 unhold_lkb(lkb);
1426                 rv = -1;
1427                 break;
1428         default:
1429                 log_print("invalid status for revert %d", lkb->lkb_status);
1430         }
1431         return rv;
1432 }
1433
1434 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436         return revert_lock(r, lkb);
1437 }
1438
1439 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1440 {
1441         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1442                 lkb->lkb_grmode = lkb->lkb_rqmode;
1443                 if (lkb->lkb_status)
1444                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1445                 else
1446                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1447         }
1448
1449         lkb->lkb_rqmode = DLM_LOCK_IV;
1450 }
1451
1452 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1453 {
1454         set_lvb_lock(r, lkb);
1455         _grant_lock(r, lkb);
1456         lkb->lkb_highbast = 0;
1457 }
1458
1459 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1460                           struct dlm_message *ms)
1461 {
1462         set_lvb_lock_pc(r, lkb, ms);
1463         _grant_lock(r, lkb);
1464 }
1465
1466 /* called by grant_pending_locks() which means an async grant message must
1467    be sent to the requesting node in addition to granting the lock if the
1468    lkb belongs to a remote node. */
1469
1470 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472         grant_lock(r, lkb);
1473         if (is_master_copy(lkb))
1474                 send_grant(r, lkb);
1475         else
1476                 queue_cast(r, lkb, 0);
1477 }
1478
1479 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1480    change the granted/requested modes.  We're munging things accordingly in
1481    the process copy.
1482    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1483    conversion deadlock
1484    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1485    compatible with other granted locks */
1486
1487 static void munge_demoted(struct dlm_lkb *lkb)
1488 {
1489         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1490                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1491                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1492                 return;
1493         }
1494
1495         lkb->lkb_grmode = DLM_LOCK_NL;
1496 }
1497
1498 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1499 {
1500         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1501             ms->m_type != DLM_MSG_GRANT) {
1502                 log_print("munge_altmode %x invalid reply type %d",
1503                           lkb->lkb_id, ms->m_type);
1504                 return;
1505         }
1506
1507         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1508                 lkb->lkb_rqmode = DLM_LOCK_PR;
1509         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1510                 lkb->lkb_rqmode = DLM_LOCK_CW;
1511         else {
1512                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1513                 dlm_print_lkb(lkb);
1514         }
1515 }
1516
1517 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1518 {
1519         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1520                                            lkb_statequeue);
1521         if (lkb->lkb_id == first->lkb_id)
1522                 return 1;
1523
1524         return 0;
1525 }
1526
1527 /* Check if the given lkb conflicts with another lkb on the queue. */
1528
1529 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1530 {
1531         struct dlm_lkb *this;
1532
1533         list_for_each_entry(this, head, lkb_statequeue) {
1534                 if (this == lkb)
1535                         continue;
1536                 if (!modes_compat(this, lkb))
1537                         return 1;
1538         }
1539         return 0;
1540 }
1541
1542 /*
1543  * "A conversion deadlock arises with a pair of lock requests in the converting
1544  * queue for one resource.  The granted mode of each lock blocks the requested
1545  * mode of the other lock."
1546  *
1547  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1548  * convert queue from being granted, then deadlk/demote lkb.
1549  *
1550  * Example:
1551  * Granted Queue: empty
1552  * Convert Queue: NL->EX (first lock)
1553  *                PR->EX (second lock)
1554  *
1555  * The first lock can't be granted because of the granted mode of the second
1556  * lock and the second lock can't be granted because it's not first in the
1557  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1558  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1559  * flag set and return DEMOTED in the lksb flags.
1560  *
1561  * Originally, this function detected conv-deadlk in a more limited scope:
1562  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1563  * - if lkb1 was the first entry in the queue (not just earlier), and was
1564  *   blocked by the granted mode of lkb2, and there was nothing on the
1565  *   granted queue preventing lkb1 from being granted immediately, i.e.
1566  *   lkb2 was the only thing preventing lkb1 from being granted.
1567  *
1568  * That second condition meant we'd only say there was conv-deadlk if
1569  * resolving it (by demotion) would lead to the first lock on the convert
1570  * queue being granted right away.  It allowed conversion deadlocks to exist
1571  * between locks on the convert queue while they couldn't be granted anyway.
1572  *
1573  * Now, we detect and take action on conversion deadlocks immediately when
1574  * they're created, even if they may not be immediately consequential.  If
1575  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1576  * mode that would prevent lkb1's conversion from being granted, we do a
1577  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1578  * I think this means that the lkb_is_ahead condition below should always
1579  * be zero, i.e. there will never be conv-deadlk between two locks that are
1580  * both already on the convert queue.
1581  */
1582
1583 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1584 {
1585         struct dlm_lkb *lkb1;
1586         int lkb_is_ahead = 0;
1587
1588         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1589                 if (lkb1 == lkb2) {
1590                         lkb_is_ahead = 1;
1591                         continue;
1592                 }
1593
1594                 if (!lkb_is_ahead) {
1595                         if (!modes_compat(lkb2, lkb1))
1596                                 return 1;
1597                 } else {
1598                         if (!modes_compat(lkb2, lkb1) &&
1599                             !modes_compat(lkb1, lkb2))
1600                                 return 1;
1601                 }
1602         }
1603         return 0;
1604 }
1605
1606 /*
1607  * Return 1 if the lock can be granted, 0 otherwise.
1608  * Also detect and resolve conversion deadlocks.
1609  *
1610  * lkb is the lock to be granted
1611  *
1612  * now is 1 if the function is being called in the context of the
1613  * immediate request, it is 0 if called later, after the lock has been
1614  * queued.
1615  *
1616  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1617  */
1618
1619 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1620 {
1621         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1622
1623         /*
1624          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1625          * a new request for a NL mode lock being blocked.
1626          *
1627          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1628          * request, then it would be granted.  In essence, the use of this flag
1629          * tells the Lock Manager to expedite theis request by not considering
1630          * what may be in the CONVERTING or WAITING queues...  As of this
1631          * writing, the EXPEDITE flag can be used only with new requests for NL
1632          * mode locks.  This flag is not valid for conversion requests.
1633          *
1634          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1635          * conversion or used with a non-NL requested mode.  We also know an
1636          * EXPEDITE request is always granted immediately, so now must always
1637          * be 1.  The full condition to grant an expedite request: (now &&
1638          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1639          * therefore be shortened to just checking the flag.
1640          */
1641
1642         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1643                 return 1;
1644
1645         /*
1646          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1647          * added to the remaining conditions.
1648          */
1649
1650         if (queue_conflict(&r->res_grantqueue, lkb))
1651                 goto out;
1652
1653         /*
1654          * 6-3: By default, a conversion request is immediately granted if the
1655          * requested mode is compatible with the modes of all other granted
1656          * locks
1657          */
1658
1659         if (queue_conflict(&r->res_convertqueue, lkb))
1660                 goto out;
1661
1662         /*
1663          * 6-5: But the default algorithm for deciding whether to grant or
1664          * queue conversion requests does not by itself guarantee that such
1665          * requests are serviced on a "first come first serve" basis.  This, in
1666          * turn, can lead to a phenomenon known as "indefinate postponement".
1667          *
1668          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1669          * the system service employed to request a lock conversion.  This flag
1670          * forces certain conversion requests to be queued, even if they are
1671          * compatible with the granted modes of other locks on the same
1672          * resource.  Thus, the use of this flag results in conversion requests
1673          * being ordered on a "first come first servce" basis.
1674          *
1675          * DCT: This condition is all about new conversions being able to occur
1676          * "in place" while the lock remains on the granted queue (assuming
1677          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1678          * doesn't _have_ to go onto the convert queue where it's processed in
1679          * order.  The "now" variable is necessary to distinguish converts
1680          * being received and processed for the first time now, because once a
1681          * convert is moved to the conversion queue the condition below applies
1682          * requiring fifo granting.
1683          */
1684
1685         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1686                 return 1;
1687
1688         /*
1689          * The NOORDER flag is set to avoid the standard vms rules on grant
1690          * order.
1691          */
1692
1693         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1694                 return 1;
1695
1696         /*
1697          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1698          * granted until all other conversion requests ahead of it are granted
1699          * and/or canceled.
1700          */
1701
1702         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1703                 return 1;
1704
1705         /*
1706          * 6-4: By default, a new request is immediately granted only if all
1707          * three of the following conditions are satisfied when the request is
1708          * issued:
1709          * - The queue of ungranted conversion requests for the resource is
1710          *   empty.
1711          * - The queue of ungranted new requests for the resource is empty.
1712          * - The mode of the new request is compatible with the most
1713          *   restrictive mode of all granted locks on the resource.
1714          */
1715
1716         if (now && !conv && list_empty(&r->res_convertqueue) &&
1717             list_empty(&r->res_waitqueue))
1718                 return 1;
1719
1720         /*
1721          * 6-4: Once a lock request is in the queue of ungranted new requests,
1722          * it cannot be granted until the queue of ungranted conversion
1723          * requests is empty, all ungranted new requests ahead of it are
1724          * granted and/or canceled, and it is compatible with the granted mode
1725          * of the most restrictive lock granted on the resource.
1726          */
1727
1728         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1729             first_in_list(lkb, &r->res_waitqueue))
1730                 return 1;
1731  out:
1732         return 0;
1733 }
1734
1735 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1736                           int *err)
1737 {
1738         int rv;
1739         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1740         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1741
1742         if (err)
1743                 *err = 0;
1744
1745         rv = _can_be_granted(r, lkb, now);
1746         if (rv)
1747                 goto out;
1748
1749         /*
1750          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1751          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1752          * cancels one of the locks.
1753          */
1754
1755         if (is_convert && can_be_queued(lkb) &&
1756             conversion_deadlock_detect(r, lkb)) {
1757                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1758                         lkb->lkb_grmode = DLM_LOCK_NL;
1759                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1760                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1761                         if (err)
1762                                 *err = -EDEADLK;
1763                         else {
1764                                 log_print("can_be_granted deadlock %x now %d",
1765                                           lkb->lkb_id, now);
1766                                 dlm_dump_rsb(r);
1767                         }
1768                 }
1769                 goto out;
1770         }
1771
1772         /*
1773          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1774          * to grant a request in a mode other than the normal rqmode.  It's a
1775          * simple way to provide a big optimization to applications that can
1776          * use them.
1777          */
1778
1779         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1780                 alt = DLM_LOCK_PR;
1781         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1782                 alt = DLM_LOCK_CW;
1783
1784         if (alt) {
1785                 lkb->lkb_rqmode = alt;
1786                 rv = _can_be_granted(r, lkb, now);
1787                 if (rv)
1788                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1789                 else
1790                         lkb->lkb_rqmode = rqmode;
1791         }
1792  out:
1793         return rv;
1794 }
1795
1796 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1797    for locks pending on the convert list.  Once verified (watch for these
1798    log_prints), we should be able to just call _can_be_granted() and not
1799    bother with the demote/deadlk cases here (and there's no easy way to deal
1800    with a deadlk here, we'd have to generate something like grant_lock with
1801    the deadlk error.) */
1802
1803 /* Returns the highest requested mode of all blocked conversions; sets
1804    cw if there's a blocked conversion to DLM_LOCK_CW. */
1805
1806 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1807 {
1808         struct dlm_lkb *lkb, *s;
1809         int hi, demoted, quit, grant_restart, demote_restart;
1810         int deadlk;
1811
1812         quit = 0;
1813  restart:
1814         grant_restart = 0;
1815         demote_restart = 0;
1816         hi = DLM_LOCK_IV;
1817
1818         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1819                 demoted = is_demoted(lkb);
1820                 deadlk = 0;
1821
1822                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1823                         grant_lock_pending(r, lkb);
1824                         grant_restart = 1;
1825                         continue;
1826                 }
1827
1828                 if (!demoted && is_demoted(lkb)) {
1829                         log_print("WARN: pending demoted %x node %d %s",
1830                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1831                         demote_restart = 1;
1832                         continue;
1833                 }
1834
1835                 if (deadlk) {
1836                         log_print("WARN: pending deadlock %x node %d %s",
1837                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1838                         dlm_dump_rsb(r);
1839                         continue;
1840                 }
1841
1842                 hi = max_t(int, lkb->lkb_rqmode, hi);
1843
1844                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1845                         *cw = 1;
1846         }
1847
1848         if (grant_restart)
1849                 goto restart;
1850         if (demote_restart && !quit) {
1851                 quit = 1;
1852                 goto restart;
1853         }
1854
1855         return max_t(int, high, hi);
1856 }
1857
1858 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1859 {
1860         struct dlm_lkb *lkb, *s;
1861
1862         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1863                 if (can_be_granted(r, lkb, 0, NULL))
1864                         grant_lock_pending(r, lkb);
1865                 else {
1866                         high = max_t(int, lkb->lkb_rqmode, high);
1867                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1868                                 *cw = 1;
1869                 }
1870         }
1871
1872         return high;
1873 }
1874
1875 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1876    on either the convert or waiting queue.
1877    high is the largest rqmode of all locks blocked on the convert or
1878    waiting queue. */
1879
1880 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1881 {
1882         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1883                 if (gr->lkb_highbast < DLM_LOCK_EX)
1884                         return 1;
1885                 return 0;
1886         }
1887
1888         if (gr->lkb_highbast < high &&
1889             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1890                 return 1;
1891         return 0;
1892 }
1893
1894 static void grant_pending_locks(struct dlm_rsb *r)
1895 {
1896         struct dlm_lkb *lkb, *s;
1897         int high = DLM_LOCK_IV;
1898         int cw = 0;
1899
1900         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1901
1902         high = grant_pending_convert(r, high, &cw);
1903         high = grant_pending_wait(r, high, &cw);
1904
1905         if (high == DLM_LOCK_IV)
1906                 return;
1907
1908         /*
1909          * If there are locks left on the wait/convert queue then send blocking
1910          * ASTs to granted locks based on the largest requested mode (high)
1911          * found above.
1912          */
1913
1914         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1915                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1916                         if (cw && high == DLM_LOCK_PR &&
1917                             lkb->lkb_grmode == DLM_LOCK_PR)
1918                                 queue_bast(r, lkb, DLM_LOCK_CW);
1919                         else
1920                                 queue_bast(r, lkb, high);
1921                         lkb->lkb_highbast = high;
1922                 }
1923         }
1924 }
1925
1926 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1927 {
1928         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1929             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1930                 if (gr->lkb_highbast < DLM_LOCK_EX)
1931                         return 1;
1932                 return 0;
1933         }
1934
1935         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1936                 return 1;
1937         return 0;
1938 }
1939
1940 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1941                             struct dlm_lkb *lkb)
1942 {
1943         struct dlm_lkb *gr;
1944
1945         list_for_each_entry(gr, head, lkb_statequeue) {
1946                 /* skip self when sending basts to convertqueue */
1947                 if (gr == lkb)
1948                         continue;
1949                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1950                         queue_bast(r, gr, lkb->lkb_rqmode);
1951                         gr->lkb_highbast = lkb->lkb_rqmode;
1952                 }
1953         }
1954 }
1955
1956 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1957 {
1958         send_bast_queue(r, &r->res_grantqueue, lkb);
1959 }
1960
1961 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1962 {
1963         send_bast_queue(r, &r->res_grantqueue, lkb);
1964         send_bast_queue(r, &r->res_convertqueue, lkb);
1965 }
1966
1967 /* set_master(r, lkb) -- set the master nodeid of a resource
1968
1969    The purpose of this function is to set the nodeid field in the given
1970    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1971    known, it can just be copied to the lkb and the function will return
1972    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1973    before it can be copied to the lkb.
1974
1975    When the rsb nodeid is being looked up remotely, the initial lkb
1976    causing the lookup is kept on the ls_waiters list waiting for the
1977    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1978    on the rsb's res_lookup list until the master is verified.
1979
1980    Return values:
1981    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1982    1: the rsb master is not available and the lkb has been placed on
1983       a wait queue
1984 */
1985
1986 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1987 {
1988         struct dlm_ls *ls = r->res_ls;
1989         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1990
1991         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1992                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1993                 r->res_first_lkid = lkb->lkb_id;
1994                 lkb->lkb_nodeid = r->res_nodeid;
1995                 return 0;
1996         }
1997
1998         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1999                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2000                 return 1;
2001         }
2002
2003         if (r->res_nodeid == 0) {
2004                 lkb->lkb_nodeid = 0;
2005                 return 0;
2006         }
2007
2008         if (r->res_nodeid > 0) {
2009                 lkb->lkb_nodeid = r->res_nodeid;
2010                 return 0;
2011         }
2012
2013         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2014
2015         dir_nodeid = dlm_dir_nodeid(r);
2016
2017         if (dir_nodeid != our_nodeid) {
2018                 r->res_first_lkid = lkb->lkb_id;
2019                 send_lookup(r, lkb);
2020                 return 1;
2021         }
2022
2023         for (i = 0; i < 2; i++) {
2024                 /* It's possible for dlm_scand to remove an old rsb for
2025                    this same resource from the toss list, us to create
2026                    a new one, look up the master locally, and find it
2027                    already exists just before dlm_scand does the
2028                    dir_remove() on the previous rsb. */
2029
2030                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2031                                        r->res_length, &ret_nodeid);
2032                 if (!error)
2033                         break;
2034                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2035                 schedule();
2036         }
2037         if (error && error != -EEXIST)
2038                 return error;
2039
2040         if (ret_nodeid == our_nodeid) {
2041                 r->res_first_lkid = 0;
2042                 r->res_nodeid = 0;
2043                 lkb->lkb_nodeid = 0;
2044         } else {
2045                 r->res_first_lkid = lkb->lkb_id;
2046                 r->res_nodeid = ret_nodeid;
2047                 lkb->lkb_nodeid = ret_nodeid;
2048         }
2049         return 0;
2050 }
2051
2052 static void process_lookup_list(struct dlm_rsb *r)
2053 {
2054         struct dlm_lkb *lkb, *safe;
2055
2056         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2057                 list_del_init(&lkb->lkb_rsb_lookup);
2058                 _request_lock(r, lkb);
2059                 schedule();
2060         }
2061 }
2062
2063 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2064
2065 static void confirm_master(struct dlm_rsb *r, int error)
2066 {
2067         struct dlm_lkb *lkb;
2068
2069         if (!r->res_first_lkid)
2070                 return;
2071
2072         switch (error) {
2073         case 0:
2074         case -EINPROGRESS:
2075                 r->res_first_lkid = 0;
2076                 process_lookup_list(r);
2077                 break;
2078
2079         case -EAGAIN:
2080         case -EBADR:
2081         case -ENOTBLK:
2082                 /* the remote request failed and won't be retried (it was
2083                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2084                    lkb the first_lkid */
2085
2086                 r->res_first_lkid = 0;
2087
2088                 if (!list_empty(&r->res_lookup)) {
2089                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2090                                          lkb_rsb_lookup);
2091                         list_del_init(&lkb->lkb_rsb_lookup);
2092                         r->res_first_lkid = lkb->lkb_id;
2093                         _request_lock(r, lkb);
2094                 }
2095                 break;
2096
2097         default:
2098                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2099         }
2100 }
2101
2102 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2103                          int namelen, unsigned long timeout_cs,
2104                          void (*ast) (void *astparam),
2105                          void *astparam,
2106                          void (*bast) (void *astparam, int mode),
2107                          struct dlm_args *args)
2108 {
2109         int rv = -EINVAL;
2110
2111         /* check for invalid arg usage */
2112
2113         if (mode < 0 || mode > DLM_LOCK_EX)
2114                 goto out;
2115
2116         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2117                 goto out;
2118
2119         if (flags & DLM_LKF_CANCEL)
2120                 goto out;
2121
2122         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2123                 goto out;
2124
2125         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2126                 goto out;
2127
2128         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2129                 goto out;
2130
2131         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2132                 goto out;
2133
2134         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2135                 goto out;
2136
2137         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2138                 goto out;
2139
2140         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2141                 goto out;
2142
2143         if (!ast || !lksb)
2144                 goto out;
2145
2146         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2147                 goto out;
2148
2149         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2150                 goto out;
2151
2152         /* these args will be copied to the lkb in validate_lock_args,
2153            it cannot be done now because when converting locks, fields in
2154            an active lkb cannot be modified before locking the rsb */
2155
2156         args->flags = flags;
2157         args->astfn = ast;
2158         args->astparam = astparam;
2159         args->bastfn = bast;
2160         args->timeout = timeout_cs;
2161         args->mode = mode;
2162         args->lksb = lksb;
2163         rv = 0;
2164  out:
2165         return rv;
2166 }
2167
2168 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2169 {
2170         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2171                       DLM_LKF_FORCEUNLOCK))
2172                 return -EINVAL;
2173
2174         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2175                 return -EINVAL;
2176
2177         args->flags = flags;
2178         args->astparam = astarg;
2179         return 0;
2180 }
2181
2182 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2183                               struct dlm_args *args)
2184 {
2185         int rv = -EINVAL;
2186
2187         if (args->flags & DLM_LKF_CONVERT) {
2188                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2189                         goto out;
2190
2191                 if (args->flags & DLM_LKF_QUECVT &&
2192                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2193                         goto out;
2194
2195                 rv = -EBUSY;
2196                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2197                         goto out;
2198
2199                 if (lkb->lkb_wait_type)
2200                         goto out;
2201
2202                 if (is_overlap(lkb))
2203                         goto out;
2204         }
2205
2206         lkb->lkb_exflags = args->flags;
2207         lkb->lkb_sbflags = 0;
2208         lkb->lkb_astfn = args->astfn;
2209         lkb->lkb_astparam = args->astparam;
2210         lkb->lkb_bastfn = args->bastfn;
2211         lkb->lkb_rqmode = args->mode;
2212         lkb->lkb_lksb = args->lksb;
2213         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2214         lkb->lkb_ownpid = (int) current->pid;
2215         lkb->lkb_timeout_cs = args->timeout;
2216         rv = 0;
2217  out:
2218         if (rv)
2219                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2220                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2221                           lkb->lkb_status, lkb->lkb_wait_type,
2222                           lkb->lkb_resource->res_name);
2223         return rv;
2224 }
2225
2226 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2227    for success */
2228
2229 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2230    because there may be a lookup in progress and it's valid to do
2231    cancel/unlockf on it */
2232
2233 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2234 {
2235         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2236         int rv = -EINVAL;
2237
2238         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2239                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2240                 dlm_print_lkb(lkb);
2241                 goto out;
2242         }
2243
2244         /* an lkb may still exist even though the lock is EOL'ed due to a
2245            cancel, unlock or failed noqueue request; an app can't use these
2246            locks; return same error as if the lkid had not been found at all */
2247
2248         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2249                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2250                 rv = -ENOENT;
2251                 goto out;
2252         }
2253
2254         /* an lkb may be waiting for an rsb lookup to complete where the
2255            lookup was initiated by another lock */
2256
2257         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2258                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2259                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2260                         list_del_init(&lkb->lkb_rsb_lookup);
2261                         queue_cast(lkb->lkb_resource, lkb,
2262                                    args->flags & DLM_LKF_CANCEL ?
2263                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2264                         unhold_lkb(lkb); /* undoes create_lkb() */
2265                 }
2266                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2267                 rv = -EBUSY;
2268                 goto out;
2269         }
2270
2271         /* cancel not allowed with another cancel/unlock in progress */
2272
2273         if (args->flags & DLM_LKF_CANCEL) {
2274                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2275                         goto out;
2276
2277                 if (is_overlap(lkb))
2278                         goto out;
2279
2280                 /* don't let scand try to do a cancel */
2281                 del_timeout(lkb);
2282
2283                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2284                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2285                         rv = -EBUSY;
2286                         goto out;
2287                 }
2288
2289                 /* there's nothing to cancel */
2290                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2291                     !lkb->lkb_wait_type) {
2292                         rv = -EBUSY;
2293                         goto out;
2294                 }
2295
2296                 switch (lkb->lkb_wait_type) {
2297                 case DLM_MSG_LOOKUP:
2298                 case DLM_MSG_REQUEST:
2299                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2300                         rv = -EBUSY;
2301                         goto out;
2302                 case DLM_MSG_UNLOCK:
2303                 case DLM_MSG_CANCEL:
2304                         goto out;
2305                 }
2306                 /* add_to_waiters() will set OVERLAP_CANCEL */
2307                 goto out_ok;
2308         }
2309
2310         /* do we need to allow a force-unlock if there's a normal unlock
2311            already in progress?  in what conditions could the normal unlock
2312            fail such that we'd want to send a force-unlock to be sure? */
2313
2314         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2315                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2316                         goto out;
2317
2318                 if (is_overlap_unlock(lkb))
2319                         goto out;
2320
2321                 /* don't let scand try to do a cancel */
2322                 del_timeout(lkb);
2323
2324                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2325                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2326                         rv = -EBUSY;
2327                         goto out;
2328                 }
2329
2330                 switch (lkb->lkb_wait_type) {
2331                 case DLM_MSG_LOOKUP:
2332                 case DLM_MSG_REQUEST:
2333                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2334                         rv = -EBUSY;
2335                         goto out;
2336                 case DLM_MSG_UNLOCK:
2337                         goto out;
2338                 }
2339                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2340                 goto out_ok;
2341         }
2342
2343         /* normal unlock not allowed if there's any op in progress */
2344         rv = -EBUSY;
2345         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2346                 goto out;
2347
2348  out_ok:
2349         /* an overlapping op shouldn't blow away exflags from other op */
2350         lkb->lkb_exflags |= args->flags;
2351         lkb->lkb_sbflags = 0;
2352         lkb->lkb_astparam = args->astparam;
2353         rv = 0;
2354  out:
2355         if (rv)
2356                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2357                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2358                           args->flags, lkb->lkb_wait_type,
2359                           lkb->lkb_resource->res_name);
2360         return rv;
2361 }
2362
2363 /*
2364  * Four stage 4 varieties:
2365  * do_request(), do_convert(), do_unlock(), do_cancel()
2366  * These are called on the master node for the given lock and
2367  * from the central locking logic.
2368  */
2369
2370 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2371 {
2372         int error = 0;
2373
2374         if (can_be_granted(r, lkb, 1, NULL)) {
2375                 grant_lock(r, lkb);
2376                 queue_cast(r, lkb, 0);
2377                 goto out;
2378         }
2379
2380         if (can_be_queued(lkb)) {
2381                 error = -EINPROGRESS;
2382                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2383                 add_timeout(lkb);
2384                 goto out;
2385         }
2386
2387         error = -EAGAIN;
2388         queue_cast(r, lkb, -EAGAIN);
2389  out:
2390         return error;
2391 }
2392
2393 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394                                int error)
2395 {
2396         switch (error) {
2397         case -EAGAIN:
2398                 if (force_blocking_asts(lkb))
2399                         send_blocking_asts_all(r, lkb);
2400                 break;
2401         case -EINPROGRESS:
2402                 send_blocking_asts(r, lkb);
2403                 break;
2404         }
2405 }
2406
2407 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2408 {
2409         int error = 0;
2410         int deadlk = 0;
2411
2412         /* changing an existing lock may allow others to be granted */
2413
2414         if (can_be_granted(r, lkb, 1, &deadlk)) {
2415                 grant_lock(r, lkb);
2416                 queue_cast(r, lkb, 0);
2417                 goto out;
2418         }
2419
2420         /* can_be_granted() detected that this lock would block in a conversion
2421            deadlock, so we leave it on the granted queue and return EDEADLK in
2422            the ast for the convert. */
2423
2424         if (deadlk) {
2425                 /* it's left on the granted queue */
2426                 revert_lock(r, lkb);
2427                 queue_cast(r, lkb, -EDEADLK);
2428                 error = -EDEADLK;
2429                 goto out;
2430         }
2431
2432         /* is_demoted() means the can_be_granted() above set the grmode
2433            to NL, and left us on the granted queue.  This auto-demotion
2434            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2435            now grantable.  We have to try to grant other converting locks
2436            before we try again to grant this one. */
2437
2438         if (is_demoted(lkb)) {
2439                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2440                 if (_can_be_granted(r, lkb, 1)) {
2441                         grant_lock(r, lkb);
2442                         queue_cast(r, lkb, 0);
2443                         goto out;
2444                 }
2445                 /* else fall through and move to convert queue */
2446         }
2447
2448         if (can_be_queued(lkb)) {
2449                 error = -EINPROGRESS;
2450                 del_lkb(r, lkb);
2451                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2452                 add_timeout(lkb);
2453                 goto out;
2454         }
2455
2456         error = -EAGAIN;
2457         queue_cast(r, lkb, -EAGAIN);
2458  out:
2459         return error;
2460 }
2461
2462 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2463                                int error)
2464 {
2465         switch (error) {
2466         case 0:
2467                 grant_pending_locks(r);
2468                 /* grant_pending_locks also sends basts */
2469                 break;
2470         case -EAGAIN:
2471                 if (force_blocking_asts(lkb))
2472                         send_blocking_asts_all(r, lkb);
2473                 break;
2474         case -EINPROGRESS:
2475                 send_blocking_asts(r, lkb);
2476                 break;
2477         }
2478 }
2479
2480 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481 {
2482         remove_lock(r, lkb);
2483         queue_cast(r, lkb, -DLM_EUNLOCK);
2484         return -DLM_EUNLOCK;
2485 }
2486
2487 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2488                               int error)
2489 {
2490         grant_pending_locks(r);
2491 }
2492
2493 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2494  
2495 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2496 {
2497         int error;
2498
2499         error = revert_lock(r, lkb);
2500         if (error) {
2501                 queue_cast(r, lkb, -DLM_ECANCEL);
2502                 return -DLM_ECANCEL;
2503         }
2504         return 0;
2505 }
2506
2507 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2508                               int error)
2509 {
2510         if (error)
2511                 grant_pending_locks(r);
2512 }
2513
2514 /*
2515  * Four stage 3 varieties:
2516  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2517  */
2518
2519 /* add a new lkb to a possibly new rsb, called by requesting process */
2520
2521 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522 {
2523         int error;
2524
2525         /* set_master: sets lkb nodeid from r */
2526
2527         error = set_master(r, lkb);
2528         if (error < 0)
2529                 goto out;
2530         if (error) {
2531                 error = 0;
2532                 goto out;
2533         }
2534
2535         if (is_remote(r)) {
2536                 /* receive_request() calls do_request() on remote node */
2537                 error = send_request(r, lkb);
2538         } else {
2539                 error = do_request(r, lkb);
2540                 /* for remote locks the request_reply is sent
2541                    between do_request and do_request_effects */
2542                 do_request_effects(r, lkb, error);
2543         }
2544  out:
2545         return error;
2546 }
2547
2548 /* change some property of an existing lkb, e.g. mode */
2549
2550 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552         int error;
2553
2554         if (is_remote(r)) {
2555                 /* receive_convert() calls do_convert() on remote node */
2556                 error = send_convert(r, lkb);
2557         } else {
2558                 error = do_convert(r, lkb);
2559                 /* for remote locks the convert_reply is sent
2560                    between do_convert and do_convert_effects */
2561                 do_convert_effects(r, lkb, error);
2562         }
2563
2564         return error;
2565 }
2566
2567 /* remove an existing lkb from the granted queue */
2568
2569 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571         int error;
2572
2573         if (is_remote(r)) {
2574                 /* receive_unlock() calls do_unlock() on remote node */
2575                 error = send_unlock(r, lkb);
2576         } else {
2577                 error = do_unlock(r, lkb);
2578                 /* for remote locks the unlock_reply is sent
2579                    between do_unlock and do_unlock_effects */
2580                 do_unlock_effects(r, lkb, error);
2581         }
2582
2583         return error;
2584 }
2585
2586 /* remove an existing lkb from the convert or wait queue */
2587
2588 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589 {
2590         int error;
2591
2592         if (is_remote(r)) {
2593                 /* receive_cancel() calls do_cancel() on remote node */
2594                 error = send_cancel(r, lkb);
2595         } else {
2596                 error = do_cancel(r, lkb);
2597                 /* for remote locks the cancel_reply is sent
2598                    between do_cancel and do_cancel_effects */
2599                 do_cancel_effects(r, lkb, error);
2600         }
2601
2602         return error;
2603 }
2604
2605 /*
2606  * Four stage 2 varieties:
2607  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2608  */
2609
2610 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2611                         int len, struct dlm_args *args)
2612 {
2613         struct dlm_rsb *r;
2614         int error;
2615
2616         error = validate_lock_args(ls, lkb, args);
2617         if (error)
2618                 goto out;
2619
2620         error = find_rsb(ls, name, len, R_CREATE, &r);
2621         if (error)
2622                 goto out;
2623
2624         lock_rsb(r);
2625
2626         attach_lkb(r, lkb);
2627         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2628
2629         error = _request_lock(r, lkb);
2630
2631         unlock_rsb(r);
2632         put_rsb(r);
2633
2634  out:
2635         return error;
2636 }
2637
2638 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2639                         struct dlm_args *args)
2640 {
2641         struct dlm_rsb *r;
2642         int error;
2643
2644         r = lkb->lkb_resource;
2645
2646         hold_rsb(r);
2647         lock_rsb(r);
2648
2649         error = validate_lock_args(ls, lkb, args);
2650         if (error)
2651                 goto out;
2652
2653         error = _convert_lock(r, lkb);
2654  out:
2655         unlock_rsb(r);
2656         put_rsb(r);
2657         return error;
2658 }
2659
2660 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2661                        struct dlm_args *args)
2662 {
2663         struct dlm_rsb *r;
2664         int error;
2665
2666         r = lkb->lkb_resource;
2667
2668         hold_rsb(r);
2669         lock_rsb(r);
2670
2671         error = validate_unlock_args(lkb, args);
2672         if (error)
2673                 goto out;
2674
2675         error = _unlock_lock(r, lkb);
2676  out:
2677         unlock_rsb(r);
2678         put_rsb(r);
2679         return error;
2680 }
2681
2682 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2683                        struct dlm_args *args)
2684 {
2685         struct dlm_rsb *r;
2686         int error;
2687
2688         r = lkb->lkb_resource;
2689
2690         hold_rsb(r);
2691         lock_rsb(r);
2692
2693         error = validate_unlock_args(lkb, args);
2694         if (error)
2695                 goto out;
2696
2697         error = _cancel_lock(r, lkb);
2698  out:
2699         unlock_rsb(r);
2700         put_rsb(r);
2701         return error;
2702 }
2703
2704 /*
2705  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2706  */
2707
2708 int dlm_lock(dlm_lockspace_t *lockspace,
2709              int mode,
2710              struct dlm_lksb *lksb,
2711              uint32_t flags,
2712              void *name,
2713              unsigned int namelen,
2714              uint32_t parent_lkid,
2715              void (*ast) (void *astarg),
2716              void *astarg,
2717              void (*bast) (void *astarg, int mode))
2718 {
2719         struct dlm_ls *ls;
2720         struct dlm_lkb *lkb;
2721         struct dlm_args args;
2722         int error, convert = flags & DLM_LKF_CONVERT;
2723
2724         ls = dlm_find_lockspace_local(lockspace);
2725         if (!ls)
2726                 return -EINVAL;
2727
2728         dlm_lock_recovery(ls);
2729
2730         if (convert)
2731                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2732         else
2733                 error = create_lkb(ls, &lkb);
2734
2735         if (error)
2736                 goto out;
2737
2738         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2739                               astarg, bast, &args);
2740         if (error)
2741                 goto out_put;
2742
2743         if (convert)
2744                 error = convert_lock(ls, lkb, &args);
2745         else
2746                 error = request_lock(ls, lkb, name, namelen, &args);
2747
2748         if (error == -EINPROGRESS)
2749                 error = 0;
2750  out_put:
2751         if (convert || error)
2752                 __put_lkb(ls, lkb);
2753         if (error == -EAGAIN || error == -EDEADLK)
2754                 error = 0;
2755  out:
2756         dlm_unlock_recovery(ls);
2757         dlm_put_lockspace(ls);
2758         return error;
2759 }
2760
2761 int dlm_unlock(dlm_lockspace_t *lockspace,
2762                uint32_t lkid,
2763                uint32_t flags,
2764                struct dlm_lksb *lksb,
2765                void *astarg)
2766 {
2767         struct dlm_ls *ls;
2768         struct dlm_lkb *lkb;
2769         struct dlm_args args;
2770         int error;
2771
2772         ls = dlm_find_lockspace_local(lockspace);
2773         if (!ls)
2774                 return -EINVAL;
2775
2776         dlm_lock_recovery(ls);
2777
2778         error = find_lkb(ls, lkid, &lkb);
2779         if (error)
2780                 goto out;
2781
2782         error = set_unlock_args(flags, astarg, &args);
2783         if (error)
2784                 goto out_put;
2785
2786         if (flags & DLM_LKF_CANCEL)
2787                 error = cancel_lock(ls, lkb, &args);
2788         else
2789                 error = unlock_lock(ls, lkb, &args);
2790
2791         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2792                 error = 0;
2793         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2794                 error = 0;
2795  out_put:
2796         dlm_put_lkb(lkb);
2797  out:
2798         dlm_unlock_recovery(ls);
2799         dlm_put_lockspace(ls);
2800         return error;
2801 }
2802
2803 /*
2804  * send/receive routines for remote operations and replies
2805  *
2806  * send_args
2807  * send_common
2808  * send_request                 receive_request
2809  * send_convert                 receive_convert
2810  * send_unlock                  receive_unlock
2811  * send_cancel                  receive_cancel
2812  * send_grant                   receive_grant
2813  * send_bast                    receive_bast
2814  * send_lookup                  receive_lookup
2815  * send_remove                  receive_remove
2816  *
2817  *                              send_common_reply
2818  * receive_request_reply        send_request_reply
2819  * receive_convert_reply        send_convert_reply
2820  * receive_unlock_reply         send_unlock_reply
2821  * receive_cancel_reply         send_cancel_reply
2822  * receive_lookup_reply         send_lookup_reply
2823  */
2824
2825 static int _create_message(struct dlm_ls *ls, int mb_len,
2826                            int to_nodeid, int mstype,
2827                            struct dlm_message **ms_ret,
2828                            struct dlm_mhandle **mh_ret)
2829 {
2830         struct dlm_message *ms;
2831         struct dlm_mhandle *mh;
2832         char *mb;
2833
2834         /* get_buffer gives us a message handle (mh) that we need to
2835            pass into lowcomms_commit and a message buffer (mb) that we
2836            write our data into */
2837
2838         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2839         if (!mh)
2840                 return -ENOBUFS;
2841
2842         memset(mb, 0, mb_len);
2843
2844         ms = (struct dlm_message *) mb;
2845
2846         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2847         ms->m_header.h_lockspace = ls->ls_global_id;
2848         ms->m_header.h_nodeid = dlm_our_nodeid();
2849         ms->m_header.h_length = mb_len;
2850         ms->m_header.h_cmd = DLM_MSG;
2851
2852         ms->m_type = mstype;
2853
2854         *mh_ret = mh;
2855         *ms_ret = ms;
2856         return 0;
2857 }
2858
2859 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2860                           int to_nodeid, int mstype,
2861                           struct dlm_message **ms_ret,
2862                           struct dlm_mhandle **mh_ret)
2863 {
2864         int mb_len = sizeof(struct dlm_message);
2865
2866         switch (mstype) {
2867         case DLM_MSG_REQUEST:
2868         case DLM_MSG_LOOKUP:
2869         case DLM_MSG_REMOVE:
2870                 mb_len += r->res_length;
2871                 break;
2872         case DLM_MSG_CONVERT:
2873         case DLM_MSG_UNLOCK:
2874         case DLM_MSG_REQUEST_REPLY:
2875         case DLM_MSG_CONVERT_REPLY:
2876         case DLM_MSG_GRANT:
2877                 if (lkb && lkb->lkb_lvbptr)
2878                         mb_len += r->res_ls->ls_lvblen;
2879                 break;
2880         }
2881
2882         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2883                                ms_ret, mh_ret);
2884 }
2885
2886 /* further lowcomms enhancements or alternate implementations may make
2887    the return value from this function useful at some point */
2888
2889 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2890 {
2891         dlm_message_out(ms);
2892         dlm_lowcomms_commit_buffer(mh);
2893         return 0;
2894 }
2895
2896 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2897                       struct dlm_message *ms)
2898 {
2899         ms->m_nodeid   = lkb->lkb_nodeid;
2900         ms->m_pid      = lkb->lkb_ownpid;
2901         ms->m_lkid     = lkb->lkb_id;
2902         ms->m_remid    = lkb->lkb_remid;
2903         ms->m_exflags  = lkb->lkb_exflags;
2904         ms->m_sbflags  = lkb->lkb_sbflags;
2905         ms->m_flags    = lkb->lkb_flags;
2906         ms->m_lvbseq   = lkb->lkb_lvbseq;
2907         ms->m_status   = lkb->lkb_status;
2908         ms->m_grmode   = lkb->lkb_grmode;
2909         ms->m_rqmode   = lkb->lkb_rqmode;
2910         ms->m_hash     = r->res_hash;
2911
2912         /* m_result and m_bastmode are set from function args,
2913            not from lkb fields */
2914
2915         if (lkb->lkb_bastfn)
2916                 ms->m_asts |= DLM_CB_BAST;
2917         if (lkb->lkb_astfn)
2918                 ms->m_asts |= DLM_CB_CAST;
2919
2920         /* compare with switch in create_message; send_remove() doesn't
2921            use send_args() */
2922
2923         switch (ms->m_type) {
2924         case DLM_MSG_REQUEST:
2925         case DLM_MSG_LOOKUP:
2926                 memcpy(ms->m_extra, r->res_name, r->res_length);
2927                 break;
2928         case DLM_MSG_CONVERT:
2929         case DLM_MSG_UNLOCK:
2930         case DLM_MSG_REQUEST_REPLY:
2931         case DLM_MSG_CONVERT_REPLY:
2932         case DLM_MSG_GRANT:
2933                 if (!lkb->lkb_lvbptr)
2934                         break;
2935                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2936                 break;
2937         }
2938 }
2939
2940 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2941 {
2942         struct dlm_message *ms;
2943         struct dlm_mhandle *mh;
2944         int to_nodeid, error;
2945
2946         to_nodeid = r->res_nodeid;
2947
2948         error = add_to_waiters(lkb, mstype, to_nodeid);
2949         if (error)
2950                 return error;
2951
2952         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2953         if (error)
2954                 goto fail;
2955
2956         send_args(r, lkb, ms);
2957
2958         error = send_message(mh, ms);
2959         if (error)
2960                 goto fail;
2961         return 0;
2962
2963  fail:
2964         remove_from_waiters(lkb, msg_reply_type(mstype));
2965         return error;
2966 }
2967
2968 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2969 {
2970         return send_common(r, lkb, DLM_MSG_REQUEST);
2971 }
2972
2973 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2974 {
2975         int error;
2976
2977         error = send_common(r, lkb, DLM_MSG_CONVERT);
2978
2979         /* down conversions go without a reply from the master */
2980         if (!error && down_conversion(lkb)) {
2981                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2982                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2983                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2984                 r->res_ls->ls_stub_ms.m_result = 0;
2985                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2986         }
2987
2988         return error;
2989 }
2990
2991 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2992    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2993    that the master is still correct. */
2994
2995 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 {
2997         return send_common(r, lkb, DLM_MSG_UNLOCK);
2998 }
2999
3000 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3001 {
3002         return send_common(r, lkb, DLM_MSG_CANCEL);
3003 }
3004
3005 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3006 {
3007         struct dlm_message *ms;
3008         struct dlm_mhandle *mh;
3009         int to_nodeid, error;
3010
3011         to_nodeid = lkb->lkb_nodeid;
3012
3013         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3014         if (error)
3015                 goto out;
3016
3017         send_args(r, lkb, ms);
3018
3019         ms->m_result = 0;
3020
3021         error = send_message(mh, ms);
3022  out:
3023         return error;
3024 }
3025
3026 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3027 {
3028         struct dlm_message *ms;
3029         struct dlm_mhandle *mh;
3030         int to_nodeid, error;
3031
3032         to_nodeid = lkb->lkb_nodeid;
3033
3034         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3035         if (error)
3036                 goto out;
3037
3038         send_args(r, lkb, ms);
3039
3040         ms->m_bastmode = mode;
3041
3042         error = send_message(mh, ms);
3043  out:
3044         return error;
3045 }
3046
3047 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 {
3049         struct dlm_message *ms;
3050         struct dlm_mhandle *mh;
3051         int to_nodeid, error;
3052
3053         to_nodeid = dlm_dir_nodeid(r);
3054
3055         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3056         if (error)
3057                 return error;
3058
3059         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3060         if (error)
3061                 goto fail;
3062
3063         send_args(r, lkb, ms);
3064
3065         error = send_message(mh, ms);
3066         if (error)
3067                 goto fail;
3068         return 0;
3069
3070  fail:
3071         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3072         return error;
3073 }
3074
3075 static int send_remove(struct dlm_rsb *r)
3076 {
3077         struct dlm_message *ms;
3078         struct dlm_mhandle *mh;
3079         int to_nodeid, error;
3080
3081         to_nodeid = dlm_dir_nodeid(r);
3082
3083         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3084         if (error)
3085                 goto out;
3086
3087         memcpy(ms->m_extra, r->res_name, r->res_length);
3088         ms->m_hash = r->res_hash;
3089
3090         error = send_message(mh, ms);
3091  out:
3092         return error;
3093 }
3094
3095 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3096                              int mstype, int rv)
3097 {
3098         struct dlm_message *ms;
3099         struct dlm_mhandle *mh;
3100         int to_nodeid, error;
3101
3102         to_nodeid = lkb->lkb_nodeid;
3103
3104         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3105         if (error)
3106                 goto out;
3107
3108         send_args(r, lkb, ms);
3109
3110         ms->m_result = rv;
3111
3112         error = send_message(mh, ms);
3113  out:
3114         return error;
3115 }
3116
3117 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3118 {
3119         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3120 }
3121
3122 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3123 {
3124         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3125 }
3126
3127 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3128 {
3129         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3130 }
3131
3132 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3133 {
3134         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3135 }
3136
3137 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3138                              int ret_nodeid, int rv)
3139 {
3140         struct dlm_rsb *r = &ls->ls_stub_rsb;
3141         struct dlm_message *ms;
3142         struct dlm_mhandle *mh;
3143         int error, nodeid = ms_in->m_header.h_nodeid;
3144
3145         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3146         if (error)
3147                 goto out;
3148
3149         ms->m_lkid = ms_in->m_lkid;
3150         ms->m_result = rv;
3151         ms->m_nodeid = ret_nodeid;
3152
3153         error = send_message(mh, ms);
3154  out:
3155         return error;
3156 }
3157
3158 /* which args we save from a received message depends heavily on the type
3159    of message, unlike the send side where we can safely send everything about
3160    the lkb for any type of message */
3161
3162 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3163 {
3164         lkb->lkb_exflags = ms->m_exflags;
3165         lkb->lkb_sbflags = ms->m_sbflags;
3166         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3167                          (ms->m_flags & 0x0000FFFF);
3168 }
3169
3170 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3171 {
3172         if (ms->m_flags == DLM_IFL_STUB_MS)
3173                 return;
3174
3175         lkb->lkb_sbflags = ms->m_sbflags;
3176         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3177                          (ms->m_flags & 0x0000FFFF);
3178 }
3179
3180 static int receive_extralen(struct dlm_message *ms)
3181 {
3182         return (ms->m_header.h_length - sizeof(struct dlm_message));
3183 }
3184
3185 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3186                        struct dlm_message *ms)
3187 {
3188         int len;
3189
3190         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3191                 if (!lkb->lkb_lvbptr)
3192                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3193                 if (!lkb->lkb_lvbptr)
3194                         return -ENOMEM;
3195                 len = receive_extralen(ms);
3196                 if (len > DLM_RESNAME_MAXLEN)
3197                         len = DLM_RESNAME_MAXLEN;
3198                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3199         }
3200         return 0;
3201 }
3202
3203 static void fake_bastfn(void *astparam, int mode)
3204 {
3205         log_print("fake_bastfn should not be called");
3206 }
3207
3208 static void fake_astfn(void *astparam)
3209 {
3210         log_print("fake_astfn should not be called");
3211 }
3212
3213 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3214                                 struct dlm_message *ms)
3215 {
3216         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3217         lkb->lkb_ownpid = ms->m_pid;
3218         lkb->lkb_remid = ms->m_lkid;
3219         lkb->lkb_grmode = DLM_LOCK_IV;
3220         lkb->lkb_rqmode = ms->m_rqmode;
3221
3222         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3223         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3224
3225         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3226                 /* lkb was just created so there won't be an lvb yet */
3227                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3228                 if (!lkb->lkb_lvbptr)
3229                         return -ENOMEM;
3230         }
3231
3232         return 0;
3233 }
3234
3235 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3236                                 struct dlm_message *ms)
3237 {
3238         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3239                 return -EBUSY;
3240
3241         if (receive_lvb(ls, lkb, ms))
3242                 return -ENOMEM;
3243
3244         lkb->lkb_rqmode = ms->m_rqmode;
3245         lkb->lkb_lvbseq = ms->m_lvbseq;
3246
3247         return 0;
3248 }
3249
3250 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3251                                struct dlm_message *ms)
3252 {
3253         if (receive_lvb(ls, lkb, ms))
3254                 return -ENOMEM;
3255         return 0;
3256 }
3257
3258 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3259    uses to send a reply and that the remote end uses to process the reply. */
3260
3261 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3264         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3265         lkb->lkb_remid = ms->m_lkid;
3266 }
3267
3268 /* This is called after the rsb is locked so that we can safely inspect
3269    fields in the lkb. */
3270
3271 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3272 {
3273         int from = ms->m_header.h_nodeid;
3274         int error = 0;
3275
3276         switch (ms->m_type) {
3277         case DLM_MSG_CONVERT:
3278         case DLM_MSG_UNLOCK:
3279         case DLM_MSG_CANCEL:
3280                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3281                         error = -EINVAL;
3282                 break;
3283
3284         case DLM_MSG_CONVERT_REPLY:
3285         case DLM_MSG_UNLOCK_REPLY:
3286         case DLM_MSG_CANCEL_REPLY:
3287         case DLM_MSG_GRANT:
3288         case DLM_MSG_BAST:
3289                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3290                         error = -EINVAL;
3291                 break;
3292
3293         case DLM_MSG_REQUEST_REPLY:
3294                 if (!is_process_copy(lkb))
3295                         error = -EINVAL;
3296                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3297                         error = -EINVAL;
3298                 break;
3299
3300         default:
3301                 error = -EINVAL;
3302         }
3303
3304         if (error)
3305                 log_error(lkb->lkb_resource->res_ls,
3306                           "ignore invalid message %d from %d %x %x %x %d",
3307                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3308                           lkb->lkb_flags, lkb->lkb_nodeid);
3309         return error;
3310 }
3311
3312 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3313 {
3314         struct dlm_lkb *lkb;
3315         struct dlm_rsb *r;
3316         int error, namelen;
3317
3318         error = create_lkb(ls, &lkb);
3319         if (error)
3320                 goto fail;
3321
3322         receive_flags(lkb, ms);
3323         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3324         error = receive_request_args(ls, lkb, ms);
3325         if (error) {
3326                 __put_lkb(ls, lkb);
3327                 goto fail;
3328         }
3329
3330         namelen = receive_extralen(ms);
3331
3332         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3333         if (error) {
3334                 __put_lkb(ls, lkb);
3335                 goto fail;
3336         }
3337
3338         lock_rsb(r);
3339
3340         attach_lkb(r, lkb);
3341         error = do_request(r, lkb);
3342         send_request_reply(r, lkb, error);
3343         do_request_effects(r, lkb, error);
3344
3345         unlock_rsb(r);
3346         put_rsb(r);
3347
3348         if (error == -EINPROGRESS)
3349                 error = 0;
3350         if (error)
3351                 dlm_put_lkb(lkb);
3352         return;
3353
3354  fail:
3355         setup_stub_lkb(ls, ms);
3356         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3357 }
3358
3359 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3360 {
3361         struct dlm_lkb *lkb;
3362         struct dlm_rsb *r;
3363         int error, reply = 1;
3364
3365         error = find_lkb(ls, ms->m_remid, &lkb);
3366         if (error)
3367                 goto fail;
3368
3369         r = lkb->lkb_resource;
3370
3371         hold_rsb(r);
3372         lock_rsb(r);
3373
3374         error = validate_message(lkb, ms);
3375         if (error)
3376                 goto out;
3377
3378         receive_flags(lkb, ms);
3379
3380         error = receive_convert_args(ls, lkb, ms);
3381         if (error) {
3382                 send_convert_reply(r, lkb, error);
3383                 goto out;
3384         }
3385
3386         reply = !down_conversion(lkb);
3387
3388         error = do_convert(r, lkb);
3389         if (reply)
3390                 send_convert_reply(r, lkb, error);