56d6bfcc1e48895f2d9523d70376779349b44cfa
[pandora-kernel.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88                                     struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133
134 #define modes_compat(gr, rq) \
135         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163                "     status %d rqmode %d grmode %d wait_type %d\n",
164                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166                lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171         printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172                r->res_nodeid, r->res_flags, r->res_first_lkid,
173                r->res_recover_locks_count, r->res_name);
174 }
175
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178         struct dlm_lkb *lkb;
179
180         dlm_print_rsb(r);
181
182         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184         printk(KERN_ERR "rsb lookup list\n");
185         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186                 dlm_print_lkb(lkb);
187         printk(KERN_ERR "rsb grant queue:\n");
188         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189                 dlm_print_lkb(lkb);
190         printk(KERN_ERR "rsb convert queue:\n");
191         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192                 dlm_print_lkb(lkb);
193         printk(KERN_ERR "rsb wait queue:\n");
194         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195                 dlm_print_lkb(lkb);
196 }
197
198 /* Threads cannot use the lockspace while it's being recovered */
199
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202         down_read(&ls->ls_in_recovery);
203 }
204
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207         up_read(&ls->ls_in_recovery);
208 }
209
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212         return down_read_trylock(&ls->ls_in_recovery);
213 }
214
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243         return !!r->res_nodeid;
244 }
245
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253         if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254                 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262                 return 1;
263         return 0;
264 }
265
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284                                   DLM_IFL_OVERLAP_CANCEL));
285 }
286
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289         if (is_master_copy(lkb))
290                 return;
291
292         del_timeout(lkb);
293
294         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
296         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297            timeout caused the cancel then return -ETIMEDOUT */
298         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300                 rv = -ETIMEDOUT;
301         }
302
303         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305                 rv = -EDEADLK;
306         }
307
308         dlm_add_ast(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313         queue_cast(r, lkb,
314                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319         if (is_master_copy(lkb)) {
320                 send_bast(r, lkb, rqmode);
321         } else {
322                 dlm_add_ast(lkb, DLM_CB_BAST, rqmode, 0, 0);
323         }
324 }
325
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329
330 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
331 {
332         struct dlm_rsb *r;
333
334         r = dlm_allocate_rsb(ls, len);
335         if (!r)
336                 return NULL;
337
338         r->res_ls = ls;
339         r->res_length = len;
340         memcpy(r->res_name, name, len);
341         mutex_init(&r->res_mutex);
342
343         INIT_LIST_HEAD(&r->res_lookup);
344         INIT_LIST_HEAD(&r->res_grantqueue);
345         INIT_LIST_HEAD(&r->res_convertqueue);
346         INIT_LIST_HEAD(&r->res_waitqueue);
347         INIT_LIST_HEAD(&r->res_root_list);
348         INIT_LIST_HEAD(&r->res_recover_list);
349
350         return r;
351 }
352
353 static int search_rsb_list(struct list_head *head, char *name, int len,
354                            unsigned int flags, struct dlm_rsb **r_ret)
355 {
356         struct dlm_rsb *r;
357         int error = 0;
358
359         list_for_each_entry(r, head, res_hashchain) {
360                 if (len == r->res_length && !memcmp(name, r->res_name, len))
361                         goto found;
362         }
363         *r_ret = NULL;
364         return -EBADR;
365
366  found:
367         if (r->res_nodeid && (flags & R_MASTER))
368                 error = -ENOTBLK;
369         *r_ret = r;
370         return error;
371 }
372
373 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
374                        unsigned int flags, struct dlm_rsb **r_ret)
375 {
376         struct dlm_rsb *r;
377         int error;
378
379         error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
380         if (!error) {
381                 kref_get(&r->res_ref);
382                 goto out;
383         }
384         error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
385         if (error)
386                 goto out;
387
388         list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
389
390         if (dlm_no_directory(ls))
391                 goto out;
392
393         if (r->res_nodeid == -1) {
394                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
395                 r->res_first_lkid = 0;
396         } else if (r->res_nodeid > 0) {
397                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
398                 r->res_first_lkid = 0;
399         } else {
400                 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
401                 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
402         }
403  out:
404         *r_ret = r;
405         return error;
406 }
407
408 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
409                       unsigned int flags, struct dlm_rsb **r_ret)
410 {
411         int error;
412         spin_lock(&ls->ls_rsbtbl[b].lock);
413         error = _search_rsb(ls, name, len, b, flags, r_ret);
414         spin_unlock(&ls->ls_rsbtbl[b].lock);
415         return error;
416 }
417
418 /*
419  * Find rsb in rsbtbl and potentially create/add one
420  *
421  * Delaying the release of rsb's has a similar benefit to applications keeping
422  * NL locks on an rsb, but without the guarantee that the cached master value
423  * will still be valid when the rsb is reused.  Apps aren't always smart enough
424  * to keep NL locks on an rsb that they may lock again shortly; this can lead
425  * to excessive master lookups and removals if we don't delay the release.
426  *
427  * Searching for an rsb means looking through both the normal list and toss
428  * list.  When found on the toss list the rsb is moved to the normal list with
429  * ref count of 1; when found on normal list the ref count is incremented.
430  */
431
432 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
433                     unsigned int flags, struct dlm_rsb **r_ret)
434 {
435         struct dlm_rsb *r = NULL, *tmp;
436         uint32_t hash, bucket;
437         int error = -EINVAL;
438
439         if (namelen > DLM_RESNAME_MAXLEN)
440                 goto out;
441
442         if (dlm_no_directory(ls))
443                 flags |= R_CREATE;
444
445         error = 0;
446         hash = jhash(name, namelen, 0);
447         bucket = hash & (ls->ls_rsbtbl_size - 1);
448
449         error = search_rsb(ls, name, namelen, bucket, flags, &r);
450         if (!error)
451                 goto out;
452
453         if (error == -EBADR && !(flags & R_CREATE))
454                 goto out;
455
456         /* the rsb was found but wasn't a master copy */
457         if (error == -ENOTBLK)
458                 goto out;
459
460         error = -ENOMEM;
461         r = create_rsb(ls, name, namelen);
462         if (!r)
463                 goto out;
464
465         r->res_hash = hash;
466         r->res_bucket = bucket;
467         r->res_nodeid = -1;
468         kref_init(&r->res_ref);
469
470         /* With no directory, the master can be set immediately */
471         if (dlm_no_directory(ls)) {
472                 int nodeid = dlm_dir_nodeid(r);
473                 if (nodeid == dlm_our_nodeid())
474                         nodeid = 0;
475                 r->res_nodeid = nodeid;
476         }
477
478         spin_lock(&ls->ls_rsbtbl[bucket].lock);
479         error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
480         if (!error) {
481                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
482                 dlm_free_rsb(r);
483                 r = tmp;
484                 goto out;
485         }
486         list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
487         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
488         error = 0;
489  out:
490         *r_ret = r;
491         return error;
492 }
493
494 /* This is only called to add a reference when the code already holds
495    a valid reference to the rsb, so there's no need for locking. */
496
497 static inline void hold_rsb(struct dlm_rsb *r)
498 {
499         kref_get(&r->res_ref);
500 }
501
502 void dlm_hold_rsb(struct dlm_rsb *r)
503 {
504         hold_rsb(r);
505 }
506
507 static void toss_rsb(struct kref *kref)
508 {
509         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510         struct dlm_ls *ls = r->res_ls;
511
512         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513         kref_init(&r->res_ref);
514         list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515         r->res_toss_time = jiffies;
516         if (r->res_lvbptr) {
517                 dlm_free_lvb(r->res_lvbptr);
518                 r->res_lvbptr = NULL;
519         }
520 }
521
522 /* When all references to the rsb are gone it's transferred to
523    the tossed list for later disposal. */
524
525 static void put_rsb(struct dlm_rsb *r)
526 {
527         struct dlm_ls *ls = r->res_ls;
528         uint32_t bucket = r->res_bucket;
529
530         spin_lock(&ls->ls_rsbtbl[bucket].lock);
531         kref_put(&r->res_ref, toss_rsb);
532         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
533 }
534
535 void dlm_put_rsb(struct dlm_rsb *r)
536 {
537         put_rsb(r);
538 }
539
540 /* See comment for unhold_lkb */
541
542 static void unhold_rsb(struct dlm_rsb *r)
543 {
544         int rv;
545         rv = kref_put(&r->res_ref, toss_rsb);
546         DLM_ASSERT(!rv, dlm_dump_rsb(r););
547 }
548
549 static void kill_rsb(struct kref *kref)
550 {
551         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553         /* All work is done after the return from kref_put() so we
554            can release the write_lock before the remove and free. */
555
556         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
562 }
563
564 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565    The rsb must exist as long as any lkb's for it do. */
566
567 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568 {
569         hold_rsb(r);
570         lkb->lkb_resource = r;
571 }
572
573 static void detach_lkb(struct dlm_lkb *lkb)
574 {
575         if (lkb->lkb_resource) {
576                 put_rsb(lkb->lkb_resource);
577                 lkb->lkb_resource = NULL;
578         }
579 }
580
581 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582 {
583         struct dlm_lkb *lkb, *tmp;
584         uint32_t lkid = 0;
585         uint16_t bucket;
586
587         lkb = dlm_allocate_lkb(ls);
588         if (!lkb)
589                 return -ENOMEM;
590
591         lkb->lkb_nodeid = -1;
592         lkb->lkb_grmode = DLM_LOCK_IV;
593         kref_init(&lkb->lkb_ref);
594         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
595         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
596         INIT_LIST_HEAD(&lkb->lkb_time_list);
597         INIT_LIST_HEAD(&lkb->lkb_astqueue);
598
599         get_random_bytes(&bucket, sizeof(bucket));
600         bucket &= (ls->ls_lkbtbl_size - 1);
601
602         write_lock(&ls->ls_lkbtbl[bucket].lock);
603
604         /* counter can roll over so we must verify lkid is not in use */
605
606         while (lkid == 0) {
607                 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
608
609                 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
610                                     lkb_idtbl_list) {
611                         if (tmp->lkb_id != lkid)
612                                 continue;
613                         lkid = 0;
614                         break;
615                 }
616         }
617
618         lkb->lkb_id = lkid;
619         list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
620         write_unlock(&ls->ls_lkbtbl[bucket].lock);
621
622         *lkb_ret = lkb;
623         return 0;
624 }
625
626 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
627 {
628         struct dlm_lkb *lkb;
629         uint16_t bucket = (lkid >> 16);
630
631         list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
632                 if (lkb->lkb_id == lkid)
633                         return lkb;
634         }
635         return NULL;
636 }
637
638 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
639 {
640         struct dlm_lkb *lkb;
641         uint16_t bucket = (lkid >> 16);
642
643         if (bucket >= ls->ls_lkbtbl_size)
644                 return -EBADSLT;
645
646         read_lock(&ls->ls_lkbtbl[bucket].lock);
647         lkb = __find_lkb(ls, lkid);
648         if (lkb)
649                 kref_get(&lkb->lkb_ref);
650         read_unlock(&ls->ls_lkbtbl[bucket].lock);
651
652         *lkb_ret = lkb;
653         return lkb ? 0 : -ENOENT;
654 }
655
656 static void kill_lkb(struct kref *kref)
657 {
658         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
659
660         /* All work is done after the return from kref_put() so we
661            can release the write_lock before the detach_lkb */
662
663         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
664 }
665
666 /* __put_lkb() is used when an lkb may not have an rsb attached to
667    it so we need to provide the lockspace explicitly */
668
669 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
670 {
671         uint16_t bucket = (lkb->lkb_id >> 16);
672
673         write_lock(&ls->ls_lkbtbl[bucket].lock);
674         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
675                 list_del(&lkb->lkb_idtbl_list);
676                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
677
678                 detach_lkb(lkb);
679
680                 /* for local/process lkbs, lvbptr points to caller's lksb */
681                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
682                         dlm_free_lvb(lkb->lkb_lvbptr);
683                 dlm_free_lkb(lkb);
684                 return 1;
685         } else {
686                 write_unlock(&ls->ls_lkbtbl[bucket].lock);
687                 return 0;
688         }
689 }
690
691 int dlm_put_lkb(struct dlm_lkb *lkb)
692 {
693         struct dlm_ls *ls;
694
695         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
696         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
697
698         ls = lkb->lkb_resource->res_ls;
699         return __put_lkb(ls, lkb);
700 }
701
702 /* This is only called to add a reference when the code already holds
703    a valid reference to the lkb, so there's no need for locking. */
704
705 static inline void hold_lkb(struct dlm_lkb *lkb)
706 {
707         kref_get(&lkb->lkb_ref);
708 }
709
710 /* This is called when we need to remove a reference and are certain
711    it's not the last ref.  e.g. del_lkb is always called between a
712    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
713    put_lkb would work fine, but would involve unnecessary locking */
714
715 static inline void unhold_lkb(struct dlm_lkb *lkb)
716 {
717         int rv;
718         rv = kref_put(&lkb->lkb_ref, kill_lkb);
719         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
720 }
721
722 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
723                             int mode)
724 {
725         struct dlm_lkb *lkb = NULL;
726
727         list_for_each_entry(lkb, head, lkb_statequeue)
728                 if (lkb->lkb_rqmode < mode)
729                         break;
730
731         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
732 }
733
734 /* add/remove lkb to rsb's grant/convert/wait queue */
735
736 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
737 {
738         kref_get(&lkb->lkb_ref);
739
740         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
741
742         lkb->lkb_timestamp = ktime_get();
743
744         lkb->lkb_status = status;
745
746         switch (status) {
747         case DLM_LKSTS_WAITING:
748                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750                 else
751                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752                 break;
753         case DLM_LKSTS_GRANTED:
754                 /* convention says granted locks kept in order of grmode */
755                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756                                 lkb->lkb_grmode);
757                 break;
758         case DLM_LKSTS_CONVERT:
759                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761                 else
762                         list_add_tail(&lkb->lkb_statequeue,
763                                       &r->res_convertqueue);
764                 break;
765         default:
766                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767         }
768 }
769
770 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771 {
772         lkb->lkb_status = 0;
773         list_del(&lkb->lkb_statequeue);
774         unhold_lkb(lkb);
775 }
776
777 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778 {
779         hold_lkb(lkb);
780         del_lkb(r, lkb);
781         add_lkb(r, lkb, sts);
782         unhold_lkb(lkb);
783 }
784
785 static int msg_reply_type(int mstype)
786 {
787         switch (mstype) {
788         case DLM_MSG_REQUEST:
789                 return DLM_MSG_REQUEST_REPLY;
790         case DLM_MSG_CONVERT:
791                 return DLM_MSG_CONVERT_REPLY;
792         case DLM_MSG_UNLOCK:
793                 return DLM_MSG_UNLOCK_REPLY;
794         case DLM_MSG_CANCEL:
795                 return DLM_MSG_CANCEL_REPLY;
796         case DLM_MSG_LOOKUP:
797                 return DLM_MSG_LOOKUP_REPLY;
798         }
799         return -1;
800 }
801
802 /* add/remove lkb from global waiters list of lkb's waiting for
803    a reply from a remote node */
804
805 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
806 {
807         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
808         int error = 0;
809
810         mutex_lock(&ls->ls_waiters_mutex);
811
812         if (is_overlap_unlock(lkb) ||
813             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
814                 error = -EINVAL;
815                 goto out;
816         }
817
818         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
819                 switch (mstype) {
820                 case DLM_MSG_UNLOCK:
821                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
822                         break;
823                 case DLM_MSG_CANCEL:
824                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
825                         break;
826                 default:
827                         error = -EBUSY;
828                         goto out;
829                 }
830                 lkb->lkb_wait_count++;
831                 hold_lkb(lkb);
832
833                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
834                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
835                           lkb->lkb_wait_count, lkb->lkb_flags);
836                 goto out;
837         }
838
839         DLM_ASSERT(!lkb->lkb_wait_count,
840                    dlm_print_lkb(lkb);
841                    printk("wait_count %d\n", lkb->lkb_wait_count););
842
843         lkb->lkb_wait_count++;
844         lkb->lkb_wait_type = mstype;
845         hold_lkb(lkb);
846         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
847  out:
848         if (error)
849                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
850                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
851                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
852         mutex_unlock(&ls->ls_waiters_mutex);
853         return error;
854 }
855
856 /* We clear the RESEND flag because we might be taking an lkb off the waiters
857    list as part of process_requestqueue (e.g. a lookup that has an optimized
858    request reply on the requestqueue) between dlm_recover_waiters_pre() which
859    set RESEND and dlm_recover_waiters_post() */
860
861 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
862                                 struct dlm_message *ms)
863 {
864         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
865         int overlap_done = 0;
866
867         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
868                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
869                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
870                 overlap_done = 1;
871                 goto out_del;
872         }
873
874         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
875                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
876                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
877                 overlap_done = 1;
878                 goto out_del;
879         }
880
881         /* Cancel state was preemptively cleared by a successful convert,
882            see next comment, nothing to do. */
883
884         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
885             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
886                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
887                           lkb->lkb_id, lkb->lkb_wait_type);
888                 return -1;
889         }
890
891         /* Remove for the convert reply, and premptively remove for the
892            cancel reply.  A convert has been granted while there's still
893            an outstanding cancel on it (the cancel is moot and the result
894            in the cancel reply should be 0).  We preempt the cancel reply
895            because the app gets the convert result and then can follow up
896            with another op, like convert.  This subsequent op would see the
897            lingering state of the cancel and fail with -EBUSY. */
898
899         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
900             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
901             is_overlap_cancel(lkb) && ms && !ms->m_result) {
902                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
903                           lkb->lkb_id);
904                 lkb->lkb_wait_type = 0;
905                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
906                 lkb->lkb_wait_count--;
907                 goto out_del;
908         }
909
910         /* N.B. type of reply may not always correspond to type of original
911            msg due to lookup->request optimization, verify others? */
912
913         if (lkb->lkb_wait_type) {
914                 lkb->lkb_wait_type = 0;
915                 goto out_del;
916         }
917
918         log_error(ls, "remwait error %x reply %d flags %x no wait_type",
919                   lkb->lkb_id, mstype, lkb->lkb_flags);
920         return -1;
921
922  out_del:
923         /* the force-unlock/cancel has completed and we haven't recvd a reply
924            to the op that was in progress prior to the unlock/cancel; we
925            give up on any reply to the earlier op.  FIXME: not sure when/how
926            this would happen */
927
928         if (overlap_done && lkb->lkb_wait_type) {
929                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
930                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
931                 lkb->lkb_wait_count--;
932                 lkb->lkb_wait_type = 0;
933         }
934
935         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
936
937         lkb->lkb_flags &= ~DLM_IFL_RESEND;
938         lkb->lkb_wait_count--;
939         if (!lkb->lkb_wait_count)
940                 list_del_init(&lkb->lkb_wait_reply);
941         unhold_lkb(lkb);
942         return 0;
943 }
944
945 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
946 {
947         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
948         int error;
949
950         mutex_lock(&ls->ls_waiters_mutex);
951         error = _remove_from_waiters(lkb, mstype, NULL);
952         mutex_unlock(&ls->ls_waiters_mutex);
953         return error;
954 }
955
956 /* Handles situations where we might be processing a "fake" or "stub" reply in
957    which we can't try to take waiters_mutex again. */
958
959 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
960 {
961         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
962         int error;
963
964         if (ms != &ls->ls_stub_ms)
965                 mutex_lock(&ls->ls_waiters_mutex);
966         error = _remove_from_waiters(lkb, ms->m_type, ms);
967         if (ms != &ls->ls_stub_ms)
968                 mutex_unlock(&ls->ls_waiters_mutex);
969         return error;
970 }
971
972 static void dir_remove(struct dlm_rsb *r)
973 {
974         int to_nodeid;
975
976         if (dlm_no_directory(r->res_ls))
977                 return;
978
979         to_nodeid = dlm_dir_nodeid(r);
980         if (to_nodeid != dlm_our_nodeid())
981                 send_remove(r);
982         else
983                 dlm_dir_remove_entry(r->res_ls, to_nodeid,
984                                      r->res_name, r->res_length);
985 }
986
987 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
988    found since they are in order of newest to oldest? */
989
990 static int shrink_bucket(struct dlm_ls *ls, int b)
991 {
992         struct dlm_rsb *r;
993         int count = 0, found;
994
995         for (;;) {
996                 found = 0;
997                 spin_lock(&ls->ls_rsbtbl[b].lock);
998                 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
999                                             res_hashchain) {
1000                         if (!time_after_eq(jiffies, r->res_toss_time +
1001                                            dlm_config.ci_toss_secs * HZ))
1002                                 continue;
1003                         found = 1;
1004                         break;
1005                 }
1006
1007                 if (!found) {
1008                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1009                         break;
1010                 }
1011
1012                 if (kref_put(&r->res_ref, kill_rsb)) {
1013                         list_del(&r->res_hashchain);
1014                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1015
1016                         if (is_master(r))
1017                                 dir_remove(r);
1018                         dlm_free_rsb(r);
1019                         count++;
1020                 } else {
1021                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1022                         log_error(ls, "tossed rsb in use %s", r->res_name);
1023                 }
1024         }
1025
1026         return count;
1027 }
1028
1029 void dlm_scan_rsbs(struct dlm_ls *ls)
1030 {
1031         int i;
1032
1033         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1034                 shrink_bucket(ls, i);
1035                 if (dlm_locking_stopped(ls))
1036                         break;
1037                 cond_resched();
1038         }
1039 }
1040
1041 static void add_timeout(struct dlm_lkb *lkb)
1042 {
1043         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1044
1045         if (is_master_copy(lkb))
1046                 return;
1047
1048         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1049             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1050                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1051                 goto add_it;
1052         }
1053         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1054                 goto add_it;
1055         return;
1056
1057  add_it:
1058         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1059         mutex_lock(&ls->ls_timeout_mutex);
1060         hold_lkb(lkb);
1061         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1062         mutex_unlock(&ls->ls_timeout_mutex);
1063 }
1064
1065 static void del_timeout(struct dlm_lkb *lkb)
1066 {
1067         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1068
1069         mutex_lock(&ls->ls_timeout_mutex);
1070         if (!list_empty(&lkb->lkb_time_list)) {
1071                 list_del_init(&lkb->lkb_time_list);
1072                 unhold_lkb(lkb);
1073         }
1074         mutex_unlock(&ls->ls_timeout_mutex);
1075 }
1076
1077 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1078    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1079    and then lock rsb because of lock ordering in add_timeout.  We may need
1080    to specify some special timeout-related bits in the lkb that are just to
1081    be accessed under the timeout_mutex. */
1082
1083 void dlm_scan_timeout(struct dlm_ls *ls)
1084 {
1085         struct dlm_rsb *r;
1086         struct dlm_lkb *lkb;
1087         int do_cancel, do_warn;
1088         s64 wait_us;
1089
1090         for (;;) {
1091                 if (dlm_locking_stopped(ls))
1092                         break;
1093
1094                 do_cancel = 0;
1095                 do_warn = 0;
1096                 mutex_lock(&ls->ls_timeout_mutex);
1097                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1098
1099                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1100                                                         lkb->lkb_timestamp));
1101
1102                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1103                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1104                                 do_cancel = 1;
1105
1106                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1107                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1108                                 do_warn = 1;
1109
1110                         if (!do_cancel && !do_warn)
1111                                 continue;
1112                         hold_lkb(lkb);
1113                         break;
1114                 }
1115                 mutex_unlock(&ls->ls_timeout_mutex);
1116
1117                 if (!do_cancel && !do_warn)
1118                         break;
1119
1120                 r = lkb->lkb_resource;
1121                 hold_rsb(r);
1122                 lock_rsb(r);
1123
1124                 if (do_warn) {
1125                         /* clear flag so we only warn once */
1126                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1127                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1128                                 del_timeout(lkb);
1129                         dlm_timeout_warn(lkb);
1130                 }
1131
1132                 if (do_cancel) {
1133                         log_debug(ls, "timeout cancel %x node %d %s",
1134                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1135                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1136                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1137                         del_timeout(lkb);
1138                         _cancel_lock(r, lkb);
1139                 }
1140
1141                 unlock_rsb(r);
1142                 unhold_rsb(r);
1143                 dlm_put_lkb(lkb);
1144         }
1145 }
1146
1147 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1148    dlm_recoverd before checking/setting ls_recover_begin. */
1149
1150 void dlm_adjust_timeouts(struct dlm_ls *ls)
1151 {
1152         struct dlm_lkb *lkb;
1153         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1154
1155         ls->ls_recover_begin = 0;
1156         mutex_lock(&ls->ls_timeout_mutex);
1157         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1158                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1159         mutex_unlock(&ls->ls_timeout_mutex);
1160 }
1161
1162 /* lkb is master or local copy */
1163
1164 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1165 {
1166         int b, len = r->res_ls->ls_lvblen;
1167
1168         /* b=1 lvb returned to caller
1169            b=0 lvb written to rsb or invalidated
1170            b=-1 do nothing */
1171
1172         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1173
1174         if (b == 1) {
1175                 if (!lkb->lkb_lvbptr)
1176                         return;
1177
1178                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1179                         return;
1180
1181                 if (!r->res_lvbptr)
1182                         return;
1183
1184                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1185                 lkb->lkb_lvbseq = r->res_lvbseq;
1186
1187         } else if (b == 0) {
1188                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1189                         rsb_set_flag(r, RSB_VALNOTVALID);
1190                         return;
1191                 }
1192
1193                 if (!lkb->lkb_lvbptr)
1194                         return;
1195
1196                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1197                         return;
1198
1199                 if (!r->res_lvbptr)
1200                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1201
1202                 if (!r->res_lvbptr)
1203                         return;
1204
1205                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1206                 r->res_lvbseq++;
1207                 lkb->lkb_lvbseq = r->res_lvbseq;
1208                 rsb_clear_flag(r, RSB_VALNOTVALID);
1209         }
1210
1211         if (rsb_flag(r, RSB_VALNOTVALID))
1212                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1213 }
1214
1215 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1216 {
1217         if (lkb->lkb_grmode < DLM_LOCK_PW)
1218                 return;
1219
1220         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1221                 rsb_set_flag(r, RSB_VALNOTVALID);
1222                 return;
1223         }
1224
1225         if (!lkb->lkb_lvbptr)
1226                 return;
1227
1228         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1229                 return;
1230
1231         if (!r->res_lvbptr)
1232                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1233
1234         if (!r->res_lvbptr)
1235                 return;
1236
1237         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1238         r->res_lvbseq++;
1239         rsb_clear_flag(r, RSB_VALNOTVALID);
1240 }
1241
1242 /* lkb is process copy (pc) */
1243
1244 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1245                             struct dlm_message *ms)
1246 {
1247         int b;
1248
1249         if (!lkb->lkb_lvbptr)
1250                 return;
1251
1252         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1253                 return;
1254
1255         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1256         if (b == 1) {
1257                 int len = receive_extralen(ms);
1258                 if (len > DLM_RESNAME_MAXLEN)
1259                         len = DLM_RESNAME_MAXLEN;
1260                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1261                 lkb->lkb_lvbseq = ms->m_lvbseq;
1262         }
1263 }
1264
1265 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1266    remove_lock -- used for unlock, removes lkb from granted
1267    revert_lock -- used for cancel, moves lkb from convert to granted
1268    grant_lock  -- used for request and convert, adds lkb to granted or
1269                   moves lkb from convert or waiting to granted
1270
1271    Each of these is used for master or local copy lkb's.  There is
1272    also a _pc() variation used to make the corresponding change on
1273    a process copy (pc) lkb. */
1274
1275 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1276 {
1277         del_lkb(r, lkb);
1278         lkb->lkb_grmode = DLM_LOCK_IV;
1279         /* this unhold undoes the original ref from create_lkb()
1280            so this leads to the lkb being freed */
1281         unhold_lkb(lkb);
1282 }
1283
1284 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1285 {
1286         set_lvb_unlock(r, lkb);
1287         _remove_lock(r, lkb);
1288 }
1289
1290 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1291 {
1292         _remove_lock(r, lkb);
1293 }
1294
1295 /* returns: 0 did nothing
1296             1 moved lock to granted
1297            -1 removed lock */
1298
1299 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1300 {
1301         int rv = 0;
1302
1303         lkb->lkb_rqmode = DLM_LOCK_IV;
1304
1305         switch (lkb->lkb_status) {
1306         case DLM_LKSTS_GRANTED:
1307                 break;
1308         case DLM_LKSTS_CONVERT:
1309                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310                 rv = 1;
1311                 break;
1312         case DLM_LKSTS_WAITING:
1313                 del_lkb(r, lkb);
1314                 lkb->lkb_grmode = DLM_LOCK_IV;
1315                 /* this unhold undoes the original ref from create_lkb()
1316                    so this leads to the lkb being freed */
1317                 unhold_lkb(lkb);
1318                 rv = -1;
1319                 break;
1320         default:
1321                 log_print("invalid status for revert %d", lkb->lkb_status);
1322         }
1323         return rv;
1324 }
1325
1326 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1327 {
1328         return revert_lock(r, lkb);
1329 }
1330
1331 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1332 {
1333         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1334                 lkb->lkb_grmode = lkb->lkb_rqmode;
1335                 if (lkb->lkb_status)
1336                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1337                 else
1338                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1339         }
1340
1341         lkb->lkb_rqmode = DLM_LOCK_IV;
1342 }
1343
1344 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1345 {
1346         set_lvb_lock(r, lkb);
1347         _grant_lock(r, lkb);
1348         lkb->lkb_highbast = 0;
1349 }
1350
1351 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1352                           struct dlm_message *ms)
1353 {
1354         set_lvb_lock_pc(r, lkb, ms);
1355         _grant_lock(r, lkb);
1356 }
1357
1358 /* called by grant_pending_locks() which means an async grant message must
1359    be sent to the requesting node in addition to granting the lock if the
1360    lkb belongs to a remote node. */
1361
1362 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1363 {
1364         grant_lock(r, lkb);
1365         if (is_master_copy(lkb))
1366                 send_grant(r, lkb);
1367         else
1368                 queue_cast(r, lkb, 0);
1369 }
1370
1371 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1372    change the granted/requested modes.  We're munging things accordingly in
1373    the process copy.
1374    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1375    conversion deadlock
1376    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1377    compatible with other granted locks */
1378
1379 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1380 {
1381         if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1382                 log_print("munge_demoted %x invalid reply type %d",
1383                           lkb->lkb_id, ms->m_type);
1384                 return;
1385         }
1386
1387         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1388                 log_print("munge_demoted %x invalid modes gr %d rq %d",
1389                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1390                 return;
1391         }
1392
1393         lkb->lkb_grmode = DLM_LOCK_NL;
1394 }
1395
1396 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1397 {
1398         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1399             ms->m_type != DLM_MSG_GRANT) {
1400                 log_print("munge_altmode %x invalid reply type %d",
1401                           lkb->lkb_id, ms->m_type);
1402                 return;
1403         }
1404
1405         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1406                 lkb->lkb_rqmode = DLM_LOCK_PR;
1407         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1408                 lkb->lkb_rqmode = DLM_LOCK_CW;
1409         else {
1410                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1411                 dlm_print_lkb(lkb);
1412         }
1413 }
1414
1415 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1416 {
1417         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1418                                            lkb_statequeue);
1419         if (lkb->lkb_id == first->lkb_id)
1420                 return 1;
1421
1422         return 0;
1423 }
1424
1425 /* Check if the given lkb conflicts with another lkb on the queue. */
1426
1427 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1428 {
1429         struct dlm_lkb *this;
1430
1431         list_for_each_entry(this, head, lkb_statequeue) {
1432                 if (this == lkb)
1433                         continue;
1434                 if (!modes_compat(this, lkb))
1435                         return 1;
1436         }
1437         return 0;
1438 }
1439
1440 /*
1441  * "A conversion deadlock arises with a pair of lock requests in the converting
1442  * queue for one resource.  The granted mode of each lock blocks the requested
1443  * mode of the other lock."
1444  *
1445  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1446  * convert queue from being granted, then deadlk/demote lkb.
1447  *
1448  * Example:
1449  * Granted Queue: empty
1450  * Convert Queue: NL->EX (first lock)
1451  *                PR->EX (second lock)
1452  *
1453  * The first lock can't be granted because of the granted mode of the second
1454  * lock and the second lock can't be granted because it's not first in the
1455  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1456  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1457  * flag set and return DEMOTED in the lksb flags.
1458  *
1459  * Originally, this function detected conv-deadlk in a more limited scope:
1460  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1461  * - if lkb1 was the first entry in the queue (not just earlier), and was
1462  *   blocked by the granted mode of lkb2, and there was nothing on the
1463  *   granted queue preventing lkb1 from being granted immediately, i.e.
1464  *   lkb2 was the only thing preventing lkb1 from being granted.
1465  *
1466  * That second condition meant we'd only say there was conv-deadlk if
1467  * resolving it (by demotion) would lead to the first lock on the convert
1468  * queue being granted right away.  It allowed conversion deadlocks to exist
1469  * between locks on the convert queue while they couldn't be granted anyway.
1470  *
1471  * Now, we detect and take action on conversion deadlocks immediately when
1472  * they're created, even if they may not be immediately consequential.  If
1473  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1474  * mode that would prevent lkb1's conversion from being granted, we do a
1475  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1476  * I think this means that the lkb_is_ahead condition below should always
1477  * be zero, i.e. there will never be conv-deadlk between two locks that are
1478  * both already on the convert queue.
1479  */
1480
1481 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1482 {
1483         struct dlm_lkb *lkb1;
1484         int lkb_is_ahead = 0;
1485
1486         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1487                 if (lkb1 == lkb2) {
1488                         lkb_is_ahead = 1;
1489                         continue;
1490                 }
1491
1492                 if (!lkb_is_ahead) {
1493                         if (!modes_compat(lkb2, lkb1))
1494                                 return 1;
1495                 } else {
1496                         if (!modes_compat(lkb2, lkb1) &&
1497                             !modes_compat(lkb1, lkb2))
1498                                 return 1;
1499                 }
1500         }
1501         return 0;
1502 }
1503
1504 /*
1505  * Return 1 if the lock can be granted, 0 otherwise.
1506  * Also detect and resolve conversion deadlocks.
1507  *
1508  * lkb is the lock to be granted
1509  *
1510  * now is 1 if the function is being called in the context of the
1511  * immediate request, it is 0 if called later, after the lock has been
1512  * queued.
1513  *
1514  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1515  */
1516
1517 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1518 {
1519         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1520
1521         /*
1522          * 6-10: Version 5.4 introduced an option to address the phenomenon of
1523          * a new request for a NL mode lock being blocked.
1524          *
1525          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1526          * request, then it would be granted.  In essence, the use of this flag
1527          * tells the Lock Manager to expedite theis request by not considering
1528          * what may be in the CONVERTING or WAITING queues...  As of this
1529          * writing, the EXPEDITE flag can be used only with new requests for NL
1530          * mode locks.  This flag is not valid for conversion requests.
1531          *
1532          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1533          * conversion or used with a non-NL requested mode.  We also know an
1534          * EXPEDITE request is always granted immediately, so now must always
1535          * be 1.  The full condition to grant an expedite request: (now &&
1536          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1537          * therefore be shortened to just checking the flag.
1538          */
1539
1540         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1541                 return 1;
1542
1543         /*
1544          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1545          * added to the remaining conditions.
1546          */
1547
1548         if (queue_conflict(&r->res_grantqueue, lkb))
1549                 goto out;
1550
1551         /*
1552          * 6-3: By default, a conversion request is immediately granted if the
1553          * requested mode is compatible with the modes of all other granted
1554          * locks
1555          */
1556
1557         if (queue_conflict(&r->res_convertqueue, lkb))
1558                 goto out;
1559
1560         /*
1561          * 6-5: But the default algorithm for deciding whether to grant or
1562          * queue conversion requests does not by itself guarantee that such
1563          * requests are serviced on a "first come first serve" basis.  This, in
1564          * turn, can lead to a phenomenon known as "indefinate postponement".
1565          *
1566          * 6-7: This issue is dealt with by using the optional QUECVT flag with
1567          * the system service employed to request a lock conversion.  This flag
1568          * forces certain conversion requests to be queued, even if they are
1569          * compatible with the granted modes of other locks on the same
1570          * resource.  Thus, the use of this flag results in conversion requests
1571          * being ordered on a "first come first servce" basis.
1572          *
1573          * DCT: This condition is all about new conversions being able to occur
1574          * "in place" while the lock remains on the granted queue (assuming
1575          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1576          * doesn't _have_ to go onto the convert queue where it's processed in
1577          * order.  The "now" variable is necessary to distinguish converts
1578          * being received and processed for the first time now, because once a
1579          * convert is moved to the conversion queue the condition below applies
1580          * requiring fifo granting.
1581          */
1582
1583         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1584                 return 1;
1585
1586         /*
1587          * The NOORDER flag is set to avoid the standard vms rules on grant
1588          * order.
1589          */
1590
1591         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1592                 return 1;
1593
1594         /*
1595          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1596          * granted until all other conversion requests ahead of it are granted
1597          * and/or canceled.
1598          */
1599
1600         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1601                 return 1;
1602
1603         /*
1604          * 6-4: By default, a new request is immediately granted only if all
1605          * three of the following conditions are satisfied when the request is
1606          * issued:
1607          * - The queue of ungranted conversion requests for the resource is
1608          *   empty.
1609          * - The queue of ungranted new requests for the resource is empty.
1610          * - The mode of the new request is compatible with the most
1611          *   restrictive mode of all granted locks on the resource.
1612          */
1613
1614         if (now && !conv && list_empty(&r->res_convertqueue) &&
1615             list_empty(&r->res_waitqueue))
1616                 return 1;
1617
1618         /*
1619          * 6-4: Once a lock request is in the queue of ungranted new requests,
1620          * it cannot be granted until the queue of ungranted conversion
1621          * requests is empty, all ungranted new requests ahead of it are
1622          * granted and/or canceled, and it is compatible with the granted mode
1623          * of the most restrictive lock granted on the resource.
1624          */
1625
1626         if (!now && !conv && list_empty(&r->res_convertqueue) &&
1627             first_in_list(lkb, &r->res_waitqueue))
1628                 return 1;
1629  out:
1630         return 0;
1631 }
1632
1633 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1634                           int *err)
1635 {
1636         int rv;
1637         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1638         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1639
1640         if (err)
1641                 *err = 0;
1642
1643         rv = _can_be_granted(r, lkb, now);
1644         if (rv)
1645                 goto out;
1646
1647         /*
1648          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1649          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1650          * cancels one of the locks.
1651          */
1652
1653         if (is_convert && can_be_queued(lkb) &&
1654             conversion_deadlock_detect(r, lkb)) {
1655                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1656                         lkb->lkb_grmode = DLM_LOCK_NL;
1657                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1658                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1659                         if (err)
1660                                 *err = -EDEADLK;
1661                         else {
1662                                 log_print("can_be_granted deadlock %x now %d",
1663                                           lkb->lkb_id, now);
1664                                 dlm_dump_rsb(r);
1665                         }
1666                 }
1667                 goto out;
1668         }
1669
1670         /*
1671          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1672          * to grant a request in a mode other than the normal rqmode.  It's a
1673          * simple way to provide a big optimization to applications that can
1674          * use them.
1675          */
1676
1677         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1678                 alt = DLM_LOCK_PR;
1679         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1680                 alt = DLM_LOCK_CW;
1681
1682         if (alt) {
1683                 lkb->lkb_rqmode = alt;
1684                 rv = _can_be_granted(r, lkb, now);
1685                 if (rv)
1686                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1687                 else
1688                         lkb->lkb_rqmode = rqmode;
1689         }
1690  out:
1691         return rv;
1692 }
1693
1694 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1695    for locks pending on the convert list.  Once verified (watch for these
1696    log_prints), we should be able to just call _can_be_granted() and not
1697    bother with the demote/deadlk cases here (and there's no easy way to deal
1698    with a deadlk here, we'd have to generate something like grant_lock with
1699    the deadlk error.) */
1700
1701 /* Returns the highest requested mode of all blocked conversions; sets
1702    cw if there's a blocked conversion to DLM_LOCK_CW. */
1703
1704 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1705 {
1706         struct dlm_lkb *lkb, *s;
1707         int hi, demoted, quit, grant_restart, demote_restart;
1708         int deadlk;
1709
1710         quit = 0;
1711  restart:
1712         grant_restart = 0;
1713         demote_restart = 0;
1714         hi = DLM_LOCK_IV;
1715
1716         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1717                 demoted = is_demoted(lkb);
1718                 deadlk = 0;
1719
1720                 if (can_be_granted(r, lkb, 0, &deadlk)) {
1721                         grant_lock_pending(r, lkb);
1722                         grant_restart = 1;
1723                         continue;
1724                 }
1725
1726                 if (!demoted && is_demoted(lkb)) {
1727                         log_print("WARN: pending demoted %x node %d %s",
1728                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1729                         demote_restart = 1;
1730                         continue;
1731                 }
1732
1733                 if (deadlk) {
1734                         log_print("WARN: pending deadlock %x node %d %s",
1735                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1736                         dlm_dump_rsb(r);
1737                         continue;
1738                 }
1739
1740                 hi = max_t(int, lkb->lkb_rqmode, hi);
1741
1742                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1743                         *cw = 1;
1744         }
1745
1746         if (grant_restart)
1747                 goto restart;
1748         if (demote_restart && !quit) {
1749                 quit = 1;
1750                 goto restart;
1751         }
1752
1753         return max_t(int, high, hi);
1754 }
1755
1756 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1757 {
1758         struct dlm_lkb *lkb, *s;
1759
1760         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1761                 if (can_be_granted(r, lkb, 0, NULL))
1762                         grant_lock_pending(r, lkb);
1763                 else {
1764                         high = max_t(int, lkb->lkb_rqmode, high);
1765                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
1766                                 *cw = 1;
1767                 }
1768         }
1769
1770         return high;
1771 }
1772
1773 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1774    on either the convert or waiting queue.
1775    high is the largest rqmode of all locks blocked on the convert or
1776    waiting queue. */
1777
1778 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1779 {
1780         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1781                 if (gr->lkb_highbast < DLM_LOCK_EX)
1782                         return 1;
1783                 return 0;
1784         }
1785
1786         if (gr->lkb_highbast < high &&
1787             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1788                 return 1;
1789         return 0;
1790 }
1791
1792 static void grant_pending_locks(struct dlm_rsb *r)
1793 {
1794         struct dlm_lkb *lkb, *s;
1795         int high = DLM_LOCK_IV;
1796         int cw = 0;
1797
1798         DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1799
1800         high = grant_pending_convert(r, high, &cw);
1801         high = grant_pending_wait(r, high, &cw);
1802
1803         if (high == DLM_LOCK_IV)
1804                 return;
1805
1806         /*
1807          * If there are locks left on the wait/convert queue then send blocking
1808          * ASTs to granted locks based on the largest requested mode (high)
1809          * found above.
1810          */
1811
1812         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1813                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1814                         if (cw && high == DLM_LOCK_PR &&
1815                             lkb->lkb_grmode == DLM_LOCK_PR)
1816                                 queue_bast(r, lkb, DLM_LOCK_CW);
1817                         else
1818                                 queue_bast(r, lkb, high);
1819                         lkb->lkb_highbast = high;
1820                 }
1821         }
1822 }
1823
1824 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1825 {
1826         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1827             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1828                 if (gr->lkb_highbast < DLM_LOCK_EX)
1829                         return 1;
1830                 return 0;
1831         }
1832
1833         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1834                 return 1;
1835         return 0;
1836 }
1837
1838 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1839                             struct dlm_lkb *lkb)
1840 {
1841         struct dlm_lkb *gr;
1842
1843         list_for_each_entry(gr, head, lkb_statequeue) {
1844                 /* skip self when sending basts to convertqueue */
1845                 if (gr == lkb)
1846                         continue;
1847                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1848                         queue_bast(r, gr, lkb->lkb_rqmode);
1849                         gr->lkb_highbast = lkb->lkb_rqmode;
1850                 }
1851         }
1852 }
1853
1854 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1855 {
1856         send_bast_queue(r, &r->res_grantqueue, lkb);
1857 }
1858
1859 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1860 {
1861         send_bast_queue(r, &r->res_grantqueue, lkb);
1862         send_bast_queue(r, &r->res_convertqueue, lkb);
1863 }
1864
1865 /* set_master(r, lkb) -- set the master nodeid of a resource
1866
1867    The purpose of this function is to set the nodeid field in the given
1868    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1869    known, it can just be copied to the lkb and the function will return
1870    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1871    before it can be copied to the lkb.
1872
1873    When the rsb nodeid is being looked up remotely, the initial lkb
1874    causing the lookup is kept on the ls_waiters list waiting for the
1875    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1876    on the rsb's res_lookup list until the master is verified.
1877
1878    Return values:
1879    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1880    1: the rsb master is not available and the lkb has been placed on
1881       a wait queue
1882 */
1883
1884 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886         struct dlm_ls *ls = r->res_ls;
1887         int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1888
1889         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1890                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1891                 r->res_first_lkid = lkb->lkb_id;
1892                 lkb->lkb_nodeid = r->res_nodeid;
1893                 return 0;
1894         }
1895
1896         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1897                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1898                 return 1;
1899         }
1900
1901         if (r->res_nodeid == 0) {
1902                 lkb->lkb_nodeid = 0;
1903                 return 0;
1904         }
1905
1906         if (r->res_nodeid > 0) {
1907                 lkb->lkb_nodeid = r->res_nodeid;
1908                 return 0;
1909         }
1910
1911         DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1912
1913         dir_nodeid = dlm_dir_nodeid(r);
1914
1915         if (dir_nodeid != our_nodeid) {
1916                 r->res_first_lkid = lkb->lkb_id;
1917                 send_lookup(r, lkb);
1918                 return 1;
1919         }
1920
1921         for (i = 0; i < 2; i++) {
1922                 /* It's possible for dlm_scand to remove an old rsb for
1923                    this same resource from the toss list, us to create
1924                    a new one, look up the master locally, and find it
1925                    already exists just before dlm_scand does the
1926                    dir_remove() on the previous rsb. */
1927
1928                 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1929                                        r->res_length, &ret_nodeid);
1930                 if (!error)
1931                         break;
1932                 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1933                 schedule();
1934         }
1935         if (error && error != -EEXIST)
1936                 return error;
1937
1938         if (ret_nodeid == our_nodeid) {
1939                 r->res_first_lkid = 0;
1940                 r->res_nodeid = 0;
1941                 lkb->lkb_nodeid = 0;
1942         } else {
1943                 r->res_first_lkid = lkb->lkb_id;
1944                 r->res_nodeid = ret_nodeid;
1945                 lkb->lkb_nodeid = ret_nodeid;
1946         }
1947         return 0;
1948 }
1949
1950 static void process_lookup_list(struct dlm_rsb *r)
1951 {
1952         struct dlm_lkb *lkb, *safe;
1953
1954         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1955                 list_del_init(&lkb->lkb_rsb_lookup);
1956                 _request_lock(r, lkb);
1957                 schedule();
1958         }
1959 }
1960
1961 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1962
1963 static void confirm_master(struct dlm_rsb *r, int error)
1964 {
1965         struct dlm_lkb *lkb;
1966
1967         if (!r->res_first_lkid)
1968                 return;
1969
1970         switch (error) {
1971         case 0:
1972         case -EINPROGRESS:
1973                 r->res_first_lkid = 0;
1974                 process_lookup_list(r);
1975                 break;
1976
1977         case -EAGAIN:
1978         case -EBADR:
1979         case -ENOTBLK:
1980                 /* the remote request failed and won't be retried (it was
1981                    a NOQUEUE, or has been canceled/unlocked); make a waiting
1982                    lkb the first_lkid */
1983
1984                 r->res_first_lkid = 0;
1985
1986                 if (!list_empty(&r->res_lookup)) {
1987                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1988                                          lkb_rsb_lookup);
1989                         list_del_init(&lkb->lkb_rsb_lookup);
1990                         r->res_first_lkid = lkb->lkb_id;
1991                         _request_lock(r, lkb);
1992                 }
1993                 break;
1994
1995         default:
1996                 log_error(r->res_ls, "confirm_master unknown error %d", error);
1997         }
1998 }
1999
2000 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2001                          int namelen, unsigned long timeout_cs,
2002                          void (*ast) (void *astparam),
2003                          void *astparam,
2004                          void (*bast) (void *astparam, int mode),
2005                          struct dlm_args *args)
2006 {
2007         int rv = -EINVAL;
2008
2009         /* check for invalid arg usage */
2010
2011         if (mode < 0 || mode > DLM_LOCK_EX)
2012                 goto out;
2013
2014         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2015                 goto out;
2016
2017         if (flags & DLM_LKF_CANCEL)
2018                 goto out;
2019
2020         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2021                 goto out;
2022
2023         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2024                 goto out;
2025
2026         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2027                 goto out;
2028
2029         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2030                 goto out;
2031
2032         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2033                 goto out;
2034
2035         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2036                 goto out;
2037
2038         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2039                 goto out;
2040
2041         if (!ast || !lksb)
2042                 goto out;
2043
2044         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2045                 goto out;
2046
2047         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2048                 goto out;
2049
2050         /* these args will be copied to the lkb in validate_lock_args,
2051            it cannot be done now because when converting locks, fields in
2052            an active lkb cannot be modified before locking the rsb */
2053
2054         args->flags = flags;
2055         args->astfn = ast;
2056         args->astparam = astparam;
2057         args->bastfn = bast;
2058         args->timeout = timeout_cs;
2059         args->mode = mode;
2060         args->lksb = lksb;
2061         rv = 0;
2062  out:
2063         return rv;
2064 }
2065
2066 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2067 {
2068         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2069                       DLM_LKF_FORCEUNLOCK))
2070                 return -EINVAL;
2071
2072         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2073                 return -EINVAL;
2074
2075         args->flags = flags;
2076         args->astparam = astarg;
2077         return 0;
2078 }
2079
2080 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2081                               struct dlm_args *args)
2082 {
2083         int rv = -EINVAL;
2084
2085         if (args->flags & DLM_LKF_CONVERT) {
2086                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2087                         goto out;
2088
2089                 if (args->flags & DLM_LKF_QUECVT &&
2090                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2091                         goto out;
2092
2093                 rv = -EBUSY;
2094                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2095                         goto out;
2096
2097                 if (lkb->lkb_wait_type)
2098                         goto out;
2099
2100                 if (is_overlap(lkb))
2101                         goto out;
2102         }
2103
2104         lkb->lkb_exflags = args->flags;
2105         lkb->lkb_sbflags = 0;
2106         lkb->lkb_astfn = args->astfn;
2107         lkb->lkb_astparam = args->astparam;
2108         lkb->lkb_bastfn = args->bastfn;
2109         lkb->lkb_rqmode = args->mode;
2110         lkb->lkb_lksb = args->lksb;
2111         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2112         lkb->lkb_ownpid = (int) current->pid;
2113         lkb->lkb_timeout_cs = args->timeout;
2114         rv = 0;
2115  out:
2116         if (rv)
2117                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2118                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2119                           lkb->lkb_status, lkb->lkb_wait_type,
2120                           lkb->lkb_resource->res_name);
2121         return rv;
2122 }
2123
2124 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2125    for success */
2126
2127 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2128    because there may be a lookup in progress and it's valid to do
2129    cancel/unlockf on it */
2130
2131 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2132 {
2133         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2134         int rv = -EINVAL;
2135
2136         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2137                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2138                 dlm_print_lkb(lkb);
2139                 goto out;
2140         }
2141
2142         /* an lkb may still exist even though the lock is EOL'ed due to a
2143            cancel, unlock or failed noqueue request; an app can't use these
2144            locks; return same error as if the lkid had not been found at all */
2145
2146         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2147                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2148                 rv = -ENOENT;
2149                 goto out;
2150         }
2151
2152         /* an lkb may be waiting for an rsb lookup to complete where the
2153            lookup was initiated by another lock */
2154
2155         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2156                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2157                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2158                         list_del_init(&lkb->lkb_rsb_lookup);
2159                         queue_cast(lkb->lkb_resource, lkb,
2160                                    args->flags & DLM_LKF_CANCEL ?
2161                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2162                         unhold_lkb(lkb); /* undoes create_lkb() */
2163                 }
2164                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2165                 rv = -EBUSY;
2166                 goto out;
2167         }
2168
2169         /* cancel not allowed with another cancel/unlock in progress */
2170
2171         if (args->flags & DLM_LKF_CANCEL) {
2172                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2173                         goto out;
2174
2175                 if (is_overlap(lkb))
2176                         goto out;
2177
2178                 /* don't let scand try to do a cancel */
2179                 del_timeout(lkb);
2180
2181                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2182                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2183                         rv = -EBUSY;
2184                         goto out;
2185                 }
2186
2187                 /* there's nothing to cancel */
2188                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2189                     !lkb->lkb_wait_type) {
2190                         rv = -EBUSY;
2191                         goto out;
2192                 }
2193
2194                 switch (lkb->lkb_wait_type) {
2195                 case DLM_MSG_LOOKUP:
2196                 case DLM_MSG_REQUEST:
2197                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2198                         rv = -EBUSY;
2199                         goto out;
2200                 case DLM_MSG_UNLOCK:
2201                 case DLM_MSG_CANCEL:
2202                         goto out;
2203                 }
2204                 /* add_to_waiters() will set OVERLAP_CANCEL */
2205                 goto out_ok;
2206         }
2207
2208         /* do we need to allow a force-unlock if there's a normal unlock
2209            already in progress?  in what conditions could the normal unlock
2210            fail such that we'd want to send a force-unlock to be sure? */
2211
2212         if (args->flags & DLM_LKF_FORCEUNLOCK) {
2213                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2214                         goto out;
2215
2216                 if (is_overlap_unlock(lkb))
2217                         goto out;
2218
2219                 /* don't let scand try to do a cancel */
2220                 del_timeout(lkb);
2221
2222                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2223                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2224                         rv = -EBUSY;
2225                         goto out;
2226                 }
2227
2228                 switch (lkb->lkb_wait_type) {
2229                 case DLM_MSG_LOOKUP:
2230                 case DLM_MSG_REQUEST:
2231                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2232                         rv = -EBUSY;
2233                         goto out;
2234                 case DLM_MSG_UNLOCK:
2235                         goto out;
2236                 }
2237                 /* add_to_waiters() will set OVERLAP_UNLOCK */
2238                 goto out_ok;
2239         }
2240
2241         /* normal unlock not allowed if there's any op in progress */
2242         rv = -EBUSY;
2243         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2244                 goto out;
2245
2246  out_ok:
2247         /* an overlapping op shouldn't blow away exflags from other op */
2248         lkb->lkb_exflags |= args->flags;
2249         lkb->lkb_sbflags = 0;
2250         lkb->lkb_astparam = args->astparam;
2251         rv = 0;
2252  out:
2253         if (rv)
2254                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2255                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2256                           args->flags, lkb->lkb_wait_type,
2257                           lkb->lkb_resource->res_name);
2258         return rv;
2259 }
2260
2261 /*
2262  * Four stage 4 varieties:
2263  * do_request(), do_convert(), do_unlock(), do_cancel()
2264  * These are called on the master node for the given lock and
2265  * from the central locking logic.
2266  */
2267
2268 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2269 {
2270         int error = 0;
2271
2272         if (can_be_granted(r, lkb, 1, NULL)) {
2273                 grant_lock(r, lkb);
2274                 queue_cast(r, lkb, 0);
2275                 goto out;
2276         }
2277
2278         if (can_be_queued(lkb)) {
2279                 error = -EINPROGRESS;
2280                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2281                 add_timeout(lkb);
2282                 goto out;
2283         }
2284
2285         error = -EAGAIN;
2286         queue_cast(r, lkb, -EAGAIN);
2287  out:
2288         return error;
2289 }
2290
2291 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2292                                int error)
2293 {
2294         switch (error) {
2295         case -EAGAIN:
2296                 if (force_blocking_asts(lkb))
2297                         send_blocking_asts_all(r, lkb);
2298                 break;
2299         case -EINPROGRESS:
2300                 send_blocking_asts(r, lkb);
2301                 break;
2302         }
2303 }
2304
2305 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2306 {
2307         int error = 0;
2308         int deadlk = 0;
2309
2310         /* changing an existing lock may allow others to be granted */
2311
2312         if (can_be_granted(r, lkb, 1, &deadlk)) {
2313                 grant_lock(r, lkb);
2314                 queue_cast(r, lkb, 0);
2315                 goto out;
2316         }
2317
2318         /* can_be_granted() detected that this lock would block in a conversion
2319            deadlock, so we leave it on the granted queue and return EDEADLK in
2320            the ast for the convert. */
2321
2322         if (deadlk) {
2323                 /* it's left on the granted queue */
2324                 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2325                           lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2326                           lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2327                 revert_lock(r, lkb);
2328                 queue_cast(r, lkb, -EDEADLK);
2329                 error = -EDEADLK;
2330                 goto out;
2331         }
2332
2333         /* is_demoted() means the can_be_granted() above set the grmode
2334            to NL, and left us on the granted queue.  This auto-demotion
2335            (due to CONVDEADLK) might mean other locks, and/or this lock, are
2336            now grantable.  We have to try to grant other converting locks
2337            before we try again to grant this one. */
2338
2339         if (is_demoted(lkb)) {
2340                 grant_pending_convert(r, DLM_LOCK_IV, NULL);
2341                 if (_can_be_granted(r, lkb, 1)) {
2342                         grant_lock(r, lkb);
2343                         queue_cast(r, lkb, 0);
2344                         goto out;
2345                 }
2346                 /* else fall through and move to convert queue */
2347         }
2348
2349         if (can_be_queued(lkb)) {
2350                 error = -EINPROGRESS;
2351                 del_lkb(r, lkb);
2352                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2353                 add_timeout(lkb);
2354                 goto out;
2355         }
2356
2357         error = -EAGAIN;
2358         queue_cast(r, lkb, -EAGAIN);
2359  out:
2360         return error;
2361 }
2362
2363 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2364                                int error)
2365 {
2366         switch (error) {
2367         case 0:
2368                 grant_pending_locks(r);
2369                 /* grant_pending_locks also sends basts */
2370                 break;
2371         case -EAGAIN:
2372                 if (force_blocking_asts(lkb))
2373                         send_blocking_asts_all(r, lkb);
2374                 break;
2375         case -EINPROGRESS:
2376                 send_blocking_asts(r, lkb);
2377                 break;
2378         }
2379 }
2380
2381 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2382 {
2383         remove_lock(r, lkb);
2384         queue_cast(r, lkb, -DLM_EUNLOCK);
2385         return -DLM_EUNLOCK;
2386 }
2387
2388 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2389                               int error)
2390 {
2391         grant_pending_locks(r);
2392 }
2393
2394 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2395  
2396 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2397 {
2398         int error;
2399
2400         error = revert_lock(r, lkb);
2401         if (error) {
2402                 queue_cast(r, lkb, -DLM_ECANCEL);
2403                 return -DLM_ECANCEL;
2404         }
2405         return 0;
2406 }
2407
2408 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2409                               int error)
2410 {
2411         if (error)
2412                 grant_pending_locks(r);
2413 }
2414
2415 /*
2416  * Four stage 3 varieties:
2417  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2418  */
2419
2420 /* add a new lkb to a possibly new rsb, called by requesting process */
2421
2422 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2423 {
2424         int error;
2425
2426         /* set_master: sets lkb nodeid from r */
2427
2428         error = set_master(r, lkb);
2429         if (error < 0)
2430                 goto out;
2431         if (error) {
2432                 error = 0;
2433                 goto out;
2434         }
2435
2436         if (is_remote(r)) {
2437                 /* receive_request() calls do_request() on remote node */
2438                 error = send_request(r, lkb);
2439         } else {
2440                 error = do_request(r, lkb);
2441                 /* for remote locks the request_reply is sent
2442                    between do_request and do_request_effects */
2443                 do_request_effects(r, lkb, error);
2444         }
2445  out:
2446         return error;
2447 }
2448
2449 /* change some property of an existing lkb, e.g. mode */
2450
2451 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2452 {
2453         int error;
2454
2455         if (is_remote(r)) {
2456                 /* receive_convert() calls do_convert() on remote node */
2457                 error = send_convert(r, lkb);
2458         } else {
2459                 error = do_convert(r, lkb);
2460                 /* for remote locks the convert_reply is sent
2461                    between do_convert and do_convert_effects */
2462                 do_convert_effects(r, lkb, error);
2463         }
2464
2465         return error;
2466 }
2467
2468 /* remove an existing lkb from the granted queue */
2469
2470 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2471 {
2472         int error;
2473
2474         if (is_remote(r)) {
2475                 /* receive_unlock() calls do_unlock() on remote node */
2476                 error = send_unlock(r, lkb);
2477         } else {
2478                 error = do_unlock(r, lkb);
2479                 /* for remote locks the unlock_reply is sent
2480                    between do_unlock and do_unlock_effects */
2481                 do_unlock_effects(r, lkb, error);
2482         }
2483
2484         return error;
2485 }
2486
2487 /* remove an existing lkb from the convert or wait queue */
2488
2489 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2490 {
2491         int error;
2492
2493         if (is_remote(r)) {
2494                 /* receive_cancel() calls do_cancel() on remote node */
2495                 error = send_cancel(r, lkb);
2496         } else {
2497                 error = do_cancel(r, lkb);
2498                 /* for remote locks the cancel_reply is sent
2499                    between do_cancel and do_cancel_effects */
2500                 do_cancel_effects(r, lkb, error);
2501         }
2502
2503         return error;
2504 }
2505
2506 /*
2507  * Four stage 2 varieties:
2508  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2509  */
2510
2511 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2512                         int len, struct dlm_args *args)
2513 {
2514         struct dlm_rsb *r;
2515         int error;
2516
2517         error = validate_lock_args(ls, lkb, args);
2518         if (error)
2519                 goto out;
2520
2521         error = find_rsb(ls, name, len, R_CREATE, &r);
2522         if (error)
2523                 goto out;
2524
2525         lock_rsb(r);
2526
2527         attach_lkb(r, lkb);
2528         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2529
2530         error = _request_lock(r, lkb);
2531
2532         unlock_rsb(r);
2533         put_rsb(r);
2534
2535  out:
2536         return error;
2537 }
2538
2539 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2540                         struct dlm_args *args)
2541 {
2542         struct dlm_rsb *r;
2543         int error;
2544
2545         r = lkb->lkb_resource;
2546
2547         hold_rsb(r);
2548         lock_rsb(r);
2549
2550         error = validate_lock_args(ls, lkb, args);
2551         if (error)
2552                 goto out;
2553
2554         error = _convert_lock(r, lkb);
2555  out:
2556         unlock_rsb(r);
2557         put_rsb(r);
2558         return error;
2559 }
2560
2561 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2562                        struct dlm_args *args)
2563 {
2564         struct dlm_rsb *r;
2565         int error;
2566
2567         r = lkb->lkb_resource;
2568
2569         hold_rsb(r);
2570         lock_rsb(r);
2571
2572         error = validate_unlock_args(lkb, args);
2573         if (error)
2574                 goto out;
2575
2576         error = _unlock_lock(r, lkb);
2577  out:
2578         unlock_rsb(r);
2579         put_rsb(r);
2580         return error;
2581 }
2582
2583 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2584                        struct dlm_args *args)
2585 {
2586         struct dlm_rsb *r;
2587         int error;
2588
2589         r = lkb->lkb_resource;
2590
2591         hold_rsb(r);
2592         lock_rsb(r);
2593
2594         error = validate_unlock_args(lkb, args);
2595         if (error)
2596                 goto out;
2597
2598         error = _cancel_lock(r, lkb);
2599  out:
2600         unlock_rsb(r);
2601         put_rsb(r);
2602         return error;
2603 }
2604
2605 /*
2606  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2607  */
2608
2609 int dlm_lock(dlm_lockspace_t *lockspace,
2610              int mode,
2611              struct dlm_lksb *lksb,
2612              uint32_t flags,
2613              void *name,
2614              unsigned int namelen,
2615              uint32_t parent_lkid,
2616              void (*ast) (void *astarg),
2617              void *astarg,
2618              void (*bast) (void *astarg, int mode))
2619 {
2620         struct dlm_ls *ls;
2621         struct dlm_lkb *lkb;
2622         struct dlm_args args;
2623         int error, convert = flags & DLM_LKF_CONVERT;
2624
2625         ls = dlm_find_lockspace_local(lockspace);
2626         if (!ls)
2627                 return -EINVAL;
2628
2629         dlm_lock_recovery(ls);
2630
2631         if (convert)
2632                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2633         else
2634                 error = create_lkb(ls, &lkb);
2635
2636         if (error)
2637                 goto out;
2638
2639         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2640                               astarg, bast, &args);
2641         if (error)
2642                 goto out_put;
2643
2644         if (convert)
2645                 error = convert_lock(ls, lkb, &args);
2646         else
2647                 error = request_lock(ls, lkb, name, namelen, &args);
2648
2649         if (error == -EINPROGRESS)
2650                 error = 0;
2651  out_put:
2652         if (convert || error)
2653                 __put_lkb(ls, lkb);
2654         if (error == -EAGAIN || error == -EDEADLK)
2655                 error = 0;
2656  out:
2657         dlm_unlock_recovery(ls);
2658         dlm_put_lockspace(ls);
2659         return error;
2660 }
2661
2662 int dlm_unlock(dlm_lockspace_t *lockspace,
2663                uint32_t lkid,
2664                uint32_t flags,
2665                struct dlm_lksb *lksb,
2666                void *astarg)
2667 {
2668         struct dlm_ls *ls;
2669         struct dlm_lkb *lkb;
2670         struct dlm_args args;
2671         int error;
2672
2673         ls = dlm_find_lockspace_local(lockspace);
2674         if (!ls)
2675                 return -EINVAL;
2676
2677         dlm_lock_recovery(ls);
2678
2679         error = find_lkb(ls, lkid, &lkb);
2680         if (error)
2681                 goto out;
2682
2683         error = set_unlock_args(flags, astarg, &args);
2684         if (error)
2685                 goto out_put;
2686
2687         if (flags & DLM_LKF_CANCEL)
2688                 error = cancel_lock(ls, lkb, &args);
2689         else
2690                 error = unlock_lock(ls, lkb, &args);
2691
2692         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2693                 error = 0;
2694         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2695                 error = 0;
2696  out_put:
2697         dlm_put_lkb(lkb);
2698  out:
2699         dlm_unlock_recovery(ls);
2700         dlm_put_lockspace(ls);
2701         return error;
2702 }
2703
2704 /*
2705  * send/receive routines for remote operations and replies
2706  *
2707  * send_args
2708  * send_common
2709  * send_request                 receive_request
2710  * send_convert                 receive_convert
2711  * send_unlock                  receive_unlock
2712  * send_cancel                  receive_cancel
2713  * send_grant                   receive_grant
2714  * send_bast                    receive_bast
2715  * send_lookup                  receive_lookup
2716  * send_remove                  receive_remove
2717  *
2718  *                              send_common_reply
2719  * receive_request_reply        send_request_reply
2720  * receive_convert_reply        send_convert_reply
2721  * receive_unlock_reply         send_unlock_reply
2722  * receive_cancel_reply         send_cancel_reply
2723  * receive_lookup_reply         send_lookup_reply
2724  */
2725
2726 static int _create_message(struct dlm_ls *ls, int mb_len,
2727                            int to_nodeid, int mstype,
2728                            struct dlm_message **ms_ret,
2729                            struct dlm_mhandle **mh_ret)
2730 {
2731         struct dlm_message *ms;
2732         struct dlm_mhandle *mh;
2733         char *mb;
2734
2735         /* get_buffer gives us a message handle (mh) that we need to
2736            pass into lowcomms_commit and a message buffer (mb) that we
2737            write our data into */
2738
2739         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2740         if (!mh)
2741                 return -ENOBUFS;
2742
2743         memset(mb, 0, mb_len);
2744
2745         ms = (struct dlm_message *) mb;
2746
2747         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2748         ms->m_header.h_lockspace = ls->ls_global_id;
2749         ms->m_header.h_nodeid = dlm_our_nodeid();
2750         ms->m_header.h_length = mb_len;
2751         ms->m_header.h_cmd = DLM_MSG;
2752
2753         ms->m_type = mstype;
2754
2755         *mh_ret = mh;
2756         *ms_ret = ms;
2757         return 0;
2758 }
2759
2760 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2761                           int to_nodeid, int mstype,
2762                           struct dlm_message **ms_ret,
2763                           struct dlm_mhandle **mh_ret)
2764 {
2765         int mb_len = sizeof(struct dlm_message);
2766
2767         switch (mstype) {
2768         case DLM_MSG_REQUEST:
2769         case DLM_MSG_LOOKUP:
2770         case DLM_MSG_REMOVE:
2771                 mb_len += r->res_length;
2772                 break;
2773         case DLM_MSG_CONVERT:
2774         case DLM_MSG_UNLOCK:
2775         case DLM_MSG_REQUEST_REPLY:
2776         case DLM_MSG_CONVERT_REPLY:
2777         case DLM_MSG_GRANT:
2778                 if (lkb && lkb->lkb_lvbptr)
2779                         mb_len += r->res_ls->ls_lvblen;
2780                 break;
2781         }
2782
2783         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2784                                ms_ret, mh_ret);
2785 }
2786
2787 /* further lowcomms enhancements or alternate implementations may make
2788    the return value from this function useful at some point */
2789
2790 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2791 {
2792         dlm_message_out(ms);
2793         dlm_lowcomms_commit_buffer(mh);
2794         return 0;
2795 }
2796
2797 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2798                       struct dlm_message *ms)
2799 {
2800         ms->m_nodeid   = lkb->lkb_nodeid;
2801         ms->m_pid      = lkb->lkb_ownpid;
2802         ms->m_lkid     = lkb->lkb_id;
2803         ms->m_remid    = lkb->lkb_remid;
2804         ms->m_exflags  = lkb->lkb_exflags;
2805         ms->m_sbflags  = lkb->lkb_sbflags;
2806         ms->m_flags    = lkb->lkb_flags;
2807         ms->m_lvbseq   = lkb->lkb_lvbseq;
2808         ms->m_status   = lkb->lkb_status;
2809         ms->m_grmode   = lkb->lkb_grmode;
2810         ms->m_rqmode   = lkb->lkb_rqmode;
2811         ms->m_hash     = r->res_hash;
2812
2813         /* m_result and m_bastmode are set from function args,
2814            not from lkb fields */
2815
2816         if (lkb->lkb_bastfn)
2817                 ms->m_asts |= DLM_CB_BAST;
2818         if (lkb->lkb_astfn)
2819                 ms->m_asts |= DLM_CB_CAST;
2820
2821         /* compare with switch in create_message; send_remove() doesn't
2822            use send_args() */
2823
2824         switch (ms->m_type) {
2825         case DLM_MSG_REQUEST:
2826         case DLM_MSG_LOOKUP:
2827                 memcpy(ms->m_extra, r->res_name, r->res_length);
2828                 break;
2829         case DLM_MSG_CONVERT:
2830         case DLM_MSG_UNLOCK:
2831         case DLM_MSG_REQUEST_REPLY:
2832         case DLM_MSG_CONVERT_REPLY:
2833         case DLM_MSG_GRANT:
2834                 if (!lkb->lkb_lvbptr)
2835                         break;
2836                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2837                 break;
2838         }
2839 }
2840
2841 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2842 {
2843         struct dlm_message *ms;
2844         struct dlm_mhandle *mh;
2845         int to_nodeid, error;
2846
2847         error = add_to_waiters(lkb, mstype);
2848         if (error)
2849                 return error;
2850
2851         to_nodeid = r->res_nodeid;
2852
2853         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2854         if (error)
2855                 goto fail;
2856
2857         send_args(r, lkb, ms);
2858
2859         error = send_message(mh, ms);
2860         if (error)
2861                 goto fail;
2862         return 0;
2863
2864  fail:
2865         remove_from_waiters(lkb, msg_reply_type(mstype));
2866         return error;
2867 }
2868
2869 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2870 {
2871         return send_common(r, lkb, DLM_MSG_REQUEST);
2872 }
2873
2874 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2875 {
2876         int error;
2877
2878         error = send_common(r, lkb, DLM_MSG_CONVERT);
2879
2880         /* down conversions go without a reply from the master */
2881         if (!error && down_conversion(lkb)) {
2882                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2883                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2884                 r->res_ls->ls_stub_ms.m_result = 0;
2885                 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2886                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2887         }
2888
2889         return error;
2890 }
2891
2892 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2893    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2894    that the master is still correct. */
2895
2896 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2897 {
2898         return send_common(r, lkb, DLM_MSG_UNLOCK);
2899 }
2900
2901 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2902 {
2903         return send_common(r, lkb, DLM_MSG_CANCEL);
2904 }
2905
2906 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2907 {
2908         struct dlm_message *ms;
2909         struct dlm_mhandle *mh;
2910         int to_nodeid, error;
2911
2912         to_nodeid = lkb->lkb_nodeid;
2913
2914         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2915         if (error)
2916                 goto out;
2917
2918         send_args(r, lkb, ms);
2919
2920         ms->m_result = 0;
2921
2922         error = send_message(mh, ms);
2923  out:
2924         return error;
2925 }
2926
2927 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2928 {
2929         struct dlm_message *ms;
2930         struct dlm_mhandle *mh;
2931         int to_nodeid, error;
2932
2933         to_nodeid = lkb->lkb_nodeid;
2934
2935         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2936         if (error)
2937                 goto out;
2938
2939         send_args(r, lkb, ms);
2940
2941         ms->m_bastmode = mode;
2942
2943         error = send_message(mh, ms);
2944  out:
2945         return error;
2946 }
2947
2948 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2949 {
2950         struct dlm_message *ms;
2951         struct dlm_mhandle *mh;
2952         int to_nodeid, error;
2953
2954         error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2955         if (error)
2956                 return error;
2957
2958         to_nodeid = dlm_dir_nodeid(r);
2959
2960         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2961         if (error)
2962                 goto fail;
2963
2964         send_args(r, lkb, ms);
2965
2966         error = send_message(mh, ms);
2967         if (error)
2968                 goto fail;
2969         return 0;
2970
2971  fail:
2972         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2973         return error;
2974 }
2975
2976 static int send_remove(struct dlm_rsb *r)
2977 {
2978         struct dlm_message *ms;
2979         struct dlm_mhandle *mh;
2980         int to_nodeid, error;
2981
2982         to_nodeid = dlm_dir_nodeid(r);
2983
2984         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2985         if (error)
2986                 goto out;
2987
2988         memcpy(ms->m_extra, r->res_name, r->res_length);
2989         ms->m_hash = r->res_hash;
2990
2991         error = send_message(mh, ms);
2992  out:
2993         return error;
2994 }
2995
2996 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2997                              int mstype, int rv)
2998 {
2999         struct dlm_message *ms;
3000         struct dlm_mhandle *mh;
3001         int to_nodeid, error;
3002
3003         to_nodeid = lkb->lkb_nodeid;
3004
3005         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3006         if (error)
3007                 goto out;
3008
3009         send_args(r, lkb, ms);
3010
3011         ms->m_result = rv;
3012
3013         error = send_message(mh, ms);
3014  out:
3015         return error;
3016 }
3017
3018 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3019 {
3020         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3021 }
3022
3023 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3024 {
3025         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3026 }
3027
3028 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3029 {
3030         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3031 }
3032
3033 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3034 {
3035         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3036 }
3037
3038 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3039                              int ret_nodeid, int rv)
3040 {
3041         struct dlm_rsb *r = &ls->ls_stub_rsb;
3042         struct dlm_message *ms;
3043         struct dlm_mhandle *mh;
3044         int error, nodeid = ms_in->m_header.h_nodeid;
3045
3046         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3047         if (error)
3048                 goto out;
3049
3050         ms->m_lkid = ms_in->m_lkid;
3051         ms->m_result = rv;
3052         ms->m_nodeid = ret_nodeid;
3053
3054         error = send_message(mh, ms);
3055  out:
3056         return error;
3057 }
3058
3059 /* which args we save from a received message depends heavily on the type
3060    of message, unlike the send side where we can safely send everything about
3061    the lkb for any type of message */
3062
3063 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3064 {
3065         lkb->lkb_exflags = ms->m_exflags;
3066         lkb->lkb_sbflags = ms->m_sbflags;
3067         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3068                          (ms->m_flags & 0x0000FFFF);
3069 }
3070
3071 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3072 {
3073         lkb->lkb_sbflags = ms->m_sbflags;
3074         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3075                          (ms->m_flags & 0x0000FFFF);
3076 }
3077
3078 static int receive_extralen(struct dlm_message *ms)
3079 {
3080         return (ms->m_header.h_length - sizeof(struct dlm_message));
3081 }
3082
3083 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3084                        struct dlm_message *ms)
3085 {
3086         int len;
3087
3088         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3089                 if (!lkb->lkb_lvbptr)
3090                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3091                 if (!lkb->lkb_lvbptr)
3092                         return -ENOMEM;
3093                 len = receive_extralen(ms);
3094                 if (len > DLM_RESNAME_MAXLEN)
3095                         len = DLM_RESNAME_MAXLEN;
3096                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3097         }
3098         return 0;
3099 }
3100
3101 static void fake_bastfn(void *astparam, int mode)
3102 {
3103         log_print("fake_bastfn should not be called");
3104 }
3105
3106 static void fake_astfn(void *astparam)
3107 {
3108         log_print("fake_astfn should not be called");
3109 }
3110
3111 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3112                                 struct dlm_message *ms)
3113 {
3114         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3115         lkb->lkb_ownpid = ms->m_pid;
3116         lkb->lkb_remid = ms->m_lkid;
3117         lkb->lkb_grmode = DLM_LOCK_IV;
3118         lkb->lkb_rqmode = ms->m_rqmode;
3119
3120         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3121         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3122
3123         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3124                 /* lkb was just created so there won't be an lvb yet */
3125                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3126                 if (!lkb->lkb_lvbptr)
3127                         return -ENOMEM;
3128         }
3129
3130         return 0;
3131 }
3132
3133 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3134                                 struct dlm_message *ms)
3135 {
3136         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3137                 return -EBUSY;
3138
3139         if (receive_lvb(ls, lkb, ms))
3140                 return -ENOMEM;
3141
3142         lkb->lkb_rqmode = ms->m_rqmode;
3143         lkb->lkb_lvbseq = ms->m_lvbseq;
3144
3145         return 0;
3146 }
3147
3148 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3149                                struct dlm_message *ms)
3150 {
3151         if (receive_lvb(ls, lkb, ms))
3152                 return -ENOMEM;
3153         return 0;
3154 }
3155
3156 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3157    uses to send a reply and that the remote end uses to process the reply. */
3158
3159 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3160 {
3161         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3162         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3163         lkb->lkb_remid = ms->m_lkid;
3164 }
3165
3166 /* This is called after the rsb is locked so that we can safely inspect
3167    fields in the lkb. */
3168
3169 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3170 {
3171         int from = ms->m_header.h_nodeid;
3172         int error = 0;
3173
3174         switch (ms->m_type) {
3175         case DLM_MSG_CONVERT:
3176         case DLM_MSG_UNLOCK:
3177         case DLM_MSG_CANCEL:
3178                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3179                         error = -EINVAL;
3180                 break;
3181
3182         case DLM_MSG_CONVERT_REPLY:
3183         case DLM_MSG_UNLOCK_REPLY:
3184         case DLM_MSG_CANCEL_REPLY:
3185         case DLM_MSG_GRANT:
3186         case DLM_MSG_BAST:
3187                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3188                         error = -EINVAL;
3189                 break;
3190
3191         case DLM_MSG_REQUEST_REPLY:
3192                 if (!is_process_copy(lkb))
3193                         error = -EINVAL;
3194                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3195                         error = -EINVAL;
3196                 break;
3197
3198         default:
3199                 error = -EINVAL;
3200         }
3201
3202         if (error)
3203                 log_error(lkb->lkb_resource->res_ls,
3204                           "ignore invalid message %d from %d %x %x %x %d",
3205                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3206                           lkb->lkb_flags, lkb->lkb_nodeid);
3207         return error;
3208 }
3209
3210 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3211 {
3212         struct dlm_lkb *lkb;
3213         struct dlm_rsb *r;
3214         int error, namelen;
3215
3216         error = create_lkb(ls, &lkb);
3217         if (error)
3218                 goto fail;
3219
3220         receive_flags(lkb, ms);
3221         lkb->lkb_flags |= DLM_IFL_MSTCPY;
3222         error = receive_request_args(ls, lkb, ms);
3223         if (error) {
3224                 __put_lkb(ls, lkb);
3225                 goto fail;
3226         }
3227
3228         namelen = receive_extralen(ms);
3229
3230         error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3231         if (error) {
3232                 __put_lkb(ls, lkb);
3233                 goto fail;
3234         }
3235
3236         lock_rsb(r);
3237
3238         attach_lkb(r, lkb);
3239         error = do_request(r, lkb);
3240         send_request_reply(r, lkb, error);
3241         do_request_effects(r, lkb, error);
3242
3243         unlock_rsb(r);
3244         put_rsb(r);
3245
3246         if (error == -EINPROGRESS)
3247                 error = 0;
3248         if (error)
3249                 dlm_put_lkb(lkb);
3250         return;
3251
3252  fail:
3253         setup_stub_lkb(ls, ms);
3254         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3255 }
3256
3257 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3258 {
3259         struct dlm_lkb *lkb;
3260         struct dlm_rsb *r;
3261         int error, reply = 1;
3262
3263         error = find_lkb(ls, ms->m_remid, &lkb);
3264         if (error)
3265                 goto fail;
3266
3267         r = lkb->lkb_resource;
3268
3269         hold_rsb(r);
3270         lock_rsb(r);
3271
3272         error = validate_message(lkb, ms);
3273         if (error)
3274                 goto out;
3275
3276         receive_flags(lkb, ms);
3277
3278         error = receive_convert_args(ls, lkb, ms);
3279         if (error) {
3280                 send_convert_reply(r, lkb, error);
3281                 goto out;
3282         }
3283
3284         reply = !down_conversion(lkb);
3285
3286         error = do_convert(r, lkb);
3287         if (reply)
3288                 send_convert_reply(r, lkb, error);
3289         do_convert_effects(r, lkb, error);
3290  out:
3291         unlock_rsb(r);
3292         put_rsb(r);
3293         dlm_put_lkb(lkb);
3294         return;
3295
3296  fail:
3297         setup_stub_lkb(ls, ms);
3298         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3299 }
3300
3301 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3302 {
3303         struct dlm_lkb *lkb;
3304         struct dlm_rsb *r;
3305         int error;
3306
3307         error = find_lkb(ls, ms->m_remid, &lkb);
3308         if (error)
3309                 goto fail;
3310
3311         r = lkb->lkb_resource;
3312
3313         hold_rsb(r);
3314         lock_rsb(r);
3315
3316         error = validate_message(lkb, ms);
3317         if (error)
3318                 goto out;
3319
3320         receive_flags(lkb, ms);
3321
3322         error = receive_unlock_args(ls, lkb, ms);
3323         if (error) {
3324                 send_unlock_reply(r, lkb, error);
3325                 goto out;
3326         }
3327
3328         error = do_unlock(r, lkb);
3329         send_unlock_reply(r, lkb, error);
3330         do_unlock_effects(r, lkb, error);
3331  out:
3332         unlock_rsb(r);
3333         put_rsb(r);
3334         dlm_put_lkb(lkb);
3335         return;
3336
3337  fail:
3338         setup_stub_lkb(ls, ms);
3339         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3340 }
3341
3342 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3343 {
3344         struct dlm_lkb *lkb;
3345         struct dlm_rsb *r;
3346         int error;
3347
3348         error = find_lkb(ls, ms->m_remid, &lkb);
3349         if (error)
3350                 goto fail;
3351
3352         receive_flags(lkb, ms);
3353
3354         r = lkb->lkb_resource;
3355
3356         hold_rsb(r);
3357         lock_rsb(r);
3358
3359         error = validate_message(lkb, ms);
3360         if (error)
3361                 goto out;
3362
3363         error = do_cancel(r, lkb);
3364         send_cancel_reply(r, lkb, error);
3365         do_cancel_effects(r, lkb, error);
3366  out:
3367         unlock_rsb(r);
3368         put_rsb(r);
3369         dlm_put_lkb(lkb);
3370         return;
3371
3372  fail:
3373         setup_stub_lkb(ls, ms);
3374         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3375 }
3376
3377 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3378 {
3379         struct dlm_lkb *lkb;
3380         struct dlm_rsb *r;
3381         int error;
3382
3383         error = find_lkb(ls, ms->m_remid, &lkb);
3384         if (error) {
3385                 log_debug(ls, "receive_grant from %d no lkb %x",
3386                           ms->m_header.h_nodeid, ms->m_remid);
3387                 return;
3388         }
3389
3390         r = lkb->lkb_resource;
3391
3392         hold_rsb(r);
3393         lock_rsb(r);
3394
3395         error = validate_message(lkb, ms);
3396         if (error)
3397                 goto out;
3398
3399         receive_flags_reply(lkb, ms);
3400         if (is_altmode(lkb))
3401                 munge_altmode(lkb, ms);
3402         grant_lock_pc(r, lkb, ms);
3403         queue_cast(r, lkb, 0);
3404  out:
3405         unlock_rsb(r);
3406         put_rsb(r);
3407         dlm_put_lkb(lkb);
3408 }
3409
3410 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3411 {
3412         struct dlm_lkb *lkb;
3413         struct dlm_rsb *r;
3414         int error;
3415
3416         error = find_lkb(ls, ms->m_remid, &lkb);
3417         if (error) {
3418                 log_debug(ls, "receive_bast from %d no lkb %x",
3419                           ms->m_header.h_nodeid, ms->m_remid);
3420                 return;
3421         }
3422
3423         r = lkb->lkb_resource;
3424
3425         hold_rsb(r);
3426         lock_rsb(r);
3427
3428         error = validate_message(lkb, ms);
3429         if (error)
3430                 goto out;
3431
3432         queue_bast(r, lkb, ms->m_bastmode);
3433  out:
3434         unlock_rsb(r);
3435         put_rsb(r);
3436         dlm_put_lkb(lkb);
3437 }
3438
3439 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3440 {
3441         int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3442
3443         from_nodeid = ms->m_header.h_nodeid;
3444         our_nodeid = dlm_our_nodeid();
3445
3446         len = receive_extralen(ms);
3447
3448         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3449         if (dir_nodeid != our_nodeid) {
3450                 log_error(ls, "lookup dir_nodeid %d from %d",
3451                           dir_nodeid, from_nodeid);
3452                 error = -EINVAL;
3453                 ret_nodeid = -1;
3454                 goto out;
3455         }
3456
3457         error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3458
3459         /* Optimization: we're master so treat lookup as a request */
3460         if (!error && ret_nodeid == our_nodeid) {
3461                 receive_request(ls, ms);
3462                 return;
3463         }
3464  out:
3465         send_lookup_reply(ls, ms, ret_nodeid, error);
3466 }
3467
3468 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3469 {
3470         int len, dir_nodeid, from_nodeid;
3471
3472         from_nodeid = ms->m_header.h_nodeid;
3473
3474         len = receive_extralen(ms);
3475
3476         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3477         if (dir_nodeid != dlm_our_nodeid()) {
3478                 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3479                           dir_nodeid, from_nodeid);
3480                 return;
3481         }
3482
3483         dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3484 }
3485
3486 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3487 {
3488         do_purge(ls, ms->m_nodeid, ms->m_pid);
3489 }
3490
3491 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3492 {
3493         struct dlm_lkb *lkb;
3494         struct dlm_rsb *r;
3495         int error, mstype, result;
3496
3497         error = find_lkb(ls, ms->m_remid, &lkb);
3498         if (error) {
3499                 log_debug(ls, "receive_request_reply from %d no lkb %x",
3500                           ms->m_header.h_nodeid, ms->m_remid);
3501                 return;
3502         }
3503
3504         r = lkb->lkb_resource;
3505         hold_rsb(r);
3506         lock_rsb(r);
3507
3508         error = validate_message(lkb, ms);
3509         if (error)
3510                 goto out;
3511
3512         mstype = lkb->lkb_wait_type;
3513         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3514         if (error)
3515                 goto out;
3516
3517         /* Optimization: the dir node was also the master, so it took our
3518            lookup as a request and sent request reply instead of lookup reply */
3519         if (mstype == DLM_MSG_LOOKUP) {
3520                 r->res_nodeid = ms->m_header.h_nodeid;
3521                 lkb->lkb_nodeid = r->res_nodeid;
3522         }
3523
3524         /* this is the value returned from do_request() on the master */
3525         result = ms->m_result;
3526
3527         switch (result) {
3528         case -EAGAIN:
3529                 /* request would block (be queued) on remote master */
3530                 queue_cast(r, lkb, -EAGAIN);
3531                 confirm_master(r, -EAGAIN);
3532                 unhold_lkb(lkb); /* undoes create_lkb() */
3533                 break;
3534
3535         case -EINPROGRESS:
3536         case 0:
3537                 /* request was queued or granted on remote master */
3538                 receive_flags_reply(lkb, ms);
3539                 lkb->lkb_remid = ms->m_lkid;
3540                 if (is_altmode(lkb))
3541                         munge_altmode(lkb, ms);
3542                 if (result) {
3543                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
3544                         add_timeout(lkb);
3545                 } else {
3546                         grant_lock_pc(r, lkb, ms);
3547                         queue_cast(r, lkb, 0);
3548                 }
3549                 confirm_master(r, result);
3550                 break;
3551
3552         case -EBADR:
3553         case -ENOTBLK:
3554                 /* find_rsb failed to find rsb or rsb wasn't master */
3555                 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3556                           lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3557                 r->res_nodeid = -1;
3558                 lkb->lkb_nodeid = -1;
3559
3560                 if (is_overlap(lkb)) {
3561                         /* we'll ignore error in cancel/unlock reply */
3562                         queue_cast_overlap(r, lkb);
3563                         confirm_master(r, result);
3564                         unhold_lkb(lkb); /* undoes create_lkb() */
3565                 } else
3566                         _request_lock(r, lkb);
3567                 break;
3568
3569         default:
3570                 log_error(ls, "receive_request_reply %x error %d",
3571                           lkb->lkb_id, result);
3572         }
3573
3574         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3575                 log_debug(ls, "receive_request_reply %x result %d unlock",
3576                           lkb->lkb_id, result);
3577                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3578                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3579                 send_unlock(r, lkb);
3580         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3581                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3582                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3583                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3584                 send_cancel(r, lkb);
3585         } else {
3586                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3587                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3588         }
3589  out:
3590         unlock_rsb(r);
3591         put_rsb(r);
3592         dlm_put_lkb(lkb);
3593 }
3594
3595 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3596                                     struct dlm_message *ms)
3597 {
3598         /* this is the value returned from do_convert() on the master */
3599         switch (ms->m_result) {
3600         case -EAGAIN:
3601                 /* convert would block (be queued) on remote master */
3602                 queue_cast(r, lkb, -EAGAIN);
3603                 break;
3604
3605         case -EDEADLK:
3606                 receive_flags_reply(lkb, ms);
3607                 revert_lock_pc(r, lkb);
3608                 queue_cast(r, lkb, -EDEADLK);
3609                 break;
3610
3611         case -EINPROGRESS:
3612                 /* convert was queued on remote master */
3613                 receive_flags_reply(lkb, ms);
3614                 if (is_demoted(lkb))
3615                         munge_demoted(lkb, ms);
3616                 del_lkb(r, lkb);
3617                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3618                 add_timeout(lkb);
3619                 break;
3620
3621         case 0:
3622                 /* convert was granted on remote master */
3623                 receive_flags_reply(lkb, ms);
3624                 if (is_demoted(lkb))
3625                         munge_demoted(lkb, ms);
3626                 grant_lock_pc(r, lkb, ms);
3627                 queue_cast(r, lkb, 0);
3628                 break;
3629
3630         default:
3631                 log_error(r->res_ls, "receive_convert_reply %x error %d",
3632                           lkb->lkb_id, ms->m_result);
3633         }
3634 }
3635
3636 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3637 {
3638         struct dlm_rsb *r = lkb->lkb_resource;
3639         int error;
3640
3641         hold_rsb(r);
3642         lock_rsb(r);
3643
3644         error = validate_message(lkb, ms);
3645         if (error)
3646                 goto out;
3647
3648         /* stub reply can happen with waiters_mutex held */
3649         error = remove_from_waiters_ms(lkb, ms);
3650         if (error)
3651                 goto out;
3652
3653         __receive_convert_reply(r, lkb, ms);
3654  out:
3655         unlock_rsb(r);
3656         put_rsb(r);
3657 }
3658
3659 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3660 {
3661         struct dlm_lkb *lkb;
3662         int error;
3663
3664         error = find_lkb(ls, ms->m_remid, &lkb);
3665         if (error) {
3666                 log_debug(ls, "receive_convert_reply from %d no lkb %x",
3667                           ms->m_header.h_nodeid, ms->m_remid);
3668                 return;
3669         }
3670
3671         _receive_convert_reply(lkb, ms);
3672         dlm_put_lkb(lkb);
3673 }
3674
3675 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3676 {
3677         struct dlm_rsb *r = lkb->lkb_resource;
3678         int error;
3679
3680         hold_rsb(r);
3681         lock_rsb(r);
3682
3683         error = validate_message(lkb, ms);
3684         if (error)
3685                 goto out;
3686
3687         /* stub reply can happen with waiters_mutex held */
3688         error = remove_from_waiters_ms(lkb, ms);
3689         if (error)
3690                 goto out;
3691
3692         /* this is the value returned from do_unlock() on the master */
3693
3694         switch (ms->m_result) {
3695         case -DLM_EUNLOCK:
3696                 receive_flags_reply(lkb, ms);
3697                 remove_lock_pc(r, lkb);
3698                 queue_cast(r, lkb, -DLM_EUNLOCK);
3699                 break;
3700         case -ENOENT:
3701                 break;
3702         default:
3703                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3704                           lkb->lkb_id, ms->m_result);
3705         }
3706  out:
3707         unlock_rsb(r);
3708         put_rsb(r);
3709 }
3710
3711 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3712 {
3713         struct dlm_lkb *lkb;
3714         int error;
3715
3716         error = find_lkb(ls, ms->m_remid, &lkb);
3717         if (error) {
3718                 log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3719                           ms->m_header.h_nodeid, ms->m_remid);
3720                 return;
3721         }
3722
3723         _receive_unlock_reply(lkb, ms);
3724         dlm_put_lkb(lkb);
3725 }
3726
3727 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3728 {
3729         struct dlm_rsb *r = lkb->lkb_resource;
3730         int error;
3731
3732         hold_rsb(r);
3733         lock_rsb(r);
3734
3735         error = validate_message(lkb, ms);
3736         if (error)
3737                 goto out;
3738
3739         /* stub reply can happen with waiters_mutex held */
3740         error = remove_from_waiters_ms(lkb, ms);
3741         if (error)
3742                 goto out;
3743
3744         /* this is the value returned from do_cancel() on the master */
3745
3746         switch (ms->m_result) {
3747         case -DLM_ECANCEL:
3748                 receive_flags_reply(lkb, ms);
3749                 revert_lock_pc(r, lkb);
3750                 queue_cast(r, lkb, -DLM_ECANCEL);
3751                 break;
3752         case 0:
3753                 break;
3754         default:
3755                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3756                           lkb->lkb_id, ms->m_result);
3757         }
3758  out:
3759         unlock_rsb(r);
3760         put_rsb(r);
3761 }
3762
3763 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3764 {
3765         struct dlm_lkb *lkb;
3766         int error;
3767
3768         error = find_lkb(ls, ms->m_remid, &lkb);
3769         if (error) {
3770                 log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3771                           ms->m_header.h_nodeid, ms->m_remid);
3772                 return;
3773         }
3774
3775         _receive_cancel_reply(lkb, ms);
3776         dlm_put_lkb(lkb);
3777 }
3778
3779 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3780 {
3781         struct dlm_lkb *lkb;
3782         struct dlm_rsb *r;
3783         int error, ret_nodeid;
3784
3785         error = find_lkb(ls, ms->m_lkid, &lkb);
3786         if (error) {
3787                 log_error(ls, "receive_lookup_reply no lkb");
3788                 return;
3789         }
3790
3791         /* ms->m_result is the value returned by dlm_dir_lookup on dir node
3792            FIXME: will a non-zero error ever be returned? */
3793
3794         r = lkb->lkb_resource;
3795         hold_rsb(r);
3796         lock_rsb(r);
3797
3798         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3799         if (error)
3800                 goto out;
3801
3802         ret_nodeid = ms->m_nodeid;
3803         if (ret_nodeid == dlm_our_nodeid()) {
3804                 r->res_nodeid = 0;
3805                 ret_nodeid = 0;
3806                 r->res_first_lkid = 0;
3807         } else {
3808                 /* set_master() will copy res_nodeid to lkb_nodeid */
3809                 r->res_nodeid = ret_nodeid;
3810         }
3811
3812         if (is_overlap(lkb)) {
3813                 log_debug(ls, "receive_lookup_reply %x unlock %x",
3814                           lkb->lkb_id, lkb->lkb_flags);
3815                 queue_cast_overlap(r, lkb);
3816                 unhold_lkb(lkb); /* undoes create_lkb() */
3817                 goto out_list;
3818         }
3819
3820         _request_lock(r, lkb);
3821
3822  out_list:
3823         if (!ret_nodeid)
3824                 process_lookup_list(r);
3825  out:
3826         unlock_rsb(r);
3827         put_rsb(r);
3828         dlm_put_lkb(lkb);
3829 }
3830
3831 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3832 {
3833         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3834                 log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3835                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3836                           ms->m_remid, ms->m_result);
3837                 return;
3838         }
3839
3840         switch (ms->m_type) {
3841
3842         /* messages sent to a master node */
3843
3844         case DLM_MSG_REQUEST:
3845                 receive_request(ls, ms);
3846                 break;
3847
3848         case DLM_MSG_CONVERT:
3849                 receive_convert(ls, ms);
3850                 break;
3851
3852         case DLM_MSG_UNLOCK:
3853                 receive_unlock(ls, ms);
3854                 break;
3855
3856         case DLM_MSG_CANCEL:
3857                 receive_cancel(ls, ms);
3858                 break;
3859
3860         /* messages sent from a master node (replies to above) */
3861
3862         case DLM_MSG_REQUEST_REPLY:
3863                 receive_request_reply(ls, ms);
3864                 break;
3865
3866         case DLM_MSG_CONVERT_REPLY:
3867                 receive_convert_reply(ls, ms);
3868                 break;
3869
3870         case DLM_MSG_UNLOCK_REPLY:
3871                 receive_unlock_reply(ls, ms);
3872                 break;
3873
3874         case DLM_MSG_CANCEL_REPLY:
3875                 receive_cancel_reply(ls, ms);
3876                 break;
3877
3878         /* messages sent from a master node (only two types of async msg) */
3879
3880         case DLM_MSG_GRANT:
3881                 receive_grant(ls, ms);
3882                 break;
3883
3884         case DLM_MSG_BAST:
3885                 receive_bast(ls, ms);
3886                 break;
3887
3888         /* messages sent to a dir node */
3889
3890         case DLM_MSG_LOOKUP:
3891                 receive_lookup(ls, ms);
3892                 break;
3893
3894         case DLM_MSG_REMOVE:
3895                 receive_remove(ls, ms);
3896                 break;
3897
3898         /* messages sent from a dir node (remove has no reply) */
3899
3900         case DLM_MSG_LOOKUP_REPLY:
3901                 receive_lookup_reply(ls, ms);
3902                 break;
3903
3904         /* other messages */
3905
3906         case DLM_MSG_PURGE:
3907                 receive_purge(ls, ms);
3908                 break;
3909
3910         default:
3911                 log_error(ls, "unknown message type %d", ms->m_type);
3912         }
3913
3914         dlm_astd_wake();
3915 }
3916
3917 /* If the lockspace is in recovery mode (locking stopped), then normal
3918    messages are saved on the requestqueue for processing after recovery is
3919    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3920    messages off the requestqueue before we process new ones. This occurs right
3921    after recovery completes when we transition from saving all messages on
3922    requestqueue, to processing all the saved messages, to processing new
3923    messages as they arrive. */
3924
3925 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3926                                 int nodeid)
3927 {
3928         if (dlm_locking_stopped(ls)) {
3929                 dlm_add_requestqueue(ls, nodeid, ms);
3930         } else {
3931                 dlm_wait_requestqueue(ls);
3932                 _receive_message(ls, ms);
3933         }
3934 }
3935
3936 /* This is called by dlm_recoverd to process messages that were saved on
3937    the requestqueue. */
3938
3939 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3940 {
3941         _receive_message(ls, ms);
3942 }
3943
3944 /* This is called by the midcomms layer when something is received for
3945    the lockspace.  It could be either a MSG (normal message sent as part of
3946    standard locking activity) or an RCOM (recovery message sent as part of
3947    lockspace recovery). */
3948
3949 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3950 {
3951         struct dlm_header *hd = &p->header;
3952         struct dlm_ls *ls;
3953         int type = 0;
3954
3955         switch (hd->h_cmd) {
3956         case DLM_MSG:
3957                 dlm_message_in(&p->message);
3958                 type = p->message.m_type;
3959                 break;
3960         case DLM_RCOM:
3961                 dlm_rcom_in(&p->rcom);
3962                 type = p->rcom.rc_type;
3963                 break;
3964         default:
3965                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3966                 return;
3967         }
3968
3969         if (hd->h_nodeid != nodeid) {
3970                 log_print("invalid h_nodeid %d from %d lockspace %x",
3971                           hd->h_nodeid, nodeid, hd->h_lockspace);
3972                 return;
3973         }
3974
3975         ls = dlm_find_lockspace_global(hd->h_lockspace);
3976         if (!ls) {
3977                 if (dlm_config.ci_log_debug)
3978                         log_print("invalid lockspace %x from %d cmd %d type %d",
3979                                   hd->h_lockspace, nodeid, hd->h_cmd, type);
3980
3981                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3982                         dlm_send_ls_not_ready(nodeid, &p->rcom);
3983                 return;
3984         }
3985
3986         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3987            be inactive (in this ls) before transitioning to recovery mode */
3988
3989         down_read(&ls->ls_recv_active);
3990         if (hd->h_cmd == DLM_MSG)
3991                 dlm_receive_message(ls, &p->message, nodeid);
3992         else
3993                 dlm_receive_rcom(ls, &p->rcom, nodeid);
3994         up_read(&ls->ls_recv_active);
3995
3996         dlm_put_lockspace(ls);
3997 }
3998
3999 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
4000 {
4001         if (middle_conversion(lkb)) {
4002                 hold_lkb(lkb);
4003                 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
4004                 ls->ls_stub_ms.m_result = -EINPROGRESS;
4005                 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4006                 ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4007                 _receive_convert_reply(lkb, &ls->ls_stub_ms);
4008
4009                 /* Same special case as in receive_rcom_lock_args() */
4010                 lkb->lkb_grmode = DLM_LOCK_IV;
4011                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4012                 unhold_lkb(lkb);
4013
4014         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4015                 lkb->lkb_flags |= DLM_IFL_RESEND;
4016         }
4017
4018         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4019            conversions are async; there's no reply from the remote master */
4020 }
4021
4022 /* A waiting lkb needs recovery if the master node has failed, or
4023    the master node is changing (only when no directory is used) */
4024
4025 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4026 {
4027         if (dlm_is_removed(ls, lkb->lkb_nodeid))
4028                 return 1;
4029
4030         if (!dlm_no_directory(ls))
4031                 return 0;
4032
4033         if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4034                 return 1;
4035
4036         return 0;
4037 }
4038
4039 /* Recovery for locks that are waiting for replies from nodes that are now
4040    gone.  We can just complete unlocks and cancels by faking a reply from the
4041    dead node.  Requests and up-conversions we flag to be resent after
4042    recovery.  Down-conversions can just be completed with a fake reply like
4043    unlocks.  Conversions between PR and CW need special attention. */
4044
4045 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4046 {
4047         struct dlm_lkb *lkb, *safe;
4048         int wait_type, stub_unlock_result, stub_cancel_result;
4049
4050         mutex_lock(&ls->ls_waiters_mutex);
4051
4052         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4053                 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
4054                           lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
4055
4056                 /* all outstanding lookups, regardless of destination  will be
4057                    resent after recovery is done */
4058
4059                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4060                         lkb->lkb_flags |= DLM_IFL_RESEND;
4061                         continue;
4062                 }
4063
4064                 if (!waiter_needs_recovery(ls, lkb))
4065                         continue;
4066
4067                 wait_type = lkb->lkb_wait_type;
4068                 stub_unlock_result = -DLM_EUNLOCK;
4069                 stub_cancel_result = -DLM_ECANCEL;
4070
4071                 /* Main reply may have been received leaving a zero wait_type,
4072                    but a reply for the overlapping op may not have been
4073                    received.  In that case we need to fake the appropriate
4074                    reply for the overlap op. */
4075
4076                 if (!wait_type) {
4077                         if (is_overlap_cancel(lkb)) {
4078                                 wait_type = DLM_MSG_CANCEL;
4079                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4080                                         stub_cancel_result = 0;
4081                         }
4082                         if (is_overlap_unlock(lkb)) {
4083                                 wait_type = DLM_MSG_UNLOCK;
4084                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
4085                                         stub_unlock_result = -ENOENT;
4086                         }
4087
4088                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
4089                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
4090                                   stub_cancel_result, stub_unlock_result);
4091                 }
4092
4093                 switch (wait_type) {
4094
4095                 case DLM_MSG_REQUEST:
4096                         lkb->lkb_flags |= DLM_IFL_RESEND;
4097                         break;
4098
4099                 case DLM_MSG_CONVERT:
4100                         recover_convert_waiter(ls, lkb);
4101                         break;
4102
4103                 case DLM_MSG_UNLOCK:
4104                         hold_lkb(lkb);
4105                         ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4106                         ls->ls_stub_ms.m_result = stub_unlock_result;
4107                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4108                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4109                         _receive_unlock_reply(lkb, &ls->ls_stub_ms);
4110                         dlm_put_lkb(lkb);
4111                         break;
4112
4113                 case DLM_MSG_CANCEL:
4114                         hold_lkb(lkb);
4115                         ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4116                         ls->ls_stub_ms.m_result = stub_cancel_result;
4117                         ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4118                         ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4119                         _receive_cancel_reply(lkb, &ls->ls_stub_ms);
4120                         dlm_put_lkb(lkb);
4121                         break;
4122
4123                 default:
4124                         log_error(ls, "invalid lkb wait_type %d %d",
4125                                   lkb->lkb_wait_type, wait_type);
4126                 }
4127                 schedule();
4128         }
4129         mutex_unlock(&ls->ls_waiters_mutex);
4130 }
4131
4132 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4133 {
4134         struct dlm_lkb *lkb;
4135         int found = 0;
4136
4137         mutex_lock(&ls->ls_waiters_mutex);
4138         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4139                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
4140                         hold_lkb(lkb);
4141                         found = 1;
4142                         break;
4143                 }
4144         }
4145         mutex_unlock(&ls->ls_waiters_mutex);
4146
4147         if (!found)
4148                 lkb = NULL;
4149         return lkb;
4150 }
4151
4152 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4153    master or dir-node for r.  Processing the lkb may result in it being placed
4154    back on waiters. */
4155
4156 /* We do this after normal locking has been enabled and any saved messages
4157    (in requestqueue) have been processed.  We should be confident that at
4158    this point we won't get or process a reply to any of these waiting
4159    operations.  But, new ops may be coming in on the rsbs/locks here from
4160    userspace or remotely. */
4161
4162 /* there may have been an overlap unlock/cancel prior to recovery or after
4163    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4164    overlap flag would just have been set and nothing new sent.  we can be
4165    confident here than any replies to either the initial op or overlap ops
4166    prior to recovery have been received. */
4167
4168 int dlm_recover_waiters_post(struct dlm_ls *ls)
4169 {
4170         struct dlm_lkb *lkb;
4171         struct dlm_rsb *r;
4172         int error = 0, mstype, err, oc, ou;
4173
4174         while (1) {
4175                 if (dlm_locking_stopped(ls)) {
4176                         log_debug(ls, "recover_waiters_post aborted");
4177                         error = -EINTR;
4178                         break;
4179                 }
4180
4181                 lkb = find_resend_waiter(ls);
4182                 if (!lkb)
4183                         break;
4184
4185                 r = lkb->lkb_resource;
4186                 hold_rsb(r);
4187                 lock_rsb(r);
4188
4189                 mstype = lkb->lkb_wait_type;
4190                 oc = is_overlap_cancel(lkb);
4191                 ou = is_overlap_unlock(lkb);
4192                 err = 0;
4193
4194                 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4195                           lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4196
4197                 /* At this point we assume that we won't get a reply to any
4198                    previous op or overlap op on this lock.  First, do a big
4199                    remove_from_waiters() for all previous ops. */
4200
4201                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
4202                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4203                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4204                 lkb->lkb_wait_type = 0;
4205                 lkb->lkb_wait_count = 0;
4206                 mutex_lock(&ls->ls_waiters_mutex);
4207                 list_del_init(&lkb->lkb_wait_reply);
4208                 mutex_unlock(&ls->ls_waiters_mutex);
4209                 unhold_lkb(lkb); /* for waiters list */
4210
4211                 if (oc || ou) {
4212                         /* do an unlock or cancel instead of resending */
4213                         switch (mstype) {
4214                         case DLM_MSG_LOOKUP:
4215                         case DLM_MSG_REQUEST:
4216                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4217                                                         -DLM_ECANCEL);
4218                                 unhold_lkb(lkb); /* undoes create_lkb() */
4219                                 break;
4220                         case DLM_MSG_CONVERT:
4221                                 if (oc) {
4222                                         queue_cast(r, lkb, -DLM_ECANCEL);
4223                                 } else {
4224                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4225                                         _unlock_lock(r, lkb);
4226                                 }
4227                                 break;
4228                         default:
4229                                 err = 1;
4230                         }
4231                 } else {
4232                         switch (mstype) {
4233                         case DLM_MSG_LOOKUP:
4234                         case DLM_MSG_REQUEST:
4235                                 _request_lock(r, lkb);
4236                                 if (is_master(r))
4237                                         confirm_master(r, 0);
4238                                 break;
4239                         case DLM_MSG_CONVERT:
4240                                 _convert_lock(r, lkb);
4241                                 break;
4242                         default:
4243                                 err = 1;
4244                         }
4245                 }
4246
4247                 if (err)
4248                         log_error(ls, "recover_waiters_post %x %d %x %d %d",
4249                                   lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4250                 unlock_rsb(r);
4251                 put_rsb(r);
4252                 dlm_put_lkb(lkb);
4253         }
4254
4255         return error;
4256 }
4257
4258 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4259                         int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4260 {
4261         struct dlm_ls *ls = r->res_ls;
4262         struct dlm_lkb *lkb, *safe;
4263
4264         list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4265                 if (test(ls, lkb)) {
4266                         rsb_set_flag(r, RSB_LOCKS_PURGED);
4267                         del_lkb(r, lkb);
4268                         /* this put should free the lkb */
4269                         if (!dlm_put_lkb(lkb))
4270                                 log_error(ls, "purged lkb not released");
4271                 }
4272         }
4273 }
4274
4275 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4276 {
4277         return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4278 }
4279
4280 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4281 {
4282         return is_master_copy(lkb);
4283 }
4284
4285 static void purge_dead_locks(struct dlm_rsb *r)
4286 {
4287         purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4288         purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4289         purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4290 }
4291
4292 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4293 {
4294         purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4295         purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4296         purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4297 }
4298
4299 /* Get rid of locks held by nodes that are gone. */
4300
4301 int dlm_purge_locks(struct dlm_ls *ls)
4302 {
4303         struct dlm_rsb *r;
4304
4305         log_debug(ls, "dlm_purge_locks");
4306
4307         down_write(&ls->ls_root_sem);
4308         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4309                 hold_rsb(r);
4310                 lock_rsb(r);
4311                 if (is_master(r))
4312                         purge_dead_locks(r);
4313                 unlock_rsb(r);
4314                 unhold_rsb(r);
4315
4316                 schedule();
4317         }
4318         up_write(&ls->ls_root_sem);
4319
4320         return 0;
4321 }
4322
4323 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4324 {
4325         struct dlm_rsb *r, *r_ret = NULL;
4326
4327         spin_lock(&ls->ls_rsbtbl[bucket].lock);
4328         list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4329                 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4330                         continue;
4331                 hold_rsb(r);
4332                 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4333                 r_ret = r;
4334                 break;
4335         }
4336         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4337         return r_ret;
4338 }
4339
4340 void dlm_grant_after_purge(struct dlm_ls *ls)
4341 {
4342         struct dlm_rsb *r;
4343         int bucket = 0;
4344
4345         while (1) {
4346                 r = find_purged_rsb(ls, bucket);
4347                 if (!r) {
4348                         if (bucket == ls->ls_rsbtbl_size - 1)
4349                                 break;
4350                         bucket++;
4351                         continue;
4352                 }
4353                 lock_rsb(r);
4354                 if (is_master(r)) {
4355                         grant_pending_locks(r);
4356                         confirm_master(r, 0);
4357                 }
4358                 unlock_rsb(r);
4359                 put_rsb(r);
4360                 schedule();
4361         }
4362 }
4363
4364 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4365                                          uint32_t remid)
4366 {
4367         struct dlm_lkb *lkb;
4368
4369         list_for_each_entry(lkb, head, lkb_statequeue) {
4370                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4371                         return lkb;
4372         }
4373         return NULL;
4374 }
4375
4376 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4377                                     uint32_t remid)
4378 {
4379         struct dlm_lkb *lkb;
4380
4381         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4382         if (lkb)
4383                 return lkb;
4384         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4385         if (lkb)
4386                 return lkb;
4387         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4388         if (lkb)
4389                 return lkb;
4390         return NULL;
4391 }
4392
4393 /* needs at least dlm_rcom + rcom_lock */
4394 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4395                                   struct dlm_rsb *r, struct dlm_rcom *rc)
4396 {
4397         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4398
4399         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4400         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4401         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4402         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4403         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4404         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4405         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4406         lkb->lkb_rqmode = rl->rl_rqmode;
4407         lkb->lkb_grmode = rl->rl_grmode;
4408         /* don't set lkb_status because add_lkb wants to itself */
4409
4410         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4411         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4412
4413         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4414                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4415                          sizeof(struct rcom_lock);
4416                 if (lvblen > ls->ls_lvblen)
4417                         return -EINVAL;
4418                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4419                 if (!lkb->lkb_lvbptr)
4420                         return -ENOMEM;
4421                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4422         }
4423
4424         /* Conversions between PR and CW (middle modes) need special handling.
4425            The real granted mode of these converting locks cannot be determined
4426            until all locks have been rebuilt on the rsb (recover_conversion) */
4427
4428         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4429             middle_conversion(lkb)) {
4430                 rl->rl_status = DLM_LKSTS_CONVERT;
4431                 lkb->lkb_grmode = DLM_LOCK_IV;
4432                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4433         }
4434
4435         return 0;
4436 }
4437
4438 /* This lkb may have been recovered in a previous aborted recovery so we need
4439    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4440    If so we just send back a standard reply.  If not, we create a new lkb with
4441    the given values and send back our lkid.  We send back our lkid by sending
4442    back the rcom_lock struct we got but with the remid field filled in. */
4443
4444 /* needs at least dlm_rcom + rcom_lock */
4445 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4446 {
4447         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4448         struct dlm_rsb *r;
4449         struct dlm_lkb *lkb;
4450         int error;
4451
4452         if (rl->rl_parent_lkid) {
4453                 error = -EOPNOTSUPP;
4454                 goto out;
4455         }
4456
4457         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4458                          R_MASTER, &r);
4459         if (error)
4460                 goto out;
4461
4462         lock_rsb(r);
4463
4464         lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4465         if (lkb) {
4466                 error = -EEXIST;
4467                 goto out_remid;
4468         }
4469
4470         error = create_lkb(ls, &lkb);
4471         if (error)
4472                 goto out_unlock;
4473
4474         error = receive_rcom_lock_args(ls, lkb, r, rc);
4475         if (error) {
4476                 __put_lkb(ls, lkb);
4477                 goto out_unlock;
4478         }
4479
4480         attach_lkb(r, lkb);
4481         add_lkb(r, lkb, rl->rl_status);
4482         error = 0;
4483
4484  out_remid:
4485         /* this is the new value returned to the lock holder for
4486            saving in its process-copy lkb */
4487         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4488
4489  out_unlock:
4490         unlock_rsb(r);
4491         put_rsb(r);
4492  out:
4493         if (error)
4494                 log_debug(ls, "recover_master_copy %d %x", error,
4495                           le32_to_cpu(rl->rl_lkid));
4496         rl->rl_result = cpu_to_le32(error);
4497         return error;
4498 }
4499
4500 /* needs at least dlm_rcom + rcom_lock */
4501 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4502 {
4503         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4504         struct dlm_rsb *r;
4505         struct dlm_lkb *lkb;
4506         int error;
4507
4508         error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4509         if (error) {
4510                 log_error(ls, "recover_process_copy no lkid %x",
4511                                 le32_to_cpu(rl->rl_lkid));
4512                 return error;
4513         }
4514
4515         DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4516
4517         error = le32_to_cpu(rl->rl_result);
4518
4519         r = lkb->lkb_resource;
4520         hold_rsb(r);
4521         lock_rsb(r);
4522
4523         switch (error) {
4524         case -EBADR:
4525                 /* There's a chance the new master received our lock before
4526                    dlm_recover_master_reply(), this wouldn't happen if we did
4527                    a barrier between recover_masters and recover_locks. */
4528                 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4529                           (unsigned long)r, r->res_name);
4530                 dlm_send_rcom_lock(r, lkb);
4531                 goto out;
4532         case -EEXIST:
4533                 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4534                 /* fall through */
4535         case 0:
4536                 lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4537                 break;
4538         default:
4539                 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4540                           error, lkb->lkb_id);
4541         }
4542
4543         /* an ack for dlm_recover_locks() which waits for replies from
4544            all the locks it sends to new masters */
4545         dlm_recovered_lock(r);
4546  out:
4547         unlock_rsb(r);
4548         put_rsb(r);
4549         dlm_put_lkb(lkb);
4550
4551         return 0;
4552 }
4553
4554 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4555                      int mode, uint32_t flags, void *name, unsigned int namelen,
4556                      unsigned long timeout_cs)
4557 {
4558         struct dlm_lkb *lkb;
4559         struct dlm_args args;
4560         int error;
4561
4562         dlm_lock_recovery(ls);
4563
4564         error = create_lkb(ls, &lkb);
4565         if (error) {
4566                 kfree(ua);
4567                 goto out;
4568         }
4569
4570         if (flags & DLM_LKF_VALBLK) {
4571                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4572                 if (!ua->lksb.sb_lvbptr) {
4573                         kfree(ua);
4574                         __put_lkb(ls, lkb);
4575                         error = -ENOMEM;
4576                         goto out;
4577                 }
4578         }
4579
4580         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
4581            When DLM_IFL_USER is set, the dlm knows that this is a userspace
4582            lock and that lkb_astparam is the dlm_user_args structure. */
4583
4584         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4585                               fake_astfn, ua, fake_bastfn, &args);
4586         lkb->lkb_flags |= DLM_IFL_USER;
4587
4588         if (error) {
4589                 __put_lkb(ls, lkb);
4590                 goto out;
4591         }
4592
4593         error = request_lock(ls, lkb, name, namelen, &args);
4594
4595         switch (error) {
4596         case 0:
4597                 break;
4598         case -EINPROGRESS:
4599                 error = 0;
4600                 break;
4601         case -EAGAIN:
4602                 error = 0;
4603                 /* fall through */
4604         default:
4605                 __put_lkb(ls, lkb);
4606                 goto out;
4607         }
4608
4609         /* add this new lkb to the per-process list of locks */
4610         spin_lock(&ua->proc->locks_spin);
4611         hold_lkb(lkb);
4612         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4613         spin_unlock(&ua->proc->locks_spin);
4614  out:
4615         dlm_unlock_recovery(ls);
4616         return error;
4617 }
4618
4619 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4620                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4621                      unsigned long timeout_cs)
4622 {
4623         struct dlm_lkb *lkb;
4624         struct dlm_args args;
4625         struct dlm_user_args *ua;
4626         int error;
4627
4628         dlm_lock_recovery(ls);
4629
4630         error = find_lkb(ls, lkid, &lkb);
4631         if (error)
4632                 goto out;
4633
4634         /* user can change the params on its lock when it converts it, or
4635            add an lvb that didn't exist before */
4636
4637         ua = lkb->lkb_ua;
4638
4639         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4640                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4641                 if (!ua->lksb.sb_lvbptr) {
4642                         error = -ENOMEM;
4643                         goto out_put;
4644                 }
4645         }
4646         if (lvb_in && ua->lksb.sb_lvbptr)
4647                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4648
4649         ua->xid = ua_tmp->xid;
4650         ua->castparam = ua_tmp->castparam;
4651         ua->castaddr = ua_tmp->castaddr;
4652         ua->bastparam = ua_tmp->bastparam;
4653         ua->bastaddr = ua_tmp->bastaddr;
4654         ua->user_lksb = ua_tmp->user_lksb;
4655
4656         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4657                               fake_astfn, ua, fake_bastfn, &args);
4658         if (error)
4659                 goto out_put;
4660
4661         error = convert_lock(ls, lkb, &args);
4662
4663         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4664                 error = 0;
4665  out_put:
4666         dlm_put_lkb(lkb);
4667  out:
4668         dlm_unlock_recovery(ls);
4669         kfree(ua_tmp);
4670         return error;
4671 }
4672
4673 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4674                     uint32_t flags, uint32_t lkid, char *lvb_in)
4675 {
4676         struct dlm_lkb *lkb;
4677         struct dlm_args args;
4678         struct dlm_user_args *ua;
4679         int error;
4680
4681         dlm_lock_recovery(ls);
4682
4683         error = find_lkb(ls, lkid, &lkb);
4684         if (error)
4685                 goto out;
4686
4687         ua = lkb->lkb_ua;
4688
4689         if (lvb_in && ua->lksb.sb_lvbptr)
4690                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4691         if (ua_tmp->castparam)
4692                 ua->castparam = ua_tmp->castparam;
4693         ua->user_lksb = ua_tmp->user_lksb;
4694
4695         error = set_unlock_args(flags, ua, &args);
4696         if (error)
4697                 goto out_put;
4698
4699         error = unlock_lock(ls, lkb, &args);
4700
4701         if (error == -DLM_EUNLOCK)
4702                 error = 0;
4703         /* from validate_unlock_args() */
4704         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4705                 error = 0;
4706         if (error)
4707                 goto out_put;
4708
4709         spin_lock(&ua->proc->locks_spin);
4710         /* dlm_user_add_ast() may have already taken lkb off the proc list */
4711         if (!list_empty(&lkb->lkb_ownqueue))
4712                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4713         spin_unlock(&ua->proc->locks_spin);
4714  out_put:
4715         dlm_put_lkb(lkb);
4716  out:
4717         dlm_unlock_recovery(ls);
4718         kfree(ua_tmp);
4719         return error;
4720 }
4721
4722 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4723                     uint32_t flags, uint32_t lkid)
4724 {
4725         struct dlm_lkb *lkb;
4726         struct dlm_args args;
4727         struct dlm_user_args *ua;
4728         int error;
4729
4730         dlm_lock_recovery(ls);
4731
4732         error = find_lkb(ls, lkid, &lkb);
4733         if (error)
4734                 goto out;
4735
4736         ua = lkb->lkb_ua;
4737         if (ua_tmp->castparam)
4738                 ua->castparam = ua_tmp->castparam;
4739         ua->user_lksb = ua_tmp->user_lksb;
4740
4741         error = set_unlock_args(flags, ua, &args);
4742         if (error)
4743                 goto out_put;
4744
4745         error = cancel_lock(ls, lkb, &args);
4746
4747         if (error == -DLM_ECANCEL)
4748                 error = 0;
4749         /* from validate_unlock_args() */
4750         if (error == -EBUSY)
4751                 error = 0;
4752  out_put:
4753         dlm_put_lkb(lkb);
4754  out:
4755         dlm_unlock_recovery(ls);
4756         kfree(ua_tmp);
4757         return error;
4758 }
4759
4760 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4761 {
4762         struct dlm_lkb *lkb;
4763         struct dlm_args args;
4764         struct dlm_user_args *ua;
4765         struct dlm_rsb *r;
4766         int error;
4767
4768         dlm_lock_recovery(ls);
4769
4770         error = find_lkb(ls, lkid, &lkb);
4771         if (error)
4772                 goto out;
4773
4774         ua = lkb->lkb_ua;
4775
4776         error = set_unlock_args(flags, ua, &args);
4777         if (error)
4778                 goto out_put;
4779
4780         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4781
4782         r = lkb->lkb_resource;
4783         hold_rsb(r);
4784         lock_rsb(r);
4785
4786         error = validate_unlock_args(lkb, &args);
4787         if (error)
4788                 goto out_r;
4789         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4790
4791         error = _cancel_lock(r, lkb);
4792  out_r:
4793         unlock_rsb(r);
4794         put_rsb(r);
4795
4796         if (error == -DLM_ECANCEL)
4797                 error = 0;
4798         /* from validate_unlock_args() */
4799         if (error == -EBUSY)
4800                 error = 0;
4801  out_put:
4802         dlm_put_lkb(lkb);
4803  out:
4804         dlm_unlock_recovery(ls);
4805         return error;
4806 }
4807
4808 /* lkb's that are removed from the waiters list by revert are just left on the
4809    orphans list with the granted orphan locks, to be freed by purge */
4810
4811 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4812 {
4813         struct dlm_args args;
4814         int error;
4815
4816         hold_lkb(lkb);
4817         mutex_lock(&ls->ls_orphans_mutex);
4818         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4819         mutex_unlock(&ls->ls_orphans_mutex);
4820
4821         set_unlock_args(0, lkb->lkb_ua, &args);
4822
4823         error = cancel_lock(ls, lkb, &args);
4824         if (error == -DLM_ECANCEL)
4825                 error = 0;
4826         return error;
4827 }
4828
4829 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4830    Regardless of what rsb queue the lock is on, it's removed and freed. */
4831
4832 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4833 {
4834         struct dlm_args args;
4835         int error;
4836
4837         set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4838
4839         error = unlock_lock(ls, lkb, &args);
4840         if (error == -DLM_EUNLOCK)
4841                 error = 0;
4842         return error;
4843 }
4844
4845 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4846    (which does lock_rsb) due to deadlock with receiving a message that does
4847    lock_rsb followed by dlm_user_add_ast() */
4848
4849 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4850                                      struct dlm_user_proc *proc)
4851 {
4852         struct dlm_lkb *lkb = NULL;
4853
4854         mutex_lock(&ls->ls_clear_proc_locks);
4855         if (list_empty(&proc->locks))
4856                 goto out;
4857
4858         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4859         list_del_init(&lkb->lkb_ownqueue);
4860
4861         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4862                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4863         else
4864                 lkb->lkb_flags |= DLM_IFL_DEAD;
4865  out:
4866         mutex_unlock(&ls->ls_clear_proc_locks);
4867         return lkb;
4868 }
4869
4870 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4871    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4872    which we clear here. */
4873
4874 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4875    list, and no more device_writes should add lkb's to proc->locks list; so we
4876    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4877    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4878    them ourself. */
4879
4880 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4881 {
4882         struct dlm_lkb *lkb, *safe;
4883
4884         dlm_lock_recovery(ls);
4885
4886         while (1) {
4887                 lkb = del_proc_lock(ls, proc);
4888                 if (!lkb)
4889                         break;
4890                 del_timeout(lkb);
4891                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4892                         orphan_proc_lock(ls, lkb);
4893                 else
4894                         unlock_proc_lock(ls, lkb);
4895
4896                 /* this removes the reference for the proc->locks list
4897                    added by dlm_user_request, it may result in the lkb
4898                    being freed */
4899
4900                 dlm_put_lkb(lkb);
4901         }
4902
4903         mutex_lock(&ls->ls_clear_proc_locks);
4904
4905         /* in-progress unlocks */
4906         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4907                 list_del_init(&lkb->lkb_ownqueue);
4908                 lkb->lkb_flags |= DLM_IFL_DEAD;
4909                 dlm_put_lkb(lkb);
4910         }
4911
4912         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4913                 memset(&lkb->lkb_callbacks, 0,
4914                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
4915                 list_del_init(&lkb->lkb_astqueue);
4916                 dlm_put_lkb(lkb);
4917         }
4918
4919         mutex_unlock(&ls->ls_clear_proc_locks);
4920         dlm_unlock_recovery(ls);
4921 }
4922
4923 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4924 {
4925         struct dlm_lkb *lkb, *safe;
4926
4927         while (1) {
4928                 lkb = NULL;
4929                 spin_lock(&proc->locks_spin);
4930                 if (!list_empty(&proc->locks)) {
4931                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
4932                                          lkb_ownqueue);
4933                         list_del_init(&lkb->lkb_ownqueue);
4934                 }
4935                 spin_unlock(&proc->locks_spin);
4936
4937                 if (!lkb)
4938                         break;
4939
4940                 lkb->lkb_flags |= DLM_IFL_DEAD;
4941                 unlock_proc_lock(ls, lkb);
4942                 dlm_put_lkb(lkb); /* ref from proc->locks list */
4943         }
4944
4945         spin_lock(&proc->locks_spin);
4946         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4947                 list_del_init(&lkb->lkb_ownqueue);
4948                 lkb->lkb_flags |= DLM_IFL_DEAD;
4949                 dlm_put_lkb(lkb);
4950         }
4951         spin_unlock(&proc->locks_spin);
4952
4953         spin_lock(&proc->asts_spin);
4954         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4955                 memset(&lkb->lkb_callbacks, 0,
4956                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
4957                 list_del_init(&lkb->lkb_astqueue);
4958                 dlm_put_lkb(lkb);
4959         }
4960         spin_unlock(&proc->asts_spin);
4961 }
4962
4963 /* pid of 0 means purge all orphans */
4964
4965 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4966 {
4967         struct dlm_lkb *lkb, *safe;
4968
4969         mutex_lock(&ls->ls_orphans_mutex);
4970         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4971                 if (pid && lkb->lkb_ownpid != pid)
4972                         continue;
4973                 unlock_proc_lock(ls, lkb);
4974                 list_del_init(&lkb->lkb_ownqueue);
4975                 dlm_put_lkb(lkb);
4976         }
4977         mutex_unlock(&ls->ls_orphans_mutex);
4978 }
4979
4980 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4981 {
4982         struct dlm_message *ms;
4983         struct dlm_mhandle *mh;
4984         int error;
4985
4986         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4987                                 DLM_MSG_PURGE, &ms, &mh);
4988         if (error)
4989                 return error;
4990         ms->m_nodeid = nodeid;
4991         ms->m_pid = pid;
4992
4993         return send_message(mh, ms);
4994 }
4995
4996 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4997                    int nodeid, int pid)
4998 {
4999         int error = 0;
5000
5001         if (nodeid != dlm_our_nodeid()) {
5002                 error = send_purge(ls, nodeid, pid);
5003         } else {
5004                 dlm_lock_recovery(ls);
5005                 if (pid == current->pid)
5006                         purge_proc_locks(ls, proc);
5007                 else
5008                         do_purge(ls, nodeid, pid);
5009                 dlm_unlock_recovery(ls);
5010         }
5011         return error;
5012 }
5013