Merge git://git.kernel.org/pub/scm/linux/kernel/git/rusty/linux-2.6-for-linus
[pandora-kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329
330 static int pre_rsb_struct(struct dlm_ls *ls)
331 {
332         struct dlm_rsb *r1, *r2;
333         int count = 0;
334
335         spin_lock(&ls->ls_new_rsb_spin);
336         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337                 spin_unlock(&ls->ls_new_rsb_spin);
338                 return 0;
339         }
340         spin_unlock(&ls->ls_new_rsb_spin);
341
342         r1 = dlm_allocate_rsb(ls);
343         r2 = dlm_allocate_rsb(ls);
344
345         spin_lock(&ls->ls_new_rsb_spin);
346         if (r1) {
347                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348                 ls->ls_new_rsb_count++;
349         }
350         if (r2) {
351                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352                 ls->ls_new_rsb_count++;
353         }
354         count = ls->ls_new_rsb_count;
355         spin_unlock(&ls->ls_new_rsb_spin);
356
357         if (!count)
358                 return -ENOMEM;
359         return 0;
360 }
361
362 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363    unlock any spinlocks, go back and call pre_rsb_struct again.
364    Otherwise, take an rsb off the list and return it. */
365
366 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367                           struct dlm_rsb **r_ret)
368 {
369         struct dlm_rsb *r;
370         int count;
371
372         spin_lock(&ls->ls_new_rsb_spin);
373         if (list_empty(&ls->ls_new_rsb)) {
374                 count = ls->ls_new_rsb_count;
375                 spin_unlock(&ls->ls_new_rsb_spin);
376                 log_debug(ls, "find_rsb retry %d %d %s",
377                           count, dlm_config.ci_new_rsb_count, name);
378                 return -EAGAIN;
379         }
380
381         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382         list_del(&r->res_hashchain);
383         ls->ls_new_rsb_count--;
384         spin_unlock(&ls->ls_new_rsb_spin);
385
386         r->res_ls = ls;
387         r->res_length = len;
388         memcpy(r->res_name, name, len);
389         mutex_init(&r->res_mutex);
390
391         INIT_LIST_HEAD(&r->res_hashchain);
392         INIT_LIST_HEAD(&r->res_lookup);
393         INIT_LIST_HEAD(&r->res_grantqueue);
394         INIT_LIST_HEAD(&r->res_convertqueue);
395         INIT_LIST_HEAD(&r->res_waitqueue);
396         INIT_LIST_HEAD(&r->res_root_list);
397         INIT_LIST_HEAD(&r->res_recover_list);
398
399         *r_ret = r;
400         return 0;
401 }
402
403 static int search_rsb_list(struct list_head *head, char *name, int len,
404                            unsigned int flags, struct dlm_rsb **r_ret)
405 {
406         struct dlm_rsb *r;
407         int error = 0;
408
409         list_for_each_entry(r, head, res_hashchain) {
410                 if (len == r->res_length && !memcmp(name, r->res_name, len))
411                         goto found;
412         }
413         *r_ret = NULL;
414         return -EBADR;
415
416  found:
417         if (r->res_nodeid && (flags & R_MASTER))
418                 error = -ENOTBLK;
419         *r_ret = r;
420         return error;
421 }
422
423 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
424                        unsigned int flags, struct dlm_rsb **r_ret)
425 {
426         struct dlm_rsb *r;
427         int error;
428
429         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
430         if (!error) {
431                 kref_get(&r->res_ref);
432                 goto out;
433         }
434         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
435         if (error)
436                 goto out;
437
438         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
439
440         if (dlm_no_directory(ls))
441                 goto out;
442
443         if (r->res_nodeid == -1) {
444                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
445                 r->res_first_lkid = 0;
446         } else if (r->res_nodeid > 0) {
447                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
448                 r->res_first_lkid = 0;
449         } else {
450                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
451                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
452         }
453  out:
454         *r_ret = r;
455         return error;
456 }
457
458 /*
459  * Find rsb in rsbtbl and potentially create/add one
460  *
461  * Delaying the release of rsb's has a similar benefit to applications keeping
462  * NL locks on an rsb, but without the guarantee that the cached master value
463  * will still be valid when the rsb is reused.  Apps aren't always smart enough
464  * to keep NL locks on an rsb that they may lock again shortly; this can lead
465  * to excessive master lookups and removals if we don't delay the release.
466  *
467  * Searching for an rsb means looking through both the normal list and toss
468  * list.  When found on the toss list the rsb is moved to the normal list with
469  * ref count of 1; when found on normal list the ref count is incremented.
470  */
471
472 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
473                     unsigned int flags, struct dlm_rsb **r_ret)
474 {
475         struct dlm_rsb *r = NULL;
476         uint32_t hash, bucket;
477         int error;
478
479         if (namelen > DLM_RESNAME_MAXLEN) {
480                 error = -EINVAL;
481                 goto out;
482         }
483
484         if (dlm_no_directory(ls))
485                 flags |= R_CREATE;
486
487         hash = jhash(name, namelen, 0);
488         bucket = hash & (ls->ls_rsbtbl_size - 1);
489
490  retry:
491         if (flags & R_CREATE) {
492                 error = pre_rsb_struct(ls);
493                 if (error < 0)
494                         goto out;
495         }
496
497         spin_lock(&ls->ls_rsbtbl[bucket].lock);
498
499         error = _search_rsb(ls, name, namelen, bucket, flags, &r);
500         if (!error)
501                 goto out_unlock;
502
503         if (error == -EBADR && !(flags & R_CREATE))
504                 goto out_unlock;
505
506         /* the rsb was found but wasn't a master copy */
507         if (error == -ENOTBLK)
508                 goto out_unlock;
509
510         error = get_rsb_struct(ls, name, namelen, &r);
511         if (error == -EAGAIN) {
512                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
513                 goto retry;
514         }
515         if (error)
516                 goto out_unlock;
517
518         r->res_hash = hash;
519         r->res_bucket = bucket;
520         r->res_nodeid = -1;
521         kref_init(&r->res_ref);
522
523         /* With no directory, the master can be set immediately */
524         if (dlm_no_directory(ls)) {
525                 int nodeid = dlm_dir_nodeid(r);
526                 if (nodeid == dlm_our_nodeid())
527                         nodeid = 0;
528                 r->res_nodeid = nodeid;
529         }
530         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
531         error = 0;
532  out_unlock:
533         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
534  out:
535         *r_ret = r;
536         return error;
537 }
538
539 /* This is only called to add a reference when the code already holds
540    a valid reference to the rsb, so there's no need for locking. */
541
542 static inline void hold_rsb(struct dlm_rsb *r)
543 {
544         kref_get(&r->res_ref);
545 }
546
547 void dlm_hold_rsb(struct dlm_rsb *r)
548 {
549         hold_rsb(r);
550 }
551
552 static void toss_rsb(struct kref *kref)
553 {
554         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555         struct dlm_ls *ls = r->res_ls;
556
557         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
558         kref_init(&r->res_ref);
559         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
560         r->res_toss_time = jiffies;
561         if (r->res_lvbptr) {
562                 dlm_free_lvb(r->res_lvbptr);
563                 r->res_lvbptr = NULL;
564         }
565 }
566
567 /* When all references to the rsb are gone it's transferred to
568    the tossed list for later disposal. */
569
570 static void put_rsb(struct dlm_rsb *r)
571 {
572         struct dlm_ls *ls = r->res_ls;
573         uint32_t bucket = r->res_bucket;
574
575         spin_lock(&ls->ls_rsbtbl[bucket].lock);
576         kref_put(&r->res_ref, toss_rsb);
577         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
578 }
579
580 void dlm_put_rsb(struct dlm_rsb *r)
581 {
582         put_rsb(r);
583 }
584
585 /* See comment for unhold_lkb */
586
587 static void unhold_rsb(struct dlm_rsb *r)
588 {
589         int rv;
590         rv = kref_put(&r->res_ref, toss_rsb);
591         DLM_ASSERT(!rv, dlm_dump_rsb(r););
592 }
593
594 static void kill_rsb(struct kref *kref)
595 {
596         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
597
598         /* All work is done after the return from kref_put() so we
599            can release the write_lock before the remove and free. */
600
601         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
602         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
603         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
604         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
605         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
606         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
607 }
608
609 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
610    The rsb must exist as long as any lkb's for it do. */
611
612 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
613 {
614         hold_rsb(r);
615         lkb->lkb_resource = r;
616 }
617
618 static void detach_lkb(struct dlm_lkb *lkb)
619 {
620         if (lkb->lkb_resource) {
621                 put_rsb(lkb->lkb_resource);
622                 lkb->lkb_resource = NULL;
623         }
624 }
625
626 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
627 {
628         struct dlm_lkb *lkb;
629         int rv, id;
630
631         lkb = dlm_allocate_lkb(ls);
632         if (!lkb)
633                 return -ENOMEM;
634
635         lkb->lkb_nodeid = -1;
636         lkb->lkb_grmode = DLM_LOCK_IV;
637         kref_init(&lkb->lkb_ref);
638         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
639         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
640         INIT_LIST_HEAD(&lkb->lkb_time_list);
641         INIT_LIST_HEAD(&lkb->lkb_cb_list);
642         mutex_init(&lkb->lkb_cb_mutex);
643         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
644
645  retry:
646         rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
647         if (!rv)
648                 return -ENOMEM;
649
650         spin_lock(&ls->ls_lkbidr_spin);
651         rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
652         if (!rv)
653                 lkb->lkb_id = id;
654         spin_unlock(&ls->ls_lkbidr_spin);
655
656         if (rv == -EAGAIN)
657                 goto retry;
658
659         if (rv < 0) {
660                 log_error(ls, "create_lkb idr error %d", rv);
661                 return rv;
662         }
663
664         *lkb_ret = lkb;
665         return 0;
666 }
667
668 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
669 {
670         struct dlm_lkb *lkb;
671
672         spin_lock(&ls->ls_lkbidr_spin);
673         lkb = idr_find(&ls->ls_lkbidr, lkid);
674         if (lkb)
675                 kref_get(&lkb->lkb_ref);
676         spin_unlock(&ls->ls_lkbidr_spin);
677
678         *lkb_ret = lkb;
679         return lkb ? 0 : -ENOENT;
680 }
681
682 static void kill_lkb(struct kref *kref)
683 {
684         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
685
686         /* All work is done after the return from kref_put() so we
687            can release the write_lock before the detach_lkb */
688
689         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
690 }
691
692 /* __put_lkb() is used when an lkb may not have an rsb attached to
693    it so we need to provide the lockspace explicitly */
694
695 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
696 {
697         uint32_t lkid = lkb->lkb_id;
698
699         spin_lock(&ls->ls_lkbidr_spin);
700         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
701                 idr_remove(&ls->ls_lkbidr, lkid);
702                 spin_unlock(&ls->ls_lkbidr_spin);
703
704                 detach_lkb(lkb);
705
706                 /* for local/process lkbs, lvbptr points to caller's lksb */
707                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
708                         dlm_free_lvb(lkb->lkb_lvbptr);
709                 dlm_free_lkb(lkb);
710                 return 1;
711         } else {
712                 spin_unlock(&ls->ls_lkbidr_spin);
713                 return 0;
714         }
715 }
716
717 int dlm_put_lkb(struct dlm_lkb *lkb)
718 {
719         struct dlm_ls *ls;
720
721         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
722         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
723
724         ls = lkb->lkb_resource->res_ls;
725         return __put_lkb(ls, lkb);
726 }
727
728 /* This is only called to add a reference when the code already holds
729    a valid reference to the lkb, so there's no need for locking. */
730
731 static inline void hold_lkb(struct dlm_lkb *lkb)
732 {
733         kref_get(&lkb->lkb_ref);
734 }
735
736 /* This is called when we need to remove a reference and are certain
737    it's not the last ref.  e.g. del_lkb is always called between a
738    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
739    put_lkb would work fine, but would involve unnecessary locking */
740
741 static inline void unhold_lkb(struct dlm_lkb *lkb)
742 {
743         int rv;
744         rv = kref_put(&lkb->lkb_ref, kill_lkb);
745         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
746 }
747
748 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
749                             int mode)
750 {
751         struct dlm_lkb *lkb = NULL;
752
753         list_for_each_entry(lkb, head, lkb_statequeue)
754                 if (lkb->lkb_rqmode < mode)
755                         break;
756
757         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
758 }
759
760 /* add/remove lkb to rsb's grant/convert/wait queue */
761
762 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
763 {
764         kref_get(&lkb->lkb_ref);
765
766         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
767
768         lkb->lkb_timestamp = ktime_get();
769
770         lkb->lkb_status = status;
771
772         switch (status) {
773         case DLM_LKSTS_WAITING:
774                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
775                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
776                 else
777                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
778                 break;
779         case DLM_LKSTS_GRANTED:
780                 /* convention says granted locks kept in order of grmode */
781                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
782                                 lkb->lkb_grmode);
783                 break;
784         case DLM_LKSTS_CONVERT:
785                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
786                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
787                 else
788                         list_add_tail(&lkb->lkb_statequeue,
789                                       &r->res_convertqueue);
790                 break;
791         default:
792                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
793         }
794 }
795
796 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
797 {
798         lkb->lkb_status = 0;
799         list_del(&lkb->lkb_statequeue);
800         unhold_lkb(lkb);
801 }
802
803 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
804 {
805         hold_lkb(lkb);
806         del_lkb(r, lkb);
807         add_lkb(r, lkb, sts);
808         unhold_lkb(lkb);
809 }
810
811 static int msg_reply_type(int mstype)
812 {
813         switch (mstype) {
814         case DLM_MSG_REQUEST:
815                 return DLM_MSG_REQUEST_REPLY;
816         case DLM_MSG_CONVERT:
817                 return DLM_MSG_CONVERT_REPLY;
818         case DLM_MSG_UNLOCK:
819                 return DLM_MSG_UNLOCK_REPLY;
820         case DLM_MSG_CANCEL:
821                 return DLM_MSG_CANCEL_REPLY;
822         case DLM_MSG_LOOKUP:
823                 return DLM_MSG_LOOKUP_REPLY;
824         }
825         return -1;
826 }
827
828 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
829 {
830         int i;
831
832         for (i = 0; i < num_nodes; i++) {
833                 if (!warned[i]) {
834                         warned[i] = nodeid;
835                         return 0;
836                 }
837                 if (warned[i] == nodeid)
838                         return 1;
839         }
840         return 0;
841 }
842
843 void dlm_scan_waiters(struct dlm_ls *ls)
844 {
845         struct dlm_lkb *lkb;
846         ktime_t zero = ktime_set(0, 0);
847         s64 us;
848         s64 debug_maxus = 0;
849         u32 debug_scanned = 0;
850         u32 debug_expired = 0;
851         int num_nodes = 0;
852         int *warned = NULL;
853
854         if (!dlm_config.ci_waitwarn_us)
855                 return;
856
857         mutex_lock(&ls->ls_waiters_mutex);
858
859         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
860                 if (ktime_equal(lkb->lkb_wait_time, zero))
861                         continue;
862
863                 debug_scanned++;
864
865                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
866
867                 if (us < dlm_config.ci_waitwarn_us)
868                         continue;
869
870                 lkb->lkb_wait_time = zero;
871
872                 debug_expired++;
873                 if (us > debug_maxus)
874                         debug_maxus = us;
875
876                 if (!num_nodes) {
877                         num_nodes = ls->ls_num_nodes;
878                         warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
879                 }
880                 if (!warned)
881                         continue;
882                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
883                         continue;
884
885                 log_error(ls, "waitwarn %x %lld %d us check connection to "
886                           "node %d", lkb->lkb_id, (long long)us,
887                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
888         }
889         mutex_unlock(&ls->ls_waiters_mutex);
890         kfree(warned);
891
892         if (debug_expired)
893                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
894                           debug_scanned, debug_expired,
895                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
896 }
897
898 /* add/remove lkb from global waiters list of lkb's waiting for
899    a reply from a remote node */
900
901 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
902 {
903         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
904         int error = 0;
905
906         mutex_lock(&ls->ls_waiters_mutex);
907
908         if (is_overlap_unlock(lkb) ||
909             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
910                 error = -EINVAL;
911                 goto out;
912         }
913
914         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
915                 switch (mstype) {
916                 case DLM_MSG_UNLOCK:
917                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
918                         break;
919                 case DLM_MSG_CANCEL:
920                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
921                         break;
922                 default:
923                         error = -EBUSY;
924                         goto out;
925                 }
926                 lkb->lkb_wait_count++;
927                 hold_lkb(lkb);
928
929                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
930                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
931                           lkb->lkb_wait_count, lkb->lkb_flags);
932                 goto out;
933         }
934
935         DLM_ASSERT(!lkb->lkb_wait_count,
936                    dlm_print_lkb(lkb);
937                    printk("wait_count %d\n", lkb->lkb_wait_count););
938
939         lkb->lkb_wait_count++;
940         lkb->lkb_wait_type = mstype;
941         lkb->lkb_wait_time = ktime_get();
942         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
943         hold_lkb(lkb);
944         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
945  out:
946         if (error)
947                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
948                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
949                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
950         mutex_unlock(&ls->ls_waiters_mutex);
951         return error;
952 }
953
954 /* We clear the RESEND flag because we might be taking an lkb off the waiters
955    list as part of process_requestqueue (e.g. a lookup that has an optimized
956    request reply on the requestqueue) between dlm_recover_waiters_pre() which
957    set RESEND and dlm_recover_waiters_post() */
958
959 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
960                                 struct dlm_message *ms)
961 {
962         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
963         int overlap_done = 0;
964
965         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
966                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
967                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
968                 overlap_done = 1;
969                 goto out_del;
970         }
971
972         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
973                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
974                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
975                 overlap_done = 1;
976                 goto out_del;
977         }
978
979         /* Cancel state was preemptively cleared by a successful convert,
980            see next comment, nothing to do. */
981
982         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
983             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
984                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
985                           lkb->lkb_id, lkb->lkb_wait_type);
986                 return -1;
987         }
988
989         /* Remove for the convert reply, and premptively remove for the
990            cancel reply.  A convert has been granted while there's still
991            an outstanding cancel on it (the cancel is moot and the result
992            in the cancel reply should be 0).  We preempt the cancel reply
993            because the app gets the convert result and then can follow up
994            with another op, like convert.  This subsequent op would see the
995            lingering state of the cancel and fail with -EBUSY. */
996
997         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
998             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
999             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1000                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1001                           lkb->lkb_id);
1002                 lkb->lkb_wait_type = 0;
1003                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1004                 lkb->lkb_wait_count--;
1005                 goto out_del;
1006         }
1007
1008         /* N.B. type of reply may not always correspond to type of original
1009            msg due to lookup->request optimization, verify others? */
1010
1011         if (lkb->lkb_wait_type) {
1012                 lkb->lkb_wait_type = 0;
1013                 goto out_del;
1014         }
1015
1016         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1017                   lkb->lkb_id, mstype, lkb->lkb_flags);
1018         return -1;
1019
1020  out_del:
1021         /* the force-unlock/cancel has completed and we haven't recvd a reply
1022            to the op that was in progress prior to the unlock/cancel; we
1023            give up on any reply to the earlier op.  FIXME: not sure when/how
1024            this would happen */
1025
1026         if (overlap_done && lkb->lkb_wait_type) {
1027                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1028                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1029                 lkb->lkb_wait_count--;
1030                 lkb->lkb_wait_type = 0;
1031         }
1032
1033         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1034
1035         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1036         lkb->lkb_wait_count--;
1037         if (!lkb->lkb_wait_count)
1038                 list_del_init(&lkb->lkb_wait_reply);
1039         unhold_lkb(lkb);
1040         return 0;
1041 }
1042
1043 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1044 {
1045         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1046         int error;
1047
1048         mutex_lock(&ls->ls_waiters_mutex);
1049         error = _remove_from_waiters(lkb, mstype, NULL);
1050         mutex_unlock(&ls->ls_waiters_mutex);
1051         return error;
1052 }
1053
1054 /* Handles situations where we might be processing a "fake" or "stub" reply in
1055    which we can't try to take waiters_mutex again. */
1056
1057 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1058 {
1059         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1060         int error;
1061
1062         if (ms->m_flags != DLM_IFL_STUB_MS)
1063                 mutex_lock(&ls->ls_waiters_mutex);
1064         error = _remove_from_waiters(lkb, ms->m_type, ms);
1065         if (ms->m_flags != DLM_IFL_STUB_MS)
1066                 mutex_unlock(&ls->ls_waiters_mutex);
1067         return error;
1068 }
1069
1070 static void dir_remove(struct dlm_rsb *r)
1071 {
1072         int to_nodeid;
1073
1074         if (dlm_no_directory(r->res_ls))
1075                 return;
1076
1077         to_nodeid = dlm_dir_nodeid(r);
1078         if (to_nodeid != dlm_our_nodeid())
1079                 send_remove(r);
1080         else
1081                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
1082                                      r->res_name, r->res_length);
1083 }
1084
1085 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1086    found since they are in order of newest to oldest? */
1087
1088 static int shrink_bucket(struct dlm_ls *ls, int b)
1089 {
1090         struct dlm_rsb *r;
1091         int count = 0, found;
1092
1093         for (;;) {
1094                 found = 0;
1095                 spin_lock(&ls->ls_rsbtbl[b].lock);
1096                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1097                                             res_hashchain) {
1098                         if (!time_after_eq(jiffies, r->res_toss_time +
1099                                            dlm_config.ci_toss_secs * HZ))
1100                                 continue;
1101                         found = 1;
1102                         break;
1103                 }
1104
1105                 if (!found) {
1106                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1107                         break;
1108                 }
1109
1110                 if (kref_put(&r->res_ref, kill_rsb)) {
1111                         list_del(&r->res_hashchain);
1112                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1113
1114                         if (is_master(r))
1115                                 dir_remove(r);
1116                         dlm_free_rsb(r);
1117                         count++;
1118                 } else {
1119                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1120                         log_error(ls, "tossed rsb in use %s", r->res_name);
1121                 }
1122         }
1123
1124         return count;
1125 }
1126
1127 void dlm_scan_rsbs(struct dlm_ls *ls)
1128 {
1129         int i;
1130
1131         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1132                 shrink_bucket(ls, i);
1133                 if (dlm_locking_stopped(ls))
1134                         break;
1135                 cond_resched();
1136         }
1137 }
1138
1139 static void add_timeout(struct dlm_lkb *lkb)
1140 {
1141         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1142
1143         if (is_master_copy(lkb))
1144                 return;
1145
1146         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1147             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1148                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1149                 goto add_it;
1150         }
1151         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1152                 goto add_it;
1153         return;
1154
1155  add_it:
1156         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1157         mutex_lock(&ls->ls_timeout_mutex);
1158         hold_lkb(lkb);
1159         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1160         mutex_unlock(&ls->ls_timeout_mutex);
1161 }
1162
1163 static void del_timeout(struct dlm_lkb *lkb)
1164 {
1165         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1166
1167         mutex_lock(&ls->ls_timeout_mutex);
1168         if (!list_empty(&lkb->lkb_time_list)) {
1169                 list_del_init(&lkb->lkb_time_list);
1170                 unhold_lkb(lkb);
1171         }
1172         mutex_unlock(&ls->ls_timeout_mutex);
1173 }
1174
1175 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1176    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1177    and then lock rsb because of lock ordering in add_timeout.  We may need
1178    to specify some special timeout-related bits in the lkb that are just to
1179    be accessed under the timeout_mutex. */
1180
1181 void dlm_scan_timeout(struct dlm_ls *ls)
1182 {
1183         struct dlm_rsb *r;
1184         struct dlm_lkb *lkb;
1185         int do_cancel, do_warn;
1186         s64 wait_us;
1187
1188         for (;;) {
1189                 if (dlm_locking_stopped(ls))
1190                         break;
1191
1192                 do_cancel = 0;
1193                 do_warn = 0;
1194                 mutex_lock(&ls->ls_timeout_mutex);
1195                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1196
1197                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1198                                                         lkb->lkb_timestamp));
1199
1200                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1201                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1202                                 do_cancel = 1;
1203
1204                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1205                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1206                                 do_warn = 1;
1207
1208                         if (!do_cancel && !do_warn)
1209                                 continue;
1210                         hold_lkb(lkb);
1211                         break;
1212                 }
1213                 mutex_unlock(&ls->ls_timeout_mutex);
1214
1215                 if (!do_cancel && !do_warn)
1216                         break;
1217
1218                 r = lkb->lkb_resource;
1219                 hold_rsb(r);
1220                 lock_rsb(r);
1221
1222                 if (do_warn) {
1223                         /* clear flag so we only warn once */
1224                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1225                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1226                                 del_timeout(lkb);
1227                         dlm_timeout_warn(lkb);
1228                 }
1229
1230                 if (do_cancel) {
1231                         log_debug(ls, "timeout cancel %x node %d %s",
1232                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1233                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1234                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1235                         del_timeout(lkb);
1236                         _cancel_lock(r, lkb);
1237                 }
1238
1239                 unlock_rsb(r);
1240                 unhold_rsb(r);
1241                 dlm_put_lkb(lkb);
1242         }
1243 }
1244
1245 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1246    dlm_recoverd before checking/setting ls_recover_begin. */
1247
1248 void dlm_adjust_timeouts(struct dlm_ls *ls)
1249 {
1250         struct dlm_lkb *lkb;
1251         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1252
1253         ls->ls_recover_begin = 0;
1254         mutex_lock(&ls->ls_timeout_mutex);
1255         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1256                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1257         mutex_unlock(&ls->ls_timeout_mutex);
1258
1259         if (!dlm_config.ci_waitwarn_us)
1260                 return;
1261
1262         mutex_lock(&ls->ls_waiters_mutex);
1263         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1264                 if (ktime_to_us(lkb->lkb_wait_time))
1265                         lkb->lkb_wait_time = ktime_get();
1266         }
1267         mutex_unlock(&ls->ls_waiters_mutex);
1268 }
1269
1270 /* lkb is master or local copy */
1271
1272 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273 {
1274         int b, len = r->res_ls->ls_lvblen;
1275
1276         /* b=1 lvb returned to caller
1277            b=0 lvb written to rsb or invalidated
1278            b=-1 do nothing */
1279
1280         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1281
1282         if (b == 1) {
1283                 if (!lkb->lkb_lvbptr)
1284                         return;
1285
1286                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1287                         return;
1288
1289                 if (!r->res_lvbptr)
1290                         return;
1291
1292                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1293                 lkb->lkb_lvbseq = r->res_lvbseq;
1294
1295         } else if (b == 0) {
1296                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1297                         rsb_set_flag(r, RSB_VALNOTVALID);
1298                         return;
1299                 }
1300
1301                 if (!lkb->lkb_lvbptr)
1302                         return;
1303
1304                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1305                         return;
1306
1307                 if (!r->res_lvbptr)
1308                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1309
1310                 if (!r->res_lvbptr)
1311                         return;
1312
1313                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1314                 r->res_lvbseq++;
1315                 lkb->lkb_lvbseq = r->res_lvbseq;
1316                 rsb_clear_flag(r, RSB_VALNOTVALID);
1317         }
1318
1319         if (rsb_flag(r, RSB_VALNOTVALID))
1320                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1321 }
1322
1323 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325         if (lkb->lkb_grmode < DLM_LOCK_PW)
1326                 return;
1327
1328         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1329                 rsb_set_flag(r, RSB_VALNOTVALID);
1330                 return;
1331         }
1332
1333         if (!lkb->lkb_lvbptr)
1334                 return;
1335
1336         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1337                 return;
1338
1339         if (!r->res_lvbptr)
1340                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1341
1342         if (!r->res_lvbptr)
1343                 return;
1344
1345         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1346         r->res_lvbseq++;
1347         rsb_clear_flag(r, RSB_VALNOTVALID);
1348 }
1349
1350 /* lkb is process copy (pc) */
1351
1352 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1353                             struct dlm_message *ms)
1354 {
1355         int b;
1356
1357         if (!lkb->lkb_lvbptr)
1358                 return;
1359
1360         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1361                 return;
1362
1363         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1364         if (b == 1) {
1365                 int len = receive_extralen(ms);
1366                 if (len > DLM_RESNAME_MAXLEN)
1367                         len = DLM_RESNAME_MAXLEN;
1368                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1369                 lkb->lkb_lvbseq = ms->m_lvbseq;
1370         }
1371 }
1372
1373 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1374    remove_lock -- used for unlock, removes lkb from granted
1375    revert_lock -- used for cancel, moves lkb from convert to granted
1376    grant_lock  -- used for request and convert, adds lkb to granted or
1377                   moves lkb from convert or waiting to granted
1378
1379    Each of these is used for master or local copy lkb's.  There is
1380    also a _pc() variation used to make the corresponding change on
1381    a process copy (pc) lkb. */
1382
1383 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1384 {
1385         del_lkb(r, lkb);
1386         lkb->lkb_grmode = DLM_LOCK_IV;
1387         /* this unhold undoes the original ref from create_lkb()
1388            so this leads to the lkb being freed */
1389         unhold_lkb(lkb);
1390 }
1391
1392 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1393 {
1394         set_lvb_unlock(r, lkb);
1395         _remove_lock(r, lkb);
1396 }
1397
1398 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1399 {
1400         _remove_lock(r, lkb);
1401 }
1402
1403 /* returns: 0 did nothing
1404             1 moved lock to granted
1405            -1 removed lock */
1406
1407 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408 {
1409         int rv = 0;
1410
1411         lkb->lkb_rqmode = DLM_LOCK_IV;
1412
1413         switch (lkb->lkb_status) {
1414         case DLM_LKSTS_GRANTED:
1415                 break;
1416         case DLM_LKSTS_CONVERT:
1417                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1418                 rv = 1;
1419                 break;
1420         case DLM_LKSTS_WAITING:
1421                 del_lkb(r, lkb);
1422                 lkb->lkb_grmode = DLM_LOCK_IV;
1423                 /* this unhold undoes the original ref from create_lkb()
1424                    so this leads to the lkb being freed */
1425                 unhold_lkb(lkb);
1426                 rv = -1;
1427                 break;
1428         default:
1429                 log_print("invalid status for revert %d", lkb->lkb_status);
1430         }
1431         return rv;
1432 }
1433
1434 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436         return revert_lock(r, lkb);
1437 }
1438
1439 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1440 {
1441         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1442                 lkb->lkb_grmode = lkb->lkb_rqmode;
1443                 if (lkb->lkb_status)
1444                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1445                 else
1446                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1447         }
1448
1449         lkb->lkb_rqmode = DLM_LOCK_IV;
1450 }
1451
1452 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1453 {
1454         set_lvb_lock(r, lkb);
1455         _grant_lock(r, lkb);
1456         lkb->lkb_highbast = 0;
1457 }
1458
1459 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1460                           struct dlm_message *ms)
1461 {
1462         set_lvb_lock_pc(r, lkb, ms);
1463         _grant_lock(r, lkb);
1464 }
1465
1466 /* called by grant_pending_locks() which means an async grant message must
1467    be sent to the requesting node in addition to granting the lock if the
1468    lkb belongs to a remote node. */
1469
1470 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472         grant_lock(r, lkb);
1473         if (is_master_copy(lkb))
1474                 send_grant(r, lkb);
1475         else
1476                 queue_cast(r, lkb, 0);
1477 }
1478
1479 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1480    change the granted/requested modes.  We're munging things accordingly in
1481    the process copy.
1482    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1483    conversion deadlock
1484    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1485    compatible with other granted locks */
1486
1487 static void munge_demoted(struct dlm_lkb *lkb)
1488 {
1489         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1490                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1491                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1492                 return;
1493         }
1494
1495         lkb->lkb_grmode = DLM_LOCK_NL;
1496 }
1497
1498 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1499 {
1500         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1501             ms->m_type != DLM_MSG_GRANT) {
1502                 log_print("munge_altmode %x invalid reply type %d",
1503                           lkb->lkb_id, ms->m_type);
1504                 return;
1505         }
1506
1507         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1508                 lkb->lkb_rqmode = DLM_LOCK_PR;
1509         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1510                 lkb->lkb_rqmode = DLM_LOCK_CW;
1511         else {
1512                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1513                 dlm_print_lkb(lkb);
1514         }
1515 }
1516
1517 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1518 {
1519         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1520                                            lkb_statequeue);
1521         if (lkb->lkb_id == first->lkb_id)
1522                 return 1;
1523
1524         return 0;
1525 }
1526
1527 /* Check if the given lkb conflicts with another lkb on the queue. */
1528
1529 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1530 {
1531         struct dlm_lkb *this;
1532
1533         list_for_each_entry(this, head, lkb_statequeue) {
1534                 if (this == lkb)
1535                         continue;
1536                 if (!modes_compat(this, lkb))
1537                         return 1;
1538         }
1539         return 0;
1540 }
1541
1542 /*
1543  * "A conversion deadlock arises with a pair of lock requests in the converting
1544  * queue for one resource.  The granted mode of each lock blocks the requested
1545  * mode of the other lock."
1546  *
1547  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1548  * convert queue from being granted, then deadlk/demote lkb.
1549  *
1550  * Example:
1551  * Granted Queue: empty
1552  * Convert Queue: NL->EX (first lock)
1553  *                PR->EX (second lock)
1554  *
1555  * The first lock can't be granted because of the granted mode of the second
1556  * lock and the second lock can't be granted because it's not first in the
1557  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1558  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1559  * flag set and return DEMOTED in the lksb flags.
1560  *
1561  * Originally, this function detected conv-deadlk in a more limited scope:
1562  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1563  * - if lkb1 was the first entry in the queue (not just earlier), and was
1564  *   blocked by the granted mode of lkb2, and there was nothing on the
1565  *   granted queue preventing lkb1 from being granted immediately, i.e.
1566  *   lkb2 was the only thing preventing lkb1 from being granted.
1567  *
1568  * That second condition meant we'd only say there was conv-deadlk if
1569  * resolving it (by demotion) would lead to the first lock on the convert
1570  * queue being granted right away.  It allowed conversion deadlocks to exist
1571  * between locks on the convert queue while they couldn't be granted anyway.
1572  *
1573  * Now, we detect and take action on conversion deadlocks immediately when
1574  * they're created, even if they may not be immediately consequential.  If
1575  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1576  * mode that would prevent lkb1's conversion from being granted, we do a
1577  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1578  * I think this means that the lkb_is_ahead condition below should always
1579  * be zero, i.e. there will never be conv-deadlk between two locks that are
1580  * both already on the convert queue.
1581  */
1582
1583 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1584 {
1585         struct dlm_lkb *lkb1;
1586         int lkb_is_ahead = 0;
1587
1588         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1589                 if (lkb1 == lkb2) {
1590                         lkb_is_ahead = 1;
1591                         continue;
1592                 }
1593
1594                 if (!lkb_is_ahead) {
1595                         if (!modes_compat(lkb2, lkb1))
1596                                 return 1;
1597                 } else {
1598                         if (!modes_compat(lkb2, lkb1) &&
1599                             !modes_compat(lkb1, lkb2))
1600                                 return 1;
1601                 }
1602         }
1603         return 0;
1604 }
1605
1606 /*
1607  * Return 1 if the lock can be granted, 0 otherwise.
1608  * Also detect and resolve conversion deadlocks.
1609  *
1610  * lkb is the lock to be granted
1611  *
1612  * now is 1 if the function is being called in the context of the
1613  * immediate request, it is 0 if called later, after the lock has been
1614  * queued.
1615  *
1616  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1617  */
1618
1619 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1620 {
1621         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1622
1623         /*
1624          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1625          * a new request for a NL mode lock being blocked.
1626          *
1627          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1628          * request, then it would be granted.  In essence, the use of this flag
1629          * tells the Lock Manager to expedite theis request by not considering
1630          * what may be in the CONVERTING or WAITING queues...  As of this
1631          * writing, the EXPEDITE flag can be used only with new requests for NL
1632          * mode locks.  This flag is not valid for conversion requests.
1633          *
1634          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1635          * conversion or used with a non-NL requested mode.  We also know an
1636          * EXPEDITE request is always granted immediately, so now must always
1637          * be 1.  The full condition to grant an expedite request: (now &&
1638          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1639          * therefore be shortened to just checking the flag.
1640          */
1641
1642         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1643                 return 1;
1644
1645         /*
1646          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1647          * added to the remaining conditions.
1648          */
1649
1650         if (queue_conflict(&r->res_grantqueue, lkb))
1651                 goto out;
1652
1653         /*
1654          * 6-3: By default, a conversion request is immediately granted if the
1655          * requested mode is compatible with the modes of all other granted
1656          * locks
1657          */
1658
1659         if (queue_conflict(&r->res_convertqueue, lkb))
1660                 goto out;
1661
1662         /*
1663          * 6-5: But the default algorithm for deciding whether to grant or
1664          * queue conversion requests does not by itself guarantee that such
1665          * requests are serviced on a "first come first serve" basis.  This, in
1666          * turn, can lead to a phenomenon known as "indefinate postponement".
1667          *
1668          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1669          * the system service employed to request a lock conversion.  This flag
1670          * forces certain conversion requests to be queued, even if they are
1671          * compatible with the granted modes of other locks on the same
1672          * resource.  Thus, the use of this flag results in conversion requests
1673          * being ordered on a "first come first servce" basis.
1674          *
1675          * DCT: This condition is all about new conversions being able to occur
1676          * "in place" while the lock remains on the granted queue (assuming
1677          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1678          * doesn't _have_ to go onto the convert queue where it's processed in
1679          * order.  The "now" variable is necessary to distinguish converts
1680          * being received and processed for the first time now, because once a
1681          * convert is moved to the conversion queue the condition below applies
1682          * requiring fifo granting.
1683          */
1684
1685         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1686                 return 1;
1687
1688         /*
1689          * The NOORDER flag is set to avoid the standard vms rules on grant
1690          * order.
1691          */
1692
1693         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1694                 return 1;
1695
1696         /*
1697          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1698          * granted until all other conversion requests ahead of it are granted
1699          * and/or canceled.
1700          */
1701
1702         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1703                 return 1;
1704
1705         /*
1706          * 6-4: By default, a new request is immediately granted only if all
1707          * three of the following conditions are satisfied when the request is
1708          * issued:
1709          * - The queue of ungranted conversion requests for the resource is
1710          *   empty.
1711          * - The queue of ungranted new requests for the resource is empty.
1712          * - The mode of the new request is compatible with the most
1713          *   restrictive mode of all granted locks on the resource.
1714          */
1715
1716         if (now && !conv && list_empty(&r->res_convertqueue) &&
1717             list_empty(&r->res_waitqueue))
1718                 return 1;
1719
1720         /*
1721          * 6-4: Once a lock request is in the queue of ungranted new requests,
1722          * it cannot be granted until the queue of ungranted conversion
1723          * requests is empty, all ungranted new requests ahead of it are
1724          * granted and/or canceled, and it is compatible with the granted mode
1725          * of the most restrictive lock granted on the resource.
1726          */
1727
1728         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1729             first_in_list(lkb, &r->res_waitqueue))
1730                 return 1;
1731  out:
1732         return 0;
1733 }
1734
1735 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1736                           int *err)
1737 {
1738         int rv;
1739         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1740         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1741
1742         if (err)
1743                 *err = 0;
1744
1745         rv = _can_be_granted(r, lkb, now);
1746         if (rv)
1747                 goto out;
1748
1749         /*
1750          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1751          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1752          * cancels one of the locks.
1753          */
1754
1755         if (is_convert && can_be_queued(lkb) &&
1756             conversion_deadlock_detect(r, lkb)) {
1757                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1758                         lkb->lkb_grmode = DLM_LOCK_NL;
1759                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1760                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1761                         if (err)
1762                                 *err = -EDEADLK;
1763                         else {
1764                                 log_print("can_be_granted deadlock %x now %d",
1765                                           lkb->lkb_id, now);
1766                                 dlm_dump_rsb(r);
1767                         }
1768                 }
1769                 goto out;
1770         }
1771
1772         /*
1773          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1774          * to grant a request in a mode other than the normal rqmode.  It's a
1775          * simple way to provide a big optimization to applications that can
1776          * use them.
1777          */
1778
1779         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1780                 alt = DLM_LOCK_PR;
1781         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1782                 alt = DLM_LOCK_CW;
1783
1784         if (alt) {
1785                 lkb->lkb_rqmode = alt;
1786                 rv = _can_be_granted(r, lkb, now);
1787                 if (rv)
1788                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1789                 else
1790                         lkb->lkb_rqmode = rqmode;
1791         }
1792  out:
1793         return rv;
1794 }
1795
1796 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1797    for locks pending on the convert list.  Once verified (watch for these
1798    log_prints), we should be able to just call _can_be_granted() and not
1799    bother with the demote/deadlk cases here (and there's no easy way to deal
1800    with a deadlk here, we'd have to generate something like grant_lock with
1801    the deadlk error.) */
1802
1803 /* Returns the highest requested mode of all blocked conversions; sets
1804    cw if there's a blocked conversion to DLM_LOCK_CW. */
1805
1806 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1807 {
1808         struct dlm_lkb *lkb, *s;
1809         int hi, demoted, quit, grant_restart, demote_restart;
1810         int deadlk;
1811
1812         quit = 0;
1813  restart:
1814         grant_restart = 0;
1815         demote_restart = 0;
1816         hi = DLM_LOCK_IV;
1817
1818         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1819                 demoted = is_demoted(lkb);
1820                 deadlk = 0;
1821
1822                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1823                         grant_lock_pending(r, lkb);
1824                         grant_restart = 1;
1825                         continue;
1826                 }
1827
1828                 if (!demoted && is_demoted(lkb)) {
1829                         log_print("WARN: pending demoted %x node %d %s",
1830                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1831                         demote_restart = 1;
1832                         continue;
1833                 }
1834
1835                 if (deadlk) {
1836                         log_print("WARN: pending deadlock %x node %d %s",
1837                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1838                         dlm_dump_rsb(r);
1839                         continue;
1840                 }
1841
1842                 hi = max_t(int, lkb->lkb_rqmode, hi);
1843
1844                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1845                         *cw = 1;
1846         }
1847
1848         if (grant_restart)
1849                 goto restart;
1850         if (demote_restart && !quit) {
1851                 quit = 1;
1852                 goto restart;
1853         }
1854
1855         return max_t(int, high, hi);
1856 }
1857
1858 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1859 {
1860         struct dlm_lkb *lkb, *s;
1861
1862         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1863                 if (can_be_granted(r, lkb, 0, NULL))
1864                         grant_lock_pending(r, lkb);
1865                 else {
1866                         high = max_t(int, lkb->lkb_rqmode, high);
1867                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1868                                 *cw = 1;
1869                 }
1870         }
1871
1872         return high;
1873 }
1874
1875 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1876    on either the convert or waiting queue.
1877    high is the largest rqmode of all locks blocked on the convert or
1878    waiting queue. */
1879
1880 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1881 {
1882         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1883                 if (gr->lkb_highbast < DLM_LOCK_EX)
1884                         return 1;
1885                 return 0;
1886         }
1887
1888         if (gr->lkb_highbast < high &&
1889             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1890                 return 1;
1891         return 0;
1892 }
1893
1894 static void grant_pending_locks(struct dlm_rsb *r)
1895 {
1896         struct dlm_lkb *lkb, *s;
1897         int high = DLM_LOCK_IV;
1898         int cw = 0;
1899
1900         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1901
1902         high = grant_pending_convert(r, high, &cw);
1903         high = grant_pending_wait(r, high, &cw);
1904
1905         if (high == DLM_LOCK_IV)
1906                 return;
1907
1908         /*
1909          * If there are locks left on the wait/convert queue then send blocking
1910          * ASTs to granted locks based on the largest requested mode (high)
1911          * found above.
1912          */
1913
1914         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1915                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1916                         if (cw && high == DLM_LOCK_PR &&
1917                             lkb->lkb_grmode == DLM_LOCK_PR)
1918                                 queue_bast(r, lkb, DLM_LOCK_CW);
1919                         else
1920                                 queue_bast(r, lkb, high);
1921                         lkb->lkb_highbast = high;
1922                 }
1923         }
1924 }
1925
1926 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1927 {
1928         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1929             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1930                 if (gr->lkb_highbast < DLM_LOCK_EX)
1931                         return 1;
1932                 return 0;
1933         }
1934
1935         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1936                 return 1;
1937         return 0;
1938 }
1939
1940 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1941                             struct dlm_lkb *lkb)
1942 {
1943         struct dlm_lkb *gr;
1944
1945         list_for_each_entry(gr, head, lkb_statequeue) {
1946                 /* skip self when sending basts to convertqueue */
1947                 if (gr == lkb)
1948                         continue;
1949                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1950                         queue_bast(r, gr, lkb->lkb_rqmode);
1951                         gr->lkb_highbast = lkb->lkb_rqmode;
1952                 }
1953         }
1954 }
1955
1956 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1957 {
1958         send_bast_queue(r, &r->res_grantqueue, lkb);
1959 }
1960
1961 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1962 {
1963         send_bast_queue(r, &r->res_grantqueue, lkb);
1964         send_bast_queue(r, &r->res_convertqueue, lkb);
1965 }
1966
1967 /* set_master(r, lkb) -- set the master nodeid of a resource
1968
1969    The purpose of this function is to set the nodeid field in the given
1970    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1971    known, it can just be copied to the lkb and the function will return
1972    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1973    before it can be copied to the lkb.
1974
1975    When the rsb nodeid is being looked up remotely, the initial lkb
1976    causing the lookup is kept on the ls_waiters list waiting for the
1977    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1978    on the rsb's res_lookup list until the master is verified.
1979
1980    Return values:
1981    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1982    1: the rsb master is not available and the lkb has been placed on
1983       a wait queue
1984 */
1985
1986 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1987 {
1988         struct dlm_ls *ls = r->res_ls;
1989         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1990
1991         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1992                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1993                 r->res_first_lkid = lkb->lkb_id;
1994                 lkb->lkb_nodeid = r->res_nodeid;
1995                 return 0;
1996         }
1997
1998         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1999                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2000                 return 1;
2001         }
2002
2003         if (r->res_nodeid == 0) {
2004                 lkb->lkb_nodeid = 0;
2005                 return 0;
2006         }
2007
2008         if (r->res_nodeid > 0) {
2009                 lkb->lkb_nodeid = r->res_nodeid;
2010                 return 0;
2011         }
2012
2013         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2014
2015         dir_nodeid = dlm_dir_nodeid(r);
2016
2017         if (dir_nodeid != our_nodeid) {
2018                 r->res_first_lkid = lkb->lkb_id;
2019                 send_lookup(r, lkb);
2020                 return 1;
2021         }
2022
2023         for (i = 0; i < 2; i++) {
2024                 /* It's possible for dlm_scand to remove an old rsb for
2025                    this same resource from the toss list, us to create
2026                    a new one, look up the master locally, and find it
2027                    already exists just before dlm_scand does the
2028                    dir_remove() on the previous rsb. */
2029
2030                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2031                                        r->res_length, &ret_nodeid);
2032                 if (!error)
2033                         break;
2034                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2035                 schedule();
2036         }
2037         if (error && error != -EEXIST)
2038                 return error;
2039
2040         if (ret_nodeid == our_nodeid) {
2041                 r->res_first_lkid = 0;
2042                 r->res_nodeid = 0;
2043                 lkb->lkb_nodeid = 0;
2044         } else {
2045                 r->res_first_lkid = lkb->lkb_id;
2046                 r->res_nodeid = ret_nodeid;
2047                 lkb->lkb_nodeid = ret_nodeid;
2048         }
2049         return 0;
2050 }
2051
2052 static void process_lookup_list(struct dlm_rsb *r)
2053 {
2054         struct dlm_lkb *lkb, *safe;
2055
2056         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2057                 list_del_init(&lkb->lkb_rsb_lookup);
2058                 _request_lock(r, lkb);
2059                 schedule();
2060         }
2061 }
2062
2063 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2064
2065 static void confirm_master(struct dlm_rsb *r, int error)
2066 {
2067         struct dlm_lkb *lkb;
2068
2069         if (!r->res_first_lkid)
2070                 return;
2071
2072         switch (error) {
2073         case 0:
2074         case -EINPROGRESS:
2075                 r->res_first_lkid = 0;
2076                 process_lookup_list(r);
2077                 break;
2078
2079         case -EAGAIN:
2080         case -EBADR:
2081         case -ENOTBLK:
2082                 /* the remote request failed and won't be retried (it was
2083                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2084                    lkb the first_lkid */
2085
2086                 r->res_first_lkid = 0;
2087
2088                 if (!list_empty(&r->res_lookup)) {
2089                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2090                                          lkb_rsb_lookup);
2091                         list_del_init(&lkb->lkb_rsb_lookup);
2092                         r->res_first_lkid = lkb->lkb_id;
2093                         _request_lock(r, lkb);
2094                 }
2095                 break;
2096
2097         default:
2098                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2099         }
2100 }
2101
2102 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2103                          int namelen, unsigned long timeout_cs,
2104                          void (*ast) (void *astparam),
2105                          void *astparam,
2106                          void (*bast) (void *astparam, int mode),
2107                          struct dlm_args *args)
2108 {
2109         int rv = -EINVAL;
2110
2111         /* check for invalid arg usage */
2112
2113         if (mode < 0 || mode > DLM_LOCK_EX)
2114                 goto out;
2115
2116         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2117                 goto out;
2118
2119         if (flags & DLM_LKF_CANCEL)
2120                 goto out;
2121
2122         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2123                 goto out;
2124
2125         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2126                 goto out;
2127
2128         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2129                 goto out;
2130
2131         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2132                 goto out;
2133
2134         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2135                 goto out;
2136
2137         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2138                 goto out;
2139
2140         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2141                 goto out;
2142
2143         if (!ast || !lksb)
2144                 goto out;
2145
2146         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2147                 goto out;
2148
2149         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2150                 goto out;
2151
2152         /* these args will be copied to the lkb in validate_lock_args,
2153            it cannot be done now because when converting locks, fields in
2154            an active lkb cannot be modified before locking the rsb */
2155
2156         args->flags = flags;
2157         args->astfn = ast;
2158         args->astparam = astparam;
2159         args->bastfn = bast;
2160         args->timeout = timeout_cs;
2161         args->mode = mode;
2162         args->lksb = lksb;
2163         rv = 0;
2164  out:
2165         return rv;
2166 }
2167
2168 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2169 {
2170         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2171                       DLM_LKF_FORCEUNLOCK))
2172                 return -EINVAL;
2173
2174         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2175                 return -EINVAL;
2176
2177         args->flags = flags;
2178         args->astparam = astarg;
2179         return 0;
2180 }
2181
2182 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2183                               struct dlm_args *args)
2184 {
2185         int rv = -EINVAL;
2186
2187         if (args->flags & DLM_LKF_CONVERT) {
2188                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2189                         goto out;
2190
2191                 if (args->flags & DLM_LKF_QUECVT &&
2192                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2193                         goto out;
2194
2195                 rv = -EBUSY;
2196                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2197                         goto out;
2198
2199                 if (lkb->lkb_wait_type)
2200                         goto out;
2201
2202                 if (is_overlap(lkb))
2203                         goto out;
2204         }
2205
2206         lkb->lkb_exflags = args->flags;
2207         lkb->lkb_sbflags = 0;
2208         lkb->lkb_astfn = args->astfn;
2209         lkb->lkb_astparam = args->astparam;
2210         lkb->lkb_bastfn = args->bastfn;
2211         lkb->lkb_rqmode = args->mode;
2212         lkb->lkb_lksb = args->lksb;
2213         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2214         lkb->lkb_ownpid = (int) current->pid;
2215         lkb->lkb_timeout_cs = args->timeout;
2216         rv = 0;
2217  out:
2218         if (rv)
2219                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2220                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2221                           lkb->lkb_status, lkb->lkb_wait_type,
2222                           lkb->lkb_resource->res_name);
2223         return rv;
2224 }
2225
2226 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2227    for success */
2228
2229 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2230    because there may be a lookup in progress and it's valid to do
2231    cancel/unlockf on it */
2232
2233 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2234 {
2235         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2236         int rv = -EINVAL;
2237
2238         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2239                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2240                 dlm_print_lkb(lkb);
2241                 goto out;
2242         }
2243
2244         /* an lkb may still exist even though the lock is EOL'ed due to a
2245            cancel, unlock or failed noqueue request; an app can't use these
2246            locks; return same error as if the lkid had not been found at all */
2247
2248         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2249                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2250                 rv = -ENOENT;
2251                 goto out;
2252         }
2253
2254         /* an lkb may be waiting for an rsb lookup to complete where the
2255            lookup was initiated by another lock */
2256
2257         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2258                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2259                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2260                         list_del_init(&lkb->lkb_rsb_lookup);
2261                         queue_cast(lkb->lkb_resource, lkb,
2262                                    args->flags & DLM_LKF_CANCEL ?
2263                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2264                         unhold_lkb(lkb); /* undoes create_lkb() */
2265                 }
2266                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2267                 rv = -EBUSY;
2268                 goto out;
2269         }
2270
2271         /* cancel not allowed with another cancel/unlock in progress */
2272
2273         if (args->flags & DLM_LKF_CANCEL) {
2274                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2275                         goto out;
2276
2277                 if (is_overlap(lkb))
2278                         goto out;
2279
2280                 /* don't let scand try to do a cancel */
2281                 del_timeout(lkb);
2282
2283                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2284                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2285                         rv = -EBUSY;
2286                         goto out;
2287                 }
2288
2289                 /* there's nothing to cancel */
2290                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2291                     !lkb->lkb_wait_type) {
2292                         rv = -EBUSY;
2293                         goto out;
2294                 }
2295
2296                 switch (lkb->lkb_wait_type) {
2297                 case DLM_MSG_LOOKUP:
2298                 case DLM_MSG_REQUEST:
2299                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2300                         rv = -EBUSY;
2301                         goto out;
2302                 case DLM_MSG_UNLOCK:
2303                 case DLM_MSG_CANCEL:
2304                         goto out;
2305                 }
2306                 /* add_to_waiters() will set OVERLAP_CANCEL */
2307                 goto out_ok;
2308         }
2309
2310         /* do we need to allow a force-unlock if there's a normal unlock
2311            already in progress?  in what conditions could the normal unlock
2312            fail such that we'd want to send a force-unlock to be sure? */
2313
2314         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2315                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2316                         goto out;
2317
2318                 if (is_overlap_unlock(lkb))
2319                         goto out;
2320
2321                 /* don't let scand try to do a cancel */
2322                 del_timeout(lkb);
2323
2324                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2325                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2326                         rv = -EBUSY;
2327                         goto out;
2328                 }
2329
2330                 switch (lkb->lkb_wait_type) {
2331                 case DLM_MSG_LOOKUP:
2332                 case DLM_MSG_REQUEST:
2333                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2334                         rv = -EBUSY;
2335                         goto out;
2336                 case DLM_MSG_UNLOCK:
2337                         goto out;
2338                 }
2339                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2340                 goto out_ok;
2341         }
2342
2343         /* normal unlock not allowed if there's any op in progress */
2344         rv = -EBUSY;
2345         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2346                 goto out;
2347
2348  out_ok:
2349         /* an overlapping op shouldn't blow away exflags from other op */
2350         lkb->lkb_exflags |= args->flags;
2351         lkb->lkb_sbflags = 0;
2352         lkb->lkb_astparam = args->astparam;
2353         rv = 0;
2354  out:
2355         if (rv)
2356                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2357                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2358                           args->flags, lkb->lkb_wait_type,
2359                           lkb->lkb_resource->res_name);
2360         return rv;
2361 }
2362
2363 /*
2364  * Four stage 4 varieties:
2365  * do_request(), do_convert(), do_unlock(), do_cancel()
2366  * These are called on the master node for the given lock and
2367  * from the central locking logic.
2368  */
2369
2370 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2371 {
2372         int error = 0;
2373
2374         if (can_be_granted(r, lkb, 1, NULL)) {
2375                 grant_lock(r, lkb);
2376                 queue_cast(r, lkb, 0);
2377                 goto out;
2378         }
2379
2380         if (can_be_queued(lkb)) {
2381                 error = -EINPROGRESS;
2382                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2383                 add_timeout(lkb);
2384                 goto out;
2385         }
2386
2387         error = -EAGAIN;
2388         queue_cast(r, lkb, -EAGAIN);
2389  out:
2390         return error;
2391 }
2392
2393 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394                                int error)
2395 {
2396         switch (error) {
2397         case -EAGAIN:
2398                 if (force_blocking_asts(lkb))
2399                         send_blocking_asts_all(r, lkb);
2400                 break;
2401         case -EINPROGRESS:
2402                 send_blocking_asts(r, lkb);
2403                 break;
2404         }
2405 }
2406
2407 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2408 {
2409         int error = 0;
2410         int deadlk = 0;
2411
2412         /* changing an existing lock may allow others to be granted */
2413
2414         if (can_be_granted(r, lkb, 1, &deadlk)) {
2415                 grant_lock(r, lkb);
2416                 queue_cast(r, lkb, 0);
2417                 goto out;
2418         }
2419
2420         /* can_be_granted() detected that this lock would block in a conversion
2421            deadlock, so we leave it on the granted queue and return EDEADLK in
2422            the ast for the convert. */
2423
2424         if (deadlk) {
2425                 /* it's left on the granted queue */
2426                 revert_lock(r, lkb);
2427                 queue_cast(r, lkb, -EDEADLK);
2428                 error = -EDEADLK;
2429                 goto out;
2430         }
2431
2432         /* is_demoted() means the can_be_granted() above set the grmode
2433            to NL, and left us on the granted queue.  This auto-demotion
2434            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2435            now grantable.  We have to try to grant other converting locks
2436            before we try again to grant this one. */
2437
2438         if (is_demoted(lkb)) {
2439                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2440                 if (_can_be_granted(r, lkb, 1)) {
2441                         grant_lock(r, lkb);
2442                         queue_cast(r, lkb, 0);
2443                         goto out;
2444                 }
2445                 /* else fall through and move to convert queue */
2446         }
2447
2448         if (can_be_queued(lkb)) {
2449                 error = -EINPROGRESS;
2450                 del_lkb(r, lkb);
2451                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2452                 add_timeout(lkb);
2453                 goto out;
2454         }
2455
2456         error = -EAGAIN;
2457         queue_cast(r, lkb, -EAGAIN);
2458  out:
2459         return error;
2460 }
2461
2462 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2463                                int error)
2464 {
2465         switch (error) {
2466         case 0:
2467                 grant_pending_locks(r);
2468                 /* grant_pending_locks also sends basts */
2469                 break;
2470         case -EAGAIN:
2471                 if (force_blocking_asts(lkb))
2472                         send_blocking_asts_all(r, lkb);
2473                 break;
2474         case -EINPROGRESS:
2475                 send_blocking_asts(r, lkb);
2476                 break;
2477         }
2478 }
2479
2480 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481 {
2482         remove_lock(r, lkb);
2483         queue_cast(r, lkb, -DLM_EUNLOCK);
2484         return -DLM_EUNLOCK;
2485 }
2486
2487 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2488                               int error)
2489 {
2490         grant_pending_locks(r);
2491 }
2492
2493 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2494  
2495 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2496 {
2497         int error;
2498
2499         error = revert_lock(r, lkb);
2500         if (error) {
2501                 queue_cast(r, lkb, -DLM_ECANCEL);
2502                 return -DLM_ECANCEL;
2503         }
2504         return 0;
2505 }
2506
2507 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2508                               int error)
2509 {
2510         if (error)
2511                 grant_pending_locks(r);
2512 }
2513
2514 /*
2515  * Four stage 3 varieties:
2516  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2517  */
2518
2519 /* add a new lkb to a possibly new rsb, called by requesting process */
2520
2521 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522 {
2523         int error;
2524
2525         /* set_master: sets lkb nodeid from r */
2526
2527         error = set_master(r, lkb);
2528         if (error < 0)
2529                 goto out;
2530         if (error) {
2531                 error = 0;
2532                 goto out;
2533         }
2534
2535         if (is_remote(r)) {
2536                 /* receive_request() calls do_request() on remote node */
2537                 error = send_request(r, lkb);
2538         } else {
2539                 error = do_request(r, lkb);
2540                 /* for remote locks the request_reply is sent
2541                    between do_request and do_request_effects */
2542                 do_request_effects(r, lkb, error);
2543         }
2544  out:
2545         return error;
2546 }
2547
2548 /* change some property of an existing lkb, e.g. mode */
2549
2550 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552         int error;
2553
2554         if (is_remote(r)) {
2555                 /* receive_convert() calls do_convert() on remote node */
2556                 error = send_convert(r, lkb);
2557         } else {
2558                 error = do_convert(r, lkb);
2559                 /* for remote locks the convert_reply is sent
2560                    between do_convert and do_convert_effects */
2561                 do_convert_effects(r, lkb, error);
2562         }
2563
2564         return error;
2565 }
2566
2567 /* remove an existing lkb from the granted queue */
2568
2569 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571         int error;
2572
2573         if (is_remote(r)) {
2574                 /* receive_unlock() calls do_unlock() on remote node */
2575                 error = send_unlock(r, lkb);
2576         } else {
2577                 error = do_unlock(r, lkb);
2578                 /* for remote locks the unlock_reply is sent
2579                    between do_unlock and do_unlock_effects */
2580                 do_unlock_effects(r, lkb, error);
2581         }
2582
2583         return error;
2584 }
2585
2586 /* remove an existing lkb from the convert or wait queue */
2587
2588 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589 {
2590         int error;
2591
2592         if (is_remote(r)) {
2593                 /* receive_cancel() calls do_cancel() on remote node */
2594                 error = send_cancel(r, lkb);
2595         } else {
2596                 error = do_cancel(r, lkb);
2597                 /* for remote locks the cancel_reply is sent
2598                    between do_cancel and do_cancel_effects */
2599                 do_cancel_effects(r, lkb, error);
2600         }
2601
2602         return error;
2603 }
2604
2605 /*
2606  * Four stage 2 varieties:
2607  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2608  */
2609
2610 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2611                         int len, struct dlm_args *args)
2612 {
2613         struct dlm_rsb *r;
2614         int error;
2615
2616         error = validate_lock_args(ls, lkb, args);
2617         if (error)
2618                 goto out;
2619
2620         error = find_rsb(ls, name, len, R_CREATE, &r);
2621         if (error)
2622                 goto out;
2623
2624         lock_rsb(r);
2625
2626         attach_lkb(r, lkb);
2627         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2628
2629         error = _request_lock(r, lkb);
2630
2631         unlock_rsb(r);
2632         put_rsb(r);
2633
2634  out:
2635         return error;
2636 }
2637
2638 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2639                         struct dlm_args *args)
2640 {
2641         struct dlm_rsb *r;
2642         int error;
2643
2644         r = lkb->lkb_resource;
2645
2646         hold_rsb(r);
2647         lock_rsb(r);
2648
2649         error = validate_lock_args(ls, lkb, args);
2650         if (error)
2651                 goto out;
2652
2653         error = _convert_lock(r, lkb);
2654  out:
2655         unlock_rsb(r);
2656         put_rsb(r);
2657         return error;
2658 }
2659
2660 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2661                        struct dlm_args *args)
2662 {
2663         struct dlm_rsb *r;
2664         int error;
2665
2666         r = lkb->lkb_resource;
2667
2668         hold_rsb(r);
2669         lock_rsb(r);
2670
2671         error = validate_unlock_args(lkb, args);
2672         if (error)
2673                 goto out;
2674
2675         error = _unlock_lock(r, lkb);
2676  out:
2677         unlock_rsb(r);
2678         put_rsb(r);
2679         return error;
2680 }
2681
2682 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2683                        struct dlm_args *args)
2684 {
2685         struct dlm_rsb *r;
2686         int error;
2687
2688         r = lkb->lkb_resource;
2689
2690         hold_rsb(r);
2691         lock_rsb(r);
2692
2693         error = validate_unlock_args(lkb, args);
2694         if (error)
2695                 goto out;
2696
2697         error = _cancel_lock(r, lkb);
2698  out:
2699         unlock_rsb(r);
2700         put_rsb(r);
2701         return error;
2702 }
2703
2704 /*
2705  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2706  */
2707
2708 int dlm_lock(dlm_lockspace_t *lockspace,
2709              int mode,
2710              struct dlm_lksb *lksb,
2711              uint32_t flags,
2712              void *name,
2713              unsigned int namelen,
2714              uint32_t parent_lkid,
2715              void (*ast) (void *astarg),
2716              void *astarg,
2717              void (*bast) (void *astarg, int mode))
2718 {
2719         struct dlm_ls *ls;
2720         struct dlm_lkb *lkb;
2721         struct dlm_args args;
2722         int error, convert = flags & DLM_LKF_CONVERT;
2723
2724         ls = dlm_find_lockspace_local(lockspace);
2725         if (!ls)
2726                 return -EINVAL;
2727
2728         dlm_lock_recovery(ls);
2729
2730         if (convert)
2731                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2732         else
2733                 error = create_lkb(ls, &lkb);
2734
2735         if (error)
2736                 goto out;
2737
2738         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2739                               astarg, bast, &args);
2740         if (error)
2741                 goto out_put;
2742
2743         if (convert)
2744                 error = convert_lock(ls, lkb, &args);
2745         else
2746                 error = request_lock(ls, lkb, name, namelen, &args);
2747
2748         if (error == -EINPROGRESS)
2749                 error = 0;
2750  out_put:
2751         if (convert || error)
2752                 __put_lkb(ls, lkb);
2753         if (error == -EAGAIN || error == -EDEADLK)
2754                 error = 0;
2755  out:
2756         dlm_unlock_recovery(ls);
2757         dlm_put_lockspace(ls);
2758         return error;
2759 }
2760
2761 int dlm_unlock(dlm_lockspace_t *lockspace,
2762                uint32_t lkid,
2763                uint32_t flags,
2764                struct dlm_lksb *lksb,
2765                void *astarg)
2766 {
2767         struct dlm_ls *ls;
2768         struct dlm_lkb *lkb;
2769         struct dlm_args args;
2770         int error;
2771
2772         ls = dlm_find_lockspace_local(lockspace);
2773         if (!ls)
2774                 return -EINVAL;
2775
2776         dlm_lock_recovery(ls);
2777
2778         error = find_lkb(ls, lkid, &lkb);
2779         if (error)
2780                 goto out;
2781
2782         error = set_unlock_args(flags, astarg, &args);
2783         if (error)
2784                 goto out_put;
2785
2786         if (flags & DLM_LKF_CANCEL)
2787                 error = cancel_lock(ls, lkb, &args);
2788         else
2789                 error = unlock_lock(ls, lkb, &args);
2790
2791         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2792                 error = 0;
2793         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2794                 error = 0;
2795  out_put:
2796         dlm_put_lkb(lkb);
2797  out:
2798         dlm_unlock_recovery(ls);
2799         dlm_put_lockspace(ls);
2800         return error;
2801 }
2802
2803 /*
2804  * send/receive routines for remote operations and replies
2805  *
2806  * send_args
2807  * send_common
2808  * send_request                 receive_request
2809  * send_convert                 receive_convert
2810  * send_unlock                  receive_unlock
2811  * send_cancel                  receive_cancel
2812  * send_grant                   receive_grant
2813  * send_bast                    receive_bast
2814  * send_lookup                  receive_lookup
2815  * send_remove                  receive_remove
2816  *
2817  *                              send_common_reply
2818  * receive_request_reply        send_request_reply
2819  * receive_convert_reply        send_convert_reply
2820  * receive_unlock_reply         send_unlock_reply
2821  * receive_cancel_reply         send_cancel_reply
2822  * receive_lookup_reply         send_lookup_reply
2823  */
2824
2825 static int _create_message(struct dlm_ls *ls, int mb_len,
2826                            int to_nodeid, int mstype,
2827                            struct dlm_message **ms_ret,
2828                            struct dlm_mhandle **mh_ret)
2829 {
2830         struct dlm_message *ms;
2831         struct dlm_mhandle *mh;
2832         char *mb;
2833
2834         /* get_buffer gives us a message handle (mh) that we need to
2835            pass into lowcomms_commit and a message buffer (mb) that we
2836            write our data into */
2837
2838         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2839         if (!mh)
2840                 return -ENOBUFS;
2841
2842         memset(mb, 0, mb_len);
2843
2844         ms = (struct dlm_message *) mb;
2845
2846         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2847         ms->m_header.h_lockspace = ls->ls_global_id;
2848         ms->m_header.h_nodeid = dlm_our_nodeid();
2849         ms->m_header.h_length = mb_len;
2850         ms->m_header.h_cmd = DLM_MSG;
2851
2852         ms->m_type = mstype;
2853
2854         *mh_ret = mh;
2855         *ms_ret = ms;
2856         return 0;
2857 }
2858
2859 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2860                           int to_nodeid, int mstype,
2861                           struct dlm_message **ms_ret,
2862                           struct dlm_mhandle **mh_ret)
2863 {
2864         int mb_len = sizeof(struct dlm_message);
2865
2866         switch (mstype) {
2867         case DLM_MSG_REQUEST:
2868         case DLM_MSG_LOOKUP:
2869         case DLM_MSG_REMOVE:
2870                 mb_len += r->res_length;
2871                 break;
2872         case DLM_MSG_CONVERT:
2873         case DLM_MSG_UNLOCK:
2874         case DLM_MSG_REQUEST_REPLY:
2875         case DLM_MSG_CONVERT_REPLY:
2876         case DLM_MSG_GRANT:
2877                 if (lkb && lkb->lkb_lvbptr)
2878                         mb_len += r->res_ls->ls_lvblen;
2879                 break;
2880         }
2881
2882         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2883                                ms_ret, mh_ret);
2884 }
2885
2886 /* further lowcomms enhancements or alternate implementations may make
2887    the return value from this function useful at some point */
2888
2889 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2890 {
2891         dlm_message_out(ms);
2892         dlm_lowcomms_commit_buffer(mh);
2893         return 0;
2894 }
2895
2896 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2897                       struct dlm_message *ms)
2898 {
2899         ms->m_nodeid   = lkb->lkb_nodeid;
2900         ms->m_pid      = lkb->lkb_ownpid;
2901         ms->m_lkid     = lkb->lkb_id;
2902         ms->m_remid    = lkb->lkb_remid;
2903         ms->m_exflags  = lkb->lkb_exflags;
2904         ms->m_sbflags  = lkb->lkb_sbflags;
2905         ms->m_flags    = lkb->lkb_flags;
2906         ms->m_lvbseq   = lkb->lkb_lvbseq;
2907         ms->m_status   = lkb->lkb_status;
2908         ms->m_grmode   = lkb->lkb_grmode;
2909         ms->m_rqmode   = lkb->lkb_rqmode;
2910         ms->m_hash     = r->res_hash;
2911
2912         /* m_result and m_bastmode are set from function args,
2913            not from lkb fields */
2914
2915         if (lkb->lkb_bastfn)
2916                 ms->m_asts |= DLM_CB_BAST;
2917         if (lkb->lkb_astfn)
2918                 ms->m_asts |= DLM_CB_CAST;
2919
2920         /* compare with switch in create_message; send_remove() doesn't
2921            use send_args() */
2922
2923         switch (ms->m_type) {
2924         case DLM_MSG_REQUEST:
2925         case DLM_MSG_LOOKUP:
2926                 memcpy(ms->m_extra, r->res_name, r->res_length);
2927                 break;
2928         case DLM_MSG_CONVERT:
2929         case DLM_MSG_UNLOCK:
2930         case DLM_MSG_REQUEST_REPLY:
2931         case DLM_MSG_CONVERT_REPLY:
2932         case DLM_MSG_GRANT:
2933                 if (!lkb->lkb_lvbptr)
2934                         break;
2935                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2936                 break;
2937         }
2938 }
2939
2940 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2941 {
2942         struct dlm_message *ms;
2943         struct dlm_mhandle *mh;
2944         int to_nodeid, error;
2945
2946         to_nodeid = r->res_nodeid;
2947
2948         error = add_to_waiters(lkb, mstype, to_nodeid);
2949         if (error)
2950                 return error;
2951
2952         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2953         if (error)
2954                 goto fail;
2955
2956         send_args(r, lkb, ms);
2957
2958         error = send_message(mh, ms);
2959         if (error)
2960                 goto fail;
2961         return 0;
2962
2963  fail:
2964         remove_from_waiters(lkb, msg_reply_type(mstype));
2965         return error;
2966 }
2967
2968 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2969 {
2970         return send_common(r, lkb, DLM_MSG_REQUEST);
2971 }
2972
2973 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2974 {
2975         int error;
2976
2977         error = send_common(r, lkb, DLM_MSG_CONVERT);
2978
2979         /* down conversions go without a reply from the master */
2980         if (!error && down_conversion(lkb)) {
2981                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2982                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2983                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2984                 r->res_ls->ls_stub_ms.m_result = 0;
2985                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2986         }
2987
2988         return error;
2989 }
2990
2991 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2992    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2993    that the master is still correct. */
2994
2995 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 {
2997         return send_common(r, lkb, DLM_MSG_UNLOCK);
2998 }
2999
3000 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3001 {
3002         return send_common(r, lkb, DLM_MSG_CANCEL);
3003 }
3004
3005 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3006 {
3007         struct dlm_message *ms;
3008         struct dlm_mhandle *mh;
3009         int to_nodeid, error;
3010
3011         to_nodeid = lkb->lkb_nodeid;
3012
3013         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3014         if (error)
3015                 goto out;
3016
3017         send_args(r, lkb, ms);
3018
3019         ms->m_result = 0;
3020
3021         error = send_message(mh, ms);
3022  out:
3023         return error;
3024 }
3025
3026 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3027 {
3028         struct dlm_message *ms;
3029         struct dlm_mhandle *mh;
3030         int to_nodeid, error;
3031
3032         to_nodeid = lkb->lkb_nodeid;
3033
3034         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3035         if (error)
3036                 goto out;
3037
3038         send_args(r, lkb, ms);
3039
3040         ms->m_bastmode = mode;
3041
3042         error = send_message(mh, ms);
3043  out:
3044         return error;
3045 }
3046
3047 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 {
3049         struct dlm_message *ms;
3050         struct dlm_mhandle *mh;
3051         int to_nodeid, error;
3052
3053         to_nodeid = dlm_dir_nodeid(r);
3054
3055         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3056         if (error)
3057                 return error;
3058
3059         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3060         if (error)
3061                 goto fail;
3062
3063         send_args(r, lkb, ms);
3064
3065         error = send_message(mh, ms);
3066         if (error)
3067                 goto fail;
3068         return 0;
3069
3070  fail:
3071         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3072         return error;
3073 }
3074
3075 static int send_remove(struct dlm_rsb *r)
3076 {
3077         struct dlm_message *ms;
3078         struct dlm_mhandle *mh;
3079         int to_nodeid, error;
3080
3081         to_nodeid = dlm_dir_nodeid(r);
3082
3083         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3084         if (error)
3085                 goto out;
3086
3087         memcpy(ms->m_extra, r->res_name, r->res_length);
3088         ms->m_hash = r->res_hash;
3089
3090         error = send_message(mh, ms);
3091  out:
3092         return error;
3093 }
3094
3095 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3096                              int mstype, int rv)
3097 {
3098         struct dlm_message *ms;
3099         struct dlm_mhandle *mh;
3100         int to_nodeid, error;
3101
3102         to_nodeid = lkb->lkb_nodeid;
3103
3104         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3105         if (error)
3106                 goto out;
3107
3108         send_args(r, lkb, ms);
3109
3110         ms->m_result = rv;
3111
3112         error = send_message(mh, ms);
3113  out:
3114         return error;
3115 }
3116
3117 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3118 {
3119         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3120 }
3121
3122 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3123 {
3124         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3125 }
3126
3127 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3128 {
3129         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3130 }
3131
3132 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3133 {
3134         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3135 }
3136
3137 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3138                              int ret_nodeid, int rv)
3139 {
3140         struct dlm_rsb *r = &ls->ls_stub_rsb;
3141         struct dlm_message *ms;
3142         struct dlm_mhandle *mh;
3143         int error, nodeid = ms_in->m_header.h_nodeid;
3144
3145         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3146         if (error)
3147                 goto out;
3148
3149         ms->m_lkid = ms_in->m_lkid;
3150         ms->m_result = rv;
3151         ms->m_nodeid = ret_nodeid;
3152
3153         error = send_message(mh, ms);
3154  out:
3155         return error;
3156 }
3157
3158 /* which args we save from a received message depends heavily on the type
3159    of message, unlike the send side where we can safely send everything about
3160    the lkb for any type of message */
3161
3162 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3163 {
3164         lkb->lkb_exflags = ms->m_exflags;
3165         lkb->lkb_sbflags = ms->m_sbflags;
3166         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3167                          (ms->m_flags & 0x0000FFFF);
3168 }
3169
3170 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3171 {
3172         if (ms->m_flags == DLM_IFL_STUB_MS)
3173                 return;
3174
3175         lkb->lkb_sbflags = ms->m_sbflags;
3176         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3177                          (ms->m_flags & 0x0000FFFF);
3178 }
3179
3180 static int receive_extralen(struct dlm_message *ms)
3181 {
3182         return (ms->m_header.h_length - sizeof(struct dlm_message));
3183 }
3184
3185 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3186                        struct dlm_message *ms)
3187 {
3188         int len;
3189
3190         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3191                 if (!lkb->lkb_lvbptr)
3192                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3193                 if (!lkb->lkb_lvbptr)
3194                         return -ENOMEM;
3195                 len = receive_extralen(ms);
3196                 if (len > DLM_RESNAME_MAXLEN)
3197                         len = DLM_RESNAME_MAXLEN;
3198                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3199         }
3200         return 0;
3201 }
3202
3203 static void fake_bastfn(void *astparam, int mode)
3204 {
3205         log_print("fake_bastfn should not be called");
3206 }
3207
3208 static void fake_astfn(void *astparam)
3209 {
3210         log_print("fake_astfn should not be called");
3211 }
3212
3213 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3214                                 struct dlm_message *ms)
3215 {
3216         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3217         lkb->lkb_ownpid = ms->m_pid;
3218         lkb->lkb_remid = ms->m_lkid;
3219         lkb->lkb_grmode = DLM_LOCK_IV;
3220         lkb->lkb_rqmode = ms->m_rqmode;
3221
3222         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3223         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3224
3225         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3226                 /* lkb was just created so there won't be an lvb yet */
3227                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3228                 if (!lkb->lkb_lvbptr)
3229                         return -ENOMEM;
3230         }
3231
3232         return 0;
3233 }
3234
3235 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3236                                 struct dlm_message *ms)
3237 {
3238         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3239                 return -EBUSY;
3240
3241         if (receive_lvb(ls, lkb, ms))
3242                 return -ENOMEM;
3243
3244         lkb->lkb_rqmode = ms->m_rqmode;
3245         lkb->lkb_lvbseq = ms->m_lvbseq;
3246
3247         return 0;
3248 }
3249
3250 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3251                                struct dlm_message *ms)
3252 {
3253         if (receive_lvb(ls, lkb, ms))
3254                 return -ENOMEM;
3255         return 0;
3256 }
3257
3258 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3259    uses to send a reply and that the remote end uses to process the reply. */
3260
3261 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3264         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3265         lkb->lkb_remid = ms->m_lkid;
3266 }
3267
3268 /* This is called after the rsb is locked so that we can safely inspect
3269    fields in the lkb. */
3270
3271 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3272 {
3273         int from = ms->m_header.h_nodeid;
3274         int error = 0;
3275
3276         switch (ms->m_type) {
3277         case DLM_MSG_CONVERT:
3278         case DLM_MSG_UNLOCK:
3279         case DLM_MSG_CANCEL:
3280                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3281                         error = -EINVAL;
3282                 break;
3283
3284         case DLM_MSG_CONVERT_REPLY:
3285         case DLM_MSG_UNLOCK_REPLY:
3286         case DLM_MSG_CANCEL_REPLY:
3287         case DLM_MSG_GRANT:
3288         case DLM_MSG_BAST:
3289                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3290                         error = -EINVAL;
3291                 break;
3292
3293         case DLM_MSG_REQUEST_REPLY:
3294                 if (!is_process_copy(lkb))
3295                         error = -EINVAL;
3296                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3297                         error = -EINVAL;
3298                 break;
3299
3300         default:
3301                 error = -EINVAL;
3302         }
3303
3304         if (error)
3305                 log_error(lkb->lkb_resource->res_ls,
3306                           "ignore invalid message %d from %d %x %x %x %d",
3307                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3308                           lkb->lkb_flags, lkb->lkb_nodeid);
3309         return error;
3310 }
3311
3312 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3313 {
3314         struct dlm_lkb *lkb;
3315         struct dlm_rsb *r;
3316         int error, namelen;
3317
3318         error = create_lkb(ls, &lkb);
3319         if (error)
3320                 goto fail;
3321
3322         receive_flags(lkb, ms);
3323         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3324         error = receive_request_args(ls, lkb, ms);
3325         if (error) {
3326                 __put_lkb(ls, lkb);
3327                 goto fail;
3328         }
3329
3330         namelen = receive_extralen(ms);
3331
3332         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3333         if (error) {
3334                 __put_lkb(ls, lkb);
3335                 goto fail;
3336         }
3337
3338         lock_rsb(r);
3339
3340         attach_lkb(r, lkb);
3341         error = do_request(r, lkb);
3342         send_request_reply(r, lkb, error);
3343         do_request_effects(r, lkb, error);
3344
3345         unlock_rsb(r);
3346         put_rsb(r);
3347
3348         if (error == -EINPROGRESS)
3349                 error = 0;
3350         if (error)
3351                 dlm_put_lkb(lkb);
3352         return;
3353
3354  fail:
3355         setup_stub_lkb(ls, ms);
3356         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3357 }
3358
3359 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3360 {
3361         struct dlm_lkb *lkb;
3362         struct dlm_rsb *r;
3363         int error, reply = 1;
3364
3365         error = find_lkb(ls, ms->m_remid, &lkb);
3366         if (error)
3367                 goto fail;
3368
3369         r = lkb->lkb_resource;
3370
3371         hold_rsb(r);
3372         lock_rsb(r);
3373
3374         error = validate_message(lkb, ms);
3375         if (error)
3376                 goto out;
3377
3378         receive_flags(lkb, ms);
3379
3380         error = receive_convert_args(ls, lkb, ms);
3381         if (error) {
3382                 send_convert_reply(r, lkb, error);
3383                 goto out;
3384         }
3385
3386         reply = !down_conversion(lkb);
3387
3388         error = do_convert(r, lkb);
3389         if (reply)
3390                 send_convert_reply(r, lkb, error);
3391         do_convert_effects(r, lkb, error);
3392  out:
3393         unlock_rsb(r);
3394         put_rsb(r);
3395         dlm_put_lkb(lkb);
3396         return;
3397
3398  fail:
3399         setup_stub_lkb(ls, ms);
3400         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3401 }
3402
3403 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3404 {
3405         struct dlm_lkb *lkb;
3406         struct dlm_rsb *r;
3407         int error;
3408
3409         error = find_lkb(ls, ms->m_remid, &lkb);
3410         if (error)
3411                 goto fail;
3412
3413         r = lkb->lkb_resource;
3414
3415         hold_rsb(r);
3416         lock_rsb(r);
3417
3418         error = validate_message(lkb, ms);
3419         if (error)
3420                 goto out;
3421
3422         receive_flags(lkb, ms);
3423
3424         error = receive_unlock_args(ls, lkb, ms);
3425         if (error) {
3426                 send_unlock_reply(r, lkb, error);
3427                 goto out;
3428         }
3429
3430         error = do_unlock(r, lkb);
3431         send_unlock_reply(r, lkb, error);
3432         do_unlock_effects(r, lkb, error);
3433  out:
3434         unlock_rsb(r);
3435         put_rsb(r);
3436         dlm_put_lkb(lkb);
3437         return;
3438
3439  fail:
3440         setup_stub_lkb(ls, ms);
3441         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3442 }
3443
3444 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3445 {
3446         struct dlm_lkb *lkb;
3447         struct dlm_rsb *r;
3448         int error;
3449
3450         error = find_lkb(ls, ms->m_remid, &lkb);
3451         if (error)
3452                 goto fail;
3453
3454         receive_flags(lkb, ms);
3455
3456         r = lkb->lkb_resource;
3457
3458         hold_rsb(r);
3459         lock_rsb(r);
3460
3461         error = validate_message(lkb, ms);
3462         if (error)
3463                 goto out;
3464
3465         error = do_cancel(r, lkb);
3466         send_cancel_reply(r, lkb, error);
3467         do_cancel_effects(r, lkb, error);
3468  out:
3469         unlock_rsb(r);
3470         put_rsb(r);
3471         dlm_put_lkb(lkb);
3472         return;
3473
3474  fail:
3475         setup_stub_lkb(ls, ms);
3476         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3477 }
3478
3479 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3480 {
3481         struct dlm_lkb *lkb;
3482         struct dlm_rsb *r;
3483         int error;
3484
3485         error = find_lkb(ls, ms->m_remid, &lkb);
3486         if (error) {
3487                 log_debug(ls, "receive_grant from %d no lkb %x",
3488                           ms->m_header.h_nodeid, ms->m_remid);
3489                 return;
3490         }
3491
3492         r = lkb->lkb_resource;
3493
3494         hold_rsb(r);
3495         lock_rsb(r);
3496
3497         error = validate_message(lkb, ms);
3498         if (error)
3499                 goto out;
3500
3501         receive_flags_reply(lkb, ms);
3502         if (is_altmode(lkb))
3503                 munge_altmode(lkb, ms);
3504         grant_lock_pc(r, lkb, ms);
3505         queue_cast(r, lkb, 0);
3506  out:
3507         unlock_rsb(r);
3508         put_rsb(r);
3509         dlm_put_lkb(lkb);
3510 }
3511
3512 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3513 {
3514         struct dlm_lkb *lkb;
3515         struct dlm_rsb *r;
3516         int error;
3517
3518         error = find_lkb(ls, ms->m_remid, &lkb);
3519         if (error) {
3520                 log_debug(ls, "receive_bast from %d no lkb %x",
3521                           ms->m_header.h_nodeid, ms->m_remid);
3522                 return;
3523         }
3524
3525         r = lkb->lkb_resource;
3526
3527         hold_rsb(r);
3528         lock_rsb(r);
3529
3530         error = validate_message(lkb, ms);
3531         if (error)
3532                 goto out;
3533
3534         queue_bast(r, lkb, ms->m_bastmode);
3535  out:
3536         unlock_rsb(r);
3537         put_rsb(r);
3538         dlm_put_lkb(lkb);
3539 }
3540
3541 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3542 {
3543         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3544
3545         from_nodeid = ms->m_header.h_nodeid;
3546         our_nodeid = dlm_our_nodeid();
3547
3548         len = receive_extralen(ms);
3549
3550         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3551         if (dir_nodeid != our_nodeid) {
3552                 log_error(ls, "lookup dir_nodeid %d from %d",
3553                           dir_nodeid, from_nodeid);
3554                 error = -EINVAL;
3555                 ret_nodeid = -1;
3556                 goto out;
3557         }
3558
3559         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3560
3561         /* Optimization: we're master so treat lookup as a request */
3562         if (!error && ret_nodeid == our_nodeid) {
3563                 receive_request(ls, ms);
3564                 return;
3565         }
3566  out:
3567         send_lookup_reply(ls, ms, ret_nodeid, error);
3568 }
3569
3570 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3571 {
3572         int len, dir_nodeid, from_nodeid;
3573
3574         from_nodeid = ms->m_header.h_nodeid;
3575
3576         len = receive_extralen(ms);
3577
3578         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3579         if (dir_nodeid != dlm_our_nodeid()) {
3580                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3581                           dir_nodeid, from_nodeid);
3582                 return;
3583         }
3584
3585         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3586 }
3587
3588 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3589 {
3590         do_purge(ls, ms->m_nodeid, ms->m_pid);
3591 }
3592
3593 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3594 {
3595         struct dlm_lkb *lkb;
3596         struct dlm_rsb *r;
3597         int error, mstype, result;
3598
3599         error = find_lkb(ls, ms->m_remid, &lkb);
3600         if (error) {
3601                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3602                           ms->m_header.h_nodeid, ms->m_remid);
3603                 return;
3604         }
3605
3606         r = lkb->lkb_resource;
3607         hold_rsb(r);
3608         lock_rsb(r);
3609
3610         error = validate_message(lkb, ms);
3611         if (error)
3612                 goto out;
3613
3614         mstype = lkb->lkb_wait_type;
3615         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3616         if (error)
3617                 goto out;
3618
3619         /* Optimization: the dir node was also the master, so it took our
3620            lookup as a request and sent request reply instead of lookup reply */
3621         if (mstype == DLM_MSG_LOOKUP) {
3622                 r->res_nodeid = ms->m_header.h_nodeid;
3623                 lkb->lkb_nodeid = r->res_nodeid;
3624         }
3625
3626         /* this is the value returned from do_request() on the master */
3627         result = ms->m_result;
3628
3629         switch (result) {
3630         case -EAGAIN:
3631                 /* request would block (be queued) on remote master */
3632                 queue_cast(r, lkb, -EAGAIN);
3633                 confirm_master(r, -EAGAIN);
3634                 unhold_lkb(lkb); /* undoes create_lkb() */
3635                 break;
3636
3637         case -EINPROGRESS:
3638         case 0:
3639                 /* request was queued or granted on remote master */
3640                 receive_flags_reply(lkb, ms);
3641                 lkb->lkb_remid = ms->m_lkid;
3642                 if (is_altmode(lkb))
3643                         munge_altmode(lkb, ms);
3644                 if (result) {
3645                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3646                         add_timeout(lkb);
3647                 } else {
3648                         grant_lock_pc(r, lkb, ms);
3649                         queue_cast(r, lkb, 0);
3650                 }
3651                 confirm_master(r, result);
3652                 break;
3653
3654         case -EBADR:
3655         case -ENOTBLK:
3656                 /* find_rsb failed to find rsb or rsb wasn't master */
3657                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3658                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3659                 r->res_nodeid = -1;
3660                 lkb->lkb_nodeid = -1;
3661
3662                 if (is_overlap(lkb)) {
3663                         /* we'll ignore error in cancel/unlock reply */
3664                         queue_cast_overlap(r, lkb);
3665                         confirm_master(r, result);
3666                         unhold_lkb(lkb); /* undoes create_lkb() */
3667                 } else
3668                         _request_lock(r, lkb);
3669                 break;
3670
3671         default:
3672                 log_error(ls, "receive_request_reply %x error %d",
3673                           lkb->lkb_id, result);
3674         }
3675
3676         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3677                 log_debug(ls, "receive_request_reply %x result %d unlock",
3678                           lkb->lkb_id, result);
3679                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3680                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3681                 send_unlock(r, lkb);
3682         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3683                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3684                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3685                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3686                 send_cancel(r, lkb);
3687         } else {
3688                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3689                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3690         }
3691  out:
3692         unlock_rsb(r);
3693         put_rsb(r);
3694         dlm_put_lkb(lkb);
3695 }
3696
3697 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3698                                     struct dlm_message *ms)
3699 {
3700         /* this is the value returned from do_convert() on the master */
3701         switch (ms->m_result) {
3702         case -EAGAIN:
3703                 /* convert would block (be queued) on remote master */
3704                 queue_cast(r, lkb, -EAGAIN);
3705                 break;
3706
3707         case -EDEADLK:
3708                 receive_flags_reply(lkb, ms);
3709                 revert_lock_pc(r, lkb);
3710                 queue_cast(r, lkb, -EDEADLK);
3711                 break;
3712
3713         case -EINPROGRESS:
3714                 /* convert was queued on remote master */
3715                 receive_flags_reply(lkb, ms);
3716                 if (is_demoted(lkb))
3717                         munge_demoted(lkb);
3718                 del_lkb(r, lkb);
3719                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3720                 add_timeout(lkb);
3721                 break;
3722
3723         case 0:
3724                 /* convert was granted on remote master */
3725                 receive_flags_reply(lkb, ms);
3726                 if (is_demoted(lkb))
3727                         munge_demoted(lkb);
3728                 grant_lock_pc(r, lkb, ms);
3729                 queue_cast(r, lkb, 0);
3730                 break;
3731
3732         default:
3733                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3734                           lkb->lkb_id, ms->m_result);
3735         }
3736 }
3737
3738 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3739 {
3740         struct dlm_rsb *r = lkb->lkb_resource;
3741         int error;
3742
3743         hold_rsb(r);
3744         lock_rsb(r);
3745
3746         error = validate_message(lkb, ms);
3747         if (error)
3748                 goto out;
3749
3750         /* stub reply can happen with waiters_mutex held */
3751         error = remove_from_waiters_ms(lkb, ms);
3752         if (error)
3753                 goto out;
3754
3755         __receive_convert_reply(r, lkb, ms);
3756  out:
3757         unlock_rsb(r);
3758         put_rsb(r);
3759 }
3760
3761 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3762 {
3763         struct dlm_lkb *lkb;
3764         int error;
3765
3766         error = find_lkb(ls, ms->m_remid, &lkb);
3767         if (error) {
3768                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3769                           ms->m_header.h_nodeid, ms->m_remid);
3770                 return;
3771         }
3772
3773         _receive_convert_reply(lkb, ms);
3774         dlm_put_lkb(lkb);
3775 }
3776
3777 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3778 {
3779         struct dlm_rsb *r = lkb->lkb_resource;
3780         int error;
3781
3782         hold_rsb(r);
3783         lock_rsb(r);
3784
3785         error = validate_message(lkb, ms);
3786         if (error)
3787                 goto out;
3788
3789         /* stub reply can happen with waiters_mutex held */
3790         error = remove_from_waiters_ms(lkb, ms);
3791         if (error)
3792                 goto out;
3793
3794         /* this is the value returned from do_unlock() on the master */
3795
3796         switch (ms->m_result) {
3797         case -DLM_EUNLOCK:
3798                 receive_flags_reply(lkb, ms);
3799                 remove_lock_pc(r, lkb);
3800                 queue_cast(r, lkb, -DLM_EUNLOCK);
3801                 break;
3802         case -ENOENT:
3803                 break;
3804         default:
3805                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3806                           lkb->lkb_id, ms->m_result);
3807         }
3808  out:
3809         unlock_rsb(r);
3810         put_rsb(r);
3811 }
3812
3813 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3814 {
3815         struct dlm_lkb *lkb;
3816         int error;
3817
3818         error = find_lkb(ls, ms->m_remid, &lkb);
3819         if (error) {
3820                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3821                           ms->m_header.h_nodeid, ms->m_remid);
3822                 return;
3823         }
3824
3825         _receive_unlock_reply(lkb, ms);
3826         dlm_put_lkb(lkb);
3827 }
3828
3829 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3830 {
3831         struct dlm_rsb *r = lkb->lkb_resource;
3832         int error;
3833
3834         hold_rsb(r);
3835         lock_rsb(r);
3836
3837         error = validate_message(lkb, ms);
3838         if (error)
3839                 goto out;
3840
3841         /* stub reply can happen with waiters_mutex held */
3842         error = remove_from_waiters_ms(lkb, ms);
3843         if (error)
3844                 goto out;
3845
3846         /* this is the value returned from do_cancel() on the master */
3847
3848         switch (ms->m_result) {
3849         case -DLM_ECANCEL:
3850                 receive_flags_reply(lkb, ms);
3851                 revert_lock_pc(r, lkb);
3852                 queue_cast(r, lkb, -DLM_ECANCEL);
3853                 break;
3854         case 0:
3855                 break;
3856         default:
3857                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3858                           lkb->lkb_id, ms->m_result);
3859         }
3860  out:
3861         unlock_rsb(r);
3862         put_rsb(r);
3863 }
3864
3865 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3866 {
3867         struct dlm_lkb *lkb;
3868         int error;
3869
3870         error = find_lkb(ls, ms->m_remid, &lkb);
3871         if (error) {
3872                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3873                           ms->m_header.h_nodeid, ms->m_remid);
3874                 return;
3875         }
3876
3877         _receive_cancel_reply(lkb, ms);
3878         dlm_put_lkb(lkb);
3879 }
3880
3881 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3882 {
3883         struct dlm_lkb *lkb;
3884         struct dlm_rsb *r;
3885         int error, ret_nodeid;
3886
3887         error = find_lkb(ls, ms->m_lkid, &lkb);
3888         if (error) {
3889                 log_error(ls, "receive_lookup_reply no lkb");
3890                 return;
3891         }
3892
3893         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3894            FIXME: will a non-zero error ever be returned? */
3895
3896         r = lkb->lkb_resource;
3897         hold_rsb(r);
3898         lock_rsb(r);
3899
3900         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3901         if (error)
3902                 goto out;
3903
3904         ret_nodeid = ms->m_nodeid;
3905         if (ret_nodeid == dlm_our_nodeid()) {
3906                 r->res_nodeid = 0;
3907                 ret_nodeid = 0;
3908                 r->res_first_lkid = 0;
3909         } else {
3910                 /* set_master() will copy res_nodeid to lkb_nodeid */
3911                 r->res_nodeid = ret_nodeid;
3912         }
3913
3914         if (is_overlap(lkb)) {
3915                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3916                           lkb->lkb_id, lkb->lkb_flags);
3917                 queue_cast_overlap(r, lkb);
3918                 unhold_lkb(lkb); /* undoes create_lkb() */
3919                 goto out_list;
3920         }
3921
3922         _request_lock(r, lkb);
3923
3924  out_list:
3925         if (!ret_nodeid)
3926                 process_lookup_list(r);
3927  out:
3928         unlock_rsb(r);
3929         put_rsb(r);
3930         dlm_put_lkb(lkb);
3931 }
3932
3933 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3934 {
3935         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3936                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3937                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3938                           ms->m_remid, ms->m_result);
3939                 return;
3940         }
3941
3942         switch (ms->m_type) {
3943
3944         /* messages sent to a master node */
3945
3946         case DLM_MSG_REQUEST:
3947                 receive_request(ls, ms);
3948                 break;
3949
3950         case DLM_MSG_CONVERT:
3951                 receive_convert(ls, ms);
3952                 break;
3953
3954         case DLM_MSG_UNLOCK:
3955                 receive_unlock(ls, ms);
3956                 break;
3957
3958         case DLM_MSG_CANCEL:
3959                 receive_cancel(ls, ms);
3960                 break;
3961
3962         /* messages sent from a master node (replies to above) */
3963
3964         case DLM_MSG_REQUEST_REPLY:
3965                 receive_request_reply(ls, ms);
3966                 break;
3967
3968         case DLM_MSG_CONVERT_REPLY:
3969                 receive_convert_reply(ls, ms);
3970                 break;
3971
3972         case DLM_MSG_UNLOCK_REPLY:
3973                 receive_unlock_reply(ls, ms);
3974                 break;
3975
3976         case DLM_MSG_CANCEL_REPLY:
3977                 receive_cancel_reply(ls, ms);
3978                 break;
3979
3980         /* messages sent from a master node (only two types of async msg) */
3981
3982         case DLM_MSG_GRANT:
3983                 receive_grant(ls, ms);
3984                 break;
3985
3986         case DLM_MSG_BAST:
3987                 receive_bast(ls, ms);
3988                 break;
3989
3990         /* messages sent to a dir node */
3991
3992         case DLM_MSG_LOOKUP:
3993                 receive_lookup(ls, ms);
3994                 break;
3995
3996         case DLM_MSG_REMOVE:
3997                 receive_remove(ls, ms);
3998                 break;
3999
4000         /* messages sent from a dir node (remove has no reply) */
4001
4002         case DLM_MSG_LOOKUP_REPLY:
4003                 receive_lookup_reply(ls, ms);
4004                 break;
4005
4006         /* other messages */
4007
4008         case DLM_MSG_PURGE:
4009                 receive_purge(ls, ms);
4010                 break;
4011
4012         default:
4013                 log_error(ls, "unknown message type %d", ms->m_type);
4014         }
4015 }
4016
4017 /* If the lockspace is in recovery mode (locking stopped), then normal
4018    messages are saved on the requestqueue for processing after recovery is
4019    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4020    messages off the requestqueue before we process new ones. This occurs right
4021    after recovery completes when we transition from saving all messages on
4022    requestqueue, to processing all the saved messages, to processing new
4023    messages as they arrive. */
4024
4025 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4026                                 int nodeid)
4027 {
4028         if (dlm_locking_stopped(ls)) {
4029                 dlm_add_requestqueue(ls, nodeid, ms);
4030         } else {
4031                 dlm_wait_requestqueue(ls);
4032                 _receive_message(ls, ms);
4033         }
4034 }
4035
4036 /* This is called by dlm_recoverd to process messages that were saved on
4037    the requestqueue. */
4038
4039 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4040 {
4041         _receive_message(ls, ms);
4042 }
4043
4044 /* This is called by the midcomms layer when something is received for
4045    the lockspace.  It could be either a MSG (normal message sent as part of
4046    standard locking activity) or an RCOM (recovery message sent as part of
4047    lockspace recovery). */
4048
4049 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4050 {
4051         struct dlm_header *hd = &p->header;
4052         struct dlm_ls *ls;
4053         int type = 0;
4054
4055         switch (hd->h_cmd) {
4056         case DLM_MSG:
4057                 dlm_message_in(&p->message);
4058                 type = p->message.m_type;
4059                 break;
4060         case DLM_RCOM:
4061                 dlm_rcom_in(&p->rcom);
4062                 type = p->rcom.rc_type;
4063                 break;
4064         default:
4065                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4066                 return;
4067         }
4068
4069         if (hd->h_nodeid != nodeid) {
4070                 log_print("invalid h_nodeid %d from %d lockspace %x",
4071                           hd->h_nodeid, nodeid, hd->h_lockspace);
4072                 return;
4073         }
4074
4075         ls = dlm_find_lockspace_global(hd->h_lockspace);
4076         if (!ls) {
4077                 if (dlm_config.ci_log_debug)
4078                         log_print("invalid lockspace %x from %d cmd %d type %d",
4079                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
4080
4081                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4082                         dlm_send_ls_not_ready(nodeid, &p->rcom);
4083                 return;
4084         }
4085
4086         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4087            be inactive (in this ls) before transitioning to recovery mode */
4088
4089         down_read(&ls->ls_recv_active);
4090         if (hd->h_cmd == DLM_MSG)
4091                 dlm_receive_message(ls, &p->message, nodeid);
4092         else
4093                 dlm_receive_rcom(ls, &p->rcom, nodeid);
4094         up_read(&ls->ls_recv_active);
4095
4096         dlm_put_lockspace(ls);
4097 }
4098
4099 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100                                    struct dlm_message *ms_stub)
4101 {
4102         if (middle_conversion(lkb)) {
4103                 hold_lkb(lkb);
4104                 memset(ms_stub, 0, sizeof(struct dlm_message));
4105                 ms_stub->m_flags = DLM_IFL_STUB_MS;
4106                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4107                 ms_stub->m_result = -EINPROGRESS;
4108                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4109                 _receive_convert_reply(lkb, ms_stub);
4110
4111                 /* Same special case as in receive_rcom_lock_args() */
4112                 lkb->lkb_grmode = DLM_LOCK_IV;
4113                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4114                 unhold_lkb(lkb);
4115
4116         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4117                 lkb->lkb_flags |= DLM_IFL_RESEND;
4118         }
4119
4120         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4121            conversions are async; there's no reply from the remote master */
4122 }
4123
4124 /* A waiting lkb needs recovery if the master node has failed, or
4125    the master node is changing (only when no directory is used) */
4126
4127 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4128 {
4129         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4130                 return 1;
4131
4132         if (!dlm_no_directory(ls))
4133                 return 0;
4134
4135         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4136                 return 1;
4137
4138         return 0;
4139 }
4140
4141 /* Recovery for locks that are waiting for replies from nodes that are now
4142    gone.  We can just complete unlocks and cancels by faking a reply from the
4143    dead node.  Requests and up-conversions we flag to be resent after
4144    recovery.  Down-conversions can just be completed with a fake reply like
4145    unlocks.  Conversions between PR and CW need special attention. */
4146
4147 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4148 {
4149         struct dlm_lkb *lkb, *safe;
4150         struct dlm_message *ms_stub;
4151         int wait_type, stub_unlock_result, stub_cancel_result;
4152
4153         ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4154         if (!ms_stub) {
4155                 log_error(ls, "dlm_recover_waiters_pre no mem");
4156                 return;
4157         }
4158
4159         mutex_lock(&ls->ls_waiters_mutex);
4160
4161         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4162
4163                 /* exclude debug messages about unlocks because there can be so
4164                    many and they aren't very interesting */
4165
4166                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4167                         log_debug(ls, "recover_waiter %x nodeid %d "
4168                                   "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4169                                   lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4170                 }
4171
4172                 /* all outstanding lookups, regardless of destination  will be
4173                    resent after recovery is done */
4174
4175                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4176                         lkb->lkb_flags |= DLM_IFL_RESEND;
4177                         continue;
4178                 }
4179
4180                 if (!waiter_needs_recovery(ls, lkb))
4181                         continue;
4182
4183                 wait_type = lkb->lkb_wait_type;
4184                 stub_unlock_result = -DLM_EUNLOCK;
4185                 stub_cancel_result = -DLM_ECANCEL;
4186
4187                 /* Main reply may have been received leaving a zero wait_type,
4188                    but a reply for the overlapping op may not have been
4189                    received.  In that case we need to fake the appropriate
4190                    reply for the overlap op. */
4191
4192                 if (!wait_type) {
4193                         if (is_overlap_cancel(lkb)) {
4194                                 wait_type = DLM_MSG_CANCEL;
4195                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4196                                         stub_cancel_result = 0;
4197                         }
4198                         if (is_overlap_unlock(lkb)) {
4199                                 wait_type = DLM_MSG_UNLOCK;
4200                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4201                                         stub_unlock_result = -ENOENT;
4202                         }
4203
4204                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4205                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4206                                   stub_cancel_result, stub_unlock_result);
4207                 }
4208
4209                 switch (wait_type) {
4210
4211                 case DLM_MSG_REQUEST:
4212                         lkb->lkb_flags |= DLM_IFL_RESEND;
4213                         break;
4214
4215                 case DLM_MSG_CONVERT:
4216                         recover_convert_waiter(ls, lkb, ms_stub);
4217                         break;
4218
4219                 case DLM_MSG_UNLOCK:
4220                         hold_lkb(lkb);
4221                         memset(ms_stub, 0, sizeof(struct dlm_message));
4222                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4223                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4224                         ms_stub->m_result = stub_unlock_result;
4225                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4226                         _receive_unlock_reply(lkb, ms_stub);
4227                         dlm_put_lkb(lkb);
4228                         break;
4229
4230                 case DLM_MSG_CANCEL:
4231                         hold_lkb(lkb);
4232                         memset(ms_stub, 0, sizeof(struct dlm_message));
4233                         ms_stub->m_flags = DLM_IFL_STUB_MS;
4234                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4235                         ms_stub->m_result = stub_cancel_result;
4236                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4237                         _receive_cancel_reply(lkb, ms_stub);
4238                         dlm_put_lkb(lkb);
4239                         break;
4240
4241                 default:
4242                         log_error(ls, "invalid lkb wait_type %d %d",
4243                                   lkb->lkb_wait_type, wait_type);
4244                 }
4245                 schedule();
4246         }
4247         mutex_unlock(&ls->ls_waiters_mutex);
4248         kfree(ms_stub);
4249 }
4250
4251 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4252 {
4253         struct dlm_lkb *lkb;
4254         int found = 0;
4255
4256         mutex_lock(&ls->ls_waiters_mutex);
4257         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4258                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4259                         hold_lkb(lkb);
4260                         found = 1;
4261                         break;
4262                 }
4263         }
4264         mutex_unlock(&ls->ls_waiters_mutex);
4265
4266         if (!found)
4267                 lkb = NULL;
4268         return lkb;
4269 }
4270
4271 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4272    master or dir-node for r.  Processing the lkb may result in it being placed
4273    back on waiters. */
4274
4275 /* We do this after normal locking has been enabled and any saved messages
4276    (in requestqueue) have been processed.  We should be confident that at
4277    this point we won't get or process a reply to any of these waiting
4278    operations.  But, new ops may be coming in on the rsbs/locks here from
4279    userspace or remotely. */
4280
4281 /* there may have been an overlap unlock/cancel prior to recovery or after
4282    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4283    overlap flag would just have been set and nothing new sent.  we can be
4284    confident here than any replies to either the initial op or overlap ops
4285    prior to recovery have been received. */
4286
4287 int dlm_recover_waiters_post(struct dlm_ls *ls)
4288 {
4289         struct dlm_lkb *lkb;
4290         struct dlm_rsb *r;
4291         int error = 0, mstype, err, oc, ou;
4292
4293         while (1) {
4294                 if (dlm_locking_stopped(ls)) {
4295                         log_debug(ls, "recover_waiters_post aborted");
4296                         error = -EINTR;
4297                         break;
4298                 }
4299
4300                 lkb = find_resend_waiter(ls);
4301                 if (!lkb)
4302                         break;
4303
4304                 r = lkb->lkb_resource;
4305                 hold_rsb(r);
4306                 lock_rsb(r);
4307
4308                 mstype = lkb->lkb_wait_type;
4309                 oc = is_overlap_cancel(lkb);
4310                 ou = is_overlap_unlock(lkb);
4311                 err = 0;
4312
4313                 log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4314                           lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4315
4316                 /* At this point we assume that we won't get a reply to any
4317                    previous op or overlap op on this lock.  First, do a big
4318                    remove_from_waiters() for all previous ops. */
4319
4320                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4321                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4322                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4323                 lkb->lkb_wait_type = 0;
4324                 lkb->lkb_wait_count = 0;
4325                 mutex_lock(&ls->ls_waiters_mutex);
4326                 list_del_init(&lkb->lkb_wait_reply);
4327                 mutex_unlock(&ls->ls_waiters_mutex);
4328                 unhold_lkb(lkb); /* for waiters list */
4329
4330                 if (oc || ou) {
4331                         /* do an unlock or cancel instead of resending */
4332                         switch (mstype) {
4333                         case DLM_MSG_LOOKUP:
4334                         case DLM_MSG_REQUEST:
4335                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4336                                                         -DLM_ECANCEL);
4337                                 unhold_lkb(lkb); /* undoes create_lkb() */
4338                                 break;
4339                         case DLM_MSG_CONVERT:
4340                                 if (oc) {
4341                                         queue_cast(r, lkb, -DLM_ECANCEL);
4342                                 } else {
4343                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4344                                         _unlock_lock(r, lkb);
4345                                 }
4346                                 break;
4347                         default:
4348                                 err = 1;
4349                         }
4350                 } else {
4351                         switch (mstype) {
4352                         case DLM_MSG_LOOKUP:
4353                         case DLM_MSG_REQUEST:
4354                                 _request_lock(r, lkb);
4355                                 if (is_master(r))
4356                                         confirm_master(r, 0);
4357                                 break;
4358                         case DLM_MSG_CONVERT:
4359                                 _convert_lock(r, lkb);
4360                                 break;
4361                         default:
4362                                 err = 1;
4363                         }
4364                 }
4365
4366                 if (err)
4367                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4368                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4369                 unlock_rsb(r);
4370                 put_rsb(r);
4371                 dlm_put_lkb(lkb);
4372         }
4373
4374         return error;
4375 }
4376
4377 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4378                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4379 {
4380         struct dlm_ls *ls = r->res_ls;
4381         struct dlm_lkb *lkb, *safe;
4382
4383         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4384                 if (test(ls, lkb)) {
4385                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4386                         del_lkb(r, lkb);
4387                         /* this put should free the lkb */
4388                         if (!dlm_put_lkb(lkb))
4389                                 log_error(ls, "purged lkb not released");
4390                 }
4391         }
4392 }
4393
4394 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4395 {
4396         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4397 }
4398
4399 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4400 {
4401         return is_master_copy(lkb);
4402 }
4403
4404 static void purge_dead_locks(struct dlm_rsb *r)
4405 {
4406         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4407         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4408         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4409 }
4410
4411 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4412 {
4413         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4414         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4415         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4416 }
4417
4418 /* Get rid of locks held by nodes that are gone. */
4419
4420 int dlm_purge_locks(struct dlm_ls *ls)
4421 {
4422         struct dlm_rsb *r;
4423
4424         log_debug(ls, "dlm_purge_locks");
4425
4426         down_write(&ls->ls_root_sem);
4427         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4428                 hold_rsb(r);
4429                 lock_rsb(r);
4430                 if (is_master(r))
4431                         purge_dead_locks(r);
4432                 unlock_rsb(r);
4433                 unhold_rsb(r);
4434
4435                 schedule();
4436         }
4437         up_write(&ls->ls_root_sem);
4438
4439         return 0;
4440 }
4441
4442 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4443 {
4444         struct dlm_rsb *r, *r_ret = NULL;
4445
4446         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4447         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4448                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4449                         continue;
4450                 hold_rsb(r);
4451                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4452                 r_ret = r;
4453                 break;
4454         }
4455         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4456         return r_ret;
4457 }
4458
4459 void dlm_grant_after_purge(struct dlm_ls *ls)
4460 {
4461         struct dlm_rsb *r;
4462         int bucket = 0;
4463
4464         while (1) {
4465                 r = find_purged_rsb(ls, bucket);
4466                 if (!r) {
4467                         if (bucket == ls->ls_rsbtbl_size - 1)
4468                                 break;
4469                         bucket++;
4470                         continue;
4471                 }
4472                 lock_rsb(r);
4473                 if (is_master(r)) {
4474                         grant_pending_locks(r);
4475                         confirm_master(r, 0);
4476                 }
4477                 unlock_rsb(r);
4478                 put_rsb(r);
4479                 schedule();
4480         }
4481 }
4482
4483 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4484                                          uint32_t remid)
4485 {
4486         struct dlm_lkb *lkb;
4487
4488         list_for_each_entry(lkb, head, lkb_statequeue) {
4489                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4490                         return lkb;
4491         }
4492         return NULL;
4493 }
4494
4495 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4496                                     uint32_t remid)
4497 {
4498         struct dlm_lkb *lkb;
4499
4500         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4501         if (lkb)
4502                 return lkb;
4503         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4504         if (lkb)
4505                 return lkb;
4506         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4507         if (lkb)
4508                 return lkb;
4509         return NULL;
4510 }
4511
4512 /* needs at least dlm_rcom + rcom_lock */
4513 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4514                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4515 {
4516         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4517
4518         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4519         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4520         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4521         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4522         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4523         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4524         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4525         lkb->lkb_rqmode = rl->rl_rqmode;
4526         lkb->lkb_grmode = rl->rl_grmode;
4527         /* don't set lkb_status because add_lkb wants to itself */
4528
4529         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4530         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4531
4532         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4533                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4534                          sizeof(struct rcom_lock);
4535                 if (lvblen > ls->ls_lvblen)
4536                         return -EINVAL;
4537                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4538                 if (!lkb->lkb_lvbptr)
4539                         return -ENOMEM;
4540                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4541         }
4542
4543         /* Conversions between PR and CW (middle modes) need special handling.
4544            The real granted mode of these converting locks cannot be determined
4545            until all locks have been rebuilt on the rsb (recover_conversion) */
4546
4547         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4548             middle_conversion(lkb)) {
4549                 rl->rl_status = DLM_LKSTS_CONVERT;
4550                 lkb->lkb_grmode = DLM_LOCK_IV;
4551                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4552         }
4553
4554         return 0;
4555 }
4556
4557 /* This lkb may have been recovered in a previous aborted recovery so we need
4558    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4559    If so we just send back a standard reply.  If not, we create a new lkb with
4560    the given values and send back our lkid.  We send back our lkid by sending
4561    back the rcom_lock struct we got but with the remid field filled in. */
4562
4563 /* needs at least dlm_rcom + rcom_lock */
4564 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4565 {
4566         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4567         struct dlm_rsb *r;
4568         struct dlm_lkb *lkb;
4569         int error;
4570
4571         if (rl->rl_parent_lkid) {
4572                 error = -EOPNOTSUPP;
4573                 goto out;
4574         }
4575
4576         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4577                          R_MASTER, &r);
4578         if (error)
4579                 goto out;
4580
4581         lock_rsb(r);
4582
4583         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4584         if (lkb) {
4585                 error = -EEXIST;
4586                 goto out_remid;
4587         }
4588
4589         error = create_lkb(ls, &lkb);
4590         if (error)
4591                 goto out_unlock;
4592
4593         error = receive_rcom_lock_args(ls, lkb, r, rc);
4594         if (error) {
4595                 __put_lkb(ls, lkb);
4596                 goto out_unlock;
4597         }
4598
4599         attach_lkb(r, lkb);
4600         add_lkb(r, lkb, rl->rl_status);
4601         error = 0;
4602
4603  out_remid:
4604         /* this is the new value returned to the lock holder for
4605            saving in its process-copy lkb */
4606         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4607
4608  out_unlock:
4609         unlock_rsb(r);
4610         put_rsb(r);
4611  out:
4612         if (error)
4613                 log_debug(ls, "recover_master_copy %d %x", error,
4614                           le32_to_cpu(rl->rl_lkid));
4615         rl->rl_result = cpu_to_le32(error);
4616         return error;
4617 }
4618
4619 /* needs at least dlm_rcom + rcom_lock */
4620 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4621 {
4622         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4623         struct dlm_rsb *r;
4624         struct dlm_lkb *lkb;
4625         int error;
4626
4627         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4628         if (error) {
4629                 log_error(ls, "recover_process_copy no lkid %x",
4630                                 le32_to_cpu(rl->rl_lkid));
4631                 return error;
4632         }
4633
4634         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4635
4636         error = le32_to_cpu(rl->rl_result);
4637
4638         r = lkb->lkb_resource;
4639         hold_rsb(r);
4640         lock_rsb(r);
4641
4642         switch (error) {
4643         case -EBADR:
4644                 /* There's a chance the new master received our lock before
4645                    dlm_recover_master_reply(), this wouldn't happen if we did
4646                    a barrier between recover_masters and recover_locks. */
4647                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4648                           (unsigned long)r, r->res_name);
4649                 dlm_send_rcom_lock(r, lkb);
4650                 goto out;
4651         case -EEXIST:
4652                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4653                 /* fall through */
4654         case 0:
4655                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4656                 break;
4657         default:
4658                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4659                           error, lkb->lkb_id);
4660         }
4661
4662         /* an ack for dlm_recover_locks() which waits for replies from
4663            all the locks it sends to new masters */
4664         dlm_recovered_lock(r);
4665  out:
4666         unlock_rsb(r);
4667         put_rsb(r);
4668         dlm_put_lkb(lkb);
4669
4670         return 0;
4671 }
4672
4673 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4674                      int mode, uint32_t flags, void *name, unsigned int namelen,
4675                      unsigned long timeout_cs)
4676 {
4677         struct dlm_lkb *lkb;
4678         struct dlm_args args;
4679         int error;
4680
4681         dlm_lock_recovery(ls);
4682
4683         error = create_lkb(ls, &lkb);
4684         if (error) {
4685                 kfree(ua);
4686                 goto out;
4687         }
4688
4689         if (flags & DLM_LKF_VALBLK) {
4690                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4691                 if (!ua->lksb.sb_lvbptr) {
4692                         kfree(ua);
4693                         __put_lkb(ls, lkb);
4694                         error = -ENOMEM;
4695                         goto out;
4696                 }
4697         }
4698
4699         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4700            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4701            lock and that lkb_astparam is the dlm_user_args structure. */
4702
4703         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4704                               fake_astfn, ua, fake_bastfn, &args);
4705         lkb->lkb_flags |= DLM_IFL_USER;
4706
4707         if (error) {
4708                 __put_lkb(ls, lkb);
4709                 goto out;
4710         }
4711
4712         error = request_lock(ls, lkb, name, namelen, &args);
4713
4714         switch (error) {
4715         case 0:
4716                 break;
4717         case -EINPROGRESS:
4718                 error = 0;
4719                 break;
4720         case -EAGAIN:
4721                 error = 0;
4722                 /* fall through */
4723         default:
4724                 __put_lkb(ls, lkb);
4725                 goto out;
4726         }
4727
4728         /* add this new lkb to the per-process list of locks */
4729         spin_lock(&ua->proc->locks_spin);
4730         hold_lkb(lkb);
4731         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4732         spin_unlock(&ua->proc->locks_spin);
4733  out:
4734         dlm_unlock_recovery(ls);
4735         return error;
4736 }
4737
4738 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4739                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4740                      unsigned long timeout_cs)
4741 {
4742         struct dlm_lkb *lkb;
4743         struct dlm_args args;
4744         struct dlm_user_args *ua;
4745         int error;
4746
4747         dlm_lock_recovery(ls);
4748
4749         error = find_lkb(ls, lkid, &lkb);
4750         if (error)
4751                 goto out;
4752
4753         /* user can change the params on its lock when it converts it, or
4754            add an lvb that didn't exist before */
4755
4756         ua = lkb->lkb_ua;
4757
4758         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4759                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4760                 if (!ua->lksb.sb_lvbptr) {
4761                         error = -ENOMEM;
4762                         goto out_put;
4763                 }
4764         }
4765         if (lvb_in && ua->lksb.sb_lvbptr)
4766                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4767
4768         ua->xid = ua_tmp->xid;
4769         ua->castparam = ua_tmp->castparam;
4770         ua->castaddr = ua_tmp->castaddr;
4771         ua->bastparam = ua_tmp->bastparam;
4772         ua->bastaddr = ua_tmp->bastaddr;
4773         ua->user_lksb = ua_tmp->user_lksb;
4774
4775         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4776                               fake_astfn, ua, fake_bastfn, &args);
4777         if (error)
4778                 goto out_put;
4779
4780         error = convert_lock(ls, lkb, &args);
4781
4782         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4783                 error = 0;
4784  out_put:
4785         dlm_put_lkb(lkb);
4786  out:
4787         dlm_unlock_recovery(ls);
4788         kfree(ua_tmp);
4789         return error;
4790 }
4791
4792 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4793                     uint32_t flags, uint32_t lkid, char *lvb_in)
4794 {
4795         struct dlm_lkb *lkb;
4796         struct dlm_args args;
4797         struct dlm_user_args *ua;
4798         int error;
4799
4800         dlm_lock_recovery(ls);
4801
4802         error = find_lkb(ls, lkid, &lkb);
4803         if (error)
4804                 goto out;
4805
4806         ua = lkb->lkb_ua;
4807
4808         if (lvb_in && ua->lksb.sb_lvbptr)
4809                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4810         if (ua_tmp->castparam)
4811                 ua->castparam = ua_tmp->castparam;
4812         ua->user_lksb = ua_tmp->user_lksb;
4813
4814         error = set_unlock_args(flags, ua, &args);
4815         if (error)
4816                 goto out_put;
4817
4818         error = unlock_lock(ls, lkb, &args);
4819
4820         if (error == -DLM_EUNLOCK)
4821                 error = 0;
4822         /* from validate_unlock_args() */
4823         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4824                 error = 0;
4825         if (error)
4826                 goto out_put;
4827
4828         spin_lock(&ua->proc->locks_spin);
4829         /* dlm_user_add_cb() may have already taken lkb off the proc list */
4830         if (!list_empty(&lkb->lkb_ownqueue))
4831                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4832         spin_unlock(&ua->proc->locks_spin);
4833  out_put:
4834         dlm_put_lkb(lkb);
4835  out:
4836         dlm_unlock_recovery(ls);
4837         kfree(ua_tmp);
4838         return error;
4839 }
4840
4841 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4842                     uint32_t flags, uint32_t lkid)
4843 {
4844         struct dlm_lkb *lkb;
4845         struct dlm_args args;
4846         struct dlm_user_args *ua;
4847         int error;
4848
4849         dlm_lock_recovery(ls);
4850
4851         error = find_lkb(ls, lkid, &lkb);
4852         if (error)
4853                 goto out;
4854
4855         ua = lkb->lkb_ua;
4856         if (ua_tmp->castparam)
4857                 ua->castparam = ua_tmp->castparam;
4858         ua->user_lksb = ua_tmp->user_lksb;
4859
4860         error = set_unlock_args(flags, ua, &args);
4861         if (error)
4862                 goto out_put;
4863
4864         error = cancel_lock(ls, lkb, &args);
4865
4866         if (error == -DLM_ECANCEL)
4867                 error = 0;
4868         /* from validate_unlock_args() */
4869         if (error == -EBUSY)
4870                 error = 0;
4871  out_put:
4872         dlm_put_lkb(lkb);
4873  out:
4874         dlm_unlock_recovery(ls);
4875         kfree(ua_tmp);
4876         return error;
4877 }
4878
4879 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4880 {
4881         struct dlm_lkb *lkb;
4882         struct dlm_args args;
4883         struct dlm_user_args *ua;
4884         struct dlm_rsb *r;
4885         int error;
4886
4887         dlm_lock_recovery(ls);
4888
4889         error = find_lkb(ls, lkid, &lkb);
4890         if (error)
4891                 goto out;
4892
4893         ua = lkb->lkb_ua;
4894
4895         error = set_unlock_args(flags, ua, &args);
4896         if (error)
4897                 goto out_put;
4898
4899         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4900
4901         r = lkb->lkb_resource;
4902         hold_rsb(r);
4903         lock_rsb(r);
4904
4905         error = validate_unlock_args(lkb, &args);
4906         if (error)
4907                 goto out_r;
4908         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4909
4910         error = _cancel_lock(r, lkb);
4911  out_r:
4912         unlock_rsb(r);
4913         put_rsb(r);
4914
4915         if (error == -DLM_ECANCEL)
4916                 error = 0;
4917         /* from validate_unlock_args() */
4918         if (error == -EBUSY)
4919                 error = 0;
4920  out_put:
4921         dlm_put_lkb(lkb);
4922  out:
4923         dlm_unlock_recovery(ls);
4924         return error;
4925 }
4926
4927 /* lkb's that are removed from the waiters list by revert are just left on the
4928    orphans list with the granted orphan locks, to be freed by purge */
4929
4930 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4931 {
4932         struct dlm_args args;
4933         int error;
4934
4935         hold_lkb(lkb);
4936         mutex_lock(&ls->ls_orphans_mutex);
4937         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4938         mutex_unlock(&ls->ls_orphans_mutex);
4939
4940         set_unlock_args(0, lkb->lkb_ua, &args);
4941
4942         error = cancel_lock(ls, lkb, &args);
4943         if (error == -DLM_ECANCEL)
4944                 error = 0;
4945         return error;
4946 }
4947
4948 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4949    Regardless of what rsb queue the lock is on, it's removed and freed. */
4950
4951 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4952 {
4953         struct dlm_args args;
4954         int error;
4955
4956         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4957
4958         error = unlock_lock(ls, lkb, &args);
4959         if (error == -DLM_EUNLOCK)
4960                 error = 0;
4961         return error;
4962 }
4963
4964 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4965    (which does lock_rsb) due to deadlock with receiving a message that does
4966    lock_rsb followed by dlm_user_add_cb() */
4967
4968 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969                                      struct dlm_user_proc *proc)
4970 {
4971         struct dlm_lkb *lkb = NULL;
4972
4973         mutex_lock(&ls->ls_clear_proc_locks);
4974         if (list_empty(&proc->locks))
4975                 goto out;
4976
4977         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4978         list_del_init(&lkb->lkb_ownqueue);
4979
4980         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4981                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4982         else
4983                 lkb->lkb_flags |= DLM_IFL_DEAD;
4984  out:
4985         mutex_unlock(&ls->ls_clear_proc_locks);
4986         return lkb;
4987 }
4988
4989 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
4990    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4991    which we clear here. */
4992
4993 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4994    list, and no more device_writes should add lkb's to proc->locks list; so we
4995    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4996    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4997    them ourself. */
4998
4999 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5000 {
5001         struct dlm_lkb *lkb, *safe;
5002
5003         dlm_lock_recovery(ls);
5004
5005         while (1) {
5006                 lkb = del_proc_lock(ls, proc);
5007                 if (!lkb)
5008                         break;
5009                 del_timeout(lkb);
5010                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5011                         orphan_proc_lock(ls, lkb);
5012                 else
5013                         unlock_proc_lock(ls, lkb);
5014
5015                 /* this removes the reference for the proc->locks list
5016                    added by dlm_user_request, it may result in the lkb
5017                    being freed */
5018
5019                 dlm_put_lkb(lkb);
5020         }
5021
5022         mutex_lock(&ls->ls_clear_proc_locks);
5023
5024         /* in-progress unlocks */
5025         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5026                 list_del_init(&lkb->lkb_ownqueue);
5027                 lkb->lkb_flags |= DLM_IFL_DEAD;
5028                 dlm_put_lkb(lkb);
5029         }
5030
5031         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5032                 memset(&lkb->lkb_callbacks, 0,
5033                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034                 list_del_init(&lkb->lkb_cb_list);
5035                 dlm_put_lkb(lkb);
5036         }
5037
5038         mutex_unlock(&ls->ls_clear_proc_locks);
5039         dlm_unlock_recovery(ls);
5040 }
5041
5042 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5043 {
5044         struct dlm_lkb *lkb, *safe;
5045
5046         while (1) {
5047                 lkb = NULL;
5048                 spin_lock(&proc->locks_spin);
5049                 if (!list_empty(&proc->locks)) {
5050                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
5051                                          lkb_ownqueue);
5052                         list_del_init(&lkb->lkb_ownqueue);
5053                 }
5054                 spin_unlock(&proc->locks_spin);
5055
5056                 if (!lkb)
5057                         break;
5058
5059                 lkb->lkb_flags |= DLM_IFL_DEAD;
5060                 unlock_proc_lock(ls, lkb);
5061                 dlm_put_lkb(lkb); /* ref from proc->locks list */
5062         }
5063
5064         spin_lock(&proc->locks_spin);
5065         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5066                 list_del_init(&lkb->lkb_ownqueue);
5067                 lkb->lkb_flags |= DLM_IFL_DEAD;
5068                 dlm_put_lkb(lkb);
5069         }
5070         spin_unlock(&proc->locks_spin);
5071
5072         spin_lock(&proc->asts_spin);
5073         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5074                 memset(&lkb->lkb_callbacks, 0,
5075                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5076                 list_del_init(&lkb->lkb_cb_list);
5077                 dlm_put_lkb(lkb);
5078         }
5079         spin_unlock(&proc->asts_spin);
5080 }
5081
5082 /* pid of 0 means purge all orphans */
5083
5084 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5085 {
5086         struct dlm_lkb *lkb, *safe;
5087
5088         mutex_lock(&ls->ls_orphans_mutex);
5089         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5090                 if (pid && lkb->lkb_ownpid != pid)
5091                         continue;
5092                 unlock_proc_lock(ls, lkb);
5093                 list_del_init(&lkb->lkb_ownqueue);
5094                 dlm_put_lkb(lkb);
5095         }
5096         mutex_unlock(&ls->ls_orphans_mutex);
5097 }
5098
5099 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5100 {
5101         struct dlm_message *ms;
5102         struct dlm_mhandle *mh;
5103         int error;
5104
5105         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5106                                 DLM_MSG_PURGE, &ms, &mh);
5107         if (error)
5108                 return error;
5109         ms->m_nodeid = nodeid;
5110         ms->m_pid = pid;
5111
5112         return send_message(mh, ms);
5113 }
5114
5115 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5116                    int nodeid, int pid)
5117 {
5118         int error = 0;
5119
5120         if (nodeid != dlm_our_nodeid()) {
5121                 error = send_purge(ls, nodeid, pid);
5122         } else {
5123                 dlm_lock_recovery(ls);
5124                 if (pid == current->pid)
5125                         purge_proc_locks(ls, proc);
5126                 else
5127                         do_purge(ls, nodeid, pid);
5128                 dlm_unlock_recovery(ls);
5129         }
5130         return error;
5131 }
5132