Merge git://git.kernel.org/pub/scm/linux/kernel/git/sfrench/cifs-2.6
[pandora-kernel.git] / fs / ocfs2 / dlmglue.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/crc32.h>
31 #include <linux/kthread.h>
32 #include <linux/pagemap.h>
33 #include <linux/debugfs.h>
34 #include <linux/seq_file.h>
35
36 #include <cluster/heartbeat.h>
37 #include <cluster/nodemanager.h>
38 #include <cluster/tcp.h>
39
40 #include <dlm/dlmapi.h>
41
42 #define MLOG_MASK_PREFIX ML_DLM_GLUE
43 #include <cluster/masklog.h>
44
45 #include "ocfs2.h"
46
47 #include "alloc.h"
48 #include "dcache.h"
49 #include "dlmglue.h"
50 #include "extent_map.h"
51 #include "file.h"
52 #include "heartbeat.h"
53 #include "inode.h"
54 #include "journal.h"
55 #include "slot_map.h"
56 #include "super.h"
57 #include "uptodate.h"
58
59 #include "buffer_head_io.h"
60
61 struct ocfs2_mask_waiter {
62         struct list_head        mw_item;
63         int                     mw_status;
64         struct completion       mw_complete;
65         unsigned long           mw_mask;
66         unsigned long           mw_goal;
67 };
68
69 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
70 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
71 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
72
73 /*
74  * Return value from ->downconvert_worker functions.
75  *
76  * These control the precise actions of ocfs2_unblock_lock()
77  * and ocfs2_process_blocked_lock()
78  *
79  */
80 enum ocfs2_unblock_action {
81         UNBLOCK_CONTINUE        = 0, /* Continue downconvert */
82         UNBLOCK_CONTINUE_POST   = 1, /* Continue downconvert, fire
83                                       * ->post_unlock callback */
84         UNBLOCK_STOP_POST       = 2, /* Do not downconvert, fire
85                                       * ->post_unlock() callback. */
86 };
87
88 struct ocfs2_unblock_ctl {
89         int requeue;
90         enum ocfs2_unblock_action unblock_action;
91 };
92
93 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
94                                         int new_level);
95 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
96
97 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
98                                      int blocking);
99
100 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
101                                        int blocking);
102
103 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
104                                      struct ocfs2_lock_res *lockres);
105
106
107 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
108
109 /* This aids in debugging situations where a bad LVB might be involved. */
110 static void ocfs2_dump_meta_lvb_info(u64 level,
111                                      const char *function,
112                                      unsigned int line,
113                                      struct ocfs2_lock_res *lockres)
114 {
115         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
116
117         mlog(level, "LVB information for %s (called from %s:%u):\n",
118              lockres->l_name, function, line);
119         mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
120              lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
121              be32_to_cpu(lvb->lvb_igeneration));
122         mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
123              (unsigned long long)be64_to_cpu(lvb->lvb_isize),
124              be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
125              be16_to_cpu(lvb->lvb_imode));
126         mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
127              "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
128              (long long)be64_to_cpu(lvb->lvb_iatime_packed),
129              (long long)be64_to_cpu(lvb->lvb_ictime_packed),
130              (long long)be64_to_cpu(lvb->lvb_imtime_packed),
131              be32_to_cpu(lvb->lvb_iattr));
132 }
133
134
135 /*
136  * OCFS2 Lock Resource Operations
137  *
138  * These fine tune the behavior of the generic dlmglue locking infrastructure.
139  *
140  * The most basic of lock types can point ->l_priv to their respective
141  * struct ocfs2_super and allow the default actions to manage things.
142  *
143  * Right now, each lock type also needs to implement an init function,
144  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
145  * should be called when the lock is no longer needed (i.e., object
146  * destruction time).
147  */
148 struct ocfs2_lock_res_ops {
149         /*
150          * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
151          * this callback if ->l_priv is not an ocfs2_super pointer
152          */
153         struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
154
155         /*
156          * Optionally called in the downconvert thread after a
157          * successful downconvert. The lockres will not be referenced
158          * after this callback is called, so it is safe to free
159          * memory, etc.
160          *
161          * The exact semantics of when this is called are controlled
162          * by ->downconvert_worker()
163          */
164         void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
165
166         /*
167          * Allow a lock type to add checks to determine whether it is
168          * safe to downconvert a lock. Return 0 to re-queue the
169          * downconvert at a later time, nonzero to continue.
170          *
171          * For most locks, the default checks that there are no
172          * incompatible holders are sufficient.
173          *
174          * Called with the lockres spinlock held.
175          */
176         int (*check_downconvert)(struct ocfs2_lock_res *, int);
177
178         /*
179          * Allows a lock type to populate the lock value block. This
180          * is called on downconvert, and when we drop a lock.
181          *
182          * Locks that want to use this should set LOCK_TYPE_USES_LVB
183          * in the flags field.
184          *
185          * Called with the lockres spinlock held.
186          */
187         void (*set_lvb)(struct ocfs2_lock_res *);
188
189         /*
190          * Called from the downconvert thread when it is determined
191          * that a lock will be downconverted. This is called without
192          * any locks held so the function can do work that might
193          * schedule (syncing out data, etc).
194          *
195          * This should return any one of the ocfs2_unblock_action
196          * values, depending on what it wants the thread to do.
197          */
198         int (*downconvert_worker)(struct ocfs2_lock_res *, int);
199
200         /*
201          * LOCK_TYPE_* flags which describe the specific requirements
202          * of a lock type. Descriptions of each individual flag follow.
203          */
204         int flags;
205 };
206
207 /*
208  * Some locks want to "refresh" potentially stale data when a
209  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
210  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
211  * individual lockres l_flags member from the ast function. It is
212  * expected that the locking wrapper will clear the
213  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
214  */
215 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
216
217 /*
218  * Indicate that a lock type makes use of the lock value block. The
219  * ->set_lvb lock type callback must be defined.
220  */
221 #define LOCK_TYPE_USES_LVB              0x2
222
223 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
224         .get_osb        = ocfs2_get_inode_osb,
225         .flags          = 0,
226 };
227
228 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
229         .get_osb        = ocfs2_get_inode_osb,
230         .check_downconvert = ocfs2_check_meta_downconvert,
231         .set_lvb        = ocfs2_set_meta_lvb,
232         .downconvert_worker = ocfs2_data_convert_worker,
233         .flags          = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
234 };
235
236 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
237         .flags          = LOCK_TYPE_REQUIRES_REFRESH,
238 };
239
240 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
241         .flags          = 0,
242 };
243
244 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
245         .get_osb        = ocfs2_get_dentry_osb,
246         .post_unlock    = ocfs2_dentry_post_unlock,
247         .downconvert_worker = ocfs2_dentry_convert_worker,
248         .flags          = 0,
249 };
250
251 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
252         .get_osb        = ocfs2_get_inode_osb,
253         .flags          = 0,
254 };
255
256 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
257         .get_osb        = ocfs2_get_file_osb,
258         .flags          = 0,
259 };
260
261 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
262 {
263         return lockres->l_type == OCFS2_LOCK_TYPE_META ||
264                 lockres->l_type == OCFS2_LOCK_TYPE_RW ||
265                 lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
266 }
267
268 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
269 {
270         BUG_ON(!ocfs2_is_inode_lock(lockres));
271
272         return (struct inode *) lockres->l_priv;
273 }
274
275 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
276 {
277         BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
278
279         return (struct ocfs2_dentry_lock *)lockres->l_priv;
280 }
281
282 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
283 {
284         if (lockres->l_ops->get_osb)
285                 return lockres->l_ops->get_osb(lockres);
286
287         return (struct ocfs2_super *)lockres->l_priv;
288 }
289
290 static int ocfs2_lock_create(struct ocfs2_super *osb,
291                              struct ocfs2_lock_res *lockres,
292                              int level,
293                              int dlm_flags);
294 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
295                                                      int wanted);
296 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
297                                  struct ocfs2_lock_res *lockres,
298                                  int level);
299 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
300 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
301 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
302 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
303 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
304                                         struct ocfs2_lock_res *lockres);
305 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
306                                                 int convert);
307 #define ocfs2_log_dlm_error(_func, _stat, _lockres) do {        \
308         mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on "  \
309                 "resource %s: %s\n", dlm_errname(_stat), _func, \
310                 _lockres->l_name, dlm_errmsg(_stat));           \
311 } while (0)
312 static int ocfs2_downconvert_thread(void *arg);
313 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
314                                         struct ocfs2_lock_res *lockres);
315 static int ocfs2_inode_lock_update(struct inode *inode,
316                                   struct buffer_head **bh);
317 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
318 static inline int ocfs2_highest_compat_lock_level(int level);
319 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
320                                       int new_level);
321 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
322                                   struct ocfs2_lock_res *lockres,
323                                   int new_level,
324                                   int lvb);
325 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
326                                         struct ocfs2_lock_res *lockres);
327 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
328                                 struct ocfs2_lock_res *lockres);
329
330
331 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
332                                   u64 blkno,
333                                   u32 generation,
334                                   char *name)
335 {
336         int len;
337
338         mlog_entry_void();
339
340         BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
341
342         len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
343                        ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
344                        (long long)blkno, generation);
345
346         BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
347
348         mlog(0, "built lock resource with name: %s\n", name);
349
350         mlog_exit_void();
351 }
352
353 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
354
355 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
356                                        struct ocfs2_dlm_debug *dlm_debug)
357 {
358         mlog(0, "Add tracking for lockres %s\n", res->l_name);
359
360         spin_lock(&ocfs2_dlm_tracking_lock);
361         list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
362         spin_unlock(&ocfs2_dlm_tracking_lock);
363 }
364
365 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
366 {
367         spin_lock(&ocfs2_dlm_tracking_lock);
368         if (!list_empty(&res->l_debug_list))
369                 list_del_init(&res->l_debug_list);
370         spin_unlock(&ocfs2_dlm_tracking_lock);
371 }
372
373 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
374                                        struct ocfs2_lock_res *res,
375                                        enum ocfs2_lock_type type,
376                                        struct ocfs2_lock_res_ops *ops,
377                                        void *priv)
378 {
379         res->l_type          = type;
380         res->l_ops           = ops;
381         res->l_priv          = priv;
382
383         res->l_level         = LKM_IVMODE;
384         res->l_requested     = LKM_IVMODE;
385         res->l_blocking      = LKM_IVMODE;
386         res->l_action        = OCFS2_AST_INVALID;
387         res->l_unlock_action = OCFS2_UNLOCK_INVALID;
388
389         res->l_flags         = OCFS2_LOCK_INITIALIZED;
390
391         ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
392 }
393
394 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
395 {
396         /* This also clears out the lock status block */
397         memset(res, 0, sizeof(struct ocfs2_lock_res));
398         spin_lock_init(&res->l_lock);
399         init_waitqueue_head(&res->l_event);
400         INIT_LIST_HEAD(&res->l_blocked_list);
401         INIT_LIST_HEAD(&res->l_mask_waiters);
402 }
403
404 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
405                                enum ocfs2_lock_type type,
406                                unsigned int generation,
407                                struct inode *inode)
408 {
409         struct ocfs2_lock_res_ops *ops;
410
411         switch(type) {
412                 case OCFS2_LOCK_TYPE_RW:
413                         ops = &ocfs2_inode_rw_lops;
414                         break;
415                 case OCFS2_LOCK_TYPE_META:
416                         ops = &ocfs2_inode_inode_lops;
417                         break;
418                 case OCFS2_LOCK_TYPE_OPEN:
419                         ops = &ocfs2_inode_open_lops;
420                         break;
421                 default:
422                         mlog_bug_on_msg(1, "type: %d\n", type);
423                         ops = NULL; /* thanks, gcc */
424                         break;
425         };
426
427         ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
428                               generation, res->l_name);
429         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
430 }
431
432 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
433 {
434         struct inode *inode = ocfs2_lock_res_inode(lockres);
435
436         return OCFS2_SB(inode->i_sb);
437 }
438
439 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
440 {
441         struct ocfs2_file_private *fp = lockres->l_priv;
442
443         return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
444 }
445
446 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
447 {
448         __be64 inode_blkno_be;
449
450         memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
451                sizeof(__be64));
452
453         return be64_to_cpu(inode_blkno_be);
454 }
455
456 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
457 {
458         struct ocfs2_dentry_lock *dl = lockres->l_priv;
459
460         return OCFS2_SB(dl->dl_inode->i_sb);
461 }
462
463 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
464                                 u64 parent, struct inode *inode)
465 {
466         int len;
467         u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
468         __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
469         struct ocfs2_lock_res *lockres = &dl->dl_lockres;
470
471         ocfs2_lock_res_init_once(lockres);
472
473         /*
474          * Unfortunately, the standard lock naming scheme won't work
475          * here because we have two 16 byte values to use. Instead,
476          * we'll stuff the inode number as a binary value. We still
477          * want error prints to show something without garbling the
478          * display, so drop a null byte in there before the inode
479          * number. A future version of OCFS2 will likely use all
480          * binary lock names. The stringified names have been a
481          * tremendous aid in debugging, but now that the debugfs
482          * interface exists, we can mangle things there if need be.
483          *
484          * NOTE: We also drop the standard "pad" value (the total lock
485          * name size stays the same though - the last part is all
486          * zeros due to the memset in ocfs2_lock_res_init_once()
487          */
488         len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
489                        "%c%016llx",
490                        ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
491                        (long long)parent);
492
493         BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
494
495         memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
496                sizeof(__be64));
497
498         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
499                                    OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
500                                    dl);
501 }
502
503 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
504                                       struct ocfs2_super *osb)
505 {
506         /* Superblock lockres doesn't come from a slab so we call init
507          * once on it manually.  */
508         ocfs2_lock_res_init_once(res);
509         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
510                               0, res->l_name);
511         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
512                                    &ocfs2_super_lops, osb);
513 }
514
515 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
516                                        struct ocfs2_super *osb)
517 {
518         /* Rename lockres doesn't come from a slab so we call init
519          * once on it manually.  */
520         ocfs2_lock_res_init_once(res);
521         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
522         ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
523                                    &ocfs2_rename_lops, osb);
524 }
525
526 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
527                               struct ocfs2_file_private *fp)
528 {
529         struct inode *inode = fp->fp_file->f_mapping->host;
530         struct ocfs2_inode_info *oi = OCFS2_I(inode);
531
532         ocfs2_lock_res_init_once(lockres);
533         ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
534                               inode->i_generation, lockres->l_name);
535         ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
536                                    OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
537                                    fp);
538         lockres->l_flags |= OCFS2_LOCK_NOCACHE;
539 }
540
541 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
542 {
543         mlog_entry_void();
544
545         if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
546                 return;
547
548         ocfs2_remove_lockres_tracking(res);
549
550         mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
551                         "Lockres %s is on the blocked list\n",
552                         res->l_name);
553         mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
554                         "Lockres %s has mask waiters pending\n",
555                         res->l_name);
556         mlog_bug_on_msg(spin_is_locked(&res->l_lock),
557                         "Lockres %s is locked\n",
558                         res->l_name);
559         mlog_bug_on_msg(res->l_ro_holders,
560                         "Lockres %s has %u ro holders\n",
561                         res->l_name, res->l_ro_holders);
562         mlog_bug_on_msg(res->l_ex_holders,
563                         "Lockres %s has %u ex holders\n",
564                         res->l_name, res->l_ex_holders);
565
566         /* Need to clear out the lock status block for the dlm */
567         memset(&res->l_lksb, 0, sizeof(res->l_lksb));
568
569         res->l_flags = 0UL;
570         mlog_exit_void();
571 }
572
573 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
574                                      int level)
575 {
576         mlog_entry_void();
577
578         BUG_ON(!lockres);
579
580         switch(level) {
581         case LKM_EXMODE:
582                 lockres->l_ex_holders++;
583                 break;
584         case LKM_PRMODE:
585                 lockres->l_ro_holders++;
586                 break;
587         default:
588                 BUG();
589         }
590
591         mlog_exit_void();
592 }
593
594 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
595                                      int level)
596 {
597         mlog_entry_void();
598
599         BUG_ON(!lockres);
600
601         switch(level) {
602         case LKM_EXMODE:
603                 BUG_ON(!lockres->l_ex_holders);
604                 lockres->l_ex_holders--;
605                 break;
606         case LKM_PRMODE:
607                 BUG_ON(!lockres->l_ro_holders);
608                 lockres->l_ro_holders--;
609                 break;
610         default:
611                 BUG();
612         }
613         mlog_exit_void();
614 }
615
616 /* WARNING: This function lives in a world where the only three lock
617  * levels are EX, PR, and NL. It *will* have to be adjusted when more
618  * lock types are added. */
619 static inline int ocfs2_highest_compat_lock_level(int level)
620 {
621         int new_level = LKM_EXMODE;
622
623         if (level == LKM_EXMODE)
624                 new_level = LKM_NLMODE;
625         else if (level == LKM_PRMODE)
626                 new_level = LKM_PRMODE;
627         return new_level;
628 }
629
630 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
631                               unsigned long newflags)
632 {
633         struct ocfs2_mask_waiter *mw, *tmp;
634
635         assert_spin_locked(&lockres->l_lock);
636
637         lockres->l_flags = newflags;
638
639         list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
640                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
641                         continue;
642
643                 list_del_init(&mw->mw_item);
644                 mw->mw_status = 0;
645                 complete(&mw->mw_complete);
646         }
647 }
648 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
649 {
650         lockres_set_flags(lockres, lockres->l_flags | or);
651 }
652 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
653                                 unsigned long clear)
654 {
655         lockres_set_flags(lockres, lockres->l_flags & ~clear);
656 }
657
658 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
659 {
660         mlog_entry_void();
661
662         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
663         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
664         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
665         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
666
667         lockres->l_level = lockres->l_requested;
668         if (lockres->l_level <=
669             ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
670                 lockres->l_blocking = LKM_NLMODE;
671                 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
672         }
673         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
674
675         mlog_exit_void();
676 }
677
678 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
679 {
680         mlog_entry_void();
681
682         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
683         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
684
685         /* Convert from RO to EX doesn't really need anything as our
686          * information is already up to data. Convert from NL to
687          * *anything* however should mark ourselves as needing an
688          * update */
689         if (lockres->l_level == LKM_NLMODE &&
690             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
691                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
692
693         lockres->l_level = lockres->l_requested;
694         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
695
696         mlog_exit_void();
697 }
698
699 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
700 {
701         mlog_entry_void();
702
703         BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
704         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
705
706         if (lockres->l_requested > LKM_NLMODE &&
707             !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
708             lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
709                 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
710
711         lockres->l_level = lockres->l_requested;
712         lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
713         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
714
715         mlog_exit_void();
716 }
717
718 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
719                                      int level)
720 {
721         int needs_downconvert = 0;
722         mlog_entry_void();
723
724         assert_spin_locked(&lockres->l_lock);
725
726         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
727
728         if (level > lockres->l_blocking) {
729                 /* only schedule a downconvert if we haven't already scheduled
730                  * one that goes low enough to satisfy the level we're
731                  * blocking.  this also catches the case where we get
732                  * duplicate BASTs */
733                 if (ocfs2_highest_compat_lock_level(level) <
734                     ocfs2_highest_compat_lock_level(lockres->l_blocking))
735                         needs_downconvert = 1;
736
737                 lockres->l_blocking = level;
738         }
739
740         mlog_exit(needs_downconvert);
741         return needs_downconvert;
742 }
743
744 static void ocfs2_blocking_ast(void *opaque, int level)
745 {
746         struct ocfs2_lock_res *lockres = opaque;
747         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
748         int needs_downconvert;
749         unsigned long flags;
750
751         BUG_ON(level <= LKM_NLMODE);
752
753         mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
754              lockres->l_name, level, lockres->l_level,
755              ocfs2_lock_type_string(lockres->l_type));
756
757         /*
758          * We can skip the bast for locks which don't enable caching -
759          * they'll be dropped at the earliest possible time anyway.
760          */
761         if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
762                 return;
763
764         spin_lock_irqsave(&lockres->l_lock, flags);
765         needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
766         if (needs_downconvert)
767                 ocfs2_schedule_blocked_lock(osb, lockres);
768         spin_unlock_irqrestore(&lockres->l_lock, flags);
769
770         wake_up(&lockres->l_event);
771
772         ocfs2_wake_downconvert_thread(osb);
773 }
774
775 static void ocfs2_locking_ast(void *opaque)
776 {
777         struct ocfs2_lock_res *lockres = opaque;
778         struct dlm_lockstatus *lksb = &lockres->l_lksb;
779         unsigned long flags;
780
781         spin_lock_irqsave(&lockres->l_lock, flags);
782
783         if (lksb->status != DLM_NORMAL) {
784                 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
785                      lockres->l_name, lksb->status);
786                 spin_unlock_irqrestore(&lockres->l_lock, flags);
787                 return;
788         }
789
790         switch(lockres->l_action) {
791         case OCFS2_AST_ATTACH:
792                 ocfs2_generic_handle_attach_action(lockres);
793                 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
794                 break;
795         case OCFS2_AST_CONVERT:
796                 ocfs2_generic_handle_convert_action(lockres);
797                 break;
798         case OCFS2_AST_DOWNCONVERT:
799                 ocfs2_generic_handle_downconvert_action(lockres);
800                 break;
801         default:
802                 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
803                      "lockres flags = 0x%lx, unlock action: %u\n",
804                      lockres->l_name, lockres->l_action, lockres->l_flags,
805                      lockres->l_unlock_action);
806                 BUG();
807         }
808
809         /* set it to something invalid so if we get called again we
810          * can catch it. */
811         lockres->l_action = OCFS2_AST_INVALID;
812
813         wake_up(&lockres->l_event);
814         spin_unlock_irqrestore(&lockres->l_lock, flags);
815 }
816
817 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
818                                                 int convert)
819 {
820         unsigned long flags;
821
822         mlog_entry_void();
823         spin_lock_irqsave(&lockres->l_lock, flags);
824         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
825         if (convert)
826                 lockres->l_action = OCFS2_AST_INVALID;
827         else
828                 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
829         spin_unlock_irqrestore(&lockres->l_lock, flags);
830
831         wake_up(&lockres->l_event);
832         mlog_exit_void();
833 }
834
835 /* Note: If we detect another process working on the lock (i.e.,
836  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
837  * to do the right thing in that case.
838  */
839 static int ocfs2_lock_create(struct ocfs2_super *osb,
840                              struct ocfs2_lock_res *lockres,
841                              int level,
842                              int dlm_flags)
843 {
844         int ret = 0;
845         enum dlm_status status = DLM_NORMAL;
846         unsigned long flags;
847
848         mlog_entry_void();
849
850         mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
851              dlm_flags);
852
853         spin_lock_irqsave(&lockres->l_lock, flags);
854         if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
855             (lockres->l_flags & OCFS2_LOCK_BUSY)) {
856                 spin_unlock_irqrestore(&lockres->l_lock, flags);
857                 goto bail;
858         }
859
860         lockres->l_action = OCFS2_AST_ATTACH;
861         lockres->l_requested = level;
862         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
863         spin_unlock_irqrestore(&lockres->l_lock, flags);
864
865         status = dlmlock(osb->dlm,
866                          level,
867                          &lockres->l_lksb,
868                          dlm_flags,
869                          lockres->l_name,
870                          OCFS2_LOCK_ID_MAX_LEN - 1,
871                          ocfs2_locking_ast,
872                          lockres,
873                          ocfs2_blocking_ast);
874         if (status != DLM_NORMAL) {
875                 ocfs2_log_dlm_error("dlmlock", status, lockres);
876                 ret = -EINVAL;
877                 ocfs2_recover_from_dlm_error(lockres, 1);
878         }
879
880         mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
881
882 bail:
883         mlog_exit(ret);
884         return ret;
885 }
886
887 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
888                                         int flag)
889 {
890         unsigned long flags;
891         int ret;
892
893         spin_lock_irqsave(&lockres->l_lock, flags);
894         ret = lockres->l_flags & flag;
895         spin_unlock_irqrestore(&lockres->l_lock, flags);
896
897         return ret;
898 }
899
900 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
901
902 {
903         wait_event(lockres->l_event,
904                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
905 }
906
907 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
908
909 {
910         wait_event(lockres->l_event,
911                    !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
912 }
913
914 /* predict what lock level we'll be dropping down to on behalf
915  * of another node, and return true if the currently wanted
916  * level will be compatible with it. */
917 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
918                                                      int wanted)
919 {
920         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
921
922         return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
923 }
924
925 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
926 {
927         INIT_LIST_HEAD(&mw->mw_item);
928         init_completion(&mw->mw_complete);
929 }
930
931 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
932 {
933         wait_for_completion(&mw->mw_complete);
934         /* Re-arm the completion in case we want to wait on it again */
935         INIT_COMPLETION(mw->mw_complete);
936         return mw->mw_status;
937 }
938
939 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
940                                     struct ocfs2_mask_waiter *mw,
941                                     unsigned long mask,
942                                     unsigned long goal)
943 {
944         BUG_ON(!list_empty(&mw->mw_item));
945
946         assert_spin_locked(&lockres->l_lock);
947
948         list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
949         mw->mw_mask = mask;
950         mw->mw_goal = goal;
951 }
952
953 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
954  * if the mask still hadn't reached its goal */
955 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
956                                       struct ocfs2_mask_waiter *mw)
957 {
958         unsigned long flags;
959         int ret = 0;
960
961         spin_lock_irqsave(&lockres->l_lock, flags);
962         if (!list_empty(&mw->mw_item)) {
963                 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
964                         ret = -EBUSY;
965
966                 list_del_init(&mw->mw_item);
967                 init_completion(&mw->mw_complete);
968         }
969         spin_unlock_irqrestore(&lockres->l_lock, flags);
970
971         return ret;
972
973 }
974
975 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
976                                              struct ocfs2_lock_res *lockres)
977 {
978         int ret;
979
980         ret = wait_for_completion_interruptible(&mw->mw_complete);
981         if (ret)
982                 lockres_remove_mask_waiter(lockres, mw);
983         else
984                 ret = mw->mw_status;
985         /* Re-arm the completion in case we want to wait on it again */
986         INIT_COMPLETION(mw->mw_complete);
987         return ret;
988 }
989
990 static int ocfs2_cluster_lock(struct ocfs2_super *osb,
991                               struct ocfs2_lock_res *lockres,
992                               int level,
993                               int lkm_flags,
994                               int arg_flags)
995 {
996         struct ocfs2_mask_waiter mw;
997         enum dlm_status status;
998         int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
999         int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1000         unsigned long flags;
1001
1002         mlog_entry_void();
1003
1004         ocfs2_init_mask_waiter(&mw);
1005
1006         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1007                 lkm_flags |= LKM_VALBLK;
1008
1009 again:
1010         wait = 0;
1011
1012         if (catch_signals && signal_pending(current)) {
1013                 ret = -ERESTARTSYS;
1014                 goto out;
1015         }
1016
1017         spin_lock_irqsave(&lockres->l_lock, flags);
1018
1019         mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1020                         "Cluster lock called on freeing lockres %s! flags "
1021                         "0x%lx\n", lockres->l_name, lockres->l_flags);
1022
1023         /* We only compare against the currently granted level
1024          * here. If the lock is blocked waiting on a downconvert,
1025          * we'll get caught below. */
1026         if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1027             level > lockres->l_level) {
1028                 /* is someone sitting in dlm_lock? If so, wait on
1029                  * them. */
1030                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1031                 wait = 1;
1032                 goto unlock;
1033         }
1034
1035         if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1036             !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1037                 /* is the lock is currently blocked on behalf of
1038                  * another node */
1039                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1040                 wait = 1;
1041                 goto unlock;
1042         }
1043
1044         if (level > lockres->l_level) {
1045                 if (lockres->l_action != OCFS2_AST_INVALID)
1046                         mlog(ML_ERROR, "lockres %s has action %u pending\n",
1047                              lockres->l_name, lockres->l_action);
1048
1049                 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1050                         lockres->l_action = OCFS2_AST_ATTACH;
1051                         lkm_flags &= ~LKM_CONVERT;
1052                 } else {
1053                         lockres->l_action = OCFS2_AST_CONVERT;
1054                         lkm_flags |= LKM_CONVERT;
1055                 }
1056
1057                 lockres->l_requested = level;
1058                 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1059                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1060
1061                 BUG_ON(level == LKM_IVMODE);
1062                 BUG_ON(level == LKM_NLMODE);
1063
1064                 mlog(0, "lock %s, convert from %d to level = %d\n",
1065                      lockres->l_name, lockres->l_level, level);
1066
1067                 /* call dlm_lock to upgrade lock now */
1068                 status = dlmlock(osb->dlm,
1069                                  level,
1070                                  &lockres->l_lksb,
1071                                  lkm_flags,
1072                                  lockres->l_name,
1073                                  OCFS2_LOCK_ID_MAX_LEN - 1,
1074                                  ocfs2_locking_ast,
1075                                  lockres,
1076                                  ocfs2_blocking_ast);
1077                 if (status != DLM_NORMAL) {
1078                         if ((lkm_flags & LKM_NOQUEUE) &&
1079                             (status == DLM_NOTQUEUED))
1080                                 ret = -EAGAIN;
1081                         else {
1082                                 ocfs2_log_dlm_error("dlmlock", status,
1083                                                     lockres);
1084                                 ret = -EINVAL;
1085                         }
1086                         ocfs2_recover_from_dlm_error(lockres, 1);
1087                         goto out;
1088                 }
1089
1090                 mlog(0, "lock %s, successfull return from dlmlock\n",
1091                      lockres->l_name);
1092
1093                 /* At this point we've gone inside the dlm and need to
1094                  * complete our work regardless. */
1095                 catch_signals = 0;
1096
1097                 /* wait for busy to clear and carry on */
1098                 goto again;
1099         }
1100
1101         /* Ok, if we get here then we're good to go. */
1102         ocfs2_inc_holders(lockres, level);
1103
1104         ret = 0;
1105 unlock:
1106         spin_unlock_irqrestore(&lockres->l_lock, flags);
1107 out:
1108         /*
1109          * This is helping work around a lock inversion between the page lock
1110          * and dlm locks.  One path holds the page lock while calling aops
1111          * which block acquiring dlm locks.  The voting thread holds dlm
1112          * locks while acquiring page locks while down converting data locks.
1113          * This block is helping an aop path notice the inversion and back
1114          * off to unlock its page lock before trying the dlm lock again.
1115          */
1116         if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1117             mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1118                 wait = 0;
1119                 if (lockres_remove_mask_waiter(lockres, &mw))
1120                         ret = -EAGAIN;
1121                 else
1122                         goto again;
1123         }
1124         if (wait) {
1125                 ret = ocfs2_wait_for_mask(&mw);
1126                 if (ret == 0)
1127                         goto again;
1128                 mlog_errno(ret);
1129         }
1130
1131         mlog_exit(ret);
1132         return ret;
1133 }
1134
1135 static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1136                                  struct ocfs2_lock_res *lockres,
1137                                  int level)
1138 {
1139         unsigned long flags;
1140
1141         mlog_entry_void();
1142         spin_lock_irqsave(&lockres->l_lock, flags);
1143         ocfs2_dec_holders(lockres, level);
1144         ocfs2_downconvert_on_unlock(osb, lockres);
1145         spin_unlock_irqrestore(&lockres->l_lock, flags);
1146         mlog_exit_void();
1147 }
1148
1149 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1150                                  struct ocfs2_lock_res *lockres,
1151                                  int ex,
1152                                  int local)
1153 {
1154         int level =  ex ? LKM_EXMODE : LKM_PRMODE;
1155         unsigned long flags;
1156         int lkm_flags = local ? LKM_LOCAL : 0;
1157
1158         spin_lock_irqsave(&lockres->l_lock, flags);
1159         BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1160         lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1161         spin_unlock_irqrestore(&lockres->l_lock, flags);
1162
1163         return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1164 }
1165
1166 /* Grants us an EX lock on the data and metadata resources, skipping
1167  * the normal cluster directory lookup. Use this ONLY on newly created
1168  * inodes which other nodes can't possibly see, and which haven't been
1169  * hashed in the inode hash yet. This can give us a good performance
1170  * increase as it'll skip the network broadcast normally associated
1171  * with creating a new lock resource. */
1172 int ocfs2_create_new_inode_locks(struct inode *inode)
1173 {
1174         int ret;
1175         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1176
1177         BUG_ON(!inode);
1178         BUG_ON(!ocfs2_inode_is_new(inode));
1179
1180         mlog_entry_void();
1181
1182         mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1183
1184         /* NOTE: That we don't increment any of the holder counts, nor
1185          * do we add anything to a journal handle. Since this is
1186          * supposed to be a new inode which the cluster doesn't know
1187          * about yet, there is no need to.  As far as the LVB handling
1188          * is concerned, this is basically like acquiring an EX lock
1189          * on a resource which has an invalid one -- we'll set it
1190          * valid when we release the EX. */
1191
1192         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1193         if (ret) {
1194                 mlog_errno(ret);
1195                 goto bail;
1196         }
1197
1198         /*
1199          * We don't want to use LKM_LOCAL on a meta data lock as they
1200          * don't use a generation in their lock names.
1201          */
1202         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1203         if (ret) {
1204                 mlog_errno(ret);
1205                 goto bail;
1206         }
1207
1208         ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1209         if (ret) {
1210                 mlog_errno(ret);
1211                 goto bail;
1212         }
1213
1214 bail:
1215         mlog_exit(ret);
1216         return ret;
1217 }
1218
1219 int ocfs2_rw_lock(struct inode *inode, int write)
1220 {
1221         int status, level;
1222         struct ocfs2_lock_res *lockres;
1223         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1224
1225         BUG_ON(!inode);
1226
1227         mlog_entry_void();
1228
1229         mlog(0, "inode %llu take %s RW lock\n",
1230              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1231              write ? "EXMODE" : "PRMODE");
1232
1233         if (ocfs2_mount_local(osb))
1234                 return 0;
1235
1236         lockres = &OCFS2_I(inode)->ip_rw_lockres;
1237
1238         level = write ? LKM_EXMODE : LKM_PRMODE;
1239
1240         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1241                                     0);
1242         if (status < 0)
1243                 mlog_errno(status);
1244
1245         mlog_exit(status);
1246         return status;
1247 }
1248
1249 void ocfs2_rw_unlock(struct inode *inode, int write)
1250 {
1251         int level = write ? LKM_EXMODE : LKM_PRMODE;
1252         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1253         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1254
1255         mlog_entry_void();
1256
1257         mlog(0, "inode %llu drop %s RW lock\n",
1258              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1259              write ? "EXMODE" : "PRMODE");
1260
1261         if (!ocfs2_mount_local(osb))
1262                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1263
1264         mlog_exit_void();
1265 }
1266
1267 /*
1268  * ocfs2_open_lock always get PR mode lock.
1269  */
1270 int ocfs2_open_lock(struct inode *inode)
1271 {
1272         int status = 0;
1273         struct ocfs2_lock_res *lockres;
1274         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1275
1276         BUG_ON(!inode);
1277
1278         mlog_entry_void();
1279
1280         mlog(0, "inode %llu take PRMODE open lock\n",
1281              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1282
1283         if (ocfs2_mount_local(osb))
1284                 goto out;
1285
1286         lockres = &OCFS2_I(inode)->ip_open_lockres;
1287
1288         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1289                                     LKM_PRMODE, 0, 0);
1290         if (status < 0)
1291                 mlog_errno(status);
1292
1293 out:
1294         mlog_exit(status);
1295         return status;
1296 }
1297
1298 int ocfs2_try_open_lock(struct inode *inode, int write)
1299 {
1300         int status = 0, level;
1301         struct ocfs2_lock_res *lockres;
1302         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1303
1304         BUG_ON(!inode);
1305
1306         mlog_entry_void();
1307
1308         mlog(0, "inode %llu try to take %s open lock\n",
1309              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1310              write ? "EXMODE" : "PRMODE");
1311
1312         if (ocfs2_mount_local(osb))
1313                 goto out;
1314
1315         lockres = &OCFS2_I(inode)->ip_open_lockres;
1316
1317         level = write ? LKM_EXMODE : LKM_PRMODE;
1318
1319         /*
1320          * The file system may already holding a PRMODE/EXMODE open lock.
1321          * Since we pass LKM_NOQUEUE, the request won't block waiting on
1322          * other nodes and the -EAGAIN will indicate to the caller that
1323          * this inode is still in use.
1324          */
1325         status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres,
1326                                     level, LKM_NOQUEUE, 0);
1327
1328 out:
1329         mlog_exit(status);
1330         return status;
1331 }
1332
1333 /*
1334  * ocfs2_open_unlock unlock PR and EX mode open locks.
1335  */
1336 void ocfs2_open_unlock(struct inode *inode)
1337 {
1338         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1339         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1340
1341         mlog_entry_void();
1342
1343         mlog(0, "inode %llu drop open lock\n",
1344              (unsigned long long)OCFS2_I(inode)->ip_blkno);
1345
1346         if (ocfs2_mount_local(osb))
1347                 goto out;
1348
1349         if(lockres->l_ro_holders)
1350                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1351                                      LKM_PRMODE);
1352         if(lockres->l_ex_holders)
1353                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres,
1354                                      LKM_EXMODE);
1355
1356 out:
1357         mlog_exit_void();
1358 }
1359
1360 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1361                                      int level)
1362 {
1363         int ret;
1364         struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1365         unsigned long flags;
1366         struct ocfs2_mask_waiter mw;
1367
1368         ocfs2_init_mask_waiter(&mw);
1369
1370 retry_cancel:
1371         spin_lock_irqsave(&lockres->l_lock, flags);
1372         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1373                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
1374                 if (ret) {
1375                         spin_unlock_irqrestore(&lockres->l_lock, flags);
1376                         ret = ocfs2_cancel_convert(osb, lockres);
1377                         if (ret < 0) {
1378                                 mlog_errno(ret);
1379                                 goto out;
1380                         }
1381                         goto retry_cancel;
1382                 }
1383                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1384                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1385
1386                 ocfs2_wait_for_mask(&mw);
1387                 goto retry_cancel;
1388         }
1389
1390         ret = -ERESTARTSYS;
1391         /*
1392          * We may still have gotten the lock, in which case there's no
1393          * point to restarting the syscall.
1394          */
1395         if (lockres->l_level == level)
1396                 ret = 0;
1397
1398         mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1399              lockres->l_flags, lockres->l_level, lockres->l_action);
1400
1401         spin_unlock_irqrestore(&lockres->l_lock, flags);
1402
1403 out:
1404         return ret;
1405 }
1406
1407 /*
1408  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1409  * flock() calls. The locking approach this requires is sufficiently
1410  * different from all other cluster lock types that we implement a
1411  * seperate path to the "low-level" dlm calls. In particular:
1412  *
1413  * - No optimization of lock levels is done - we take at exactly
1414  *   what's been requested.
1415  *
1416  * - No lock caching is employed. We immediately downconvert to
1417  *   no-lock at unlock time. This also means flock locks never go on
1418  *   the blocking list).
1419  *
1420  * - Since userspace can trivially deadlock itself with flock, we make
1421  *   sure to allow cancellation of a misbehaving applications flock()
1422  *   request.
1423  *
1424  * - Access to any flock lockres doesn't require concurrency, so we
1425  *   can simplify the code by requiring the caller to guarantee
1426  *   serialization of dlmglue flock calls.
1427  */
1428 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1429 {
1430         int ret, level = ex ? LKM_EXMODE : LKM_PRMODE;
1431         unsigned int lkm_flags = trylock ? LKM_NOQUEUE : 0;
1432         unsigned long flags;
1433         struct ocfs2_file_private *fp = file->private_data;
1434         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1435         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1436         struct ocfs2_mask_waiter mw;
1437
1438         ocfs2_init_mask_waiter(&mw);
1439
1440         if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1441             (lockres->l_level > LKM_NLMODE)) {
1442                 mlog(ML_ERROR,
1443                      "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1444                      "level: %u\n", lockres->l_name, lockres->l_flags,
1445                      lockres->l_level);
1446                 return -EINVAL;
1447         }
1448
1449         spin_lock_irqsave(&lockres->l_lock, flags);
1450         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1451                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1452                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1453
1454                 /*
1455                  * Get the lock at NLMODE to start - that way we
1456                  * can cancel the upconvert request if need be.
1457                  */
1458                 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
1459                 if (ret < 0) {
1460                         mlog_errno(ret);
1461                         goto out;
1462                 }
1463
1464                 ret = ocfs2_wait_for_mask(&mw);
1465                 if (ret) {
1466                         mlog_errno(ret);
1467                         goto out;
1468                 }
1469                 spin_lock_irqsave(&lockres->l_lock, flags);
1470         }
1471
1472         lockres->l_action = OCFS2_AST_CONVERT;
1473         lkm_flags |= LKM_CONVERT;
1474         lockres->l_requested = level;
1475         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1476
1477         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1478         spin_unlock_irqrestore(&lockres->l_lock, flags);
1479
1480         ret = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags,
1481                       lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1,
1482                       ocfs2_locking_ast, lockres, ocfs2_blocking_ast);
1483         if (ret != DLM_NORMAL) {
1484                 if (trylock && ret == DLM_NOTQUEUED)
1485                         ret = -EAGAIN;
1486                 else {
1487                         ocfs2_log_dlm_error("dlmlock", ret, lockres);
1488                         ret = -EINVAL;
1489                 }
1490
1491                 ocfs2_recover_from_dlm_error(lockres, 1);
1492                 lockres_remove_mask_waiter(lockres, &mw);
1493                 goto out;
1494         }
1495
1496         ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
1497         if (ret == -ERESTARTSYS) {
1498                 /*
1499                  * Userspace can cause deadlock itself with
1500                  * flock(). Current behavior locally is to allow the
1501                  * deadlock, but abort the system call if a signal is
1502                  * received. We follow this example, otherwise a
1503                  * poorly written program could sit in kernel until
1504                  * reboot.
1505                  *
1506                  * Handling this is a bit more complicated for Ocfs2
1507                  * though. We can't exit this function with an
1508                  * outstanding lock request, so a cancel convert is
1509                  * required. We intentionally overwrite 'ret' - if the
1510                  * cancel fails and the lock was granted, it's easier
1511                  * to just bubble sucess back up to the user.
1512                  */
1513                 ret = ocfs2_flock_handle_signal(lockres, level);
1514         }
1515
1516 out:
1517
1518         mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
1519              lockres->l_name, ex, trylock, ret);
1520         return ret;
1521 }
1522
1523 void ocfs2_file_unlock(struct file *file)
1524 {
1525         int ret;
1526         unsigned long flags;
1527         struct ocfs2_file_private *fp = file->private_data;
1528         struct ocfs2_lock_res *lockres = &fp->fp_flock;
1529         struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1530         struct ocfs2_mask_waiter mw;
1531
1532         ocfs2_init_mask_waiter(&mw);
1533
1534         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
1535                 return;
1536
1537         if (lockres->l_level == LKM_NLMODE)
1538                 return;
1539
1540         mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
1541              lockres->l_name, lockres->l_flags, lockres->l_level,
1542              lockres->l_action);
1543
1544         spin_lock_irqsave(&lockres->l_lock, flags);
1545         /*
1546          * Fake a blocking ast for the downconvert code.
1547          */
1548         lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
1549         lockres->l_blocking = LKM_EXMODE;
1550
1551         ocfs2_prepare_downconvert(lockres, LKM_NLMODE);
1552         lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1553         spin_unlock_irqrestore(&lockres->l_lock, flags);
1554
1555         ret = ocfs2_downconvert_lock(osb, lockres, LKM_NLMODE, 0);
1556         if (ret) {
1557                 mlog_errno(ret);
1558                 return;
1559         }
1560
1561         ret = ocfs2_wait_for_mask(&mw);
1562         if (ret)
1563                 mlog_errno(ret);
1564 }
1565
1566 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
1567                                         struct ocfs2_lock_res *lockres)
1568 {
1569         int kick = 0;
1570
1571         mlog_entry_void();
1572
1573         /* If we know that another node is waiting on our lock, kick
1574          * the downconvert thread * pre-emptively when we reach a release
1575          * condition. */
1576         if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1577                 switch(lockres->l_blocking) {
1578                 case LKM_EXMODE:
1579                         if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1580                                 kick = 1;
1581                         break;
1582                 case LKM_PRMODE:
1583                         if (!lockres->l_ex_holders)
1584                                 kick = 1;
1585                         break;
1586                 default:
1587                         BUG();
1588                 }
1589         }
1590
1591         if (kick)
1592                 ocfs2_wake_downconvert_thread(osb);
1593
1594         mlog_exit_void();
1595 }
1596
1597 #define OCFS2_SEC_BITS   34
1598 #define OCFS2_SEC_SHIFT  (64 - 34)
1599 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
1600
1601 /* LVB only has room for 64 bits of time here so we pack it for
1602  * now. */
1603 static u64 ocfs2_pack_timespec(struct timespec *spec)
1604 {
1605         u64 res;
1606         u64 sec = spec->tv_sec;
1607         u32 nsec = spec->tv_nsec;
1608
1609         res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1610
1611         return res;
1612 }
1613
1614 /* Call this with the lockres locked. I am reasonably sure we don't
1615  * need ip_lock in this function as anyone who would be changing those
1616  * values is supposed to be blocked in ocfs2_inode_lock right now. */
1617 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1618 {
1619         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1620         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1621         struct ocfs2_meta_lvb *lvb;
1622
1623         mlog_entry_void();
1624
1625         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1626
1627         /*
1628          * Invalidate the LVB of a deleted inode - this way other
1629          * nodes are forced to go to disk and discover the new inode
1630          * status.
1631          */
1632         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1633                 lvb->lvb_version = 0;
1634                 goto out;
1635         }
1636
1637         lvb->lvb_version   = OCFS2_LVB_VERSION;
1638         lvb->lvb_isize     = cpu_to_be64(i_size_read(inode));
1639         lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1640         lvb->lvb_iuid      = cpu_to_be32(inode->i_uid);
1641         lvb->lvb_igid      = cpu_to_be32(inode->i_gid);
1642         lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
1643         lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
1644         lvb->lvb_iatime_packed  =
1645                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1646         lvb->lvb_ictime_packed =
1647                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1648         lvb->lvb_imtime_packed =
1649                 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
1650         lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
1651         lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
1652         lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
1653
1654 out:
1655         mlog_meta_lvb(0, lockres);
1656
1657         mlog_exit_void();
1658 }
1659
1660 static void ocfs2_unpack_timespec(struct timespec *spec,
1661                                   u64 packed_time)
1662 {
1663         spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1664         spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1665 }
1666
1667 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1668 {
1669         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1670         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1671         struct ocfs2_meta_lvb *lvb;
1672
1673         mlog_entry_void();
1674
1675         mlog_meta_lvb(0, lockres);
1676
1677         lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1678
1679         /* We're safe here without the lockres lock... */
1680         spin_lock(&oi->ip_lock);
1681         oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1682         i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1683
1684         oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1685         oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
1686         ocfs2_set_inode_flags(inode);
1687
1688         /* fast-symlinks are a special case */
1689         if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1690                 inode->i_blocks = 0;
1691         else
1692                 inode->i_blocks = ocfs2_inode_sector_count(inode);
1693
1694         inode->i_uid     = be32_to_cpu(lvb->lvb_iuid);
1695         inode->i_gid     = be32_to_cpu(lvb->lvb_igid);
1696         inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
1697         inode->i_nlink   = be16_to_cpu(lvb->lvb_inlink);
1698         ocfs2_unpack_timespec(&inode->i_atime,
1699                               be64_to_cpu(lvb->lvb_iatime_packed));
1700         ocfs2_unpack_timespec(&inode->i_mtime,
1701                               be64_to_cpu(lvb->lvb_imtime_packed));
1702         ocfs2_unpack_timespec(&inode->i_ctime,
1703                               be64_to_cpu(lvb->lvb_ictime_packed));
1704         spin_unlock(&oi->ip_lock);
1705
1706         mlog_exit_void();
1707 }
1708
1709 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1710                                               struct ocfs2_lock_res *lockres)
1711 {
1712         struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1713
1714         if (lvb->lvb_version == OCFS2_LVB_VERSION
1715             && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
1716                 return 1;
1717         return 0;
1718 }
1719
1720 /* Determine whether a lock resource needs to be refreshed, and
1721  * arbitrate who gets to refresh it.
1722  *
1723  *   0 means no refresh needed.
1724  *
1725  *   > 0 means you need to refresh this and you MUST call
1726  *   ocfs2_complete_lock_res_refresh afterwards. */
1727 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1728 {
1729         unsigned long flags;
1730         int status = 0;
1731
1732         mlog_entry_void();
1733
1734 refresh_check:
1735         spin_lock_irqsave(&lockres->l_lock, flags);
1736         if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1737                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1738                 goto bail;
1739         }
1740
1741         if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1742                 spin_unlock_irqrestore(&lockres->l_lock, flags);
1743
1744                 ocfs2_wait_on_refreshing_lock(lockres);
1745                 goto refresh_check;
1746         }
1747
1748         /* Ok, I'll be the one to refresh this lock. */
1749         lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1750         spin_unlock_irqrestore(&lockres->l_lock, flags);
1751
1752         status = 1;
1753 bail:
1754         mlog_exit(status);
1755         return status;
1756 }
1757
1758 /* If status is non zero, I'll mark it as not being in refresh
1759  * anymroe, but i won't clear the needs refresh flag. */
1760 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1761                                                    int status)
1762 {
1763         unsigned long flags;
1764         mlog_entry_void();
1765
1766         spin_lock_irqsave(&lockres->l_lock, flags);
1767         lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1768         if (!status)
1769                 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1770         spin_unlock_irqrestore(&lockres->l_lock, flags);
1771
1772         wake_up(&lockres->l_event);
1773
1774         mlog_exit_void();
1775 }
1776
1777 /* may or may not return a bh if it went to disk. */
1778 static int ocfs2_inode_lock_update(struct inode *inode,
1779                                   struct buffer_head **bh)
1780 {
1781         int status = 0;
1782         struct ocfs2_inode_info *oi = OCFS2_I(inode);
1783         struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
1784         struct ocfs2_dinode *fe;
1785         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1786
1787         mlog_entry_void();
1788
1789         if (ocfs2_mount_local(osb))
1790                 goto bail;
1791
1792         spin_lock(&oi->ip_lock);
1793         if (oi->ip_flags & OCFS2_INODE_DELETED) {
1794                 mlog(0, "Orphaned inode %llu was deleted while we "
1795                      "were waiting on a lock. ip_flags = 0x%x\n",
1796                      (unsigned long long)oi->ip_blkno, oi->ip_flags);
1797                 spin_unlock(&oi->ip_lock);
1798                 status = -ENOENT;
1799                 goto bail;
1800         }
1801         spin_unlock(&oi->ip_lock);
1802
1803         if (!ocfs2_should_refresh_lock_res(lockres))
1804                 goto bail;
1805
1806         /* This will discard any caching information we might have had
1807          * for the inode metadata. */
1808         ocfs2_metadata_cache_purge(inode);
1809
1810         ocfs2_extent_map_trunc(inode, 0);
1811
1812         if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
1813                 mlog(0, "Trusting LVB on inode %llu\n",
1814                      (unsigned long long)oi->ip_blkno);
1815                 ocfs2_refresh_inode_from_lvb(inode);
1816         } else {
1817                 /* Boo, we have to go to disk. */
1818                 /* read bh, cast, ocfs2_refresh_inode */
1819                 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1820                                           bh, OCFS2_BH_CACHED, inode);
1821                 if (status < 0) {
1822                         mlog_errno(status);
1823                         goto bail_refresh;
1824                 }
1825                 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1826
1827                 /* This is a good chance to make sure we're not
1828                  * locking an invalid object.
1829                  *
1830                  * We bug on a stale inode here because we checked
1831                  * above whether it was wiped from disk. The wiping
1832                  * node provides a guarantee that we receive that
1833                  * message and can mark the inode before dropping any
1834                  * locks associated with it. */
1835                 if (!OCFS2_IS_VALID_DINODE(fe)) {
1836                         OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1837                         status = -EIO;
1838                         goto bail_refresh;
1839                 }
1840                 mlog_bug_on_msg(inode->i_generation !=
1841                                 le32_to_cpu(fe->i_generation),
1842                                 "Invalid dinode %llu disk generation: %u "
1843                                 "inode->i_generation: %u\n",
1844                                 (unsigned long long)oi->ip_blkno,
1845                                 le32_to_cpu(fe->i_generation),
1846                                 inode->i_generation);
1847                 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1848                                 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
1849                                 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1850                                 (unsigned long long)oi->ip_blkno,
1851                                 (unsigned long long)le64_to_cpu(fe->i_dtime),
1852                                 le32_to_cpu(fe->i_flags));
1853
1854                 ocfs2_refresh_inode(inode, fe);
1855         }
1856
1857         status = 0;
1858 bail_refresh:
1859         ocfs2_complete_lock_res_refresh(lockres, status);
1860 bail:
1861         mlog_exit(status);
1862         return status;
1863 }
1864
1865 static int ocfs2_assign_bh(struct inode *inode,
1866                            struct buffer_head **ret_bh,
1867                            struct buffer_head *passed_bh)
1868 {
1869         int status;
1870
1871         if (passed_bh) {
1872                 /* Ok, the update went to disk for us, use the
1873                  * returned bh. */
1874                 *ret_bh = passed_bh;
1875                 get_bh(*ret_bh);
1876
1877                 return 0;
1878         }
1879
1880         status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1881                                   OCFS2_I(inode)->ip_blkno,
1882                                   ret_bh,
1883                                   OCFS2_BH_CACHED,
1884                                   inode);
1885         if (status < 0)
1886                 mlog_errno(status);
1887
1888         return status;
1889 }
1890
1891 /*
1892  * returns < 0 error if the callback will never be called, otherwise
1893  * the result of the lock will be communicated via the callback.
1894  */
1895 int ocfs2_inode_lock_full(struct inode *inode,
1896                          struct buffer_head **ret_bh,
1897                          int ex,
1898                          int arg_flags)
1899 {
1900         int status, level, dlm_flags, acquired;
1901         struct ocfs2_lock_res *lockres = NULL;
1902         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1903         struct buffer_head *local_bh = NULL;
1904
1905         BUG_ON(!inode);
1906
1907         mlog_entry_void();
1908
1909         mlog(0, "inode %llu, take %s META lock\n",
1910              (unsigned long long)OCFS2_I(inode)->ip_blkno,
1911              ex ? "EXMODE" : "PRMODE");
1912
1913         status = 0;
1914         acquired = 0;
1915         /* We'll allow faking a readonly metadata lock for
1916          * rodevices. */
1917         if (ocfs2_is_hard_readonly(osb)) {
1918                 if (ex)
1919                         status = -EROFS;
1920                 goto bail;
1921         }
1922
1923         if (ocfs2_mount_local(osb))
1924                 goto local;
1925
1926         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1927                 wait_event(osb->recovery_event,
1928                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1929
1930         lockres = &OCFS2_I(inode)->ip_inode_lockres;
1931         level = ex ? LKM_EXMODE : LKM_PRMODE;
1932         dlm_flags = 0;
1933         if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1934                 dlm_flags |= LKM_NOQUEUE;
1935
1936         status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1937         if (status < 0) {
1938                 if (status != -EAGAIN && status != -EIOCBRETRY)
1939                         mlog_errno(status);
1940                 goto bail;
1941         }
1942
1943         /* Notify the error cleanup path to drop the cluster lock. */
1944         acquired = 1;
1945
1946         /* We wait twice because a node may have died while we were in
1947          * the lower dlm layers. The second time though, we've
1948          * committed to owning this lock so we don't allow signals to
1949          * abort the operation. */
1950         if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1951                 wait_event(osb->recovery_event,
1952                            ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1953
1954 local:
1955         /*
1956          * We only see this flag if we're being called from
1957          * ocfs2_read_locked_inode(). It means we're locking an inode
1958          * which hasn't been populated yet, so clear the refresh flag
1959          * and let the caller handle it.
1960          */
1961         if (inode->i_state & I_NEW) {
1962                 status = 0;
1963                 if (lockres)
1964                         ocfs2_complete_lock_res_refresh(lockres, 0);
1965                 goto bail;
1966         }
1967
1968         /* This is fun. The caller may want a bh back, or it may
1969          * not. ocfs2_inode_lock_update definitely wants one in, but
1970          * may or may not read one, depending on what's in the
1971          * LVB. The result of all of this is that we've *only* gone to
1972          * disk if we have to, so the complexity is worthwhile. */
1973         status = ocfs2_inode_lock_update(inode, &local_bh);
1974         if (status < 0) {
1975                 if (status != -ENOENT)
1976                         mlog_errno(status);
1977                 goto bail;
1978         }
1979
1980         if (ret_bh) {
1981                 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1982                 if (status < 0) {
1983                         mlog_errno(status);
1984                         goto bail;
1985                 }
1986         }
1987
1988 bail:
1989         if (status < 0) {
1990                 if (ret_bh && (*ret_bh)) {
1991                         brelse(*ret_bh);
1992                         *ret_bh = NULL;
1993                 }
1994                 if (acquired)
1995                         ocfs2_inode_unlock(inode, ex);
1996         }
1997
1998         if (local_bh)
1999                 brelse(local_bh);
2000
2001         mlog_exit(status);
2002         return status;
2003 }
2004
2005 /*
2006  * This is working around a lock inversion between tasks acquiring DLM
2007  * locks while holding a page lock and the downconvert thread which
2008  * blocks dlm lock acquiry while acquiring page locks.
2009  *
2010  * ** These _with_page variantes are only intended to be called from aop
2011  * methods that hold page locks and return a very specific *positive* error
2012  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2013  *
2014  * The DLM is called such that it returns -EAGAIN if it would have
2015  * blocked waiting for the downconvert thread.  In that case we unlock
2016  * our page so the downconvert thread can make progress.  Once we've
2017  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2018  * that called us can bubble that back up into the VFS who will then
2019  * immediately retry the aop call.
2020  *
2021  * We do a blocking lock and immediate unlock before returning, though, so that
2022  * the lock has a great chance of being cached on this node by the time the VFS
2023  * calls back to retry the aop.    This has a potential to livelock as nodes
2024  * ping locks back and forth, but that's a risk we're willing to take to avoid
2025  * the lock inversion simply.
2026  */
2027 int ocfs2_inode_lock_with_page(struct inode *inode,
2028                               struct buffer_head **ret_bh,
2029                               int ex,
2030                               struct page *page)
2031 {
2032         int ret;
2033
2034         ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2035         if (ret == -EAGAIN) {
2036                 unlock_page(page);
2037                 if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2038                         ocfs2_inode_unlock(inode, ex);
2039                 ret = AOP_TRUNCATED_PAGE;
2040         }
2041
2042         return ret;
2043 }
2044
2045 int ocfs2_inode_lock_atime(struct inode *inode,
2046                           struct vfsmount *vfsmnt,
2047                           int *level)
2048 {
2049         int ret;
2050
2051         mlog_entry_void();
2052         ret = ocfs2_inode_lock(inode, NULL, 0);
2053         if (ret < 0) {
2054                 mlog_errno(ret);
2055                 return ret;
2056         }
2057
2058         /*
2059          * If we should update atime, we will get EX lock,
2060          * otherwise we just get PR lock.
2061          */
2062         if (ocfs2_should_update_atime(inode, vfsmnt)) {
2063                 struct buffer_head *bh = NULL;
2064
2065                 ocfs2_inode_unlock(inode, 0);
2066                 ret = ocfs2_inode_lock(inode, &bh, 1);
2067                 if (ret < 0) {
2068                         mlog_errno(ret);
2069                         return ret;
2070                 }
2071                 *level = 1;
2072                 if (ocfs2_should_update_atime(inode, vfsmnt))
2073                         ocfs2_update_inode_atime(inode, bh);
2074                 if (bh)
2075                         brelse(bh);
2076         } else
2077                 *level = 0;
2078
2079         mlog_exit(ret);
2080         return ret;
2081 }
2082
2083 void ocfs2_inode_unlock(struct inode *inode,
2084                        int ex)
2085 {
2086         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2087         struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2088         struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2089
2090         mlog_entry_void();
2091
2092         mlog(0, "inode %llu drop %s META lock\n",
2093              (unsigned long long)OCFS2_I(inode)->ip_blkno,
2094              ex ? "EXMODE" : "PRMODE");
2095
2096         if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)) &&
2097             !ocfs2_mount_local(osb))
2098                 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
2099
2100         mlog_exit_void();
2101 }
2102
2103 int ocfs2_super_lock(struct ocfs2_super *osb,
2104                      int ex)
2105 {
2106         int status = 0;
2107         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2108         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2109         struct buffer_head *bh;
2110         struct ocfs2_slot_info *si = osb->slot_info;
2111
2112         mlog_entry_void();
2113
2114         if (ocfs2_is_hard_readonly(osb))
2115                 return -EROFS;
2116
2117         if (ocfs2_mount_local(osb))
2118                 goto bail;
2119
2120         status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2121         if (status < 0) {
2122                 mlog_errno(status);
2123                 goto bail;
2124         }
2125
2126         /* The super block lock path is really in the best position to
2127          * know when resources covered by the lock need to be
2128          * refreshed, so we do it here. Of course, making sense of
2129          * everything is up to the caller :) */
2130         status = ocfs2_should_refresh_lock_res(lockres);
2131         if (status < 0) {
2132                 mlog_errno(status);
2133                 goto bail;
2134         }
2135         if (status) {
2136                 bh = si->si_bh;
2137                 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
2138                                           si->si_inode);
2139                 if (status == 0)
2140                         ocfs2_update_slot_info(si);
2141
2142                 ocfs2_complete_lock_res_refresh(lockres, status);
2143
2144                 if (status < 0)
2145                         mlog_errno(status);
2146         }
2147 bail:
2148         mlog_exit(status);
2149         return status;
2150 }
2151
2152 void ocfs2_super_unlock(struct ocfs2_super *osb,
2153                         int ex)
2154 {
2155         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2156         struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2157
2158         if (!ocfs2_mount_local(osb))
2159                 ocfs2_cluster_unlock(osb, lockres, level);
2160 }
2161
2162 int ocfs2_rename_lock(struct ocfs2_super *osb)
2163 {
2164         int status;
2165         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2166
2167         if (ocfs2_is_hard_readonly(osb))
2168                 return -EROFS;
2169
2170         if (ocfs2_mount_local(osb))
2171                 return 0;
2172
2173         status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
2174         if (status < 0)
2175                 mlog_errno(status);
2176
2177         return status;
2178 }
2179
2180 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2181 {
2182         struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2183
2184         if (!ocfs2_mount_local(osb))
2185                 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
2186 }
2187
2188 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2189 {
2190         int ret;
2191         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2192         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2193         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2194
2195         BUG_ON(!dl);
2196
2197         if (ocfs2_is_hard_readonly(osb))
2198                 return -EROFS;
2199
2200         if (ocfs2_mount_local(osb))
2201                 return 0;
2202
2203         ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2204         if (ret < 0)
2205                 mlog_errno(ret);
2206
2207         return ret;
2208 }
2209
2210 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2211 {
2212         int level = ex ? LKM_EXMODE : LKM_PRMODE;
2213         struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2214         struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2215
2216         if (!ocfs2_mount_local(osb))
2217                 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2218 }
2219
2220 /* Reference counting of the dlm debug structure. We want this because
2221  * open references on the debug inodes can live on after a mount, so
2222  * we can't rely on the ocfs2_super to always exist. */
2223 static void ocfs2_dlm_debug_free(struct kref *kref)
2224 {
2225         struct ocfs2_dlm_debug *dlm_debug;
2226
2227         dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
2228
2229         kfree(dlm_debug);
2230 }
2231
2232 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
2233 {
2234         if (dlm_debug)
2235                 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
2236 }
2237
2238 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
2239 {
2240         kref_get(&debug->d_refcnt);
2241 }
2242
2243 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
2244 {
2245         struct ocfs2_dlm_debug *dlm_debug;
2246
2247         dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
2248         if (!dlm_debug) {
2249                 mlog_errno(-ENOMEM);
2250                 goto out;
2251         }
2252
2253         kref_init(&dlm_debug->d_refcnt);
2254         INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
2255         dlm_debug->d_locking_state = NULL;
2256 out:
2257         return dlm_debug;
2258 }
2259
2260 /* Access to this is arbitrated for us via seq_file->sem. */
2261 struct ocfs2_dlm_seq_priv {
2262         struct ocfs2_dlm_debug *p_dlm_debug;
2263         struct ocfs2_lock_res p_iter_res;
2264         struct ocfs2_lock_res p_tmp_res;
2265 };
2266
2267 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
2268                                                  struct ocfs2_dlm_seq_priv *priv)
2269 {
2270         struct ocfs2_lock_res *iter, *ret = NULL;
2271         struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
2272
2273         assert_spin_locked(&ocfs2_dlm_tracking_lock);
2274
2275         list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
2276                 /* discover the head of the list */
2277                 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
2278                         mlog(0, "End of list found, %p\n", ret);
2279                         break;
2280                 }
2281
2282                 /* We track our "dummy" iteration lockres' by a NULL
2283                  * l_ops field. */
2284                 if (iter->l_ops != NULL) {
2285                         ret = iter;
2286                         break;
2287                 }
2288         }
2289
2290         return ret;
2291 }
2292
2293 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
2294 {
2295         struct ocfs2_dlm_seq_priv *priv = m->private;
2296         struct ocfs2_lock_res *iter;
2297
2298         spin_lock(&ocfs2_dlm_tracking_lock);
2299         iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
2300         if (iter) {
2301                 /* Since lockres' have the lifetime of their container
2302                  * (which can be inodes, ocfs2_supers, etc) we want to
2303                  * copy this out to a temporary lockres while still
2304                  * under the spinlock. Obviously after this we can't
2305                  * trust any pointers on the copy returned, but that's
2306                  * ok as the information we want isn't typically held
2307                  * in them. */
2308                 priv->p_tmp_res = *iter;
2309                 iter = &priv->p_tmp_res;
2310         }
2311         spin_unlock(&ocfs2_dlm_tracking_lock);
2312
2313         return iter;
2314 }
2315
2316 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
2317 {
2318 }
2319
2320 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
2321 {
2322         struct ocfs2_dlm_seq_priv *priv = m->private;
2323         struct ocfs2_lock_res *iter = v;
2324         struct ocfs2_lock_res *dummy = &priv->p_iter_res;
2325
2326         spin_lock(&ocfs2_dlm_tracking_lock);
2327         iter = ocfs2_dlm_next_res(iter, priv);
2328         list_del_init(&dummy->l_debug_list);
2329         if (iter) {
2330                 list_add(&dummy->l_debug_list, &iter->l_debug_list);
2331                 priv->p_tmp_res = *iter;
2332                 iter = &priv->p_tmp_res;
2333         }
2334         spin_unlock(&ocfs2_dlm_tracking_lock);
2335
2336         return iter;
2337 }
2338
2339 /* So that debugfs.ocfs2 can determine which format is being used */
2340 #define OCFS2_DLM_DEBUG_STR_VERSION 1
2341 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
2342 {
2343         int i;
2344         char *lvb;
2345         struct ocfs2_lock_res *lockres = v;
2346
2347         if (!lockres)
2348                 return -EINVAL;
2349
2350         seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
2351
2352         if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
2353                 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
2354                            lockres->l_name,
2355                            (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
2356         else
2357                 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
2358
2359         seq_printf(m, "%d\t"
2360                    "0x%lx\t"
2361                    "0x%x\t"
2362                    "0x%x\t"
2363                    "%u\t"
2364                    "%u\t"
2365                    "%d\t"
2366                    "%d\t",
2367                    lockres->l_level,
2368                    lockres->l_flags,
2369                    lockres->l_action,
2370                    lockres->l_unlock_action,
2371                    lockres->l_ro_holders,
2372                    lockres->l_ex_holders,
2373                    lockres->l_requested,
2374                    lockres->l_blocking);
2375
2376         /* Dump the raw LVB */
2377         lvb = lockres->l_lksb.lvb;
2378         for(i = 0; i < DLM_LVB_LEN; i++)
2379                 seq_printf(m, "0x%x\t", lvb[i]);
2380
2381         /* End the line */
2382         seq_printf(m, "\n");
2383         return 0;
2384 }
2385
2386 static struct seq_operations ocfs2_dlm_seq_ops = {
2387         .start =        ocfs2_dlm_seq_start,
2388         .stop =         ocfs2_dlm_seq_stop,
2389         .next =         ocfs2_dlm_seq_next,
2390         .show =         ocfs2_dlm_seq_show,
2391 };
2392
2393 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2394 {
2395         struct seq_file *seq = (struct seq_file *) file->private_data;
2396         struct ocfs2_dlm_seq_priv *priv = seq->private;
2397         struct ocfs2_lock_res *res = &priv->p_iter_res;
2398
2399         ocfs2_remove_lockres_tracking(res);
2400         ocfs2_put_dlm_debug(priv->p_dlm_debug);
2401         return seq_release_private(inode, file);
2402 }
2403
2404 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2405 {
2406         int ret;
2407         struct ocfs2_dlm_seq_priv *priv;
2408         struct seq_file *seq;
2409         struct ocfs2_super *osb;
2410
2411         priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2412         if (!priv) {
2413                 ret = -ENOMEM;
2414                 mlog_errno(ret);
2415                 goto out;
2416         }
2417         osb = inode->i_private;
2418         ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2419         priv->p_dlm_debug = osb->osb_dlm_debug;
2420         INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2421
2422         ret = seq_open(file, &ocfs2_dlm_seq_ops);
2423         if (ret) {
2424                 kfree(priv);
2425                 mlog_errno(ret);
2426                 goto out;
2427         }
2428
2429         seq = (struct seq_file *) file->private_data;
2430         seq->private = priv;
2431
2432         ocfs2_add_lockres_tracking(&priv->p_iter_res,
2433                                    priv->p_dlm_debug);
2434
2435 out:
2436         return ret;
2437 }
2438
2439 static const struct file_operations ocfs2_dlm_debug_fops = {
2440         .open =         ocfs2_dlm_debug_open,
2441         .release =      ocfs2_dlm_debug_release,
2442         .read =         seq_read,
2443         .llseek =       seq_lseek,
2444 };
2445
2446 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2447 {
2448         int ret = 0;
2449         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2450
2451         dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2452                                                          S_IFREG|S_IRUSR,
2453                                                          osb->osb_debug_root,
2454                                                          osb,
2455                                                          &ocfs2_dlm_debug_fops);
2456         if (!dlm_debug->d_locking_state) {
2457                 ret = -EINVAL;
2458                 mlog(ML_ERROR,
2459                      "Unable to create locking state debugfs file.\n");
2460                 goto out;
2461         }
2462
2463         ocfs2_get_dlm_debug(dlm_debug);
2464 out:
2465         return ret;
2466 }
2467
2468 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2469 {
2470         struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2471
2472         if (dlm_debug) {
2473                 debugfs_remove(dlm_debug->d_locking_state);
2474                 ocfs2_put_dlm_debug(dlm_debug);
2475         }
2476 }
2477
2478 int ocfs2_dlm_init(struct ocfs2_super *osb)
2479 {
2480         int status = 0;
2481         u32 dlm_key;
2482         struct dlm_ctxt *dlm = NULL;
2483
2484         mlog_entry_void();
2485
2486         if (ocfs2_mount_local(osb))
2487                 goto local;
2488
2489         status = ocfs2_dlm_init_debug(osb);
2490         if (status < 0) {
2491                 mlog_errno(status);
2492                 goto bail;
2493         }
2494
2495         /* launch downconvert thread */
2496         osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc");
2497         if (IS_ERR(osb->dc_task)) {
2498                 status = PTR_ERR(osb->dc_task);
2499                 osb->dc_task = NULL;
2500                 mlog_errno(status);
2501                 goto bail;
2502         }
2503
2504         /* used by the dlm code to make message headers unique, each
2505          * node in this domain must agree on this. */
2506         dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2507
2508         /* for now, uuid == domain */
2509         dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2510         if (IS_ERR(dlm)) {
2511                 status = PTR_ERR(dlm);
2512                 mlog_errno(status);
2513                 goto bail;
2514         }
2515
2516         dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2517
2518 local:
2519         ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2520         ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2521
2522         osb->dlm = dlm;
2523
2524         status = 0;
2525 bail:
2526         if (status < 0) {
2527                 ocfs2_dlm_shutdown_debug(osb);
2528                 if (osb->dc_task)
2529                         kthread_stop(osb->dc_task);
2530         }
2531
2532         mlog_exit(status);
2533         return status;
2534 }
2535
2536 void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2537 {
2538         mlog_entry_void();
2539
2540         dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2541
2542         ocfs2_drop_osb_locks(osb);
2543
2544         if (osb->dc_task) {
2545                 kthread_stop(osb->dc_task);
2546                 osb->dc_task = NULL;
2547         }
2548
2549         ocfs2_lock_res_free(&osb->osb_super_lockres);
2550         ocfs2_lock_res_free(&osb->osb_rename_lockres);
2551
2552         dlm_unregister_domain(osb->dlm);
2553         osb->dlm = NULL;
2554
2555         ocfs2_dlm_shutdown_debug(osb);
2556
2557         mlog_exit_void();
2558 }
2559
2560 static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
2561 {
2562         struct ocfs2_lock_res *lockres = opaque;
2563         unsigned long flags;
2564
2565         mlog_entry_void();
2566
2567         mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2568              lockres->l_unlock_action);
2569
2570         spin_lock_irqsave(&lockres->l_lock, flags);
2571         /* We tried to cancel a convert request, but it was already
2572          * granted. All we want to do here is clear our unlock
2573          * state. The wake_up call done at the bottom is redundant
2574          * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2575          * hurt anything anyway */
2576         if (status == DLM_CANCELGRANT &&
2577             lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2578                 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2579
2580                 /* We don't clear the busy flag in this case as it
2581                  * should have been cleared by the ast which the dlm
2582                  * has called. */
2583                 goto complete_unlock;
2584         }
2585
2586         if (status != DLM_NORMAL) {
2587                 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2588                      "unlock_action %d\n", status, lockres->l_name,
2589                      lockres->l_unlock_action);
2590                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2591                 return;
2592         }
2593
2594         switch(lockres->l_unlock_action) {
2595         case OCFS2_UNLOCK_CANCEL_CONVERT:
2596                 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2597                 lockres->l_action = OCFS2_AST_INVALID;
2598                 break;
2599         case OCFS2_UNLOCK_DROP_LOCK:
2600                 lockres->l_level = LKM_IVMODE;
2601                 break;
2602         default:
2603                 BUG();
2604         }
2605
2606         lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2607 complete_unlock:
2608         lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2609         spin_unlock_irqrestore(&lockres->l_lock, flags);
2610
2611         wake_up(&lockres->l_event);
2612
2613         mlog_exit_void();
2614 }
2615
2616 static int ocfs2_drop_lock(struct ocfs2_super *osb,
2617                            struct ocfs2_lock_res *lockres)
2618 {
2619         enum dlm_status status;
2620         unsigned long flags;
2621         int lkm_flags = 0;
2622
2623         /* We didn't get anywhere near actually using this lockres. */
2624         if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2625                 goto out;
2626
2627         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2628                 lkm_flags |= LKM_VALBLK;
2629
2630         spin_lock_irqsave(&lockres->l_lock, flags);
2631
2632         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2633                         "lockres %s, flags 0x%lx\n",
2634                         lockres->l_name, lockres->l_flags);
2635
2636         while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2637                 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2638                      "%u, unlock_action = %u\n",
2639                      lockres->l_name, lockres->l_flags, lockres->l_action,
2640                      lockres->l_unlock_action);
2641
2642                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2643
2644                 /* XXX: Today we just wait on any busy
2645                  * locks... Perhaps we need to cancel converts in the
2646                  * future? */
2647                 ocfs2_wait_on_busy_lock(lockres);
2648
2649                 spin_lock_irqsave(&lockres->l_lock, flags);
2650         }
2651
2652         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2653                 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2654                     lockres->l_level == LKM_EXMODE &&
2655                     !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2656                         lockres->l_ops->set_lvb(lockres);
2657         }
2658
2659         if (lockres->l_flags & OCFS2_LOCK_BUSY)
2660                 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2661                      lockres->l_name);
2662         if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2663                 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2664
2665         if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2666                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2667                 goto out;
2668         }
2669
2670         lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2671
2672         /* make sure we never get here while waiting for an ast to
2673          * fire. */
2674         BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2675
2676         /* is this necessary? */
2677         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2678         lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2679         spin_unlock_irqrestore(&lockres->l_lock, flags);
2680
2681         mlog(0, "lock %s\n", lockres->l_name);
2682
2683         status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2684                            ocfs2_unlock_ast, lockres);
2685         if (status != DLM_NORMAL) {
2686                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2687                 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2688                 dlm_print_one_lock(lockres->l_lksb.lockid);
2689                 BUG();
2690         }
2691         mlog(0, "lock %s, successfull return from dlmunlock\n",
2692              lockres->l_name);
2693
2694         ocfs2_wait_on_busy_lock(lockres);
2695 out:
2696         mlog_exit(0);
2697         return 0;
2698 }
2699
2700 /* Mark the lockres as being dropped. It will no longer be
2701  * queued if blocking, but we still may have to wait on it
2702  * being dequeued from the downconvert thread before we can consider
2703  * it safe to drop. 
2704  *
2705  * You can *not* attempt to call cluster_lock on this lockres anymore. */
2706 void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2707 {
2708         int status;
2709         struct ocfs2_mask_waiter mw;
2710         unsigned long flags;
2711
2712         ocfs2_init_mask_waiter(&mw);
2713
2714         spin_lock_irqsave(&lockres->l_lock, flags);
2715         lockres->l_flags |= OCFS2_LOCK_FREEING;
2716         while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2717                 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2718                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2719
2720                 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2721
2722                 status = ocfs2_wait_for_mask(&mw);
2723                 if (status)
2724                         mlog_errno(status);
2725
2726                 spin_lock_irqsave(&lockres->l_lock, flags);
2727         }
2728         spin_unlock_irqrestore(&lockres->l_lock, flags);
2729 }
2730
2731 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2732                                struct ocfs2_lock_res *lockres)
2733 {
2734         int ret;
2735
2736         ocfs2_mark_lockres_freeing(lockres);
2737         ret = ocfs2_drop_lock(osb, lockres);
2738         if (ret)
2739                 mlog_errno(ret);
2740 }
2741
2742 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2743 {
2744         ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2745         ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
2746 }
2747
2748 int ocfs2_drop_inode_locks(struct inode *inode)
2749 {
2750         int status, err;
2751
2752         mlog_entry_void();
2753
2754         /* No need to call ocfs2_mark_lockres_freeing here -
2755          * ocfs2_clear_inode has done it for us. */
2756
2757         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2758                               &OCFS2_I(inode)->ip_open_lockres);
2759         if (err < 0)
2760                 mlog_errno(err);
2761
2762         status = err;
2763
2764         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2765                               &OCFS2_I(inode)->ip_inode_lockres);
2766         if (err < 0)
2767                 mlog_errno(err);
2768         if (err < 0 && !status)
2769                 status = err;
2770
2771         err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2772                               &OCFS2_I(inode)->ip_rw_lockres);
2773         if (err < 0)
2774                 mlog_errno(err);
2775         if (err < 0 && !status)
2776                 status = err;
2777
2778         mlog_exit(status);
2779         return status;
2780 }
2781
2782 static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2783                                       int new_level)
2784 {
2785         assert_spin_locked(&lockres->l_lock);
2786
2787         BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2788
2789         if (lockres->l_level <= new_level) {
2790                 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2791                      lockres->l_level, new_level);
2792                 BUG();
2793         }
2794
2795         mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2796              lockres->l_name, new_level, lockres->l_blocking);
2797
2798         lockres->l_action = OCFS2_AST_DOWNCONVERT;
2799         lockres->l_requested = new_level;
2800         lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2801 }
2802
2803 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2804                                   struct ocfs2_lock_res *lockres,
2805                                   int new_level,
2806                                   int lvb)
2807 {
2808         int ret, dlm_flags = LKM_CONVERT;
2809         enum dlm_status status;
2810
2811         mlog_entry_void();
2812
2813         if (lvb)
2814                 dlm_flags |= LKM_VALBLK;
2815
2816         status = dlmlock(osb->dlm,
2817                          new_level,
2818                          &lockres->l_lksb,
2819                          dlm_flags,
2820                          lockres->l_name,
2821                          OCFS2_LOCK_ID_MAX_LEN - 1,
2822                          ocfs2_locking_ast,
2823                          lockres,
2824                          ocfs2_blocking_ast);
2825         if (status != DLM_NORMAL) {
2826                 ocfs2_log_dlm_error("dlmlock", status, lockres);
2827                 ret = -EINVAL;
2828                 ocfs2_recover_from_dlm_error(lockres, 1);
2829                 goto bail;
2830         }
2831
2832         ret = 0;
2833 bail:
2834         mlog_exit(ret);
2835         return ret;
2836 }
2837
2838 /* returns 1 when the caller should unlock and call dlmunlock */
2839 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2840                                         struct ocfs2_lock_res *lockres)
2841 {
2842         assert_spin_locked(&lockres->l_lock);
2843
2844         mlog_entry_void();
2845         mlog(0, "lock %s\n", lockres->l_name);
2846
2847         if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2848                 /* If we're already trying to cancel a lock conversion
2849                  * then just drop the spinlock and allow the caller to
2850                  * requeue this lock. */
2851
2852                 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2853                 return 0;
2854         }
2855
2856         /* were we in a convert when we got the bast fire? */
2857         BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2858                lockres->l_action != OCFS2_AST_DOWNCONVERT);
2859         /* set things up for the unlockast to know to just
2860          * clear out the ast_action and unset busy, etc. */
2861         lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2862
2863         mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2864                         "lock %s, invalid flags: 0x%lx\n",
2865                         lockres->l_name, lockres->l_flags);
2866
2867         return 1;
2868 }
2869
2870 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2871                                 struct ocfs2_lock_res *lockres)
2872 {
2873         int ret;
2874         enum dlm_status status;
2875
2876         mlog_entry_void();
2877         mlog(0, "lock %s\n", lockres->l_name);
2878
2879         ret = 0;
2880         status = dlmunlock(osb->dlm,
2881                            &lockres->l_lksb,
2882                            LKM_CANCEL,
2883                            ocfs2_unlock_ast,
2884                            lockres);
2885         if (status != DLM_NORMAL) {
2886                 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2887                 ret = -EINVAL;
2888                 ocfs2_recover_from_dlm_error(lockres, 0);
2889         }
2890
2891         mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2892
2893         mlog_exit(ret);
2894         return ret;
2895 }
2896
2897 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2898                               struct ocfs2_lock_res *lockres,
2899                               struct ocfs2_unblock_ctl *ctl)
2900 {
2901         unsigned long flags;
2902         int blocking;
2903         int new_level;
2904         int ret = 0;
2905         int set_lvb = 0;
2906
2907         mlog_entry_void();
2908
2909         spin_lock_irqsave(&lockres->l_lock, flags);
2910
2911         BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2912
2913 recheck:
2914         if (lockres->l_flags & OCFS2_LOCK_BUSY) {
2915                 ctl->requeue = 1;
2916                 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2917                 spin_unlock_irqrestore(&lockres->l_lock, flags);
2918                 if (ret) {
2919                         ret = ocfs2_cancel_convert(osb, lockres);
2920                         if (ret < 0)
2921                                 mlog_errno(ret);
2922                 }
2923                 goto leave;
2924         }
2925
2926         /* if we're blocking an exclusive and we have *any* holders,
2927          * then requeue. */
2928         if ((lockres->l_blocking == LKM_EXMODE)
2929             && (lockres->l_ex_holders || lockres->l_ro_holders))
2930                 goto leave_requeue;
2931
2932         /* If it's a PR we're blocking, then only
2933          * requeue if we've got any EX holders */
2934         if (lockres->l_blocking == LKM_PRMODE &&
2935             lockres->l_ex_holders)
2936                 goto leave_requeue;
2937
2938         /*
2939          * Can we get a lock in this state if the holder counts are
2940          * zero? The meta data unblock code used to check this.
2941          */
2942         if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2943             && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2944                 goto leave_requeue;
2945
2946         new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2947
2948         if (lockres->l_ops->check_downconvert
2949             && !lockres->l_ops->check_downconvert(lockres, new_level))
2950                 goto leave_requeue;
2951
2952         /* If we get here, then we know that there are no more
2953          * incompatible holders (and anyone asking for an incompatible
2954          * lock is blocked). We can now downconvert the lock */
2955         if (!lockres->l_ops->downconvert_worker)
2956                 goto downconvert;
2957
2958         /* Some lockres types want to do a bit of work before
2959          * downconverting a lock. Allow that here. The worker function
2960          * may sleep, so we save off a copy of what we're blocking as
2961          * it may change while we're not holding the spin lock. */
2962         blocking = lockres->l_blocking;
2963         spin_unlock_irqrestore(&lockres->l_lock, flags);
2964
2965         ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
2966
2967         if (ctl->unblock_action == UNBLOCK_STOP_POST)
2968                 goto leave;
2969
2970         spin_lock_irqsave(&lockres->l_lock, flags);
2971         if (blocking != lockres->l_blocking) {
2972                 /* If this changed underneath us, then we can't drop
2973                  * it just yet. */
2974                 goto recheck;
2975         }
2976
2977 downconvert:
2978         ctl->requeue = 0;
2979
2980         if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2981                 if (lockres->l_level == LKM_EXMODE)
2982                         set_lvb = 1;
2983
2984                 /*
2985                  * We only set the lvb if the lock has been fully
2986                  * refreshed - otherwise we risk setting stale
2987                  * data. Otherwise, there's no need to actually clear
2988                  * out the lvb here as it's value is still valid.
2989                  */
2990                 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2991                         lockres->l_ops->set_lvb(lockres);
2992         }
2993
2994         ocfs2_prepare_downconvert(lockres, new_level);
2995         spin_unlock_irqrestore(&lockres->l_lock, flags);
2996         ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
2997 leave:
2998         mlog_exit(ret);
2999         return ret;
3000
3001 leave_requeue:
3002         spin_unlock_irqrestore(&lockres->l_lock, flags);
3003         ctl->requeue = 1;
3004
3005         mlog_exit(0);
3006         return 0;
3007 }
3008
3009 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3010                                      int blocking)
3011 {
3012         struct inode *inode;
3013         struct address_space *mapping;
3014
3015         inode = ocfs2_lock_res_inode(lockres);
3016         mapping = inode->i_mapping;
3017
3018         if (S_ISREG(inode->i_mode))
3019                 goto out;
3020
3021         /*
3022          * We need this before the filemap_fdatawrite() so that it can
3023          * transfer the dirty bit from the PTE to the
3024          * page. Unfortunately this means that even for EX->PR
3025          * downconverts, we'll lose our mappings and have to build
3026          * them up again.
3027          */
3028         unmap_mapping_range(mapping, 0, 0, 0);
3029
3030         if (filemap_fdatawrite(mapping)) {
3031                 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3032                      (unsigned long long)OCFS2_I(inode)->ip_blkno);
3033         }
3034         sync_mapping_buffers(mapping);
3035         if (blocking == LKM_EXMODE) {
3036                 truncate_inode_pages(mapping, 0);
3037         } else {
3038                 /* We only need to wait on the I/O if we're not also
3039                  * truncating pages because truncate_inode_pages waits
3040                  * for us above. We don't truncate pages if we're
3041                  * blocking anything < EXMODE because we want to keep
3042                  * them around in that case. */
3043                 filemap_fdatawait(mapping);
3044         }
3045
3046 out:
3047         return UNBLOCK_CONTINUE;
3048 }
3049
3050 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3051                                         int new_level)
3052 {
3053         struct inode *inode = ocfs2_lock_res_inode(lockres);
3054         int checkpointed = ocfs2_inode_fully_checkpointed(inode);
3055
3056         BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
3057         BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
3058
3059         if (checkpointed)
3060                 return 1;
3061
3062         ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
3063         return 0;
3064 }
3065
3066 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3067 {
3068         struct inode *inode = ocfs2_lock_res_inode(lockres);
3069
3070         __ocfs2_stuff_meta_lvb(inode);
3071 }
3072
3073 /*
3074  * Does the final reference drop on our dentry lock. Right now this
3075  * happens in the downconvert thread, but we could choose to simplify the
3076  * dlmglue API and push these off to the ocfs2_wq in the future.
3077  */
3078 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3079                                      struct ocfs2_lock_res *lockres)
3080 {
3081         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3082         ocfs2_dentry_lock_put(osb, dl);
3083 }
3084
3085 /*
3086  * d_delete() matching dentries before the lock downconvert.
3087  *
3088  * At this point, any process waiting to destroy the
3089  * dentry_lock due to last ref count is stopped by the
3090  * OCFS2_LOCK_QUEUED flag.
3091  *
3092  * We have two potential problems
3093  *
3094  * 1) If we do the last reference drop on our dentry_lock (via dput)
3095  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
3096  *    the downconvert to finish. Instead we take an elevated
3097  *    reference and push the drop until after we've completed our
3098  *    unblock processing.
3099  *
3100  * 2) There might be another process with a final reference,
3101  *    waiting on us to finish processing. If this is the case, we
3102  *    detect it and exit out - there's no more dentries anyway.
3103  */
3104 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
3105                                        int blocking)
3106 {
3107         struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3108         struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
3109         struct dentry *dentry;
3110         unsigned long flags;
3111         int extra_ref = 0;
3112
3113         /*
3114          * This node is blocking another node from getting a read
3115          * lock. This happens when we've renamed within a
3116          * directory. We've forced the other nodes to d_delete(), but
3117          * we never actually dropped our lock because it's still
3118          * valid. The downconvert code will retain a PR for this node,
3119          * so there's no further work to do.
3120          */
3121         if (blocking == LKM_PRMODE)
3122                 return UNBLOCK_CONTINUE;
3123
3124         /*
3125          * Mark this inode as potentially orphaned. The code in
3126          * ocfs2_delete_inode() will figure out whether it actually
3127          * needs to be freed or not.
3128          */
3129         spin_lock(&oi->ip_lock);
3130         oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
3131         spin_unlock(&oi->ip_lock);
3132
3133         /*
3134          * Yuck. We need to make sure however that the check of
3135          * OCFS2_LOCK_FREEING and the extra reference are atomic with
3136          * respect to a reference decrement or the setting of that
3137          * flag.
3138          */
3139         spin_lock_irqsave(&lockres->l_lock, flags);
3140         spin_lock(&dentry_attach_lock);
3141         if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
3142             && dl->dl_count) {
3143                 dl->dl_count++;
3144                 extra_ref = 1;
3145         }
3146         spin_unlock(&dentry_attach_lock);
3147         spin_unlock_irqrestore(&lockres->l_lock, flags);
3148
3149         mlog(0, "extra_ref = %d\n", extra_ref);
3150
3151         /*
3152          * We have a process waiting on us in ocfs2_dentry_iput(),
3153          * which means we can't have any more outstanding
3154          * aliases. There's no need to do any more work.
3155          */
3156         if (!extra_ref)
3157                 return UNBLOCK_CONTINUE;
3158
3159         spin_lock(&dentry_attach_lock);
3160         while (1) {
3161                 dentry = ocfs2_find_local_alias(dl->dl_inode,
3162                                                 dl->dl_parent_blkno, 1);
3163                 if (!dentry)
3164                         break;
3165                 spin_unlock(&dentry_attach_lock);
3166
3167                 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
3168                      dentry->d_name.name);
3169
3170                 /*
3171                  * The following dcache calls may do an
3172                  * iput(). Normally we don't want that from the
3173                  * downconverting thread, but in this case it's ok
3174                  * because the requesting node already has an
3175                  * exclusive lock on the inode, so it can't be queued
3176                  * for a downconvert.
3177                  */
3178                 d_delete(dentry);
3179                 dput(dentry);
3180
3181                 spin_lock(&dentry_attach_lock);
3182         }
3183         spin_unlock(&dentry_attach_lock);
3184
3185         /*
3186          * If we are the last holder of this dentry lock, there is no
3187          * reason to downconvert so skip straight to the unlock.
3188          */
3189         if (dl->dl_count == 1)
3190                 return UNBLOCK_STOP_POST;
3191
3192         return UNBLOCK_CONTINUE_POST;
3193 }
3194
3195 void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3196                                 struct ocfs2_lock_res *lockres)
3197 {
3198         int status;
3199         struct ocfs2_unblock_ctl ctl = {0, 0,};
3200         unsigned long flags;
3201
3202         /* Our reference to the lockres in this function can be
3203          * considered valid until we remove the OCFS2_LOCK_QUEUED
3204          * flag. */
3205
3206         mlog_entry_void();
3207
3208         BUG_ON(!lockres);
3209         BUG_ON(!lockres->l_ops);
3210
3211         mlog(0, "lockres %s blocked.\n", lockres->l_name);
3212
3213         /* Detect whether a lock has been marked as going away while
3214          * the downconvert thread was processing other things. A lock can
3215          * still be marked with OCFS2_LOCK_FREEING after this check,
3216          * but short circuiting here will still save us some
3217          * performance. */
3218         spin_lock_irqsave(&lockres->l_lock, flags);
3219         if (lockres->l_flags & OCFS2_LOCK_FREEING)
3220                 goto unqueue;
3221         spin_unlock_irqrestore(&lockres->l_lock, flags);
3222
3223         status = ocfs2_unblock_lock(osb, lockres, &ctl);
3224         if (status < 0)
3225                 mlog_errno(status);
3226
3227         spin_lock_irqsave(&lockres->l_lock, flags);
3228 unqueue:
3229         if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
3230                 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
3231         } else
3232                 ocfs2_schedule_blocked_lock(osb, lockres);
3233
3234         mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
3235              ctl.requeue ? "yes" : "no");
3236         spin_unlock_irqrestore(&lockres->l_lock, flags);
3237
3238         if (ctl.unblock_action != UNBLOCK_CONTINUE
3239             && lockres->l_ops->post_unlock)
3240                 lockres->l_ops->post_unlock(osb, lockres);
3241
3242         mlog_exit_void();
3243 }
3244
3245 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
3246                                         struct ocfs2_lock_res *lockres)
3247 {
3248         mlog_entry_void();
3249
3250         assert_spin_locked(&lockres->l_lock);
3251
3252         if (lockres->l_flags & OCFS2_LOCK_FREEING) {
3253                 /* Do not schedule a lock for downconvert when it's on
3254                  * the way to destruction - any nodes wanting access
3255                  * to the resource will get it soon. */
3256                 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
3257                      lockres->l_name, lockres->l_flags);
3258                 return;
3259         }
3260
3261         lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
3262
3263         spin_lock(&osb->dc_task_lock);
3264         if (list_empty(&lockres->l_blocked_list)) {
3265                 list_add_tail(&lockres->l_blocked_list,
3266                               &osb->blocked_lock_list);
3267                 osb->blocked_lock_count++;
3268         }
3269         spin_unlock(&osb->dc_task_lock);
3270
3271         mlog_exit_void();
3272 }
3273
3274 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
3275 {
3276         unsigned long processed;
3277         struct ocfs2_lock_res *lockres;
3278
3279         mlog_entry_void();
3280
3281         spin_lock(&osb->dc_task_lock);
3282         /* grab this early so we know to try again if a state change and
3283          * wake happens part-way through our work  */
3284         osb->dc_work_sequence = osb->dc_wake_sequence;
3285
3286         processed = osb->blocked_lock_count;
3287         while (processed) {
3288                 BUG_ON(list_empty(&osb->blocked_lock_list));
3289
3290                 lockres = list_entry(osb->blocked_lock_list.next,
3291                                      struct ocfs2_lock_res, l_blocked_list);
3292                 list_del_init(&lockres->l_blocked_list);
3293                 osb->blocked_lock_count--;
3294                 spin_unlock(&osb->dc_task_lock);
3295
3296                 BUG_ON(!processed);
3297                 processed--;
3298
3299                 ocfs2_process_blocked_lock(osb, lockres);
3300
3301                 spin_lock(&osb->dc_task_lock);
3302         }
3303         spin_unlock(&osb->dc_task_lock);
3304
3305         mlog_exit_void();
3306 }
3307
3308 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
3309 {
3310         int empty = 0;
3311
3312         spin_lock(&osb->dc_task_lock);
3313         if (list_empty(&osb->blocked_lock_list))
3314                 empty = 1;
3315
3316         spin_unlock(&osb->dc_task_lock);
3317         return empty;
3318 }
3319
3320 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
3321 {
3322         int should_wake = 0;
3323
3324         spin_lock(&osb->dc_task_lock);
3325         if (osb->dc_work_sequence != osb->dc_wake_sequence)
3326                 should_wake = 1;
3327         spin_unlock(&osb->dc_task_lock);
3328
3329         return should_wake;
3330 }
3331
3332 int ocfs2_downconvert_thread(void *arg)
3333 {
3334         int status = 0;
3335         struct ocfs2_super *osb = arg;
3336
3337         /* only quit once we've been asked to stop and there is no more
3338          * work available */
3339         while (!(kthread_should_stop() &&
3340                 ocfs2_downconvert_thread_lists_empty(osb))) {
3341
3342                 wait_event_interruptible(osb->dc_event,
3343                                          ocfs2_downconvert_thread_should_wake(osb) ||
3344                                          kthread_should_stop());
3345
3346                 mlog(0, "downconvert_thread: awoken\n");
3347
3348                 ocfs2_downconvert_thread_do_work(osb);
3349         }
3350
3351         osb->dc_task = NULL;
3352         return status;
3353 }
3354
3355 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
3356 {
3357         spin_lock(&osb->dc_task_lock);
3358         /* make sure the voting thread gets a swipe at whatever changes
3359          * the caller may have made to the voting state */
3360         osb->dc_wake_sequence++;
3361         spin_unlock(&osb->dc_task_lock);
3362         wake_up(&osb->dc_event);
3363 }