acfc9288d5cd37e5324afff1981b5d310d6acd97
[pandora-kernel.git] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42
43
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdomain.h"
51 #include "dlmdebug.h"
52
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55
56 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
57                               struct dlm_master_list_entry *mle,
58                               struct o2nm_node *node,
59                               int idx);
60 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
61                             struct dlm_master_list_entry *mle,
62                             struct o2nm_node *node,
63                             int idx);
64
65 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
66 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
67                                 struct dlm_lock_resource *res,
68                                 void *nodemap, u32 flags);
69 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
70
71 static inline void __dlm_mle_name(struct dlm_master_list_entry *mle,
72                                   unsigned char **name, unsigned int *namelen,
73                                   unsigned int *namehash)
74 {
75         BUG_ON(mle->type != DLM_MLE_BLOCK &&
76                mle->type != DLM_MLE_MASTER &&
77                mle->type != DLM_MLE_MIGRATION);
78
79         if (mle->type != DLM_MLE_MASTER) {
80                 *name = mle->u.mlename.name;
81                 *namelen = mle->u.mlename.len;
82                 if (namehash)
83                         *namehash = mle->u.mlename.hash;
84         } else {
85                 *name  = (unsigned char *)mle->u.mleres->lockname.name;
86                 *namelen = mle->u.mleres->lockname.len;
87                 if (namehash)
88                         *namehash = mle->u.mleres->lockname.hash;
89         }
90 }
91
92 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
93                                 struct dlm_master_list_entry *mle,
94                                 const char *name,
95                                 unsigned int namelen)
96 {
97         unsigned char *mlename;
98         unsigned int mlelen;
99
100         if (dlm != mle->dlm)
101                 return 0;
102
103         __dlm_mle_name(mle, &mlename, &mlelen, NULL);
104
105         if (namelen != mlelen || memcmp(name, mlename, namelen) != 0)
106                 return 0;
107
108         return 1;
109 }
110
111 static struct kmem_cache *dlm_lockres_cache = NULL;
112 static struct kmem_cache *dlm_lockname_cache = NULL;
113 static struct kmem_cache *dlm_mle_cache = NULL;
114
115 static void dlm_mle_release(struct kref *kref);
116 static void dlm_init_mle(struct dlm_master_list_entry *mle,
117                         enum dlm_mle_type type,
118                         struct dlm_ctxt *dlm,
119                         struct dlm_lock_resource *res,
120                         const char *name,
121                         unsigned int namelen);
122 static void dlm_put_mle(struct dlm_master_list_entry *mle);
123 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
124 static int dlm_find_mle(struct dlm_ctxt *dlm,
125                         struct dlm_master_list_entry **mle,
126                         char *name, unsigned int namelen);
127
128 static int dlm_do_master_request(struct dlm_lock_resource *res,
129                                  struct dlm_master_list_entry *mle, int to);
130
131
132 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
133                                      struct dlm_lock_resource *res,
134                                      struct dlm_master_list_entry *mle,
135                                      int *blocked);
136 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
137                                     struct dlm_lock_resource *res,
138                                     struct dlm_master_list_entry *mle,
139                                     int blocked);
140 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
141                                  struct dlm_lock_resource *res,
142                                  struct dlm_master_list_entry *mle,
143                                  struct dlm_master_list_entry **oldmle,
144                                  const char *name, unsigned int namelen,
145                                  u8 new_master, u8 master);
146
147 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
148                                     struct dlm_lock_resource *res);
149 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
150                                       struct dlm_lock_resource *res);
151 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
152                                        struct dlm_lock_resource *res,
153                                        u8 target);
154 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
155                                        struct dlm_lock_resource *res);
156
157
158 int dlm_is_host_down(int errno)
159 {
160         switch (errno) {
161                 case -EBADF:
162                 case -ECONNREFUSED:
163                 case -ENOTCONN:
164                 case -ECONNRESET:
165                 case -EPIPE:
166                 case -EHOSTDOWN:
167                 case -EHOSTUNREACH:
168                 case -ETIMEDOUT:
169                 case -ECONNABORTED:
170                 case -ENETDOWN:
171                 case -ENETUNREACH:
172                 case -ENETRESET:
173                 case -ESHUTDOWN:
174                 case -ENOPROTOOPT:
175                 case -EINVAL:   /* if returned from our tcp code,
176                                    this means there is no socket */
177                         return 1;
178         }
179         return 0;
180 }
181
182
183 /*
184  * MASTER LIST FUNCTIONS
185  */
186
187
188 /*
189  * regarding master list entries and heartbeat callbacks:
190  *
191  * in order to avoid sleeping and allocation that occurs in
192  * heartbeat, master list entries are simply attached to the
193  * dlm's established heartbeat callbacks.  the mle is attached
194  * when it is created, and since the dlm->spinlock is held at
195  * that time, any heartbeat event will be properly discovered
196  * by the mle.  the mle needs to be detached from the
197  * dlm->mle_hb_events list as soon as heartbeat events are no
198  * longer useful to the mle, and before the mle is freed.
199  *
200  * as a general rule, heartbeat events are no longer needed by
201  * the mle once an "answer" regarding the lock master has been
202  * received.
203  */
204 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
205                                               struct dlm_master_list_entry *mle)
206 {
207         assert_spin_locked(&dlm->spinlock);
208
209         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
210 }
211
212
213 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
214                                               struct dlm_master_list_entry *mle)
215 {
216         if (!list_empty(&mle->hb_events))
217                 list_del_init(&mle->hb_events);
218 }
219
220
221 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
222                                             struct dlm_master_list_entry *mle)
223 {
224         spin_lock(&dlm->spinlock);
225         __dlm_mle_detach_hb_events(dlm, mle);
226         spin_unlock(&dlm->spinlock);
227 }
228
229 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
230 {
231         struct dlm_ctxt *dlm;
232         dlm = mle->dlm;
233
234         assert_spin_locked(&dlm->spinlock);
235         assert_spin_locked(&dlm->master_lock);
236         mle->inuse++;
237         kref_get(&mle->mle_refs);
238 }
239
240 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
241 {
242         struct dlm_ctxt *dlm;
243         dlm = mle->dlm;
244
245         spin_lock(&dlm->spinlock);
246         spin_lock(&dlm->master_lock);
247         mle->inuse--;
248         __dlm_put_mle(mle);
249         spin_unlock(&dlm->master_lock);
250         spin_unlock(&dlm->spinlock);
251
252 }
253
254 /* remove from list and free */
255 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
256 {
257         struct dlm_ctxt *dlm;
258         dlm = mle->dlm;
259
260         assert_spin_locked(&dlm->spinlock);
261         assert_spin_locked(&dlm->master_lock);
262         if (!atomic_read(&mle->mle_refs.refcount)) {
263                 /* this may or may not crash, but who cares.
264                  * it's a BUG. */
265                 mlog(ML_ERROR, "bad mle: %p\n", mle);
266                 dlm_print_one_mle(mle);
267                 BUG();
268         } else
269                 kref_put(&mle->mle_refs, dlm_mle_release);
270 }
271
272
273 /* must not have any spinlocks coming in */
274 static void dlm_put_mle(struct dlm_master_list_entry *mle)
275 {
276         struct dlm_ctxt *dlm;
277         dlm = mle->dlm;
278
279         spin_lock(&dlm->spinlock);
280         spin_lock(&dlm->master_lock);
281         __dlm_put_mle(mle);
282         spin_unlock(&dlm->master_lock);
283         spin_unlock(&dlm->spinlock);
284 }
285
286 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
287 {
288         kref_get(&mle->mle_refs);
289 }
290
291 static void dlm_init_mle(struct dlm_master_list_entry *mle,
292                         enum dlm_mle_type type,
293                         struct dlm_ctxt *dlm,
294                         struct dlm_lock_resource *res,
295                         const char *name,
296                         unsigned int namelen)
297 {
298         assert_spin_locked(&dlm->spinlock);
299
300         mle->dlm = dlm;
301         mle->type = type;
302         INIT_HLIST_NODE(&mle->master_hash_node);
303         INIT_LIST_HEAD(&mle->hb_events);
304         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
305         spin_lock_init(&mle->spinlock);
306         init_waitqueue_head(&mle->wq);
307         atomic_set(&mle->woken, 0);
308         kref_init(&mle->mle_refs);
309         memset(mle->response_map, 0, sizeof(mle->response_map));
310         mle->master = O2NM_MAX_NODES;
311         mle->new_master = O2NM_MAX_NODES;
312         mle->inuse = 0;
313
314         BUG_ON(mle->type != DLM_MLE_BLOCK &&
315                mle->type != DLM_MLE_MASTER &&
316                mle->type != DLM_MLE_MIGRATION);
317
318         if (mle->type == DLM_MLE_MASTER) {
319                 BUG_ON(!res);
320                 mle->u.mleres = res;
321         } else {
322                 BUG_ON(!name);
323                 memcpy(mle->u.mlename.name, name, namelen);
324                 mle->u.mlename.len = namelen;
325                 mle->u.mlename.hash = dlm_lockid_hash(name, namelen);
326         }
327
328         atomic_inc(&dlm->mle_tot_count[mle->type]);
329         atomic_inc(&dlm->mle_cur_count[mle->type]);
330
331         /* copy off the node_map and register hb callbacks on our copy */
332         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
333         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
334         clear_bit(dlm->node_num, mle->vote_map);
335         clear_bit(dlm->node_num, mle->node_map);
336
337         /* attach the mle to the domain node up/down events */
338         __dlm_mle_attach_hb_events(dlm, mle);
339 }
340
341 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
342 {
343         assert_spin_locked(&dlm->spinlock);
344         assert_spin_locked(&dlm->master_lock);
345
346         if (!hlist_unhashed(&mle->master_hash_node))
347                 hlist_del_init(&mle->master_hash_node);
348 }
349
350 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
351 {
352         struct hlist_head *bucket;
353         unsigned char *mname;
354         unsigned int mlen, hash;
355
356         assert_spin_locked(&dlm->master_lock);
357
358         __dlm_mle_name(mle, &mname, &mlen, &hash);
359         bucket = dlm_master_hash(dlm, hash);
360         hlist_add_head(&mle->master_hash_node, bucket);
361 }
362
363 /* returns 1 if found, 0 if not */
364 static int dlm_find_mle(struct dlm_ctxt *dlm,
365                         struct dlm_master_list_entry **mle,
366                         char *name, unsigned int namelen)
367 {
368         struct dlm_master_list_entry *tmpmle;
369         struct hlist_head *bucket;
370         struct hlist_node *list;
371         unsigned int hash;
372
373         assert_spin_locked(&dlm->master_lock);
374
375         hash = dlm_lockid_hash(name, namelen);
376         bucket = dlm_master_hash(dlm, hash);
377         hlist_for_each(list, bucket) {
378                 tmpmle = hlist_entry(list, struct dlm_master_list_entry,
379                                      master_hash_node);
380                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
381                         continue;
382                 dlm_get_mle(tmpmle);
383                 *mle = tmpmle;
384                 return 1;
385         }
386         return 0;
387 }
388
389 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
390 {
391         struct dlm_master_list_entry *mle;
392
393         assert_spin_locked(&dlm->spinlock);
394         
395         list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
396                 if (node_up)
397                         dlm_mle_node_up(dlm, mle, NULL, idx);
398                 else
399                         dlm_mle_node_down(dlm, mle, NULL, idx);
400         }
401 }
402
403 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
404                               struct dlm_master_list_entry *mle,
405                               struct o2nm_node *node, int idx)
406 {
407         spin_lock(&mle->spinlock);
408
409         if (!test_bit(idx, mle->node_map))
410                 mlog(0, "node %u already removed from nodemap!\n", idx);
411         else
412                 clear_bit(idx, mle->node_map);
413
414         spin_unlock(&mle->spinlock);
415 }
416
417 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
418                             struct dlm_master_list_entry *mle,
419                             struct o2nm_node *node, int idx)
420 {
421         spin_lock(&mle->spinlock);
422
423         if (test_bit(idx, mle->node_map))
424                 mlog(0, "node %u already in node map!\n", idx);
425         else
426                 set_bit(idx, mle->node_map);
427
428         spin_unlock(&mle->spinlock);
429 }
430
431
432 int dlm_init_mle_cache(void)
433 {
434         dlm_mle_cache = kmem_cache_create("o2dlm_mle",
435                                           sizeof(struct dlm_master_list_entry),
436                                           0, SLAB_HWCACHE_ALIGN,
437                                           NULL);
438         if (dlm_mle_cache == NULL)
439                 return -ENOMEM;
440         return 0;
441 }
442
443 void dlm_destroy_mle_cache(void)
444 {
445         if (dlm_mle_cache)
446                 kmem_cache_destroy(dlm_mle_cache);
447 }
448
449 static void dlm_mle_release(struct kref *kref)
450 {
451         struct dlm_master_list_entry *mle;
452         struct dlm_ctxt *dlm;
453         unsigned char *mname;
454         unsigned int mlen;
455
456         mlog_entry_void();
457
458         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
459         dlm = mle->dlm;
460
461         assert_spin_locked(&dlm->spinlock);
462         assert_spin_locked(&dlm->master_lock);
463
464         __dlm_mle_name(mle, &mname, &mlen, NULL);
465         mlog(0, "Releasing mle for %.*s, type %d\n", mlen, mname, mle->type);
466
467         /* remove from list if not already */
468         __dlm_unlink_mle(dlm, mle);
469
470         /* detach the mle from the domain node up/down events */
471         __dlm_mle_detach_hb_events(dlm, mle);
472
473         atomic_dec(&dlm->mle_cur_count[mle->type]);
474
475         /* NOTE: kfree under spinlock here.
476          * if this is bad, we can move this to a freelist. */
477         kmem_cache_free(dlm_mle_cache, mle);
478 }
479
480
481 /*
482  * LOCK RESOURCE FUNCTIONS
483  */
484
485 int dlm_init_master_caches(void)
486 {
487         dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
488                                               sizeof(struct dlm_lock_resource),
489                                               0, SLAB_HWCACHE_ALIGN, NULL);
490         if (!dlm_lockres_cache)
491                 goto bail;
492
493         dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
494                                                DLM_LOCKID_NAME_MAX, 0,
495                                                SLAB_HWCACHE_ALIGN, NULL);
496         if (!dlm_lockname_cache)
497                 goto bail;
498
499         return 0;
500 bail:
501         dlm_destroy_master_caches();
502         return -ENOMEM;
503 }
504
505 void dlm_destroy_master_caches(void)
506 {
507         if (dlm_lockname_cache)
508                 kmem_cache_destroy(dlm_lockname_cache);
509
510         if (dlm_lockres_cache)
511                 kmem_cache_destroy(dlm_lockres_cache);
512 }
513
514 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
515                                   struct dlm_lock_resource *res,
516                                   u8 owner)
517 {
518         assert_spin_locked(&res->spinlock);
519
520         mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
521
522         if (owner == dlm->node_num)
523                 atomic_inc(&dlm->local_resources);
524         else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
525                 atomic_inc(&dlm->unknown_resources);
526         else
527                 atomic_inc(&dlm->remote_resources);
528
529         res->owner = owner;
530 }
531
532 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
533                               struct dlm_lock_resource *res, u8 owner)
534 {
535         assert_spin_locked(&res->spinlock);
536
537         if (owner == res->owner)
538                 return;
539
540         if (res->owner == dlm->node_num)
541                 atomic_dec(&dlm->local_resources);
542         else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
543                 atomic_dec(&dlm->unknown_resources);
544         else
545                 atomic_dec(&dlm->remote_resources);
546
547         dlm_set_lockres_owner(dlm, res, owner);
548 }
549
550
551 static void dlm_lockres_release(struct kref *kref)
552 {
553         struct dlm_lock_resource *res;
554         struct dlm_ctxt *dlm;
555
556         res = container_of(kref, struct dlm_lock_resource, refs);
557         dlm = res->dlm;
558
559         /* This should not happen -- all lockres' have a name
560          * associated with them at init time. */
561         BUG_ON(!res->lockname.name);
562
563         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
564              res->lockname.name);
565
566         spin_lock(&dlm->track_lock);
567         if (!list_empty(&res->tracking))
568                 list_del_init(&res->tracking);
569         else {
570                 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
571                      res->lockname.len, res->lockname.name);
572                 dlm_print_one_lock_resource(res);
573         }
574         spin_unlock(&dlm->track_lock);
575
576         dlm_put(dlm);
577
578         if (!hlist_unhashed(&res->hash_node) ||
579             !list_empty(&res->granted) ||
580             !list_empty(&res->converting) ||
581             !list_empty(&res->blocked) ||
582             !list_empty(&res->dirty) ||
583             !list_empty(&res->recovering) ||
584             !list_empty(&res->purge)) {
585                 mlog(ML_ERROR,
586                      "Going to BUG for resource %.*s."
587                      "  We're on a list! [%c%c%c%c%c%c%c]\n",
588                      res->lockname.len, res->lockname.name,
589                      !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
590                      !list_empty(&res->granted) ? 'G' : ' ',
591                      !list_empty(&res->converting) ? 'C' : ' ',
592                      !list_empty(&res->blocked) ? 'B' : ' ',
593                      !list_empty(&res->dirty) ? 'D' : ' ',
594                      !list_empty(&res->recovering) ? 'R' : ' ',
595                      !list_empty(&res->purge) ? 'P' : ' ');
596
597                 dlm_print_one_lock_resource(res);
598         }
599
600         /* By the time we're ready to blow this guy away, we shouldn't
601          * be on any lists. */
602         BUG_ON(!hlist_unhashed(&res->hash_node));
603         BUG_ON(!list_empty(&res->granted));
604         BUG_ON(!list_empty(&res->converting));
605         BUG_ON(!list_empty(&res->blocked));
606         BUG_ON(!list_empty(&res->dirty));
607         BUG_ON(!list_empty(&res->recovering));
608         BUG_ON(!list_empty(&res->purge));
609
610         kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
611
612         kmem_cache_free(dlm_lockres_cache, res);
613 }
614
615 void dlm_lockres_put(struct dlm_lock_resource *res)
616 {
617         kref_put(&res->refs, dlm_lockres_release);
618 }
619
620 static void dlm_init_lockres(struct dlm_ctxt *dlm,
621                              struct dlm_lock_resource *res,
622                              const char *name, unsigned int namelen)
623 {
624         char *qname;
625
626         /* If we memset here, we lose our reference to the kmalloc'd
627          * res->lockname.name, so be sure to init every field
628          * correctly! */
629
630         qname = (char *) res->lockname.name;
631         memcpy(qname, name, namelen);
632
633         res->lockname.len = namelen;
634         res->lockname.hash = dlm_lockid_hash(name, namelen);
635
636         init_waitqueue_head(&res->wq);
637         spin_lock_init(&res->spinlock);
638         INIT_HLIST_NODE(&res->hash_node);
639         INIT_LIST_HEAD(&res->granted);
640         INIT_LIST_HEAD(&res->converting);
641         INIT_LIST_HEAD(&res->blocked);
642         INIT_LIST_HEAD(&res->dirty);
643         INIT_LIST_HEAD(&res->recovering);
644         INIT_LIST_HEAD(&res->purge);
645         INIT_LIST_HEAD(&res->tracking);
646         atomic_set(&res->asts_reserved, 0);
647         res->migration_pending = 0;
648         res->inflight_locks = 0;
649
650         /* put in dlm_lockres_release */
651         dlm_grab(dlm);
652         res->dlm = dlm;
653
654         kref_init(&res->refs);
655
656         /* just for consistency */
657         spin_lock(&res->spinlock);
658         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
659         spin_unlock(&res->spinlock);
660
661         res->state = DLM_LOCK_RES_IN_PROGRESS;
662
663         res->last_used = 0;
664
665         spin_lock(&dlm->spinlock);
666         list_add_tail(&res->tracking, &dlm->tracking_list);
667         spin_unlock(&dlm->spinlock);
668
669         memset(res->lvb, 0, DLM_LVB_LEN);
670         memset(res->refmap, 0, sizeof(res->refmap));
671 }
672
673 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
674                                    const char *name,
675                                    unsigned int namelen)
676 {
677         struct dlm_lock_resource *res = NULL;
678
679         res = (struct dlm_lock_resource *)
680                                 kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
681         if (!res)
682                 goto error;
683
684         res->lockname.name = (char *)
685                                 kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
686         if (!res->lockname.name)
687                 goto error;
688
689         dlm_init_lockres(dlm, res, name, namelen);
690         return res;
691
692 error:
693         if (res && res->lockname.name)
694                 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
695
696         if (res)
697                 kmem_cache_free(dlm_lockres_cache, res);
698         return NULL;
699 }
700
701 void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
702                                    struct dlm_lock_resource *res,
703                                    int new_lockres,
704                                    const char *file,
705                                    int line)
706 {
707         if (!new_lockres)
708                 assert_spin_locked(&res->spinlock);
709
710         if (!test_bit(dlm->node_num, res->refmap)) {
711                 BUG_ON(res->inflight_locks != 0);
712                 dlm_lockres_set_refmap_bit(dlm->node_num, res);
713         }
714         res->inflight_locks++;
715         mlog(0, "%s:%.*s: inflight++: now %u\n",
716              dlm->name, res->lockname.len, res->lockname.name,
717              res->inflight_locks);
718 }
719
720 void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
721                                    struct dlm_lock_resource *res,
722                                    const char *file,
723                                    int line)
724 {
725         assert_spin_locked(&res->spinlock);
726
727         BUG_ON(res->inflight_locks == 0);
728         res->inflight_locks--;
729         mlog(0, "%s:%.*s: inflight--: now %u\n",
730              dlm->name, res->lockname.len, res->lockname.name,
731              res->inflight_locks);
732         if (res->inflight_locks == 0)
733                 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
734         wake_up(&res->wq);
735 }
736
737 /*
738  * lookup a lock resource by name.
739  * may already exist in the hashtable.
740  * lockid is null terminated
741  *
742  * if not, allocate enough for the lockres and for
743  * the temporary structure used in doing the mastering.
744  *
745  * also, do a lookup in the dlm->master_list to see
746  * if another node has begun mastering the same lock.
747  * if so, there should be a block entry in there
748  * for this name, and we should *not* attempt to master
749  * the lock here.   need to wait around for that node
750  * to assert_master (or die).
751  *
752  */
753 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
754                                           const char *lockid,
755                                           int namelen,
756                                           int flags)
757 {
758         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
759         struct dlm_master_list_entry *mle = NULL;
760         struct dlm_master_list_entry *alloc_mle = NULL;
761         int blocked = 0;
762         int ret, nodenum;
763         struct dlm_node_iter iter;
764         unsigned int hash;
765         int tries = 0;
766         int bit, wait_on_recovery = 0;
767         int drop_inflight_if_nonlocal = 0;
768
769         BUG_ON(!lockid);
770
771         hash = dlm_lockid_hash(lockid, namelen);
772
773         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
774
775 lookup:
776         spin_lock(&dlm->spinlock);
777         tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
778         if (tmpres) {
779                 int dropping_ref = 0;
780
781                 spin_unlock(&dlm->spinlock);
782
783                 spin_lock(&tmpres->spinlock);
784                 /* We wait for the other thread that is mastering the resource */
785                 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
786                         __dlm_wait_on_lockres(tmpres);
787                         BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
788                 }
789
790                 if (tmpres->owner == dlm->node_num) {
791                         BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
792                         dlm_lockres_grab_inflight_ref(dlm, tmpres);
793                 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
794                         dropping_ref = 1;
795                 spin_unlock(&tmpres->spinlock);
796
797                 /* wait until done messaging the master, drop our ref to allow
798                  * the lockres to be purged, start over. */
799                 if (dropping_ref) {
800                         spin_lock(&tmpres->spinlock);
801                         __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
802                         spin_unlock(&tmpres->spinlock);
803                         dlm_lockres_put(tmpres);
804                         tmpres = NULL;
805                         goto lookup;
806                 }
807
808                 mlog(0, "found in hash!\n");
809                 if (res)
810                         dlm_lockres_put(res);
811                 res = tmpres;
812                 goto leave;
813         }
814
815         if (!res) {
816                 spin_unlock(&dlm->spinlock);
817                 mlog(0, "allocating a new resource\n");
818                 /* nothing found and we need to allocate one. */
819                 alloc_mle = (struct dlm_master_list_entry *)
820                         kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
821                 if (!alloc_mle)
822                         goto leave;
823                 res = dlm_new_lockres(dlm, lockid, namelen);
824                 if (!res)
825                         goto leave;
826                 goto lookup;
827         }
828
829         mlog(0, "no lockres found, allocated our own: %p\n", res);
830
831         if (flags & LKM_LOCAL) {
832                 /* caller knows it's safe to assume it's not mastered elsewhere
833                  * DONE!  return right away */
834                 spin_lock(&res->spinlock);
835                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
836                 __dlm_insert_lockres(dlm, res);
837                 dlm_lockres_grab_inflight_ref(dlm, res);
838                 spin_unlock(&res->spinlock);
839                 spin_unlock(&dlm->spinlock);
840                 /* lockres still marked IN_PROGRESS */
841                 goto wake_waiters;
842         }
843
844         /* check master list to see if another node has started mastering it */
845         spin_lock(&dlm->master_lock);
846
847         /* if we found a block, wait for lock to be mastered by another node */
848         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
849         if (blocked) {
850                 int mig;
851                 if (mle->type == DLM_MLE_MASTER) {
852                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
853                         BUG();
854                 }
855                 mig = (mle->type == DLM_MLE_MIGRATION);
856                 /* if there is a migration in progress, let the migration
857                  * finish before continuing.  we can wait for the absence
858                  * of the MIGRATION mle: either the migrate finished or
859                  * one of the nodes died and the mle was cleaned up.
860                  * if there is a BLOCK here, but it already has a master
861                  * set, we are too late.  the master does not have a ref
862                  * for us in the refmap.  detach the mle and drop it.
863                  * either way, go back to the top and start over. */
864                 if (mig || mle->master != O2NM_MAX_NODES) {
865                         BUG_ON(mig && mle->master == dlm->node_num);
866                         /* we arrived too late.  the master does not
867                          * have a ref for us. retry. */
868                         mlog(0, "%s:%.*s: late on %s\n",
869                              dlm->name, namelen, lockid,
870                              mig ?  "MIGRATION" : "BLOCK");
871                         spin_unlock(&dlm->master_lock);
872                         spin_unlock(&dlm->spinlock);
873
874                         /* master is known, detach */
875                         if (!mig)
876                                 dlm_mle_detach_hb_events(dlm, mle);
877                         dlm_put_mle(mle);
878                         mle = NULL;
879                         /* this is lame, but we cant wait on either
880                          * the mle or lockres waitqueue here */
881                         if (mig)
882                                 msleep(100);
883                         goto lookup;
884                 }
885         } else {
886                 /* go ahead and try to master lock on this node */
887                 mle = alloc_mle;
888                 /* make sure this does not get freed below */
889                 alloc_mle = NULL;
890                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
891                 set_bit(dlm->node_num, mle->maybe_map);
892                 __dlm_insert_mle(dlm, mle);
893
894                 /* still holding the dlm spinlock, check the recovery map
895                  * to see if there are any nodes that still need to be 
896                  * considered.  these will not appear in the mle nodemap
897                  * but they might own this lockres.  wait on them. */
898                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
899                 if (bit < O2NM_MAX_NODES) {
900                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
901                              "recover before lock mastery can begin\n",
902                              dlm->name, namelen, (char *)lockid, bit);
903                         wait_on_recovery = 1;
904                 }
905         }
906
907         /* at this point there is either a DLM_MLE_BLOCK or a
908          * DLM_MLE_MASTER on the master list, so it's safe to add the
909          * lockres to the hashtable.  anyone who finds the lock will
910          * still have to wait on the IN_PROGRESS. */
911
912         /* finally add the lockres to its hash bucket */
913         __dlm_insert_lockres(dlm, res);
914         /* since this lockres is new it doesnt not require the spinlock */
915         dlm_lockres_grab_inflight_ref_new(dlm, res);
916
917         /* if this node does not become the master make sure to drop
918          * this inflight reference below */
919         drop_inflight_if_nonlocal = 1;
920
921         /* get an extra ref on the mle in case this is a BLOCK
922          * if so, the creator of the BLOCK may try to put the last
923          * ref at this time in the assert master handler, so we
924          * need an extra one to keep from a bad ptr deref. */
925         dlm_get_mle_inuse(mle);
926         spin_unlock(&dlm->master_lock);
927         spin_unlock(&dlm->spinlock);
928
929 redo_request:
930         while (wait_on_recovery) {
931                 /* any cluster changes that occurred after dropping the
932                  * dlm spinlock would be detectable be a change on the mle,
933                  * so we only need to clear out the recovery map once. */
934                 if (dlm_is_recovery_lock(lockid, namelen)) {
935                         mlog(ML_NOTICE, "%s: recovery map is not empty, but "
936                              "must master $RECOVERY lock now\n", dlm->name);
937                         if (!dlm_pre_master_reco_lockres(dlm, res))
938                                 wait_on_recovery = 0;
939                         else {
940                                 mlog(0, "%s: waiting 500ms for heartbeat state "
941                                     "change\n", dlm->name);
942                                 msleep(500);
943                         }
944                         continue;
945                 } 
946
947                 dlm_kick_recovery_thread(dlm);
948                 msleep(1000);
949                 dlm_wait_for_recovery(dlm);
950
951                 spin_lock(&dlm->spinlock);
952                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
953                 if (bit < O2NM_MAX_NODES) {
954                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
955                              "recover before lock mastery can begin\n",
956                              dlm->name, namelen, (char *)lockid, bit);
957                         wait_on_recovery = 1;
958                 } else
959                         wait_on_recovery = 0;
960                 spin_unlock(&dlm->spinlock);
961
962                 if (wait_on_recovery)
963                         dlm_wait_for_node_recovery(dlm, bit, 10000);
964         }
965
966         /* must wait for lock to be mastered elsewhere */
967         if (blocked)
968                 goto wait;
969
970         ret = -EINVAL;
971         dlm_node_iter_init(mle->vote_map, &iter);
972         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
973                 ret = dlm_do_master_request(res, mle, nodenum);
974                 if (ret < 0)
975                         mlog_errno(ret);
976                 if (mle->master != O2NM_MAX_NODES) {
977                         /* found a master ! */
978                         if (mle->master <= nodenum)
979                                 break;
980                         /* if our master request has not reached the master
981                          * yet, keep going until it does.  this is how the
982                          * master will know that asserts are needed back to
983                          * the lower nodes. */
984                         mlog(0, "%s:%.*s: requests only up to %u but master "
985                              "is %u, keep going\n", dlm->name, namelen,
986                              lockid, nodenum, mle->master);
987                 }
988         }
989
990 wait:
991         /* keep going until the response map includes all nodes */
992         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
993         if (ret < 0) {
994                 wait_on_recovery = 1;
995                 mlog(0, "%s:%.*s: node map changed, redo the "
996                      "master request now, blocked=%d\n",
997                      dlm->name, res->lockname.len,
998                      res->lockname.name, blocked);
999                 if (++tries > 20) {
1000                         mlog(ML_ERROR, "%s:%.*s: spinning on "
1001                              "dlm_wait_for_lock_mastery, blocked=%d\n", 
1002                              dlm->name, res->lockname.len, 
1003                              res->lockname.name, blocked);
1004                         dlm_print_one_lock_resource(res);
1005                         dlm_print_one_mle(mle);
1006                         tries = 0;
1007                 }
1008                 goto redo_request;
1009         }
1010
1011         mlog(0, "lockres mastered by %u\n", res->owner);
1012         /* make sure we never continue without this */
1013         BUG_ON(res->owner == O2NM_MAX_NODES);
1014
1015         /* master is known, detach if not already detached */
1016         dlm_mle_detach_hb_events(dlm, mle);
1017         dlm_put_mle(mle);
1018         /* put the extra ref */
1019         dlm_put_mle_inuse(mle);
1020
1021 wake_waiters:
1022         spin_lock(&res->spinlock);
1023         if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1024                 dlm_lockres_drop_inflight_ref(dlm, res);
1025         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1026         spin_unlock(&res->spinlock);
1027         wake_up(&res->wq);
1028
1029 leave:
1030         /* need to free the unused mle */
1031         if (alloc_mle)
1032                 kmem_cache_free(dlm_mle_cache, alloc_mle);
1033
1034         return res;
1035 }
1036
1037
1038 #define DLM_MASTERY_TIMEOUT_MS   5000
1039
1040 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1041                                      struct dlm_lock_resource *res,
1042                                      struct dlm_master_list_entry *mle,
1043                                      int *blocked)
1044 {
1045         u8 m;
1046         int ret, bit;
1047         int map_changed, voting_done;
1048         int assert, sleep;
1049
1050 recheck:
1051         ret = 0;
1052         assert = 0;
1053
1054         /* check if another node has already become the owner */
1055         spin_lock(&res->spinlock);
1056         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1057                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1058                      res->lockname.len, res->lockname.name, res->owner);
1059                 spin_unlock(&res->spinlock);
1060                 /* this will cause the master to re-assert across
1061                  * the whole cluster, freeing up mles */
1062                 if (res->owner != dlm->node_num) {
1063                         ret = dlm_do_master_request(res, mle, res->owner);
1064                         if (ret < 0) {
1065                                 /* give recovery a chance to run */
1066                                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1067                                 msleep(500);
1068                                 goto recheck;
1069                         }
1070                 }
1071                 ret = 0;
1072                 goto leave;
1073         }
1074         spin_unlock(&res->spinlock);
1075
1076         spin_lock(&mle->spinlock);
1077         m = mle->master;
1078         map_changed = (memcmp(mle->vote_map, mle->node_map,
1079                               sizeof(mle->vote_map)) != 0);
1080         voting_done = (memcmp(mle->vote_map, mle->response_map,
1081                              sizeof(mle->vote_map)) == 0);
1082
1083         /* restart if we hit any errors */
1084         if (map_changed) {
1085                 int b;
1086                 mlog(0, "%s: %.*s: node map changed, restarting\n",
1087                      dlm->name, res->lockname.len, res->lockname.name);
1088                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1089                 b = (mle->type == DLM_MLE_BLOCK);
1090                 if ((*blocked && !b) || (!*blocked && b)) {
1091                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
1092                              dlm->name, res->lockname.len, res->lockname.name,
1093                              *blocked, b);
1094                         *blocked = b;
1095                 }
1096                 spin_unlock(&mle->spinlock);
1097                 if (ret < 0) {
1098                         mlog_errno(ret);
1099                         goto leave;
1100                 }
1101                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1102                      "rechecking now\n", dlm->name, res->lockname.len,
1103                      res->lockname.name);
1104                 goto recheck;
1105         } else {
1106                 if (!voting_done) {
1107                         mlog(0, "map not changed and voting not done "
1108                              "for %s:%.*s\n", dlm->name, res->lockname.len,
1109                              res->lockname.name);
1110                 }
1111         }
1112
1113         if (m != O2NM_MAX_NODES) {
1114                 /* another node has done an assert!
1115                  * all done! */
1116                 sleep = 0;
1117         } else {
1118                 sleep = 1;
1119                 /* have all nodes responded? */
1120                 if (voting_done && !*blocked) {
1121                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1122                         if (dlm->node_num <= bit) {
1123                                 /* my node number is lowest.
1124                                  * now tell other nodes that I am
1125                                  * mastering this. */
1126                                 mle->master = dlm->node_num;
1127                                 /* ref was grabbed in get_lock_resource
1128                                  * will be dropped in dlmlock_master */
1129                                 assert = 1;
1130                                 sleep = 0;
1131                         }
1132                         /* if voting is done, but we have not received
1133                          * an assert master yet, we must sleep */
1134                 }
1135         }
1136
1137         spin_unlock(&mle->spinlock);
1138
1139         /* sleep if we haven't finished voting yet */
1140         if (sleep) {
1141                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1142
1143                 /*
1144                 if (atomic_read(&mle->mle_refs.refcount) < 2)
1145                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1146                         atomic_read(&mle->mle_refs.refcount),
1147                         res->lockname.len, res->lockname.name);
1148                 */
1149                 atomic_set(&mle->woken, 0);
1150                 (void)wait_event_timeout(mle->wq,
1151                                          (atomic_read(&mle->woken) == 1),
1152                                          timeo);
1153                 if (res->owner == O2NM_MAX_NODES) {
1154                         mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1155                              res->lockname.len, res->lockname.name);
1156                         goto recheck;
1157                 }
1158                 mlog(0, "done waiting, master is %u\n", res->owner);
1159                 ret = 0;
1160                 goto leave;
1161         }
1162
1163         ret = 0;   /* done */
1164         if (assert) {
1165                 m = dlm->node_num;
1166                 mlog(0, "about to master %.*s here, this=%u\n",
1167                      res->lockname.len, res->lockname.name, m);
1168                 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1169                 if (ret) {
1170                         /* This is a failure in the network path,
1171                          * not in the response to the assert_master
1172                          * (any nonzero response is a BUG on this node).
1173                          * Most likely a socket just got disconnected
1174                          * due to node death. */
1175                         mlog_errno(ret);
1176                 }
1177                 /* no longer need to restart lock mastery.
1178                  * all living nodes have been contacted. */
1179                 ret = 0;
1180         }
1181
1182         /* set the lockres owner */
1183         spin_lock(&res->spinlock);
1184         /* mastery reference obtained either during
1185          * assert_master_handler or in get_lock_resource */
1186         dlm_change_lockres_owner(dlm, res, m);
1187         spin_unlock(&res->spinlock);
1188
1189 leave:
1190         return ret;
1191 }
1192
1193 struct dlm_bitmap_diff_iter
1194 {
1195         int curnode;
1196         unsigned long *orig_bm;
1197         unsigned long *cur_bm;
1198         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1199 };
1200
1201 enum dlm_node_state_change
1202 {
1203         NODE_DOWN = -1,
1204         NODE_NO_CHANGE = 0,
1205         NODE_UP
1206 };
1207
1208 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1209                                       unsigned long *orig_bm,
1210                                       unsigned long *cur_bm)
1211 {
1212         unsigned long p1, p2;
1213         int i;
1214
1215         iter->curnode = -1;
1216         iter->orig_bm = orig_bm;
1217         iter->cur_bm = cur_bm;
1218
1219         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1220                 p1 = *(iter->orig_bm + i);
1221                 p2 = *(iter->cur_bm + i);
1222                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1223         }
1224 }
1225
1226 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1227                                      enum dlm_node_state_change *state)
1228 {
1229         int bit;
1230
1231         if (iter->curnode >= O2NM_MAX_NODES)
1232                 return -ENOENT;
1233
1234         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1235                             iter->curnode+1);
1236         if (bit >= O2NM_MAX_NODES) {
1237                 iter->curnode = O2NM_MAX_NODES;
1238                 return -ENOENT;
1239         }
1240
1241         /* if it was there in the original then this node died */
1242         if (test_bit(bit, iter->orig_bm))
1243                 *state = NODE_DOWN;
1244         else
1245                 *state = NODE_UP;
1246
1247         iter->curnode = bit;
1248         return bit;
1249 }
1250
1251
1252 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1253                                     struct dlm_lock_resource *res,
1254                                     struct dlm_master_list_entry *mle,
1255                                     int blocked)
1256 {
1257         struct dlm_bitmap_diff_iter bdi;
1258         enum dlm_node_state_change sc;
1259         int node;
1260         int ret = 0;
1261
1262         mlog(0, "something happened such that the "
1263              "master process may need to be restarted!\n");
1264
1265         assert_spin_locked(&mle->spinlock);
1266
1267         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1268         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1269         while (node >= 0) {
1270                 if (sc == NODE_UP) {
1271                         /* a node came up.  clear any old vote from
1272                          * the response map and set it in the vote map
1273                          * then restart the mastery. */
1274                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1275
1276                         /* redo the master request, but only for the new node */
1277                         mlog(0, "sending request to new node\n");
1278                         clear_bit(node, mle->response_map);
1279                         set_bit(node, mle->vote_map);
1280                 } else {
1281                         mlog(ML_ERROR, "node down! %d\n", node);
1282                         if (blocked) {
1283                                 int lowest = find_next_bit(mle->maybe_map,
1284                                                        O2NM_MAX_NODES, 0);
1285
1286                                 /* act like it was never there */
1287                                 clear_bit(node, mle->maybe_map);
1288
1289                                 if (node == lowest) {
1290                                         mlog(0, "expected master %u died"
1291                                             " while this node was blocked "
1292                                             "waiting on it!\n", node);
1293                                         lowest = find_next_bit(mle->maybe_map,
1294                                                         O2NM_MAX_NODES,
1295                                                         lowest+1);
1296                                         if (lowest < O2NM_MAX_NODES) {
1297                                                 mlog(0, "%s:%.*s:still "
1298                                                      "blocked. waiting on %u "
1299                                                      "now\n", dlm->name,
1300                                                      res->lockname.len,
1301                                                      res->lockname.name,
1302                                                      lowest);
1303                                         } else {
1304                                                 /* mle is an MLE_BLOCK, but
1305                                                  * there is now nothing left to
1306                                                  * block on.  we need to return
1307                                                  * all the way back out and try
1308                                                  * again with an MLE_MASTER.
1309                                                  * dlm_do_local_recovery_cleanup
1310                                                  * has already run, so the mle
1311                                                  * refcount is ok */
1312                                                 mlog(0, "%s:%.*s: no "
1313                                                      "longer blocking. try to "
1314                                                      "master this here\n",
1315                                                      dlm->name,
1316                                                      res->lockname.len,
1317                                                      res->lockname.name);
1318                                                 mle->type = DLM_MLE_MASTER;
1319                                                 mle->u.mleres = res;
1320                                         }
1321                                 }
1322                         }
1323
1324                         /* now blank out everything, as if we had never
1325                          * contacted anyone */
1326                         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1327                         memset(mle->response_map, 0, sizeof(mle->response_map));
1328                         /* reset the vote_map to the current node_map */
1329                         memcpy(mle->vote_map, mle->node_map,
1330                                sizeof(mle->node_map));
1331                         /* put myself into the maybe map */
1332                         if (mle->type != DLM_MLE_BLOCK)
1333                                 set_bit(dlm->node_num, mle->maybe_map);
1334                 }
1335                 ret = -EAGAIN;
1336                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1337         }
1338         return ret;
1339 }
1340
1341
1342 /*
1343  * DLM_MASTER_REQUEST_MSG
1344  *
1345  * returns: 0 on success,
1346  *          -errno on a network error
1347  *
1348  * on error, the caller should assume the target node is "dead"
1349  *
1350  */
1351
1352 static int dlm_do_master_request(struct dlm_lock_resource *res,
1353                                  struct dlm_master_list_entry *mle, int to)
1354 {
1355         struct dlm_ctxt *dlm = mle->dlm;
1356         struct dlm_master_request request;
1357         int ret, response=0, resend;
1358         unsigned char *mlename;
1359         unsigned int mlenamelen;
1360
1361         memset(&request, 0, sizeof(request));
1362         request.node_idx = dlm->node_num;
1363
1364         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1365
1366         __dlm_mle_name(mle, &mlename, &mlenamelen, NULL);
1367
1368         request.namelen = (u8)mlenamelen;
1369         memcpy(request.name, mlename, request.namelen);
1370
1371 again:
1372         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1373                                  sizeof(request), to, &response);
1374         if (ret < 0)  {
1375                 if (ret == -ESRCH) {
1376                         /* should never happen */
1377                         mlog(ML_ERROR, "TCP stack not ready!\n");
1378                         BUG();
1379                 } else if (ret == -EINVAL) {
1380                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1381                         BUG();
1382                 } else if (ret == -ENOMEM) {
1383                         mlog(ML_ERROR, "out of memory while trying to send "
1384                              "network message!  retrying\n");
1385                         /* this is totally crude */
1386                         msleep(50);
1387                         goto again;
1388                 } else if (!dlm_is_host_down(ret)) {
1389                         /* not a network error. bad. */
1390                         mlog_errno(ret);
1391                         mlog(ML_ERROR, "unhandled error!");
1392                         BUG();
1393                 }
1394                 /* all other errors should be network errors,
1395                  * and likely indicate node death */
1396                 mlog(ML_ERROR, "link to %d went down!\n", to);
1397                 goto out;
1398         }
1399
1400         ret = 0;
1401         resend = 0;
1402         spin_lock(&mle->spinlock);
1403         switch (response) {
1404                 case DLM_MASTER_RESP_YES:
1405                         set_bit(to, mle->response_map);
1406                         mlog(0, "node %u is the master, response=YES\n", to);
1407                         mlog(0, "%s:%.*s: master node %u now knows I have a "
1408                              "reference\n", dlm->name, res->lockname.len,
1409                              res->lockname.name, to);
1410                         mle->master = to;
1411                         break;
1412                 case DLM_MASTER_RESP_NO:
1413                         mlog(0, "node %u not master, response=NO\n", to);
1414                         set_bit(to, mle->response_map);
1415                         break;
1416                 case DLM_MASTER_RESP_MAYBE:
1417                         mlog(0, "node %u not master, response=MAYBE\n", to);
1418                         set_bit(to, mle->response_map);
1419                         set_bit(to, mle->maybe_map);
1420                         break;
1421                 case DLM_MASTER_RESP_ERROR:
1422                         mlog(0, "node %u hit an error, resending\n", to);
1423                         resend = 1;
1424                         response = 0;
1425                         break;
1426                 default:
1427                         mlog(ML_ERROR, "bad response! %u\n", response);
1428                         BUG();
1429         }
1430         spin_unlock(&mle->spinlock);
1431         if (resend) {
1432                 /* this is also totally crude */
1433                 msleep(50);
1434                 goto again;
1435         }
1436
1437 out:
1438         return ret;
1439 }
1440
1441 /*
1442  * locks that can be taken here:
1443  * dlm->spinlock
1444  * res->spinlock
1445  * mle->spinlock
1446  * dlm->master_list
1447  *
1448  * if possible, TRIM THIS DOWN!!!
1449  */
1450 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1451                                void **ret_data)
1452 {
1453         u8 response = DLM_MASTER_RESP_MAYBE;
1454         struct dlm_ctxt *dlm = data;
1455         struct dlm_lock_resource *res = NULL;
1456         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1457         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1458         char *name;
1459         unsigned int namelen, hash;
1460         int found, ret;
1461         int set_maybe;
1462         int dispatch_assert = 0;
1463
1464         if (!dlm_grab(dlm))
1465                 return DLM_MASTER_RESP_NO;
1466
1467         if (!dlm_domain_fully_joined(dlm)) {
1468                 response = DLM_MASTER_RESP_NO;
1469                 goto send_response;
1470         }
1471
1472         name = request->name;
1473         namelen = request->namelen;
1474         hash = dlm_lockid_hash(name, namelen);
1475
1476         if (namelen > DLM_LOCKID_NAME_MAX) {
1477                 response = DLM_IVBUFLEN;
1478                 goto send_response;
1479         }
1480
1481 way_up_top:
1482         spin_lock(&dlm->spinlock);
1483         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1484         if (res) {
1485                 spin_unlock(&dlm->spinlock);
1486
1487                 /* take care of the easy cases up front */
1488                 spin_lock(&res->spinlock);
1489                 if (res->state & (DLM_LOCK_RES_RECOVERING|
1490                                   DLM_LOCK_RES_MIGRATING)) {
1491                         spin_unlock(&res->spinlock);
1492                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1493                              "being recovered/migrated\n");
1494                         response = DLM_MASTER_RESP_ERROR;
1495                         if (mle)
1496                                 kmem_cache_free(dlm_mle_cache, mle);
1497                         goto send_response;
1498                 }
1499
1500                 if (res->owner == dlm->node_num) {
1501                         mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1502                              dlm->name, namelen, name, request->node_idx);
1503                         dlm_lockres_set_refmap_bit(request->node_idx, res);
1504                         spin_unlock(&res->spinlock);
1505                         response = DLM_MASTER_RESP_YES;
1506                         if (mle)
1507                                 kmem_cache_free(dlm_mle_cache, mle);
1508
1509                         /* this node is the owner.
1510                          * there is some extra work that needs to
1511                          * happen now.  the requesting node has
1512                          * caused all nodes up to this one to
1513                          * create mles.  this node now needs to
1514                          * go back and clean those up. */
1515                         dispatch_assert = 1;
1516                         goto send_response;
1517                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1518                         spin_unlock(&res->spinlock);
1519                         // mlog(0, "node %u is the master\n", res->owner);
1520                         response = DLM_MASTER_RESP_NO;
1521                         if (mle)
1522                                 kmem_cache_free(dlm_mle_cache, mle);
1523                         goto send_response;
1524                 }
1525
1526                 /* ok, there is no owner.  either this node is
1527                  * being blocked, or it is actively trying to
1528                  * master this lock. */
1529                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1530                         mlog(ML_ERROR, "lock with no owner should be "
1531                              "in-progress!\n");
1532                         BUG();
1533                 }
1534
1535                 // mlog(0, "lockres is in progress...\n");
1536                 spin_lock(&dlm->master_lock);
1537                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1538                 if (!found) {
1539                         mlog(ML_ERROR, "no mle found for this lock!\n");
1540                         BUG();
1541                 }
1542                 set_maybe = 1;
1543                 spin_lock(&tmpmle->spinlock);
1544                 if (tmpmle->type == DLM_MLE_BLOCK) {
1545                         // mlog(0, "this node is waiting for "
1546                         // "lockres to be mastered\n");
1547                         response = DLM_MASTER_RESP_NO;
1548                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1549                         mlog(0, "node %u is master, but trying to migrate to "
1550                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1551                         if (tmpmle->master == dlm->node_num) {
1552                                 mlog(ML_ERROR, "no owner on lockres, but this "
1553                                      "node is trying to migrate it to %u?!\n",
1554                                      tmpmle->new_master);
1555                                 BUG();
1556                         } else {
1557                                 /* the real master can respond on its own */
1558                                 response = DLM_MASTER_RESP_NO;
1559                         }
1560                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1561                         set_maybe = 0;
1562                         if (tmpmle->master == dlm->node_num) {
1563                                 response = DLM_MASTER_RESP_YES;
1564                                 /* this node will be the owner.
1565                                  * go back and clean the mles on any
1566                                  * other nodes */
1567                                 dispatch_assert = 1;
1568                                 dlm_lockres_set_refmap_bit(request->node_idx, res);
1569                                 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1570                                      dlm->name, namelen, name,
1571                                      request->node_idx);
1572                         } else
1573                                 response = DLM_MASTER_RESP_NO;
1574                 } else {
1575                         // mlog(0, "this node is attempting to "
1576                         // "master lockres\n");
1577                         response = DLM_MASTER_RESP_MAYBE;
1578                 }
1579                 if (set_maybe)
1580                         set_bit(request->node_idx, tmpmle->maybe_map);
1581                 spin_unlock(&tmpmle->spinlock);
1582
1583                 spin_unlock(&dlm->master_lock);
1584                 spin_unlock(&res->spinlock);
1585
1586                 /* keep the mle attached to heartbeat events */
1587                 dlm_put_mle(tmpmle);
1588                 if (mle)
1589                         kmem_cache_free(dlm_mle_cache, mle);
1590                 goto send_response;
1591         }
1592
1593         /*
1594          * lockres doesn't exist on this node
1595          * if there is an MLE_BLOCK, return NO
1596          * if there is an MLE_MASTER, return MAYBE
1597          * otherwise, add an MLE_BLOCK, return NO
1598          */
1599         spin_lock(&dlm->master_lock);
1600         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1601         if (!found) {
1602                 /* this lockid has never been seen on this node yet */
1603                 // mlog(0, "no mle found\n");
1604                 if (!mle) {
1605                         spin_unlock(&dlm->master_lock);
1606                         spin_unlock(&dlm->spinlock);
1607
1608                         mle = (struct dlm_master_list_entry *)
1609                                 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1610                         if (!mle) {
1611                                 response = DLM_MASTER_RESP_ERROR;
1612                                 mlog_errno(-ENOMEM);
1613                                 goto send_response;
1614                         }
1615                         goto way_up_top;
1616                 }
1617
1618                 // mlog(0, "this is second time thru, already allocated, "
1619                 // "add the block.\n");
1620                 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1621                 set_bit(request->node_idx, mle->maybe_map);
1622                 __dlm_insert_mle(dlm, mle);
1623                 response = DLM_MASTER_RESP_NO;
1624         } else {
1625                 // mlog(0, "mle was found\n");
1626                 set_maybe = 1;
1627                 spin_lock(&tmpmle->spinlock);
1628                 if (tmpmle->master == dlm->node_num) {
1629                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1630                         BUG();
1631                 }
1632                 if (tmpmle->type == DLM_MLE_BLOCK)
1633                         response = DLM_MASTER_RESP_NO;
1634                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1635                         mlog(0, "migration mle was found (%u->%u)\n",
1636                              tmpmle->master, tmpmle->new_master);
1637                         /* real master can respond on its own */
1638                         response = DLM_MASTER_RESP_NO;
1639                 } else
1640                         response = DLM_MASTER_RESP_MAYBE;
1641                 if (set_maybe)
1642                         set_bit(request->node_idx, tmpmle->maybe_map);
1643                 spin_unlock(&tmpmle->spinlock);
1644         }
1645         spin_unlock(&dlm->master_lock);
1646         spin_unlock(&dlm->spinlock);
1647
1648         if (found) {
1649                 /* keep the mle attached to heartbeat events */
1650                 dlm_put_mle(tmpmle);
1651         }
1652 send_response:
1653         /*
1654          * __dlm_lookup_lockres() grabbed a reference to this lockres.
1655          * The reference is released by dlm_assert_master_worker() under
1656          * the call to dlm_dispatch_assert_master().  If
1657          * dlm_assert_master_worker() isn't called, we drop it here.
1658          */
1659         if (dispatch_assert) {
1660                 if (response != DLM_MASTER_RESP_YES)
1661                         mlog(ML_ERROR, "invalid response %d\n", response);
1662                 if (!res) {
1663                         mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1664                         BUG();
1665                 }
1666                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1667                              dlm->node_num, res->lockname.len, res->lockname.name);
1668                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
1669                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1670                 if (ret < 0) {
1671                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1672                         response = DLM_MASTER_RESP_ERROR;
1673                         dlm_lockres_put(res);
1674                 }
1675         } else {
1676                 if (res)
1677                         dlm_lockres_put(res);
1678         }
1679
1680         dlm_put(dlm);
1681         return response;
1682 }
1683
1684 /*
1685  * DLM_ASSERT_MASTER_MSG
1686  */
1687
1688
1689 /*
1690  * NOTE: this can be used for debugging
1691  * can periodically run all locks owned by this node
1692  * and re-assert across the cluster...
1693  */
1694 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1695                                 struct dlm_lock_resource *res,
1696                                 void *nodemap, u32 flags)
1697 {
1698         struct dlm_assert_master assert;
1699         int to, tmpret;
1700         struct dlm_node_iter iter;
1701         int ret = 0;
1702         int reassert;
1703         const char *lockname = res->lockname.name;
1704         unsigned int namelen = res->lockname.len;
1705
1706         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1707
1708         spin_lock(&res->spinlock);
1709         res->state |= DLM_LOCK_RES_SETREF_INPROG;
1710         spin_unlock(&res->spinlock);
1711
1712 again:
1713         reassert = 0;
1714
1715         /* note that if this nodemap is empty, it returns 0 */
1716         dlm_node_iter_init(nodemap, &iter);
1717         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1718                 int r = 0;
1719                 struct dlm_master_list_entry *mle = NULL;
1720
1721                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1722                      namelen, lockname);
1723                 memset(&assert, 0, sizeof(assert));
1724                 assert.node_idx = dlm->node_num;
1725                 assert.namelen = namelen;
1726                 memcpy(assert.name, lockname, namelen);
1727                 assert.flags = cpu_to_be32(flags);
1728
1729                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1730                                             &assert, sizeof(assert), to, &r);
1731                 if (tmpret < 0) {
1732                         mlog(0, "assert_master returned %d!\n", tmpret);
1733                         if (!dlm_is_host_down(tmpret)) {
1734                                 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1735                                 BUG();
1736                         }
1737                         /* a node died.  finish out the rest of the nodes. */
1738                         mlog(0, "link to %d went down!\n", to);
1739                         /* any nonzero status return will do */
1740                         ret = tmpret;
1741                         r = 0;
1742                 } else if (r < 0) {
1743                         /* ok, something horribly messed.  kill thyself. */
1744                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1745                              "got %d.\n", namelen, lockname, to, r);
1746                         spin_lock(&dlm->spinlock);
1747                         spin_lock(&dlm->master_lock);
1748                         if (dlm_find_mle(dlm, &mle, (char *)lockname,
1749                                          namelen)) {
1750                                 dlm_print_one_mle(mle);
1751                                 __dlm_put_mle(mle);
1752                         }
1753                         spin_unlock(&dlm->master_lock);
1754                         spin_unlock(&dlm->spinlock);
1755                         BUG();
1756                 }
1757
1758                 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1759                     !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1760                                 mlog(ML_ERROR, "%.*s: very strange, "
1761                                      "master MLE but no lockres on %u\n",
1762                                      namelen, lockname, to);
1763                 }
1764
1765                 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1766                         mlog(0, "%.*s: node %u create mles on other "
1767                              "nodes and requests a re-assert\n", 
1768                              namelen, lockname, to);
1769                         reassert = 1;
1770                 }
1771                 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1772                         mlog(0, "%.*s: node %u has a reference to this "
1773                              "lockres, set the bit in the refmap\n",
1774                              namelen, lockname, to);
1775                         spin_lock(&res->spinlock);
1776                         dlm_lockres_set_refmap_bit(to, res);
1777                         spin_unlock(&res->spinlock);
1778                 }
1779         }
1780
1781         if (reassert)
1782                 goto again;
1783
1784         spin_lock(&res->spinlock);
1785         res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1786         spin_unlock(&res->spinlock);
1787         wake_up(&res->wq);
1788
1789         return ret;
1790 }
1791
1792 /*
1793  * locks that can be taken here:
1794  * dlm->spinlock
1795  * res->spinlock
1796  * mle->spinlock
1797  * dlm->master_list
1798  *
1799  * if possible, TRIM THIS DOWN!!!
1800  */
1801 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1802                               void **ret_data)
1803 {
1804         struct dlm_ctxt *dlm = data;
1805         struct dlm_master_list_entry *mle = NULL;
1806         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1807         struct dlm_lock_resource *res = NULL;
1808         char *name;
1809         unsigned int namelen, hash;
1810         u32 flags;
1811         int master_request = 0, have_lockres_ref = 0;
1812         int ret = 0;
1813
1814         if (!dlm_grab(dlm))
1815                 return 0;
1816
1817         name = assert->name;
1818         namelen = assert->namelen;
1819         hash = dlm_lockid_hash(name, namelen);
1820         flags = be32_to_cpu(assert->flags);
1821
1822         if (namelen > DLM_LOCKID_NAME_MAX) {
1823                 mlog(ML_ERROR, "Invalid name length!");
1824                 goto done;
1825         }
1826
1827         spin_lock(&dlm->spinlock);
1828
1829         if (flags)
1830                 mlog(0, "assert_master with flags: %u\n", flags);
1831
1832         /* find the MLE */
1833         spin_lock(&dlm->master_lock);
1834         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1835                 /* not an error, could be master just re-asserting */
1836                 mlog(0, "just got an assert_master from %u, but no "
1837                      "MLE for it! (%.*s)\n", assert->node_idx,
1838                      namelen, name);
1839         } else {
1840                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1841                 if (bit >= O2NM_MAX_NODES) {
1842                         /* not necessarily an error, though less likely.
1843                          * could be master just re-asserting. */
1844                         mlog(0, "no bits set in the maybe_map, but %u "
1845                              "is asserting! (%.*s)\n", assert->node_idx,
1846                              namelen, name);
1847                 } else if (bit != assert->node_idx) {
1848                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1849                                 mlog(0, "master %u was found, %u should "
1850                                      "back off\n", assert->node_idx, bit);
1851                         } else {
1852                                 /* with the fix for bug 569, a higher node
1853                                  * number winning the mastery will respond
1854                                  * YES to mastery requests, but this node
1855                                  * had no way of knowing.  let it pass. */
1856                                 mlog(0, "%u is the lowest node, "
1857                                      "%u is asserting. (%.*s)  %u must "
1858                                      "have begun after %u won.\n", bit,
1859                                      assert->node_idx, namelen, name, bit,
1860                                      assert->node_idx);
1861                         }
1862                 }
1863                 if (mle->type == DLM_MLE_MIGRATION) {
1864                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1865                                 mlog(0, "%s:%.*s: got cleanup assert"
1866                                      " from %u for migration\n",
1867                                      dlm->name, namelen, name,
1868                                      assert->node_idx);
1869                         } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1870                                 mlog(0, "%s:%.*s: got unrelated assert"
1871                                      " from %u for migration, ignoring\n",
1872                                      dlm->name, namelen, name,
1873                                      assert->node_idx);
1874                                 __dlm_put_mle(mle);
1875                                 spin_unlock(&dlm->master_lock);
1876                                 spin_unlock(&dlm->spinlock);
1877                                 goto done;
1878                         }       
1879                 }
1880         }
1881         spin_unlock(&dlm->master_lock);
1882
1883         /* ok everything checks out with the MLE
1884          * now check to see if there is a lockres */
1885         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1886         if (res) {
1887                 spin_lock(&res->spinlock);
1888                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1889                         mlog(ML_ERROR, "%u asserting but %.*s is "
1890                              "RECOVERING!\n", assert->node_idx, namelen, name);
1891                         goto kill;
1892                 }
1893                 if (!mle) {
1894                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1895                             res->owner != assert->node_idx) {
1896                                 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1897                                      "but current owner is %u! (%.*s)\n",
1898                                      assert->node_idx, res->owner, namelen,
1899                                      name);
1900                                 __dlm_print_one_lock_resource(res);
1901                                 BUG();
1902                         }
1903                 } else if (mle->type != DLM_MLE_MIGRATION) {
1904                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1905                                 /* owner is just re-asserting */
1906                                 if (res->owner == assert->node_idx) {
1907                                         mlog(0, "owner %u re-asserting on "
1908                                              "lock %.*s\n", assert->node_idx,
1909                                              namelen, name);
1910                                         goto ok;
1911                                 }
1912                                 mlog(ML_ERROR, "got assert_master from "
1913                                      "node %u, but %u is the owner! "
1914                                      "(%.*s)\n", assert->node_idx,
1915                                      res->owner, namelen, name);
1916                                 goto kill;
1917                         }
1918                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1919                                 mlog(ML_ERROR, "got assert from %u, but lock "
1920                                      "with no owner should be "
1921                                      "in-progress! (%.*s)\n",
1922                                      assert->node_idx,
1923                                      namelen, name);
1924                                 goto kill;
1925                         }
1926                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1927                         /* should only be getting an assert from new master */
1928                         if (assert->node_idx != mle->new_master) {
1929                                 mlog(ML_ERROR, "got assert from %u, but "
1930                                      "new master is %u, and old master "
1931                                      "was %u (%.*s)\n",
1932                                      assert->node_idx, mle->new_master,
1933                                      mle->master, namelen, name);
1934                                 goto kill;
1935                         }
1936
1937                 }
1938 ok:
1939                 spin_unlock(&res->spinlock);
1940         }
1941         spin_unlock(&dlm->spinlock);
1942
1943         // mlog(0, "woo!  got an assert_master from node %u!\n",
1944         //           assert->node_idx);
1945         if (mle) {
1946                 int extra_ref = 0;
1947                 int nn = -1;
1948                 int rr, err = 0;
1949                 
1950                 spin_lock(&mle->spinlock);
1951                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1952                         extra_ref = 1;
1953                 else {
1954                         /* MASTER mle: if any bits set in the response map
1955                          * then the calling node needs to re-assert to clear
1956                          * up nodes that this node contacted */
1957                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
1958                                                     nn+1)) < O2NM_MAX_NODES) {
1959                                 if (nn != dlm->node_num && nn != assert->node_idx)
1960                                         master_request = 1;
1961                         }
1962                 }
1963                 mle->master = assert->node_idx;
1964                 atomic_set(&mle->woken, 1);
1965                 wake_up(&mle->wq);
1966                 spin_unlock(&mle->spinlock);
1967
1968                 if (res) {
1969                         int wake = 0;
1970                         spin_lock(&res->spinlock);
1971                         if (mle->type == DLM_MLE_MIGRATION) {
1972                                 mlog(0, "finishing off migration of lockres %.*s, "
1973                                         "from %u to %u\n",
1974                                         res->lockname.len, res->lockname.name,
1975                                         dlm->node_num, mle->new_master);
1976                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1977                                 wake = 1;
1978                                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1979                                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1980                         } else {
1981                                 dlm_change_lockres_owner(dlm, res, mle->master);
1982                         }
1983                         spin_unlock(&res->spinlock);
1984                         have_lockres_ref = 1;
1985                         if (wake)
1986                                 wake_up(&res->wq);
1987                 }
1988
1989                 /* master is known, detach if not already detached.
1990                  * ensures that only one assert_master call will happen
1991                  * on this mle. */
1992                 spin_lock(&dlm->spinlock);
1993                 spin_lock(&dlm->master_lock);
1994
1995                 rr = atomic_read(&mle->mle_refs.refcount);
1996                 if (mle->inuse > 0) {
1997                         if (extra_ref && rr < 3)
1998                                 err = 1;
1999                         else if (!extra_ref && rr < 2)
2000                                 err = 1;
2001                 } else {
2002                         if (extra_ref && rr < 2)
2003                                 err = 1;
2004                         else if (!extra_ref && rr < 1)
2005                                 err = 1;
2006                 }
2007                 if (err) {
2008                         mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
2009                              "that will mess up this node, refs=%d, extra=%d, "
2010                              "inuse=%d\n", dlm->name, namelen, name,
2011                              assert->node_idx, rr, extra_ref, mle->inuse);
2012                         dlm_print_one_mle(mle);
2013                 }
2014                 __dlm_unlink_mle(dlm, mle);
2015                 __dlm_mle_detach_hb_events(dlm, mle);
2016                 __dlm_put_mle(mle);
2017                 if (extra_ref) {
2018                         /* the assert master message now balances the extra
2019                          * ref given by the master / migration request message.
2020                          * if this is the last put, it will be removed
2021                          * from the list. */
2022                         __dlm_put_mle(mle);
2023                 }
2024                 spin_unlock(&dlm->master_lock);
2025                 spin_unlock(&dlm->spinlock);
2026         } else if (res) {
2027                 if (res->owner != assert->node_idx) {
2028                         mlog(0, "assert_master from %u, but current "
2029                              "owner is %u (%.*s), no mle\n", assert->node_idx,
2030                              res->owner, namelen, name);
2031                 }
2032         }
2033
2034 done:
2035         ret = 0;
2036         if (res) {
2037                 spin_lock(&res->spinlock);
2038                 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2039                 spin_unlock(&res->spinlock);
2040                 *ret_data = (void *)res;
2041         }
2042         dlm_put(dlm);
2043         if (master_request) {
2044                 mlog(0, "need to tell master to reassert\n");
2045                 /* positive. negative would shoot down the node. */
2046                 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2047                 if (!have_lockres_ref) {
2048                         mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2049                              "mle present here for %s:%.*s, but no lockres!\n",
2050                              assert->node_idx, dlm->name, namelen, name);
2051                 }
2052         }
2053         if (have_lockres_ref) {
2054                 /* let the master know we have a reference to the lockres */
2055                 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2056                 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2057                      dlm->name, namelen, name, assert->node_idx);
2058         }
2059         return ret;
2060
2061 kill:
2062         /* kill the caller! */
2063         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2064              "and killing the other node now!  This node is OK and can continue.\n");
2065         __dlm_print_one_lock_resource(res);
2066         spin_unlock(&res->spinlock);
2067         spin_unlock(&dlm->spinlock);
2068         *ret_data = (void *)res; 
2069         dlm_put(dlm);
2070         return -EINVAL;
2071 }
2072
2073 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2074 {
2075         struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2076
2077         if (ret_data) {
2078                 spin_lock(&res->spinlock);
2079                 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2080                 spin_unlock(&res->spinlock);
2081                 wake_up(&res->wq);
2082                 dlm_lockres_put(res);
2083         }
2084         return;
2085 }
2086
2087 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2088                                struct dlm_lock_resource *res,
2089                                int ignore_higher, u8 request_from, u32 flags)
2090 {
2091         struct dlm_work_item *item;
2092         item = kzalloc(sizeof(*item), GFP_NOFS);
2093         if (!item)
2094                 return -ENOMEM;
2095
2096
2097         /* queue up work for dlm_assert_master_worker */
2098         dlm_grab(dlm);  /* get an extra ref for the work item */
2099         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2100         item->u.am.lockres = res; /* already have a ref */
2101         /* can optionally ignore node numbers higher than this node */
2102         item->u.am.ignore_higher = ignore_higher;
2103         item->u.am.request_from = request_from;
2104         item->u.am.flags = flags;
2105
2106         if (ignore_higher) 
2107                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
2108                      res->lockname.name);
2109                 
2110         spin_lock(&dlm->work_lock);
2111         list_add_tail(&item->list, &dlm->work_list);
2112         spin_unlock(&dlm->work_lock);
2113
2114         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2115         return 0;
2116 }
2117
2118 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2119 {
2120         struct dlm_ctxt *dlm = data;
2121         int ret = 0;
2122         struct dlm_lock_resource *res;
2123         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2124         int ignore_higher;
2125         int bit;
2126         u8 request_from;
2127         u32 flags;
2128
2129         dlm = item->dlm;
2130         res = item->u.am.lockres;
2131         ignore_higher = item->u.am.ignore_higher;
2132         request_from = item->u.am.request_from;
2133         flags = item->u.am.flags;
2134
2135         spin_lock(&dlm->spinlock);
2136         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2137         spin_unlock(&dlm->spinlock);
2138
2139         clear_bit(dlm->node_num, nodemap);
2140         if (ignore_higher) {
2141                 /* if is this just to clear up mles for nodes below
2142                  * this node, do not send the message to the original
2143                  * caller or any node number higher than this */
2144                 clear_bit(request_from, nodemap);
2145                 bit = dlm->node_num;
2146                 while (1) {
2147                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2148                                             bit+1);
2149                         if (bit >= O2NM_MAX_NODES)
2150                                 break;
2151                         clear_bit(bit, nodemap);
2152                 }
2153         }
2154
2155         /*
2156          * If we're migrating this lock to someone else, we are no
2157          * longer allowed to assert out own mastery.  OTOH, we need to
2158          * prevent migration from starting while we're still asserting
2159          * our dominance.  The reserved ast delays migration.
2160          */
2161         spin_lock(&res->spinlock);
2162         if (res->state & DLM_LOCK_RES_MIGRATING) {
2163                 mlog(0, "Someone asked us to assert mastery, but we're "
2164                      "in the middle of migration.  Skipping assert, "
2165                      "the new master will handle that.\n");
2166                 spin_unlock(&res->spinlock);
2167                 goto put;
2168         } else
2169                 __dlm_lockres_reserve_ast(res);
2170         spin_unlock(&res->spinlock);
2171
2172         /* this call now finishes out the nodemap
2173          * even if one or more nodes die */
2174         mlog(0, "worker about to master %.*s here, this=%u\n",
2175                      res->lockname.len, res->lockname.name, dlm->node_num);
2176         ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2177         if (ret < 0) {
2178                 /* no need to restart, we are done */
2179                 if (!dlm_is_host_down(ret))
2180                         mlog_errno(ret);
2181         }
2182
2183         /* Ok, we've asserted ourselves.  Let's let migration start. */
2184         dlm_lockres_release_ast(dlm, res);
2185
2186 put:
2187         dlm_lockres_put(res);
2188
2189         mlog(0, "finished with dlm_assert_master_worker\n");
2190 }
2191
2192 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2193  * We cannot wait for node recovery to complete to begin mastering this
2194  * lockres because this lockres is used to kick off recovery! ;-)
2195  * So, do a pre-check on all living nodes to see if any of those nodes
2196  * think that $RECOVERY is currently mastered by a dead node.  If so,
2197  * we wait a short time to allow that node to get notified by its own
2198  * heartbeat stack, then check again.  All $RECOVERY lock resources
2199  * mastered by dead nodes are purged when the hearbeat callback is 
2200  * fired, so we can know for sure that it is safe to continue once
2201  * the node returns a live node or no node.  */
2202 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2203                                        struct dlm_lock_resource *res)
2204 {
2205         struct dlm_node_iter iter;
2206         int nodenum;
2207         int ret = 0;
2208         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2209
2210         spin_lock(&dlm->spinlock);
2211         dlm_node_iter_init(dlm->domain_map, &iter);
2212         spin_unlock(&dlm->spinlock);
2213
2214         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2215                 /* do not send to self */
2216                 if (nodenum == dlm->node_num)
2217                         continue;
2218                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2219                 if (ret < 0) {
2220                         mlog_errno(ret);
2221                         if (!dlm_is_host_down(ret))
2222                                 BUG();
2223                         /* host is down, so answer for that node would be
2224                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2225                         ret = 0;
2226                 }
2227
2228                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2229                         /* check to see if this master is in the recovery map */
2230                         spin_lock(&dlm->spinlock);
2231                         if (test_bit(master, dlm->recovery_map)) {
2232                                 mlog(ML_NOTICE, "%s: node %u has not seen "
2233                                      "node %u go down yet, and thinks the "
2234                                      "dead node is mastering the recovery "
2235                                      "lock.  must wait.\n", dlm->name,
2236                                      nodenum, master);
2237                                 ret = -EAGAIN;
2238                         }
2239                         spin_unlock(&dlm->spinlock);
2240                         mlog(0, "%s: reco lock master is %u\n", dlm->name, 
2241                              master);
2242                         break;
2243                 }
2244         }
2245         return ret;
2246 }
2247
2248 /*
2249  * DLM_DEREF_LOCKRES_MSG
2250  */
2251
2252 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2253 {
2254         struct dlm_deref_lockres deref;
2255         int ret = 0, r;
2256         const char *lockname;
2257         unsigned int namelen;
2258
2259         lockname = res->lockname.name;
2260         namelen = res->lockname.len;
2261         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2262
2263         mlog(0, "%s:%.*s: sending deref to %d\n",
2264              dlm->name, namelen, lockname, res->owner);
2265         memset(&deref, 0, sizeof(deref));
2266         deref.node_idx = dlm->node_num;
2267         deref.namelen = namelen;
2268         memcpy(deref.name, lockname, namelen);
2269
2270         ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2271                                  &deref, sizeof(deref), res->owner, &r);
2272         if (ret < 0)
2273                 mlog_errno(ret);
2274         else if (r < 0) {
2275                 /* BAD.  other node says I did not have a ref. */
2276                 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2277                     "(master=%u) got %d.\n", dlm->name, namelen,
2278                     lockname, res->owner, r);
2279                 dlm_print_one_lock_resource(res);
2280                 BUG();
2281         }
2282         return ret;
2283 }
2284
2285 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2286                               void **ret_data)
2287 {
2288         struct dlm_ctxt *dlm = data;
2289         struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2290         struct dlm_lock_resource *res = NULL;
2291         char *name;
2292         unsigned int namelen;
2293         int ret = -EINVAL;
2294         u8 node;
2295         unsigned int hash;
2296         struct dlm_work_item *item;
2297         int cleared = 0;
2298         int dispatch = 0;
2299
2300         if (!dlm_grab(dlm))
2301                 return 0;
2302
2303         name = deref->name;
2304         namelen = deref->namelen;
2305         node = deref->node_idx;
2306
2307         if (namelen > DLM_LOCKID_NAME_MAX) {
2308                 mlog(ML_ERROR, "Invalid name length!");
2309                 goto done;
2310         }
2311         if (deref->node_idx >= O2NM_MAX_NODES) {
2312                 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2313                 goto done;
2314         }
2315
2316         hash = dlm_lockid_hash(name, namelen);
2317
2318         spin_lock(&dlm->spinlock);
2319         res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2320         if (!res) {
2321                 spin_unlock(&dlm->spinlock);
2322                 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2323                      dlm->name, namelen, name);
2324                 goto done;
2325         }
2326         spin_unlock(&dlm->spinlock);
2327
2328         spin_lock(&res->spinlock);
2329         if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2330                 dispatch = 1;
2331         else {
2332                 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2333                 if (test_bit(node, res->refmap)) {
2334                         dlm_lockres_clear_refmap_bit(node, res);
2335                         cleared = 1;
2336                 }
2337         }
2338         spin_unlock(&res->spinlock);
2339
2340         if (!dispatch) {
2341                 if (cleared)
2342                         dlm_lockres_calc_usage(dlm, res);
2343                 else {
2344                         mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2345                         "but it is already dropped!\n", dlm->name,
2346                         res->lockname.len, res->lockname.name, node);
2347                         dlm_print_one_lock_resource(res);
2348                 }
2349                 ret = 0;
2350                 goto done;
2351         }
2352
2353         item = kzalloc(sizeof(*item), GFP_NOFS);
2354         if (!item) {
2355                 ret = -ENOMEM;
2356                 mlog_errno(ret);
2357                 goto done;
2358         }
2359
2360         dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2361         item->u.dl.deref_res = res;
2362         item->u.dl.deref_node = node;
2363
2364         spin_lock(&dlm->work_lock);
2365         list_add_tail(&item->list, &dlm->work_list);
2366         spin_unlock(&dlm->work_lock);
2367
2368         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2369         return 0;
2370
2371 done:
2372         if (res)
2373                 dlm_lockres_put(res);
2374         dlm_put(dlm);
2375
2376         return ret;
2377 }
2378
2379 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2380 {
2381         struct dlm_ctxt *dlm;
2382         struct dlm_lock_resource *res;
2383         u8 node;
2384         u8 cleared = 0;
2385
2386         dlm = item->dlm;
2387         res = item->u.dl.deref_res;
2388         node = item->u.dl.deref_node;
2389
2390         spin_lock(&res->spinlock);
2391         BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2392         if (test_bit(node, res->refmap)) {
2393                 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2394                 dlm_lockres_clear_refmap_bit(node, res);
2395                 cleared = 1;
2396         }
2397         spin_unlock(&res->spinlock);
2398
2399         if (cleared) {
2400                 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2401                      dlm->name, res->lockname.len, res->lockname.name, node);
2402                 dlm_lockres_calc_usage(dlm, res);
2403         } else {
2404                 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2405                      "but it is already dropped!\n", dlm->name,
2406                      res->lockname.len, res->lockname.name, node);
2407                 dlm_print_one_lock_resource(res);
2408         }
2409
2410         dlm_lockres_put(res);
2411 }
2412
2413 /* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
2414  * if not. If 0, numlocks is set to the number of locks in the lockres.
2415  */
2416 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2417                                       struct dlm_lock_resource *res,
2418                                       int *numlocks)
2419 {
2420         int ret;
2421         int i;
2422         int count = 0;
2423         struct list_head *queue;
2424         struct dlm_lock *lock;
2425
2426         assert_spin_locked(&res->spinlock);
2427
2428         ret = -EINVAL;
2429         if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2430                 mlog(0, "cannot migrate lockres with unknown owner!\n");
2431                 goto leave;
2432         }
2433
2434         if (res->owner != dlm->node_num) {
2435                 mlog(0, "cannot migrate lockres this node doesn't own!\n");
2436                 goto leave;
2437         }
2438
2439         ret = 0;
2440         queue = &res->granted;
2441         for (i = 0; i < 3; i++) {
2442                 list_for_each_entry(lock, queue, list) {
2443                         ++count;
2444                         if (lock->ml.node == dlm->node_num) {
2445                                 mlog(0, "found a lock owned by this node still "
2446                                      "on the %s queue!  will not migrate this "
2447                                      "lockres\n", (i == 0 ? "granted" :
2448                                                    (i == 1 ? "converting" :
2449                                                     "blocked")));
2450                                 ret = -ENOTEMPTY;
2451                                 goto leave;
2452                         }
2453                 }
2454                 queue++;
2455         }
2456
2457         *numlocks = count;
2458         mlog(0, "migrateable lockres having %d locks\n", *numlocks);
2459
2460 leave:
2461         return ret;
2462 }
2463
2464 /*
2465  * DLM_MIGRATE_LOCKRES
2466  */
2467
2468
2469 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2470                                struct dlm_lock_resource *res,
2471                                u8 target)
2472 {
2473         struct dlm_master_list_entry *mle = NULL;
2474         struct dlm_master_list_entry *oldmle = NULL;
2475         struct dlm_migratable_lockres *mres = NULL;
2476         int ret = 0;
2477         const char *name;
2478         unsigned int namelen;
2479         int mle_added = 0;
2480         int numlocks;
2481         int wake = 0;
2482
2483         if (!dlm_grab(dlm))
2484                 return -EINVAL;
2485
2486         name = res->lockname.name;
2487         namelen = res->lockname.len;
2488
2489         mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2490
2491         /*
2492          * ensure this lockres is a proper candidate for migration
2493          */
2494         spin_lock(&res->spinlock);
2495         ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2496         if (ret < 0) {
2497                 spin_unlock(&res->spinlock);
2498                 goto leave;
2499         }
2500         spin_unlock(&res->spinlock);
2501
2502         /* no work to do */
2503         if (numlocks == 0) {
2504                 mlog(0, "no locks were found on this lockres! done!\n");
2505                 goto leave;
2506         }
2507
2508         /*
2509          * preallocate up front
2510          * if this fails, abort
2511          */
2512
2513         ret = -ENOMEM;
2514         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2515         if (!mres) {
2516                 mlog_errno(ret);
2517                 goto leave;
2518         }
2519
2520         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2521                                                                 GFP_NOFS);
2522         if (!mle) {
2523                 mlog_errno(ret);
2524                 goto leave;
2525         }
2526         ret = 0;
2527
2528         /*
2529          * find a node to migrate the lockres to
2530          */
2531
2532         mlog(0, "picking a migration node\n");
2533         spin_lock(&dlm->spinlock);
2534         /* pick a new node */
2535         if (!test_bit(target, dlm->domain_map) ||
2536             target >= O2NM_MAX_NODES) {
2537                 target = dlm_pick_migration_target(dlm, res);
2538         }
2539         mlog(0, "node %u chosen for migration\n", target);
2540
2541         if (target >= O2NM_MAX_NODES ||
2542             !test_bit(target, dlm->domain_map)) {
2543                 /* target chosen is not alive */
2544                 ret = -EINVAL;
2545         }
2546
2547         if (ret) {
2548                 spin_unlock(&dlm->spinlock);
2549                 goto fail;
2550         }
2551
2552         mlog(0, "continuing with target = %u\n", target);
2553
2554         /*
2555          * clear any existing master requests and
2556          * add the migration mle to the list
2557          */
2558         spin_lock(&dlm->master_lock);
2559         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2560                                     namelen, target, dlm->node_num);
2561         spin_unlock(&dlm->master_lock);
2562         spin_unlock(&dlm->spinlock);
2563
2564         if (ret == -EEXIST) {
2565                 mlog(0, "another process is already migrating it\n");
2566                 goto fail;
2567         }
2568         mle_added = 1;
2569
2570         /*
2571          * set the MIGRATING flag and flush asts
2572          * if we fail after this we need to re-dirty the lockres
2573          */
2574         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2575                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2576                      "the target went down.\n", res->lockname.len,
2577                      res->lockname.name, target);
2578                 spin_lock(&res->spinlock);
2579                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2580                 wake = 1;
2581                 spin_unlock(&res->spinlock);
2582                 ret = -EINVAL;
2583         }
2584
2585 fail:
2586         if (oldmle) {
2587                 /* master is known, detach if not already detached */
2588                 dlm_mle_detach_hb_events(dlm, oldmle);
2589                 dlm_put_mle(oldmle);
2590         }
2591
2592         if (ret < 0) {
2593                 if (mle_added) {
2594                         dlm_mle_detach_hb_events(dlm, mle);
2595                         dlm_put_mle(mle);
2596                 } else if (mle) {
2597                         kmem_cache_free(dlm_mle_cache, mle);
2598                 }
2599                 goto leave;
2600         }
2601
2602         /*
2603          * at this point, we have a migration target, an mle
2604          * in the master list, and the MIGRATING flag set on
2605          * the lockres
2606          */
2607
2608         /* now that remote nodes are spinning on the MIGRATING flag,
2609          * ensure that all assert_master work is flushed. */
2610         flush_workqueue(dlm->dlm_worker);
2611
2612         /* get an extra reference on the mle.
2613          * otherwise the assert_master from the new
2614          * master will destroy this.
2615          * also, make sure that all callers of dlm_get_mle
2616          * take both dlm->spinlock and dlm->master_lock */
2617         spin_lock(&dlm->spinlock);
2618         spin_lock(&dlm->master_lock);
2619         dlm_get_mle_inuse(mle);
2620         spin_unlock(&dlm->master_lock);
2621         spin_unlock(&dlm->spinlock);
2622
2623         /* notify new node and send all lock state */
2624         /* call send_one_lockres with migration flag.
2625          * this serves as notice to the target node that a
2626          * migration is starting. */
2627         ret = dlm_send_one_lockres(dlm, res, mres, target,
2628                                    DLM_MRES_MIGRATION);
2629
2630         if (ret < 0) {
2631                 mlog(0, "migration to node %u failed with %d\n",
2632                      target, ret);
2633                 /* migration failed, detach and clean up mle */
2634                 dlm_mle_detach_hb_events(dlm, mle);
2635                 dlm_put_mle(mle);
2636                 dlm_put_mle_inuse(mle);
2637                 spin_lock(&res->spinlock);
2638                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2639                 wake = 1;
2640                 spin_unlock(&res->spinlock);
2641                 goto leave;
2642         }
2643
2644         /* at this point, the target sends a message to all nodes,
2645          * (using dlm_do_migrate_request).  this node is skipped since
2646          * we had to put an mle in the list to begin the process.  this
2647          * node now waits for target to do an assert master.  this node
2648          * will be the last one notified, ensuring that the migration
2649          * is complete everywhere.  if the target dies while this is
2650          * going on, some nodes could potentially see the target as the
2651          * master, so it is important that my recovery finds the migration
2652          * mle and sets the master to UNKNONWN. */
2653
2654
2655         /* wait for new node to assert master */
2656         while (1) {
2657                 ret = wait_event_interruptible_timeout(mle->wq,
2658                                         (atomic_read(&mle->woken) == 1),
2659                                         msecs_to_jiffies(5000));
2660
2661                 if (ret >= 0) {
2662                         if (atomic_read(&mle->woken) == 1 ||
2663                             res->owner == target)
2664                                 break;
2665
2666                         mlog(0, "%s:%.*s: timed out during migration\n",
2667                              dlm->name, res->lockname.len, res->lockname.name);
2668                         /* avoid hang during shutdown when migrating lockres 
2669                          * to a node which also goes down */
2670                         if (dlm_is_node_dead(dlm, target)) {
2671                                 mlog(0, "%s:%.*s: expected migration "
2672                                      "target %u is no longer up, restarting\n",
2673                                      dlm->name, res->lockname.len,
2674                                      res->lockname.name, target);
2675                                 ret = -EINVAL;
2676                                 /* migration failed, detach and clean up mle */
2677                                 dlm_mle_detach_hb_events(dlm, mle);
2678                                 dlm_put_mle(mle);
2679                                 dlm_put_mle_inuse(mle);
2680                                 spin_lock(&res->spinlock);
2681                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2682                                 wake = 1;
2683                                 spin_unlock(&res->spinlock);
2684                                 goto leave;
2685                         }
2686                 } else
2687                         mlog(0, "%s:%.*s: caught signal during migration\n",
2688                              dlm->name, res->lockname.len, res->lockname.name);
2689         }
2690
2691         /* all done, set the owner, clear the flag */
2692         spin_lock(&res->spinlock);
2693         dlm_set_lockres_owner(dlm, res, target);
2694         res->state &= ~DLM_LOCK_RES_MIGRATING;
2695         dlm_remove_nonlocal_locks(dlm, res);
2696         spin_unlock(&res->spinlock);
2697         wake_up(&res->wq);
2698
2699         /* master is known, detach if not already detached */
2700         dlm_mle_detach_hb_events(dlm, mle);
2701         dlm_put_mle_inuse(mle);
2702         ret = 0;
2703
2704         dlm_lockres_calc_usage(dlm, res);
2705
2706 leave:
2707         /* re-dirty the lockres if we failed */
2708         if (ret < 0)
2709                 dlm_kick_thread(dlm, res);
2710
2711         /* wake up waiters if the MIGRATING flag got set
2712          * but migration failed */
2713         if (wake)
2714                 wake_up(&res->wq);
2715
2716         /* TODO: cleanup */
2717         if (mres)
2718                 free_page((unsigned long)mres);
2719
2720         dlm_put(dlm);
2721
2722         mlog(0, "returning %d\n", ret);
2723         return ret;
2724 }
2725
2726 #define DLM_MIGRATION_RETRY_MS  100
2727
2728 /* Should be called only after beginning the domain leave process.
2729  * There should not be any remaining locks on nonlocal lock resources,
2730  * and there should be no local locks left on locally mastered resources.
2731  *
2732  * Called with the dlm spinlock held, may drop it to do migration, but
2733  * will re-acquire before exit.
2734  *
2735  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2736 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2737 {
2738         int ret;
2739         int lock_dropped = 0;
2740         int numlocks;
2741
2742         spin_lock(&res->spinlock);
2743         if (res->owner != dlm->node_num) {
2744                 if (!__dlm_lockres_unused(res)) {
2745                         mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2746                              "trying to free this but locks remain\n",
2747                              dlm->name, res->lockname.len, res->lockname.name);
2748                 }
2749                 spin_unlock(&res->spinlock);
2750                 goto leave;
2751         }
2752
2753         /* No need to migrate a lockres having no locks */
2754         ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2755         if (ret >= 0 && numlocks == 0) {
2756                 spin_unlock(&res->spinlock);
2757                 goto leave;
2758         }
2759         spin_unlock(&res->spinlock);
2760
2761         /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2762         spin_unlock(&dlm->spinlock);
2763         lock_dropped = 1;
2764         while (1) {
2765                 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2766                 if (ret >= 0)
2767                         break;
2768                 if (ret == -ENOTEMPTY) {
2769                         mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2770                                 res->lockname.len, res->lockname.name);
2771                         BUG();
2772                 }
2773
2774                 mlog(0, "lockres %.*s: migrate failed, "
2775                      "retrying\n", res->lockname.len,
2776                      res->lockname.name);
2777                 msleep(DLM_MIGRATION_RETRY_MS);
2778         }
2779         spin_lock(&dlm->spinlock);
2780 leave:
2781         return lock_dropped;
2782 }
2783
2784 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2785 {
2786         int ret;
2787         spin_lock(&dlm->ast_lock);
2788         spin_lock(&lock->spinlock);
2789         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2790         spin_unlock(&lock->spinlock);
2791         spin_unlock(&dlm->ast_lock);
2792         return ret;
2793 }
2794
2795 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2796                                      struct dlm_lock_resource *res,
2797                                      u8 mig_target)
2798 {
2799         int can_proceed;
2800         spin_lock(&res->spinlock);
2801         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2802         spin_unlock(&res->spinlock);
2803
2804         /* target has died, so make the caller break out of the 
2805          * wait_event, but caller must recheck the domain_map */
2806         spin_lock(&dlm->spinlock);
2807         if (!test_bit(mig_target, dlm->domain_map))
2808                 can_proceed = 1;
2809         spin_unlock(&dlm->spinlock);
2810         return can_proceed;
2811 }
2812
2813 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2814                                 struct dlm_lock_resource *res)
2815 {
2816         int ret;
2817         spin_lock(&res->spinlock);
2818         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2819         spin_unlock(&res->spinlock);
2820         return ret;
2821 }
2822
2823
2824 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2825                                        struct dlm_lock_resource *res,
2826                                        u8 target)
2827 {
2828         int ret = 0;
2829
2830         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2831                res->lockname.len, res->lockname.name, dlm->node_num,
2832                target);
2833         /* need to set MIGRATING flag on lockres.  this is done by
2834          * ensuring that all asts have been flushed for this lockres. */
2835         spin_lock(&res->spinlock);
2836         BUG_ON(res->migration_pending);
2837         res->migration_pending = 1;
2838         /* strategy is to reserve an extra ast then release
2839          * it below, letting the release do all of the work */
2840         __dlm_lockres_reserve_ast(res);
2841         spin_unlock(&res->spinlock);
2842
2843         /* now flush all the pending asts */
2844         dlm_kick_thread(dlm, res);
2845         /* before waiting on DIRTY, block processes which may
2846          * try to dirty the lockres before MIGRATING is set */
2847         spin_lock(&res->spinlock);
2848         BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2849         res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2850         spin_unlock(&res->spinlock);
2851         /* now wait on any pending asts and the DIRTY state */
2852         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2853         dlm_lockres_release_ast(dlm, res);
2854
2855         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2856                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2857         /* if the extra ref we just put was the final one, this
2858          * will pass thru immediately.  otherwise, we need to wait
2859          * for the last ast to finish. */
2860 again:
2861         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2862                    dlm_migration_can_proceed(dlm, res, target),
2863                    msecs_to_jiffies(1000));
2864         if (ret < 0) {
2865                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2866                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2867                        test_bit(target, dlm->domain_map) ? "no":"yes");
2868         } else {
2869                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2870                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2871                        test_bit(target, dlm->domain_map) ? "no":"yes");
2872         }
2873         if (!dlm_migration_can_proceed(dlm, res, target)) {
2874                 mlog(0, "trying again...\n");
2875                 goto again;
2876         }
2877         /* now that we are sure the MIGRATING state is there, drop
2878          * the unneded state which blocked threads trying to DIRTY */
2879         spin_lock(&res->spinlock);
2880         BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2881         BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2882         res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2883         spin_unlock(&res->spinlock);
2884
2885         /* did the target go down or die? */
2886         spin_lock(&dlm->spinlock);
2887         if (!test_bit(target, dlm->domain_map)) {
2888                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2889                      target);
2890                 ret = -EHOSTDOWN;
2891         }
2892         spin_unlock(&dlm->spinlock);
2893
2894         /*
2895          * at this point:
2896          *
2897          *   o the DLM_LOCK_RES_MIGRATING flag is set
2898          *   o there are no pending asts on this lockres
2899          *   o all processes trying to reserve an ast on this
2900          *     lockres must wait for the MIGRATING flag to clear
2901          */
2902         return ret;
2903 }
2904
2905 /* last step in the migration process.
2906  * original master calls this to free all of the dlm_lock
2907  * structures that used to be for other nodes. */
2908 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2909                                       struct dlm_lock_resource *res)
2910 {
2911         struct list_head *queue = &res->granted;
2912         int i, bit;
2913         struct dlm_lock *lock, *next;
2914
2915         assert_spin_locked(&res->spinlock);
2916
2917         BUG_ON(res->owner == dlm->node_num);
2918
2919         for (i=0; i<3; i++) {
2920                 list_for_each_entry_safe(lock, next, queue, list) {
2921                         if (lock->ml.node != dlm->node_num) {
2922                                 mlog(0, "putting lock for node %u\n",
2923                                      lock->ml.node);
2924                                 /* be extra careful */
2925                                 BUG_ON(!list_empty(&lock->ast_list));
2926                                 BUG_ON(!list_empty(&lock->bast_list));
2927                                 BUG_ON(lock->ast_pending);
2928                                 BUG_ON(lock->bast_pending);
2929                                 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2930                                 list_del_init(&lock->list);
2931                                 dlm_lock_put(lock);
2932                                 /* In a normal unlock, we would have added a
2933                                  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2934                                 dlm_lock_put(lock);
2935                         }
2936                 }
2937                 queue++;
2938         }
2939         bit = 0;
2940         while (1) {
2941                 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2942                 if (bit >= O2NM_MAX_NODES)
2943                         break;
2944                 /* do not clear the local node reference, if there is a
2945                  * process holding this, let it drop the ref itself */
2946                 if (bit != dlm->node_num) {
2947                         mlog(0, "%s:%.*s: node %u had a ref to this "
2948                              "migrating lockres, clearing\n", dlm->name,
2949                              res->lockname.len, res->lockname.name, bit);
2950                         dlm_lockres_clear_refmap_bit(bit, res);
2951                 }
2952                 bit++;
2953         }
2954 }
2955
2956 /* for now this is not too intelligent.  we will
2957  * need stats to make this do the right thing.
2958  * this just finds the first lock on one of the
2959  * queues and uses that node as the target. */
2960 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2961                                     struct dlm_lock_resource *res)
2962 {
2963         int i;
2964         struct list_head *queue = &res->granted;
2965         struct dlm_lock *lock;
2966         int nodenum;
2967
2968         assert_spin_locked(&dlm->spinlock);
2969
2970         spin_lock(&res->spinlock);
2971         for (i=0; i<3; i++) {
2972                 list_for_each_entry(lock, queue, list) {
2973                         /* up to the caller to make sure this node
2974                          * is alive */
2975                         if (lock->ml.node != dlm->node_num) {
2976                                 spin_unlock(&res->spinlock);
2977                                 return lock->ml.node;
2978                         }
2979                 }
2980                 queue++;
2981         }
2982         spin_unlock(&res->spinlock);
2983         mlog(0, "have not found a suitable target yet! checking domain map\n");
2984
2985         /* ok now we're getting desperate.  pick anyone alive. */
2986         nodenum = -1;
2987         while (1) {
2988                 nodenum = find_next_bit(dlm->domain_map,
2989                                         O2NM_MAX_NODES, nodenum+1);
2990                 mlog(0, "found %d in domain map\n", nodenum);
2991                 if (nodenum >= O2NM_MAX_NODES)
2992                         break;
2993                 if (nodenum != dlm->node_num) {
2994                         mlog(0, "picking %d\n", nodenum);
2995                         return nodenum;
2996                 }
2997         }
2998
2999         mlog(0, "giving up.  no master to migrate to\n");
3000         return DLM_LOCK_RES_OWNER_UNKNOWN;
3001 }
3002
3003
3004
3005 /* this is called by the new master once all lockres
3006  * data has been received */
3007 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
3008                                   struct dlm_lock_resource *res,
3009                                   u8 master, u8 new_master,
3010                                   struct dlm_node_iter *iter)
3011 {
3012         struct dlm_migrate_request migrate;
3013         int ret, skip, status = 0;
3014         int nodenum;
3015
3016         memset(&migrate, 0, sizeof(migrate));
3017         migrate.namelen = res->lockname.len;
3018         memcpy(migrate.name, res->lockname.name, migrate.namelen);
3019         migrate.new_master = new_master;
3020         migrate.master = master;
3021
3022         ret = 0;
3023
3024         /* send message to all nodes, except the master and myself */
3025         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3026                 if (nodenum == master ||
3027                     nodenum == new_master)
3028                         continue;
3029
3030                 /* We could race exit domain. If exited, skip. */
3031                 spin_lock(&dlm->spinlock);
3032                 skip = (!test_bit(nodenum, dlm->domain_map));
3033                 spin_unlock(&dlm->spinlock);
3034                 if (skip) {
3035                         clear_bit(nodenum, iter->node_map);
3036                         continue;
3037                 }
3038
3039                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3040                                          &migrate, sizeof(migrate), nodenum,
3041                                          &status);
3042                 if (ret < 0) {
3043                         mlog(0, "migrate_request returned %d!\n", ret);
3044                         if (!dlm_is_host_down(ret)) {
3045                                 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3046                                 BUG();
3047                         }
3048                         clear_bit(nodenum, iter->node_map);
3049                         ret = 0;
3050                 } else if (status < 0) {
3051                         mlog(0, "migrate request (node %u) returned %d!\n",
3052                              nodenum, status);
3053                         ret = status;
3054                 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3055                         /* during the migration request we short-circuited
3056                          * the mastery of the lockres.  make sure we have
3057                          * a mastery ref for nodenum */
3058                         mlog(0, "%s:%.*s: need ref for node %u\n",
3059                              dlm->name, res->lockname.len, res->lockname.name,
3060                              nodenum);
3061                         spin_lock(&res->spinlock);
3062                         dlm_lockres_set_refmap_bit(nodenum, res);
3063                         spin_unlock(&res->spinlock);
3064                 }
3065         }
3066
3067         if (ret < 0)
3068                 mlog_errno(ret);
3069
3070         mlog(0, "returning ret=%d\n", ret);
3071         return ret;
3072 }
3073
3074
3075 /* if there is an existing mle for this lockres, we now know who the master is.
3076  * (the one who sent us *this* message) we can clear it up right away.
3077  * since the process that put the mle on the list still has a reference to it,
3078  * we can unhash it now, set the master and wake the process.  as a result,
3079  * we will have no mle in the list to start with.  now we can add an mle for
3080  * the migration and this should be the only one found for those scanning the
3081  * list.  */
3082 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3083                                 void **ret_data)
3084 {
3085         struct dlm_ctxt *dlm = data;
3086         struct dlm_lock_resource *res = NULL;
3087         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3088         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3089         const char *name;
3090         unsigned int namelen, hash;
3091         int ret = 0;
3092
3093         if (!dlm_grab(dlm))
3094                 return -EINVAL;
3095
3096         name = migrate->name;
3097         namelen = migrate->namelen;
3098         hash = dlm_lockid_hash(name, namelen);
3099
3100         /* preallocate.. if this fails, abort */
3101         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
3102                                                          GFP_NOFS);
3103
3104         if (!mle) {
3105                 ret = -ENOMEM;
3106                 goto leave;
3107         }
3108
3109         /* check for pre-existing lock */
3110         spin_lock(&dlm->spinlock);
3111         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3112         spin_lock(&dlm->master_lock);
3113
3114         if (res) {
3115                 spin_lock(&res->spinlock);
3116                 if (res->state & DLM_LOCK_RES_RECOVERING) {
3117                         /* if all is working ok, this can only mean that we got
3118                         * a migrate request from a node that we now see as
3119                         * dead.  what can we do here?  drop it to the floor? */
3120                         spin_unlock(&res->spinlock);
3121                         mlog(ML_ERROR, "Got a migrate request, but the "
3122                              "lockres is marked as recovering!");
3123                         kmem_cache_free(dlm_mle_cache, mle);
3124                         ret = -EINVAL; /* need a better solution */
3125                         goto unlock;
3126                 }
3127                 res->state |= DLM_LOCK_RES_MIGRATING;
3128                 spin_unlock(&res->spinlock);
3129         }
3130
3131         /* ignore status.  only nonzero status would BUG. */
3132         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3133                                     name, namelen,
3134                                     migrate->new_master,
3135                                     migrate->master);
3136
3137 unlock:
3138         spin_unlock(&dlm->master_lock);
3139         spin_unlock(&dlm->spinlock);
3140
3141         if (oldmle) {
3142                 /* master is known, detach if not already detached */
3143                 dlm_mle_detach_hb_events(dlm, oldmle);
3144                 dlm_put_mle(oldmle);
3145         }
3146
3147         if (res)
3148                 dlm_lockres_put(res);
3149 leave:
3150         dlm_put(dlm);
3151         return ret;
3152 }
3153
3154 /* must be holding dlm->spinlock and dlm->master_lock
3155  * when adding a migration mle, we can clear any other mles
3156  * in the master list because we know with certainty that
3157  * the master is "master".  so we remove any old mle from
3158  * the list after setting it's master field, and then add
3159  * the new migration mle.  this way we can hold with the rule
3160  * of having only one mle for a given lock name at all times. */
3161 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3162                                  struct dlm_lock_resource *res,
3163                                  struct dlm_master_list_entry *mle,
3164                                  struct dlm_master_list_entry **oldmle,
3165                                  const char *name, unsigned int namelen,
3166                                  u8 new_master, u8 master)
3167 {
3168         int found;
3169         int ret = 0;
3170
3171         *oldmle = NULL;
3172
3173         mlog_entry_void();
3174
3175         assert_spin_locked(&dlm->spinlock);
3176         assert_spin_locked(&dlm->master_lock);
3177
3178         /* caller is responsible for any ref taken here on oldmle */
3179         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3180         if (found) {
3181                 struct dlm_master_list_entry *tmp = *oldmle;
3182                 spin_lock(&tmp->spinlock);
3183                 if (tmp->type == DLM_MLE_MIGRATION) {
3184                         if (master == dlm->node_num) {
3185                                 /* ah another process raced me to it */
3186                                 mlog(0, "tried to migrate %.*s, but some "
3187                                      "process beat me to it\n",
3188                                      namelen, name);
3189                                 ret = -EEXIST;
3190                         } else {
3191                                 /* bad.  2 NODES are trying to migrate! */
3192                                 mlog(ML_ERROR, "migration error  mle: "
3193                                      "master=%u new_master=%u // request: "
3194                                      "master=%u new_master=%u // "
3195                                      "lockres=%.*s\n",
3196                                      tmp->master, tmp->new_master,
3197                                      master, new_master,
3198                                      namelen, name);
3199                                 BUG();
3200                         }
3201                 } else {
3202                         /* this is essentially what assert_master does */
3203                         tmp->master = master;
3204                         atomic_set(&tmp->woken, 1);
3205                         wake_up(&tmp->wq);
3206                         /* remove it so that only one mle will be found */
3207                         __dlm_unlink_mle(dlm, tmp);
3208                         __dlm_mle_detach_hb_events(dlm, tmp);
3209                         ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3210                         mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3211                             "telling master to get ref for cleared out mle "
3212                             "during migration\n", dlm->name, namelen, name,
3213                             master, new_master);
3214                 }
3215                 spin_unlock(&tmp->spinlock);
3216         }
3217
3218         /* now add a migration mle to the tail of the list */
3219         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3220         mle->new_master = new_master;
3221         /* the new master will be sending an assert master for this.
3222          * at that point we will get the refmap reference */
3223         mle->master = master;
3224         /* do this for consistency with other mle types */
3225         set_bit(new_master, mle->maybe_map);
3226         __dlm_insert_mle(dlm, mle);
3227
3228         return ret;
3229 }
3230
3231 /*
3232  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3233  */
3234 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3235                                         struct dlm_master_list_entry *mle)
3236 {
3237         struct dlm_lock_resource *res;
3238         unsigned int hash;
3239
3240         /* Find the lockres associated to the mle and set its owner to UNK */
3241         hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len);
3242         res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, mle->u.mlename.len,
3243                                    hash);
3244         if (res) {
3245                 spin_unlock(&dlm->master_lock);
3246
3247                 /* move lockres onto recovery list */
3248                 spin_lock(&res->spinlock);
3249                 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3250                 dlm_move_lockres_to_recovery_list(dlm, res);
3251                 spin_unlock(&res->spinlock);
3252                 dlm_lockres_put(res);
3253
3254                 /* about to get rid of mle, detach from heartbeat */
3255                 __dlm_mle_detach_hb_events(dlm, mle);
3256
3257                 /* dump the mle */
3258                 spin_lock(&dlm->master_lock);
3259                 __dlm_put_mle(mle);
3260                 spin_unlock(&dlm->master_lock);
3261         }
3262
3263         return res;
3264 }
3265
3266 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3267                                     struct dlm_master_list_entry *mle)
3268 {
3269         __dlm_mle_detach_hb_events(dlm, mle);
3270
3271         spin_lock(&mle->spinlock);
3272         __dlm_unlink_mle(dlm, mle);
3273         atomic_set(&mle->woken, 1);
3274         spin_unlock(&mle->spinlock);
3275
3276         wake_up(&mle->wq);
3277 }
3278
3279 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3280                                 struct dlm_master_list_entry *mle, u8 dead_node)
3281 {
3282         int bit;
3283
3284         BUG_ON(mle->type != DLM_MLE_BLOCK);
3285
3286         spin_lock(&mle->spinlock);
3287         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3288         if (bit != dead_node) {
3289                 mlog(0, "mle found, but dead node %u would not have been "
3290                      "master\n", dead_node);
3291                 spin_unlock(&mle->spinlock);
3292         } else {
3293                 /* Must drop the refcount by one since the assert_master will
3294                  * never arrive. This may result in the mle being unlinked and
3295                  * freed, but there may still be a process waiting in the
3296                  * dlmlock path which is fine. */
3297                 mlog(0, "node %u was expected master\n", dead_node);
3298                 atomic_set(&mle->woken, 1);
3299                 spin_unlock(&mle->spinlock);
3300                 wake_up(&mle->wq);
3301
3302                 /* Do not need events any longer, so detach from heartbeat */
3303                 __dlm_mle_detach_hb_events(dlm, mle);
3304                 __dlm_put_mle(mle);
3305         }
3306 }
3307
3308 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3309 {
3310         struct dlm_master_list_entry *mle;
3311         struct dlm_lock_resource *res;
3312         struct hlist_head *bucket;
3313         struct hlist_node *list;
3314         unsigned int i;
3315
3316         mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
3317 top:
3318         assert_spin_locked(&dlm->spinlock);
3319
3320         /* clean the master list */
3321         spin_lock(&dlm->master_lock);
3322         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3323                 bucket = dlm_master_hash(dlm, i);
3324                 hlist_for_each(list, bucket) {
3325                         mle = hlist_entry(list, struct dlm_master_list_entry,
3326                                           master_hash_node);
3327
3328                         BUG_ON(mle->type != DLM_MLE_BLOCK &&
3329                                mle->type != DLM_MLE_MASTER &&
3330                                mle->type != DLM_MLE_MIGRATION);
3331
3332                         /* MASTER mles are initiated locally. The waiting
3333                          * process will notice the node map change shortly.
3334                          * Let that happen as normal. */
3335                         if (mle->type == DLM_MLE_MASTER)
3336                                 continue;
3337
3338                         /* BLOCK mles are initiated by other nodes. Need to
3339                          * clean up if the dead node would have been the
3340                          * master. */
3341                         if (mle->type == DLM_MLE_BLOCK) {
3342                                 dlm_clean_block_mle(dlm, mle, dead_node);
3343                                 continue;
3344                         }
3345
3346                         /* Everything else is a MIGRATION mle */
3347
3348                         /* The rule for MIGRATION mles is that the master
3349                          * becomes UNKNOWN if *either* the original or the new
3350                          * master dies. All UNKNOWN lockres' are sent to
3351                          * whichever node becomes the recovery master. The new
3352                          * master is responsible for determining if there is
3353                          * still a master for this lockres, or if he needs to
3354                          * take over mastery. Either way, this node should
3355                          * expect another message to resolve this. */
3356
3357                         if (mle->master != dead_node &&
3358                             mle->new_master != dead_node)
3359                                 continue;
3360
3361                         /* If we have reached this point, this mle needs to be
3362                          * removed from the list and freed. */
3363                         dlm_clean_migration_mle(dlm, mle);
3364
3365                         mlog(0, "%s: node %u died during migration from "
3366                              "%u to %u!\n", dlm->name, dead_node, mle->master,
3367                              mle->new_master);
3368
3369                         /* If we find a lockres associated with the mle, we've
3370                          * hit this rare case that messes up our lock ordering.
3371                          * If so, we need to drop the master lock so that we can
3372                          * take the lockres lock, meaning that we will have to
3373                          * restart from the head of list. */
3374                         res = dlm_reset_mleres_owner(dlm, mle);
3375                         if (res)
3376                                 /* restart */
3377                                 goto top;
3378
3379                         /* This may be the last reference */
3380                         __dlm_put_mle(mle);
3381                 }
3382         }
3383         spin_unlock(&dlm->master_lock);
3384 }
3385
3386 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3387                          u8 old_master)
3388 {
3389         struct dlm_node_iter iter;
3390         int ret = 0;
3391
3392         spin_lock(&dlm->spinlock);
3393         dlm_node_iter_init(dlm->domain_map, &iter);
3394         clear_bit(old_master, iter.node_map);
3395         clear_bit(dlm->node_num, iter.node_map);
3396         spin_unlock(&dlm->spinlock);
3397
3398         /* ownership of the lockres is changing.  account for the
3399          * mastery reference here since old_master will briefly have
3400          * a reference after the migration completes */
3401         spin_lock(&res->spinlock);
3402         dlm_lockres_set_refmap_bit(old_master, res);
3403         spin_unlock(&res->spinlock);
3404
3405         mlog(0, "now time to do a migrate request to other nodes\n");
3406         ret = dlm_do_migrate_request(dlm, res, old_master,
3407                                      dlm->node_num, &iter);
3408         if (ret < 0) {
3409                 mlog_errno(ret);
3410                 goto leave;
3411         }
3412
3413         mlog(0, "doing assert master of %.*s to all except the original node\n",
3414              res->lockname.len, res->lockname.name);
3415         /* this call now finishes out the nodemap
3416          * even if one or more nodes die */
3417         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3418                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3419         if (ret < 0) {
3420                 /* no longer need to retry.  all living nodes contacted. */
3421                 mlog_errno(ret);
3422                 ret = 0;
3423         }
3424
3425         memset(iter.node_map, 0, sizeof(iter.node_map));
3426         set_bit(old_master, iter.node_map);
3427         mlog(0, "doing assert master of %.*s back to %u\n",
3428              res->lockname.len, res->lockname.name, old_master);
3429         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3430                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3431         if (ret < 0) {
3432                 mlog(0, "assert master to original master failed "
3433                      "with %d.\n", ret);
3434                 /* the only nonzero status here would be because of
3435                  * a dead original node.  we're done. */
3436                 ret = 0;
3437         }
3438
3439         /* all done, set the owner, clear the flag */
3440         spin_lock(&res->spinlock);
3441         dlm_set_lockres_owner(dlm, res, dlm->node_num);
3442         res->state &= ~DLM_LOCK_RES_MIGRATING;
3443         spin_unlock(&res->spinlock);
3444         /* re-dirty it on the new master */
3445         dlm_kick_thread(dlm, res);
3446         wake_up(&res->wq);
3447 leave:
3448         return ret;
3449 }
3450
3451 /*
3452  * LOCKRES AST REFCOUNT
3453  * this is integral to migration
3454  */
3455
3456 /* for future intent to call an ast, reserve one ahead of time.
3457  * this should be called only after waiting on the lockres
3458  * with dlm_wait_on_lockres, and while still holding the
3459  * spinlock after the call. */
3460 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3461 {
3462         assert_spin_locked(&res->spinlock);
3463         if (res->state & DLM_LOCK_RES_MIGRATING) {
3464                 __dlm_print_one_lock_resource(res);
3465         }
3466         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3467
3468         atomic_inc(&res->asts_reserved);
3469 }
3470
3471 /*
3472  * used to drop the reserved ast, either because it went unused,
3473  * or because the ast/bast was actually called.
3474  *
3475  * also, if there is a pending migration on this lockres,
3476  * and this was the last pending ast on the lockres,
3477  * atomically set the MIGRATING flag before we drop the lock.
3478  * this is how we ensure that migration can proceed with no
3479  * asts in progress.  note that it is ok if the state of the
3480  * queues is such that a lock should be granted in the future
3481  * or that a bast should be fired, because the new master will
3482  * shuffle the lists on this lockres as soon as it is migrated.
3483  */
3484 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3485                              struct dlm_lock_resource *res)
3486 {
3487         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3488                 return;
3489
3490         if (!res->migration_pending) {
3491                 spin_unlock(&res->spinlock);
3492                 return;
3493         }
3494
3495         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3496         res->migration_pending = 0;
3497         res->state |= DLM_LOCK_RES_MIGRATING;
3498         spin_unlock(&res->spinlock);
3499         wake_up(&res->wq);
3500         wake_up(&dlm->migration_wq);
3501 }