ocfs2/dlm: Improve lockres counts
[pandora-kernel.git] / fs / ocfs2 / dlm / dlmmaster.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42
43
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdomain.h"
51 #include "dlmdebug.h"
52
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55
56 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
57                               struct dlm_master_list_entry *mle,
58                               struct o2nm_node *node,
59                               int idx);
60 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
61                             struct dlm_master_list_entry *mle,
62                             struct o2nm_node *node,
63                             int idx);
64
65 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
66 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
67                                 struct dlm_lock_resource *res,
68                                 void *nodemap, u32 flags);
69 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data);
70
71 static inline void __dlm_mle_name(struct dlm_master_list_entry *mle,
72                                   unsigned char **name, unsigned int *namelen,
73                                   unsigned int *namehash)
74 {
75         BUG_ON(mle->type != DLM_MLE_BLOCK &&
76                mle->type != DLM_MLE_MASTER &&
77                mle->type != DLM_MLE_MIGRATION);
78
79         if (mle->type != DLM_MLE_MASTER) {
80                 *name = mle->u.mlename.name;
81                 *namelen = mle->u.mlename.len;
82                 if (namehash)
83                         *namehash = mle->u.mlename.hash;
84         } else {
85                 *name  = (unsigned char *)mle->u.mleres->lockname.name;
86                 *namelen = mle->u.mleres->lockname.len;
87                 if (namehash)
88                         *namehash = mle->u.mleres->lockname.hash;
89         }
90 }
91
92 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
93                                 struct dlm_master_list_entry *mle,
94                                 const char *name,
95                                 unsigned int namelen)
96 {
97         unsigned char *mlename;
98         unsigned int mlelen;
99
100         if (dlm != mle->dlm)
101                 return 0;
102
103         __dlm_mle_name(mle, &mlename, &mlelen, NULL);
104
105         if (namelen != mlelen || memcmp(name, mlename, namelen) != 0)
106                 return 0;
107
108         return 1;
109 }
110
111 static struct kmem_cache *dlm_lockres_cache = NULL;
112 static struct kmem_cache *dlm_lockname_cache = NULL;
113 static struct kmem_cache *dlm_mle_cache = NULL;
114
115 static void dlm_mle_release(struct kref *kref);
116 static void dlm_init_mle(struct dlm_master_list_entry *mle,
117                         enum dlm_mle_type type,
118                         struct dlm_ctxt *dlm,
119                         struct dlm_lock_resource *res,
120                         const char *name,
121                         unsigned int namelen);
122 static void dlm_put_mle(struct dlm_master_list_entry *mle);
123 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
124 static int dlm_find_mle(struct dlm_ctxt *dlm,
125                         struct dlm_master_list_entry **mle,
126                         char *name, unsigned int namelen);
127
128 static int dlm_do_master_request(struct dlm_lock_resource *res,
129                                  struct dlm_master_list_entry *mle, int to);
130
131
132 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
133                                      struct dlm_lock_resource *res,
134                                      struct dlm_master_list_entry *mle,
135                                      int *blocked);
136 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
137                                     struct dlm_lock_resource *res,
138                                     struct dlm_master_list_entry *mle,
139                                     int blocked);
140 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
141                                  struct dlm_lock_resource *res,
142                                  struct dlm_master_list_entry *mle,
143                                  struct dlm_master_list_entry **oldmle,
144                                  const char *name, unsigned int namelen,
145                                  u8 new_master, u8 master);
146
147 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
148                                     struct dlm_lock_resource *res);
149 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
150                                       struct dlm_lock_resource *res);
151 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
152                                        struct dlm_lock_resource *res,
153                                        u8 target);
154 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
155                                        struct dlm_lock_resource *res);
156
157
158 int dlm_is_host_down(int errno)
159 {
160         switch (errno) {
161                 case -EBADF:
162                 case -ECONNREFUSED:
163                 case -ENOTCONN:
164                 case -ECONNRESET:
165                 case -EPIPE:
166                 case -EHOSTDOWN:
167                 case -EHOSTUNREACH:
168                 case -ETIMEDOUT:
169                 case -ECONNABORTED:
170                 case -ENETDOWN:
171                 case -ENETUNREACH:
172                 case -ENETRESET:
173                 case -ESHUTDOWN:
174                 case -ENOPROTOOPT:
175                 case -EINVAL:   /* if returned from our tcp code,
176                                    this means there is no socket */
177                         return 1;
178         }
179         return 0;
180 }
181
182
183 /*
184  * MASTER LIST FUNCTIONS
185  */
186
187
188 /*
189  * regarding master list entries and heartbeat callbacks:
190  *
191  * in order to avoid sleeping and allocation that occurs in
192  * heartbeat, master list entries are simply attached to the
193  * dlm's established heartbeat callbacks.  the mle is attached
194  * when it is created, and since the dlm->spinlock is held at
195  * that time, any heartbeat event will be properly discovered
196  * by the mle.  the mle needs to be detached from the
197  * dlm->mle_hb_events list as soon as heartbeat events are no
198  * longer useful to the mle, and before the mle is freed.
199  *
200  * as a general rule, heartbeat events are no longer needed by
201  * the mle once an "answer" regarding the lock master has been
202  * received.
203  */
204 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
205                                               struct dlm_master_list_entry *mle)
206 {
207         assert_spin_locked(&dlm->spinlock);
208
209         list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
210 }
211
212
213 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
214                                               struct dlm_master_list_entry *mle)
215 {
216         if (!list_empty(&mle->hb_events))
217                 list_del_init(&mle->hb_events);
218 }
219
220
221 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
222                                             struct dlm_master_list_entry *mle)
223 {
224         spin_lock(&dlm->spinlock);
225         __dlm_mle_detach_hb_events(dlm, mle);
226         spin_unlock(&dlm->spinlock);
227 }
228
229 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
230 {
231         struct dlm_ctxt *dlm;
232         dlm = mle->dlm;
233
234         assert_spin_locked(&dlm->spinlock);
235         assert_spin_locked(&dlm->master_lock);
236         mle->inuse++;
237         kref_get(&mle->mle_refs);
238 }
239
240 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
241 {
242         struct dlm_ctxt *dlm;
243         dlm = mle->dlm;
244
245         spin_lock(&dlm->spinlock);
246         spin_lock(&dlm->master_lock);
247         mle->inuse--;
248         __dlm_put_mle(mle);
249         spin_unlock(&dlm->master_lock);
250         spin_unlock(&dlm->spinlock);
251
252 }
253
254 /* remove from list and free */
255 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
256 {
257         struct dlm_ctxt *dlm;
258         dlm = mle->dlm;
259
260         assert_spin_locked(&dlm->spinlock);
261         assert_spin_locked(&dlm->master_lock);
262         if (!atomic_read(&mle->mle_refs.refcount)) {
263                 /* this may or may not crash, but who cares.
264                  * it's a BUG. */
265                 mlog(ML_ERROR, "bad mle: %p\n", mle);
266                 dlm_print_one_mle(mle);
267                 BUG();
268         } else
269                 kref_put(&mle->mle_refs, dlm_mle_release);
270 }
271
272
273 /* must not have any spinlocks coming in */
274 static void dlm_put_mle(struct dlm_master_list_entry *mle)
275 {
276         struct dlm_ctxt *dlm;
277         dlm = mle->dlm;
278
279         spin_lock(&dlm->spinlock);
280         spin_lock(&dlm->master_lock);
281         __dlm_put_mle(mle);
282         spin_unlock(&dlm->master_lock);
283         spin_unlock(&dlm->spinlock);
284 }
285
286 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
287 {
288         kref_get(&mle->mle_refs);
289 }
290
291 static void dlm_init_mle(struct dlm_master_list_entry *mle,
292                         enum dlm_mle_type type,
293                         struct dlm_ctxt *dlm,
294                         struct dlm_lock_resource *res,
295                         const char *name,
296                         unsigned int namelen)
297 {
298         assert_spin_locked(&dlm->spinlock);
299
300         mle->dlm = dlm;
301         mle->type = type;
302         INIT_HLIST_NODE(&mle->master_hash_node);
303         INIT_LIST_HEAD(&mle->hb_events);
304         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
305         spin_lock_init(&mle->spinlock);
306         init_waitqueue_head(&mle->wq);
307         atomic_set(&mle->woken, 0);
308         kref_init(&mle->mle_refs);
309         memset(mle->response_map, 0, sizeof(mle->response_map));
310         mle->master = O2NM_MAX_NODES;
311         mle->new_master = O2NM_MAX_NODES;
312         mle->inuse = 0;
313
314         BUG_ON(mle->type != DLM_MLE_BLOCK &&
315                mle->type != DLM_MLE_MASTER &&
316                mle->type != DLM_MLE_MIGRATION);
317
318         if (mle->type == DLM_MLE_MASTER) {
319                 BUG_ON(!res);
320                 mle->u.mleres = res;
321         } else {
322                 BUG_ON(!name);
323                 memcpy(mle->u.mlename.name, name, namelen);
324                 mle->u.mlename.len = namelen;
325                 mle->u.mlename.hash = dlm_lockid_hash(name, namelen);
326         }
327
328         atomic_inc(&dlm->mle_tot_count[mle->type]);
329         atomic_inc(&dlm->mle_cur_count[mle->type]);
330
331         /* copy off the node_map and register hb callbacks on our copy */
332         memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
333         memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
334         clear_bit(dlm->node_num, mle->vote_map);
335         clear_bit(dlm->node_num, mle->node_map);
336
337         /* attach the mle to the domain node up/down events */
338         __dlm_mle_attach_hb_events(dlm, mle);
339 }
340
341 void __dlm_unlink_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
342 {
343         assert_spin_locked(&dlm->spinlock);
344         assert_spin_locked(&dlm->master_lock);
345
346         if (!hlist_unhashed(&mle->master_hash_node))
347                 hlist_del_init(&mle->master_hash_node);
348 }
349
350 void __dlm_insert_mle(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle)
351 {
352         struct hlist_head *bucket;
353         unsigned char *mname;
354         unsigned int mlen, hash;
355
356         assert_spin_locked(&dlm->master_lock);
357
358         __dlm_mle_name(mle, &mname, &mlen, &hash);
359         bucket = dlm_master_hash(dlm, hash);
360         hlist_add_head(&mle->master_hash_node, bucket);
361 }
362
363 /* returns 1 if found, 0 if not */
364 static int dlm_find_mle(struct dlm_ctxt *dlm,
365                         struct dlm_master_list_entry **mle,
366                         char *name, unsigned int namelen)
367 {
368         struct dlm_master_list_entry *tmpmle;
369         struct hlist_head *bucket;
370         struct hlist_node *list;
371         unsigned int hash;
372
373         assert_spin_locked(&dlm->master_lock);
374
375         hash = dlm_lockid_hash(name, namelen);
376         bucket = dlm_master_hash(dlm, hash);
377         hlist_for_each(list, bucket) {
378                 tmpmle = hlist_entry(list, struct dlm_master_list_entry,
379                                      master_hash_node);
380                 if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
381                         continue;
382                 dlm_get_mle(tmpmle);
383                 *mle = tmpmle;
384                 return 1;
385         }
386         return 0;
387 }
388
389 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
390 {
391         struct dlm_master_list_entry *mle;
392
393         assert_spin_locked(&dlm->spinlock);
394         
395         list_for_each_entry(mle, &dlm->mle_hb_events, hb_events) {
396                 if (node_up)
397                         dlm_mle_node_up(dlm, mle, NULL, idx);
398                 else
399                         dlm_mle_node_down(dlm, mle, NULL, idx);
400         }
401 }
402
403 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
404                               struct dlm_master_list_entry *mle,
405                               struct o2nm_node *node, int idx)
406 {
407         spin_lock(&mle->spinlock);
408
409         if (!test_bit(idx, mle->node_map))
410                 mlog(0, "node %u already removed from nodemap!\n", idx);
411         else
412                 clear_bit(idx, mle->node_map);
413
414         spin_unlock(&mle->spinlock);
415 }
416
417 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
418                             struct dlm_master_list_entry *mle,
419                             struct o2nm_node *node, int idx)
420 {
421         spin_lock(&mle->spinlock);
422
423         if (test_bit(idx, mle->node_map))
424                 mlog(0, "node %u already in node map!\n", idx);
425         else
426                 set_bit(idx, mle->node_map);
427
428         spin_unlock(&mle->spinlock);
429 }
430
431
432 int dlm_init_mle_cache(void)
433 {
434         dlm_mle_cache = kmem_cache_create("o2dlm_mle",
435                                           sizeof(struct dlm_master_list_entry),
436                                           0, SLAB_HWCACHE_ALIGN,
437                                           NULL);
438         if (dlm_mle_cache == NULL)
439                 return -ENOMEM;
440         return 0;
441 }
442
443 void dlm_destroy_mle_cache(void)
444 {
445         if (dlm_mle_cache)
446                 kmem_cache_destroy(dlm_mle_cache);
447 }
448
449 static void dlm_mle_release(struct kref *kref)
450 {
451         struct dlm_master_list_entry *mle;
452         struct dlm_ctxt *dlm;
453         unsigned char *mname;
454         unsigned int mlen;
455
456         mlog_entry_void();
457
458         mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
459         dlm = mle->dlm;
460
461         assert_spin_locked(&dlm->spinlock);
462         assert_spin_locked(&dlm->master_lock);
463
464         __dlm_mle_name(mle, &mname, &mlen, NULL);
465         mlog(0, "Releasing mle for %.*s, type %d\n", mlen, mname, mle->type);
466
467         /* remove from list if not already */
468         __dlm_unlink_mle(dlm, mle);
469
470         /* detach the mle from the domain node up/down events */
471         __dlm_mle_detach_hb_events(dlm, mle);
472
473         atomic_dec(&dlm->mle_cur_count[mle->type]);
474
475         /* NOTE: kfree under spinlock here.
476          * if this is bad, we can move this to a freelist. */
477         kmem_cache_free(dlm_mle_cache, mle);
478 }
479
480
481 /*
482  * LOCK RESOURCE FUNCTIONS
483  */
484
485 int dlm_init_master_caches(void)
486 {
487         dlm_lockres_cache = kmem_cache_create("o2dlm_lockres",
488                                               sizeof(struct dlm_lock_resource),
489                                               0, SLAB_HWCACHE_ALIGN, NULL);
490         if (!dlm_lockres_cache)
491                 goto bail;
492
493         dlm_lockname_cache = kmem_cache_create("o2dlm_lockname",
494                                                DLM_LOCKID_NAME_MAX, 0,
495                                                SLAB_HWCACHE_ALIGN, NULL);
496         if (!dlm_lockname_cache)
497                 goto bail;
498
499         return 0;
500 bail:
501         dlm_destroy_master_caches();
502         return -ENOMEM;
503 }
504
505 void dlm_destroy_master_caches(void)
506 {
507         if (dlm_lockname_cache)
508                 kmem_cache_destroy(dlm_lockname_cache);
509
510         if (dlm_lockres_cache)
511                 kmem_cache_destroy(dlm_lockres_cache);
512 }
513
514 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
515                                   struct dlm_lock_resource *res,
516                                   u8 owner)
517 {
518         assert_spin_locked(&res->spinlock);
519
520         res->owner = owner;
521 }
522
523 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
524                               struct dlm_lock_resource *res, u8 owner)
525 {
526         assert_spin_locked(&res->spinlock);
527
528         if (owner != res->owner)
529                 dlm_set_lockres_owner(dlm, res, owner);
530 }
531
532
533 static void dlm_lockres_release(struct kref *kref)
534 {
535         struct dlm_lock_resource *res;
536         struct dlm_ctxt *dlm;
537
538         res = container_of(kref, struct dlm_lock_resource, refs);
539         dlm = res->dlm;
540
541         /* This should not happen -- all lockres' have a name
542          * associated with them at init time. */
543         BUG_ON(!res->lockname.name);
544
545         mlog(0, "destroying lockres %.*s\n", res->lockname.len,
546              res->lockname.name);
547
548         spin_lock(&dlm->track_lock);
549         if (!list_empty(&res->tracking))
550                 list_del_init(&res->tracking);
551         else {
552                 mlog(ML_ERROR, "Resource %.*s not on the Tracking list\n",
553                      res->lockname.len, res->lockname.name);
554                 dlm_print_one_lock_resource(res);
555         }
556         spin_unlock(&dlm->track_lock);
557
558         atomic_dec(&dlm->res_cur_count);
559
560         dlm_put(dlm);
561
562         if (!hlist_unhashed(&res->hash_node) ||
563             !list_empty(&res->granted) ||
564             !list_empty(&res->converting) ||
565             !list_empty(&res->blocked) ||
566             !list_empty(&res->dirty) ||
567             !list_empty(&res->recovering) ||
568             !list_empty(&res->purge)) {
569                 mlog(ML_ERROR,
570                      "Going to BUG for resource %.*s."
571                      "  We're on a list! [%c%c%c%c%c%c%c]\n",
572                      res->lockname.len, res->lockname.name,
573                      !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
574                      !list_empty(&res->granted) ? 'G' : ' ',
575                      !list_empty(&res->converting) ? 'C' : ' ',
576                      !list_empty(&res->blocked) ? 'B' : ' ',
577                      !list_empty(&res->dirty) ? 'D' : ' ',
578                      !list_empty(&res->recovering) ? 'R' : ' ',
579                      !list_empty(&res->purge) ? 'P' : ' ');
580
581                 dlm_print_one_lock_resource(res);
582         }
583
584         /* By the time we're ready to blow this guy away, we shouldn't
585          * be on any lists. */
586         BUG_ON(!hlist_unhashed(&res->hash_node));
587         BUG_ON(!list_empty(&res->granted));
588         BUG_ON(!list_empty(&res->converting));
589         BUG_ON(!list_empty(&res->blocked));
590         BUG_ON(!list_empty(&res->dirty));
591         BUG_ON(!list_empty(&res->recovering));
592         BUG_ON(!list_empty(&res->purge));
593
594         kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
595
596         kmem_cache_free(dlm_lockres_cache, res);
597 }
598
599 void dlm_lockres_put(struct dlm_lock_resource *res)
600 {
601         kref_put(&res->refs, dlm_lockres_release);
602 }
603
604 static void dlm_init_lockres(struct dlm_ctxt *dlm,
605                              struct dlm_lock_resource *res,
606                              const char *name, unsigned int namelen)
607 {
608         char *qname;
609
610         /* If we memset here, we lose our reference to the kmalloc'd
611          * res->lockname.name, so be sure to init every field
612          * correctly! */
613
614         qname = (char *) res->lockname.name;
615         memcpy(qname, name, namelen);
616
617         res->lockname.len = namelen;
618         res->lockname.hash = dlm_lockid_hash(name, namelen);
619
620         init_waitqueue_head(&res->wq);
621         spin_lock_init(&res->spinlock);
622         INIT_HLIST_NODE(&res->hash_node);
623         INIT_LIST_HEAD(&res->granted);
624         INIT_LIST_HEAD(&res->converting);
625         INIT_LIST_HEAD(&res->blocked);
626         INIT_LIST_HEAD(&res->dirty);
627         INIT_LIST_HEAD(&res->recovering);
628         INIT_LIST_HEAD(&res->purge);
629         INIT_LIST_HEAD(&res->tracking);
630         atomic_set(&res->asts_reserved, 0);
631         res->migration_pending = 0;
632         res->inflight_locks = 0;
633
634         /* put in dlm_lockres_release */
635         dlm_grab(dlm);
636         res->dlm = dlm;
637
638         kref_init(&res->refs);
639
640         atomic_inc(&dlm->res_tot_count);
641         atomic_inc(&dlm->res_cur_count);
642
643         /* just for consistency */
644         spin_lock(&res->spinlock);
645         dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
646         spin_unlock(&res->spinlock);
647
648         res->state = DLM_LOCK_RES_IN_PROGRESS;
649
650         res->last_used = 0;
651
652         spin_lock(&dlm->spinlock);
653         list_add_tail(&res->tracking, &dlm->tracking_list);
654         spin_unlock(&dlm->spinlock);
655
656         memset(res->lvb, 0, DLM_LVB_LEN);
657         memset(res->refmap, 0, sizeof(res->refmap));
658 }
659
660 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
661                                    const char *name,
662                                    unsigned int namelen)
663 {
664         struct dlm_lock_resource *res = NULL;
665
666         res = (struct dlm_lock_resource *)
667                                 kmem_cache_zalloc(dlm_lockres_cache, GFP_NOFS);
668         if (!res)
669                 goto error;
670
671         res->lockname.name = (char *)
672                                 kmem_cache_zalloc(dlm_lockname_cache, GFP_NOFS);
673         if (!res->lockname.name)
674                 goto error;
675
676         dlm_init_lockres(dlm, res, name, namelen);
677         return res;
678
679 error:
680         if (res && res->lockname.name)
681                 kmem_cache_free(dlm_lockname_cache, (void *)res->lockname.name);
682
683         if (res)
684                 kmem_cache_free(dlm_lockres_cache, res);
685         return NULL;
686 }
687
688 void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
689                                    struct dlm_lock_resource *res,
690                                    int new_lockres,
691                                    const char *file,
692                                    int line)
693 {
694         if (!new_lockres)
695                 assert_spin_locked(&res->spinlock);
696
697         if (!test_bit(dlm->node_num, res->refmap)) {
698                 BUG_ON(res->inflight_locks != 0);
699                 dlm_lockres_set_refmap_bit(dlm->node_num, res);
700         }
701         res->inflight_locks++;
702         mlog(0, "%s:%.*s: inflight++: now %u\n",
703              dlm->name, res->lockname.len, res->lockname.name,
704              res->inflight_locks);
705 }
706
707 void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
708                                    struct dlm_lock_resource *res,
709                                    const char *file,
710                                    int line)
711 {
712         assert_spin_locked(&res->spinlock);
713
714         BUG_ON(res->inflight_locks == 0);
715         res->inflight_locks--;
716         mlog(0, "%s:%.*s: inflight--: now %u\n",
717              dlm->name, res->lockname.len, res->lockname.name,
718              res->inflight_locks);
719         if (res->inflight_locks == 0)
720                 dlm_lockres_clear_refmap_bit(dlm->node_num, res);
721         wake_up(&res->wq);
722 }
723
724 /*
725  * lookup a lock resource by name.
726  * may already exist in the hashtable.
727  * lockid is null terminated
728  *
729  * if not, allocate enough for the lockres and for
730  * the temporary structure used in doing the mastering.
731  *
732  * also, do a lookup in the dlm->master_list to see
733  * if another node has begun mastering the same lock.
734  * if so, there should be a block entry in there
735  * for this name, and we should *not* attempt to master
736  * the lock here.   need to wait around for that node
737  * to assert_master (or die).
738  *
739  */
740 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
741                                           const char *lockid,
742                                           int namelen,
743                                           int flags)
744 {
745         struct dlm_lock_resource *tmpres=NULL, *res=NULL;
746         struct dlm_master_list_entry *mle = NULL;
747         struct dlm_master_list_entry *alloc_mle = NULL;
748         int blocked = 0;
749         int ret, nodenum;
750         struct dlm_node_iter iter;
751         unsigned int hash;
752         int tries = 0;
753         int bit, wait_on_recovery = 0;
754         int drop_inflight_if_nonlocal = 0;
755
756         BUG_ON(!lockid);
757
758         hash = dlm_lockid_hash(lockid, namelen);
759
760         mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
761
762 lookup:
763         spin_lock(&dlm->spinlock);
764         tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
765         if (tmpres) {
766                 int dropping_ref = 0;
767
768                 spin_unlock(&dlm->spinlock);
769
770                 spin_lock(&tmpres->spinlock);
771                 /* We wait for the other thread that is mastering the resource */
772                 if (tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
773                         __dlm_wait_on_lockres(tmpres);
774                         BUG_ON(tmpres->owner == DLM_LOCK_RES_OWNER_UNKNOWN);
775                 }
776
777                 if (tmpres->owner == dlm->node_num) {
778                         BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
779                         dlm_lockres_grab_inflight_ref(dlm, tmpres);
780                 } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
781                         dropping_ref = 1;
782                 spin_unlock(&tmpres->spinlock);
783
784                 /* wait until done messaging the master, drop our ref to allow
785                  * the lockres to be purged, start over. */
786                 if (dropping_ref) {
787                         spin_lock(&tmpres->spinlock);
788                         __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
789                         spin_unlock(&tmpres->spinlock);
790                         dlm_lockres_put(tmpres);
791                         tmpres = NULL;
792                         goto lookup;
793                 }
794
795                 mlog(0, "found in hash!\n");
796                 if (res)
797                         dlm_lockres_put(res);
798                 res = tmpres;
799                 goto leave;
800         }
801
802         if (!res) {
803                 spin_unlock(&dlm->spinlock);
804                 mlog(0, "allocating a new resource\n");
805                 /* nothing found and we need to allocate one. */
806                 alloc_mle = (struct dlm_master_list_entry *)
807                         kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
808                 if (!alloc_mle)
809                         goto leave;
810                 res = dlm_new_lockres(dlm, lockid, namelen);
811                 if (!res)
812                         goto leave;
813                 goto lookup;
814         }
815
816         mlog(0, "no lockres found, allocated our own: %p\n", res);
817
818         if (flags & LKM_LOCAL) {
819                 /* caller knows it's safe to assume it's not mastered elsewhere
820                  * DONE!  return right away */
821                 spin_lock(&res->spinlock);
822                 dlm_change_lockres_owner(dlm, res, dlm->node_num);
823                 __dlm_insert_lockres(dlm, res);
824                 dlm_lockres_grab_inflight_ref(dlm, res);
825                 spin_unlock(&res->spinlock);
826                 spin_unlock(&dlm->spinlock);
827                 /* lockres still marked IN_PROGRESS */
828                 goto wake_waiters;
829         }
830
831         /* check master list to see if another node has started mastering it */
832         spin_lock(&dlm->master_lock);
833
834         /* if we found a block, wait for lock to be mastered by another node */
835         blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
836         if (blocked) {
837                 int mig;
838                 if (mle->type == DLM_MLE_MASTER) {
839                         mlog(ML_ERROR, "master entry for nonexistent lock!\n");
840                         BUG();
841                 }
842                 mig = (mle->type == DLM_MLE_MIGRATION);
843                 /* if there is a migration in progress, let the migration
844                  * finish before continuing.  we can wait for the absence
845                  * of the MIGRATION mle: either the migrate finished or
846                  * one of the nodes died and the mle was cleaned up.
847                  * if there is a BLOCK here, but it already has a master
848                  * set, we are too late.  the master does not have a ref
849                  * for us in the refmap.  detach the mle and drop it.
850                  * either way, go back to the top and start over. */
851                 if (mig || mle->master != O2NM_MAX_NODES) {
852                         BUG_ON(mig && mle->master == dlm->node_num);
853                         /* we arrived too late.  the master does not
854                          * have a ref for us. retry. */
855                         mlog(0, "%s:%.*s: late on %s\n",
856                              dlm->name, namelen, lockid,
857                              mig ?  "MIGRATION" : "BLOCK");
858                         spin_unlock(&dlm->master_lock);
859                         spin_unlock(&dlm->spinlock);
860
861                         /* master is known, detach */
862                         if (!mig)
863                                 dlm_mle_detach_hb_events(dlm, mle);
864                         dlm_put_mle(mle);
865                         mle = NULL;
866                         /* this is lame, but we cant wait on either
867                          * the mle or lockres waitqueue here */
868                         if (mig)
869                                 msleep(100);
870                         goto lookup;
871                 }
872         } else {
873                 /* go ahead and try to master lock on this node */
874                 mle = alloc_mle;
875                 /* make sure this does not get freed below */
876                 alloc_mle = NULL;
877                 dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
878                 set_bit(dlm->node_num, mle->maybe_map);
879                 __dlm_insert_mle(dlm, mle);
880
881                 /* still holding the dlm spinlock, check the recovery map
882                  * to see if there are any nodes that still need to be 
883                  * considered.  these will not appear in the mle nodemap
884                  * but they might own this lockres.  wait on them. */
885                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
886                 if (bit < O2NM_MAX_NODES) {
887                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
888                              "recover before lock mastery can begin\n",
889                              dlm->name, namelen, (char *)lockid, bit);
890                         wait_on_recovery = 1;
891                 }
892         }
893
894         /* at this point there is either a DLM_MLE_BLOCK or a
895          * DLM_MLE_MASTER on the master list, so it's safe to add the
896          * lockres to the hashtable.  anyone who finds the lock will
897          * still have to wait on the IN_PROGRESS. */
898
899         /* finally add the lockres to its hash bucket */
900         __dlm_insert_lockres(dlm, res);
901         /* since this lockres is new it doesnt not require the spinlock */
902         dlm_lockres_grab_inflight_ref_new(dlm, res);
903
904         /* if this node does not become the master make sure to drop
905          * this inflight reference below */
906         drop_inflight_if_nonlocal = 1;
907
908         /* get an extra ref on the mle in case this is a BLOCK
909          * if so, the creator of the BLOCK may try to put the last
910          * ref at this time in the assert master handler, so we
911          * need an extra one to keep from a bad ptr deref. */
912         dlm_get_mle_inuse(mle);
913         spin_unlock(&dlm->master_lock);
914         spin_unlock(&dlm->spinlock);
915
916 redo_request:
917         while (wait_on_recovery) {
918                 /* any cluster changes that occurred after dropping the
919                  * dlm spinlock would be detectable be a change on the mle,
920                  * so we only need to clear out the recovery map once. */
921                 if (dlm_is_recovery_lock(lockid, namelen)) {
922                         mlog(ML_NOTICE, "%s: recovery map is not empty, but "
923                              "must master $RECOVERY lock now\n", dlm->name);
924                         if (!dlm_pre_master_reco_lockres(dlm, res))
925                                 wait_on_recovery = 0;
926                         else {
927                                 mlog(0, "%s: waiting 500ms for heartbeat state "
928                                     "change\n", dlm->name);
929                                 msleep(500);
930                         }
931                         continue;
932                 } 
933
934                 dlm_kick_recovery_thread(dlm);
935                 msleep(1000);
936                 dlm_wait_for_recovery(dlm);
937
938                 spin_lock(&dlm->spinlock);
939                 bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
940                 if (bit < O2NM_MAX_NODES) {
941                         mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "
942                              "recover before lock mastery can begin\n",
943                              dlm->name, namelen, (char *)lockid, bit);
944                         wait_on_recovery = 1;
945                 } else
946                         wait_on_recovery = 0;
947                 spin_unlock(&dlm->spinlock);
948
949                 if (wait_on_recovery)
950                         dlm_wait_for_node_recovery(dlm, bit, 10000);
951         }
952
953         /* must wait for lock to be mastered elsewhere */
954         if (blocked)
955                 goto wait;
956
957         ret = -EINVAL;
958         dlm_node_iter_init(mle->vote_map, &iter);
959         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
960                 ret = dlm_do_master_request(res, mle, nodenum);
961                 if (ret < 0)
962                         mlog_errno(ret);
963                 if (mle->master != O2NM_MAX_NODES) {
964                         /* found a master ! */
965                         if (mle->master <= nodenum)
966                                 break;
967                         /* if our master request has not reached the master
968                          * yet, keep going until it does.  this is how the
969                          * master will know that asserts are needed back to
970                          * the lower nodes. */
971                         mlog(0, "%s:%.*s: requests only up to %u but master "
972                              "is %u, keep going\n", dlm->name, namelen,
973                              lockid, nodenum, mle->master);
974                 }
975         }
976
977 wait:
978         /* keep going until the response map includes all nodes */
979         ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
980         if (ret < 0) {
981                 wait_on_recovery = 1;
982                 mlog(0, "%s:%.*s: node map changed, redo the "
983                      "master request now, blocked=%d\n",
984                      dlm->name, res->lockname.len,
985                      res->lockname.name, blocked);
986                 if (++tries > 20) {
987                         mlog(ML_ERROR, "%s:%.*s: spinning on "
988                              "dlm_wait_for_lock_mastery, blocked=%d\n", 
989                              dlm->name, res->lockname.len, 
990                              res->lockname.name, blocked);
991                         dlm_print_one_lock_resource(res);
992                         dlm_print_one_mle(mle);
993                         tries = 0;
994                 }
995                 goto redo_request;
996         }
997
998         mlog(0, "lockres mastered by %u\n", res->owner);
999         /* make sure we never continue without this */
1000         BUG_ON(res->owner == O2NM_MAX_NODES);
1001
1002         /* master is known, detach if not already detached */
1003         dlm_mle_detach_hb_events(dlm, mle);
1004         dlm_put_mle(mle);
1005         /* put the extra ref */
1006         dlm_put_mle_inuse(mle);
1007
1008 wake_waiters:
1009         spin_lock(&res->spinlock);
1010         if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
1011                 dlm_lockres_drop_inflight_ref(dlm, res);
1012         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
1013         spin_unlock(&res->spinlock);
1014         wake_up(&res->wq);
1015
1016 leave:
1017         /* need to free the unused mle */
1018         if (alloc_mle)
1019                 kmem_cache_free(dlm_mle_cache, alloc_mle);
1020
1021         return res;
1022 }
1023
1024
1025 #define DLM_MASTERY_TIMEOUT_MS   5000
1026
1027 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
1028                                      struct dlm_lock_resource *res,
1029                                      struct dlm_master_list_entry *mle,
1030                                      int *blocked)
1031 {
1032         u8 m;
1033         int ret, bit;
1034         int map_changed, voting_done;
1035         int assert, sleep;
1036
1037 recheck:
1038         ret = 0;
1039         assert = 0;
1040
1041         /* check if another node has already become the owner */
1042         spin_lock(&res->spinlock);
1043         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1044                 mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
1045                      res->lockname.len, res->lockname.name, res->owner);
1046                 spin_unlock(&res->spinlock);
1047                 /* this will cause the master to re-assert across
1048                  * the whole cluster, freeing up mles */
1049                 if (res->owner != dlm->node_num) {
1050                         ret = dlm_do_master_request(res, mle, res->owner);
1051                         if (ret < 0) {
1052                                 /* give recovery a chance to run */
1053                                 mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1054                                 msleep(500);
1055                                 goto recheck;
1056                         }
1057                 }
1058                 ret = 0;
1059                 goto leave;
1060         }
1061         spin_unlock(&res->spinlock);
1062
1063         spin_lock(&mle->spinlock);
1064         m = mle->master;
1065         map_changed = (memcmp(mle->vote_map, mle->node_map,
1066                               sizeof(mle->vote_map)) != 0);
1067         voting_done = (memcmp(mle->vote_map, mle->response_map,
1068                              sizeof(mle->vote_map)) == 0);
1069
1070         /* restart if we hit any errors */
1071         if (map_changed) {
1072                 int b;
1073                 mlog(0, "%s: %.*s: node map changed, restarting\n",
1074                      dlm->name, res->lockname.len, res->lockname.name);
1075                 ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1076                 b = (mle->type == DLM_MLE_BLOCK);
1077                 if ((*blocked && !b) || (!*blocked && b)) {
1078                         mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 
1079                              dlm->name, res->lockname.len, res->lockname.name,
1080                              *blocked, b);
1081                         *blocked = b;
1082                 }
1083                 spin_unlock(&mle->spinlock);
1084                 if (ret < 0) {
1085                         mlog_errno(ret);
1086                         goto leave;
1087                 }
1088                 mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1089                      "rechecking now\n", dlm->name, res->lockname.len,
1090                      res->lockname.name);
1091                 goto recheck;
1092         } else {
1093                 if (!voting_done) {
1094                         mlog(0, "map not changed and voting not done "
1095                              "for %s:%.*s\n", dlm->name, res->lockname.len,
1096                              res->lockname.name);
1097                 }
1098         }
1099
1100         if (m != O2NM_MAX_NODES) {
1101                 /* another node has done an assert!
1102                  * all done! */
1103                 sleep = 0;
1104         } else {
1105                 sleep = 1;
1106                 /* have all nodes responded? */
1107                 if (voting_done && !*blocked) {
1108                         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1109                         if (dlm->node_num <= bit) {
1110                                 /* my node number is lowest.
1111                                  * now tell other nodes that I am
1112                                  * mastering this. */
1113                                 mle->master = dlm->node_num;
1114                                 /* ref was grabbed in get_lock_resource
1115                                  * will be dropped in dlmlock_master */
1116                                 assert = 1;
1117                                 sleep = 0;
1118                         }
1119                         /* if voting is done, but we have not received
1120                          * an assert master yet, we must sleep */
1121                 }
1122         }
1123
1124         spin_unlock(&mle->spinlock);
1125
1126         /* sleep if we haven't finished voting yet */
1127         if (sleep) {
1128                 unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1129
1130                 /*
1131                 if (atomic_read(&mle->mle_refs.refcount) < 2)
1132                         mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1133                         atomic_read(&mle->mle_refs.refcount),
1134                         res->lockname.len, res->lockname.name);
1135                 */
1136                 atomic_set(&mle->woken, 0);
1137                 (void)wait_event_timeout(mle->wq,
1138                                          (atomic_read(&mle->woken) == 1),
1139                                          timeo);
1140                 if (res->owner == O2NM_MAX_NODES) {
1141                         mlog(0, "%s:%.*s: waiting again\n", dlm->name,
1142                              res->lockname.len, res->lockname.name);
1143                         goto recheck;
1144                 }
1145                 mlog(0, "done waiting, master is %u\n", res->owner);
1146                 ret = 0;
1147                 goto leave;
1148         }
1149
1150         ret = 0;   /* done */
1151         if (assert) {
1152                 m = dlm->node_num;
1153                 mlog(0, "about to master %.*s here, this=%u\n",
1154                      res->lockname.len, res->lockname.name, m);
1155                 ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
1156                 if (ret) {
1157                         /* This is a failure in the network path,
1158                          * not in the response to the assert_master
1159                          * (any nonzero response is a BUG on this node).
1160                          * Most likely a socket just got disconnected
1161                          * due to node death. */
1162                         mlog_errno(ret);
1163                 }
1164                 /* no longer need to restart lock mastery.
1165                  * all living nodes have been contacted. */
1166                 ret = 0;
1167         }
1168
1169         /* set the lockres owner */
1170         spin_lock(&res->spinlock);
1171         /* mastery reference obtained either during
1172          * assert_master_handler or in get_lock_resource */
1173         dlm_change_lockres_owner(dlm, res, m);
1174         spin_unlock(&res->spinlock);
1175
1176 leave:
1177         return ret;
1178 }
1179
1180 struct dlm_bitmap_diff_iter
1181 {
1182         int curnode;
1183         unsigned long *orig_bm;
1184         unsigned long *cur_bm;
1185         unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1186 };
1187
1188 enum dlm_node_state_change
1189 {
1190         NODE_DOWN = -1,
1191         NODE_NO_CHANGE = 0,
1192         NODE_UP
1193 };
1194
1195 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1196                                       unsigned long *orig_bm,
1197                                       unsigned long *cur_bm)
1198 {
1199         unsigned long p1, p2;
1200         int i;
1201
1202         iter->curnode = -1;
1203         iter->orig_bm = orig_bm;
1204         iter->cur_bm = cur_bm;
1205
1206         for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1207                 p1 = *(iter->orig_bm + i);
1208                 p2 = *(iter->cur_bm + i);
1209                 iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1210         }
1211 }
1212
1213 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1214                                      enum dlm_node_state_change *state)
1215 {
1216         int bit;
1217
1218         if (iter->curnode >= O2NM_MAX_NODES)
1219                 return -ENOENT;
1220
1221         bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1222                             iter->curnode+1);
1223         if (bit >= O2NM_MAX_NODES) {
1224                 iter->curnode = O2NM_MAX_NODES;
1225                 return -ENOENT;
1226         }
1227
1228         /* if it was there in the original then this node died */
1229         if (test_bit(bit, iter->orig_bm))
1230                 *state = NODE_DOWN;
1231         else
1232                 *state = NODE_UP;
1233
1234         iter->curnode = bit;
1235         return bit;
1236 }
1237
1238
1239 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1240                                     struct dlm_lock_resource *res,
1241                                     struct dlm_master_list_entry *mle,
1242                                     int blocked)
1243 {
1244         struct dlm_bitmap_diff_iter bdi;
1245         enum dlm_node_state_change sc;
1246         int node;
1247         int ret = 0;
1248
1249         mlog(0, "something happened such that the "
1250              "master process may need to be restarted!\n");
1251
1252         assert_spin_locked(&mle->spinlock);
1253
1254         dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1255         node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1256         while (node >= 0) {
1257                 if (sc == NODE_UP) {
1258                         /* a node came up.  clear any old vote from
1259                          * the response map and set it in the vote map
1260                          * then restart the mastery. */
1261                         mlog(ML_NOTICE, "node %d up while restarting\n", node);
1262
1263                         /* redo the master request, but only for the new node */
1264                         mlog(0, "sending request to new node\n");
1265                         clear_bit(node, mle->response_map);
1266                         set_bit(node, mle->vote_map);
1267                 } else {
1268                         mlog(ML_ERROR, "node down! %d\n", node);
1269                         if (blocked) {
1270                                 int lowest = find_next_bit(mle->maybe_map,
1271                                                        O2NM_MAX_NODES, 0);
1272
1273                                 /* act like it was never there */
1274                                 clear_bit(node, mle->maybe_map);
1275
1276                                 if (node == lowest) {
1277                                         mlog(0, "expected master %u died"
1278                                             " while this node was blocked "
1279                                             "waiting on it!\n", node);
1280                                         lowest = find_next_bit(mle->maybe_map,
1281                                                         O2NM_MAX_NODES,
1282                                                         lowest+1);
1283                                         if (lowest < O2NM_MAX_NODES) {
1284                                                 mlog(0, "%s:%.*s:still "
1285                                                      "blocked. waiting on %u "
1286                                                      "now\n", dlm->name,
1287                                                      res->lockname.len,
1288                                                      res->lockname.name,
1289                                                      lowest);
1290                                         } else {
1291                                                 /* mle is an MLE_BLOCK, but
1292                                                  * there is now nothing left to
1293                                                  * block on.  we need to return
1294                                                  * all the way back out and try
1295                                                  * again with an MLE_MASTER.
1296                                                  * dlm_do_local_recovery_cleanup
1297                                                  * has already run, so the mle
1298                                                  * refcount is ok */
1299                                                 mlog(0, "%s:%.*s: no "
1300                                                      "longer blocking. try to "
1301                                                      "master this here\n",
1302                                                      dlm->name,
1303                                                      res->lockname.len,
1304                                                      res->lockname.name);
1305                                                 mle->type = DLM_MLE_MASTER;
1306                                                 mle->u.mleres = res;
1307                                         }
1308                                 }
1309                         }
1310
1311                         /* now blank out everything, as if we had never
1312                          * contacted anyone */
1313                         memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1314                         memset(mle->response_map, 0, sizeof(mle->response_map));
1315                         /* reset the vote_map to the current node_map */
1316                         memcpy(mle->vote_map, mle->node_map,
1317                                sizeof(mle->node_map));
1318                         /* put myself into the maybe map */
1319                         if (mle->type != DLM_MLE_BLOCK)
1320                                 set_bit(dlm->node_num, mle->maybe_map);
1321                 }
1322                 ret = -EAGAIN;
1323                 node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1324         }
1325         return ret;
1326 }
1327
1328
1329 /*
1330  * DLM_MASTER_REQUEST_MSG
1331  *
1332  * returns: 0 on success,
1333  *          -errno on a network error
1334  *
1335  * on error, the caller should assume the target node is "dead"
1336  *
1337  */
1338
1339 static int dlm_do_master_request(struct dlm_lock_resource *res,
1340                                  struct dlm_master_list_entry *mle, int to)
1341 {
1342         struct dlm_ctxt *dlm = mle->dlm;
1343         struct dlm_master_request request;
1344         int ret, response=0, resend;
1345         unsigned char *mlename;
1346         unsigned int mlenamelen;
1347
1348         memset(&request, 0, sizeof(request));
1349         request.node_idx = dlm->node_num;
1350
1351         BUG_ON(mle->type == DLM_MLE_MIGRATION);
1352
1353         __dlm_mle_name(mle, &mlename, &mlenamelen, NULL);
1354
1355         request.namelen = (u8)mlenamelen;
1356         memcpy(request.name, mlename, request.namelen);
1357
1358 again:
1359         ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1360                                  sizeof(request), to, &response);
1361         if (ret < 0)  {
1362                 if (ret == -ESRCH) {
1363                         /* should never happen */
1364                         mlog(ML_ERROR, "TCP stack not ready!\n");
1365                         BUG();
1366                 } else if (ret == -EINVAL) {
1367                         mlog(ML_ERROR, "bad args passed to o2net!\n");
1368                         BUG();
1369                 } else if (ret == -ENOMEM) {
1370                         mlog(ML_ERROR, "out of memory while trying to send "
1371                              "network message!  retrying\n");
1372                         /* this is totally crude */
1373                         msleep(50);
1374                         goto again;
1375                 } else if (!dlm_is_host_down(ret)) {
1376                         /* not a network error. bad. */
1377                         mlog_errno(ret);
1378                         mlog(ML_ERROR, "unhandled error!");
1379                         BUG();
1380                 }
1381                 /* all other errors should be network errors,
1382                  * and likely indicate node death */
1383                 mlog(ML_ERROR, "link to %d went down!\n", to);
1384                 goto out;
1385         }
1386
1387         ret = 0;
1388         resend = 0;
1389         spin_lock(&mle->spinlock);
1390         switch (response) {
1391                 case DLM_MASTER_RESP_YES:
1392                         set_bit(to, mle->response_map);
1393                         mlog(0, "node %u is the master, response=YES\n", to);
1394                         mlog(0, "%s:%.*s: master node %u now knows I have a "
1395                              "reference\n", dlm->name, res->lockname.len,
1396                              res->lockname.name, to);
1397                         mle->master = to;
1398                         break;
1399                 case DLM_MASTER_RESP_NO:
1400                         mlog(0, "node %u not master, response=NO\n", to);
1401                         set_bit(to, mle->response_map);
1402                         break;
1403                 case DLM_MASTER_RESP_MAYBE:
1404                         mlog(0, "node %u not master, response=MAYBE\n", to);
1405                         set_bit(to, mle->response_map);
1406                         set_bit(to, mle->maybe_map);
1407                         break;
1408                 case DLM_MASTER_RESP_ERROR:
1409                         mlog(0, "node %u hit an error, resending\n", to);
1410                         resend = 1;
1411                         response = 0;
1412                         break;
1413                 default:
1414                         mlog(ML_ERROR, "bad response! %u\n", response);
1415                         BUG();
1416         }
1417         spin_unlock(&mle->spinlock);
1418         if (resend) {
1419                 /* this is also totally crude */
1420                 msleep(50);
1421                 goto again;
1422         }
1423
1424 out:
1425         return ret;
1426 }
1427
1428 /*
1429  * locks that can be taken here:
1430  * dlm->spinlock
1431  * res->spinlock
1432  * mle->spinlock
1433  * dlm->master_list
1434  *
1435  * if possible, TRIM THIS DOWN!!!
1436  */
1437 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,
1438                                void **ret_data)
1439 {
1440         u8 response = DLM_MASTER_RESP_MAYBE;
1441         struct dlm_ctxt *dlm = data;
1442         struct dlm_lock_resource *res = NULL;
1443         struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1444         struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1445         char *name;
1446         unsigned int namelen, hash;
1447         int found, ret;
1448         int set_maybe;
1449         int dispatch_assert = 0;
1450
1451         if (!dlm_grab(dlm))
1452                 return DLM_MASTER_RESP_NO;
1453
1454         if (!dlm_domain_fully_joined(dlm)) {
1455                 response = DLM_MASTER_RESP_NO;
1456                 goto send_response;
1457         }
1458
1459         name = request->name;
1460         namelen = request->namelen;
1461         hash = dlm_lockid_hash(name, namelen);
1462
1463         if (namelen > DLM_LOCKID_NAME_MAX) {
1464                 response = DLM_IVBUFLEN;
1465                 goto send_response;
1466         }
1467
1468 way_up_top:
1469         spin_lock(&dlm->spinlock);
1470         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1471         if (res) {
1472                 spin_unlock(&dlm->spinlock);
1473
1474                 /* take care of the easy cases up front */
1475                 spin_lock(&res->spinlock);
1476                 if (res->state & (DLM_LOCK_RES_RECOVERING|
1477                                   DLM_LOCK_RES_MIGRATING)) {
1478                         spin_unlock(&res->spinlock);
1479                         mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1480                              "being recovered/migrated\n");
1481                         response = DLM_MASTER_RESP_ERROR;
1482                         if (mle)
1483                                 kmem_cache_free(dlm_mle_cache, mle);
1484                         goto send_response;
1485                 }
1486
1487                 if (res->owner == dlm->node_num) {
1488                         mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1489                              dlm->name, namelen, name, request->node_idx);
1490                         dlm_lockres_set_refmap_bit(request->node_idx, res);
1491                         spin_unlock(&res->spinlock);
1492                         response = DLM_MASTER_RESP_YES;
1493                         if (mle)
1494                                 kmem_cache_free(dlm_mle_cache, mle);
1495
1496                         /* this node is the owner.
1497                          * there is some extra work that needs to
1498                          * happen now.  the requesting node has
1499                          * caused all nodes up to this one to
1500                          * create mles.  this node now needs to
1501                          * go back and clean those up. */
1502                         dispatch_assert = 1;
1503                         goto send_response;
1504                 } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1505                         spin_unlock(&res->spinlock);
1506                         // mlog(0, "node %u is the master\n", res->owner);
1507                         response = DLM_MASTER_RESP_NO;
1508                         if (mle)
1509                                 kmem_cache_free(dlm_mle_cache, mle);
1510                         goto send_response;
1511                 }
1512
1513                 /* ok, there is no owner.  either this node is
1514                  * being blocked, or it is actively trying to
1515                  * master this lock. */
1516                 if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1517                         mlog(ML_ERROR, "lock with no owner should be "
1518                              "in-progress!\n");
1519                         BUG();
1520                 }
1521
1522                 // mlog(0, "lockres is in progress...\n");
1523                 spin_lock(&dlm->master_lock);
1524                 found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1525                 if (!found) {
1526                         mlog(ML_ERROR, "no mle found for this lock!\n");
1527                         BUG();
1528                 }
1529                 set_maybe = 1;
1530                 spin_lock(&tmpmle->spinlock);
1531                 if (tmpmle->type == DLM_MLE_BLOCK) {
1532                         // mlog(0, "this node is waiting for "
1533                         // "lockres to be mastered\n");
1534                         response = DLM_MASTER_RESP_NO;
1535                 } else if (tmpmle->type == DLM_MLE_MIGRATION) {
1536                         mlog(0, "node %u is master, but trying to migrate to "
1537                              "node %u.\n", tmpmle->master, tmpmle->new_master);
1538                         if (tmpmle->master == dlm->node_num) {
1539                                 mlog(ML_ERROR, "no owner on lockres, but this "
1540                                      "node is trying to migrate it to %u?!\n",
1541                                      tmpmle->new_master);
1542                                 BUG();
1543                         } else {
1544                                 /* the real master can respond on its own */
1545                                 response = DLM_MASTER_RESP_NO;
1546                         }
1547                 } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1548                         set_maybe = 0;
1549                         if (tmpmle->master == dlm->node_num) {
1550                                 response = DLM_MASTER_RESP_YES;
1551                                 /* this node will be the owner.
1552                                  * go back and clean the mles on any
1553                                  * other nodes */
1554                                 dispatch_assert = 1;
1555                                 dlm_lockres_set_refmap_bit(request->node_idx, res);
1556                                 mlog(0, "%s:%.*s: setting bit %u in refmap\n",
1557                                      dlm->name, namelen, name,
1558                                      request->node_idx);
1559                         } else
1560                                 response = DLM_MASTER_RESP_NO;
1561                 } else {
1562                         // mlog(0, "this node is attempting to "
1563                         // "master lockres\n");
1564                         response = DLM_MASTER_RESP_MAYBE;
1565                 }
1566                 if (set_maybe)
1567                         set_bit(request->node_idx, tmpmle->maybe_map);
1568                 spin_unlock(&tmpmle->spinlock);
1569
1570                 spin_unlock(&dlm->master_lock);
1571                 spin_unlock(&res->spinlock);
1572
1573                 /* keep the mle attached to heartbeat events */
1574                 dlm_put_mle(tmpmle);
1575                 if (mle)
1576                         kmem_cache_free(dlm_mle_cache, mle);
1577                 goto send_response;
1578         }
1579
1580         /*
1581          * lockres doesn't exist on this node
1582          * if there is an MLE_BLOCK, return NO
1583          * if there is an MLE_MASTER, return MAYBE
1584          * otherwise, add an MLE_BLOCK, return NO
1585          */
1586         spin_lock(&dlm->master_lock);
1587         found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1588         if (!found) {
1589                 /* this lockid has never been seen on this node yet */
1590                 // mlog(0, "no mle found\n");
1591                 if (!mle) {
1592                         spin_unlock(&dlm->master_lock);
1593                         spin_unlock(&dlm->spinlock);
1594
1595                         mle = (struct dlm_master_list_entry *)
1596                                 kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1597                         if (!mle) {
1598                                 response = DLM_MASTER_RESP_ERROR;
1599                                 mlog_errno(-ENOMEM);
1600                                 goto send_response;
1601                         }
1602                         goto way_up_top;
1603                 }
1604
1605                 // mlog(0, "this is second time thru, already allocated, "
1606                 // "add the block.\n");
1607                 dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1608                 set_bit(request->node_idx, mle->maybe_map);
1609                 __dlm_insert_mle(dlm, mle);
1610                 response = DLM_MASTER_RESP_NO;
1611         } else {
1612                 // mlog(0, "mle was found\n");
1613                 set_maybe = 1;
1614                 spin_lock(&tmpmle->spinlock);
1615                 if (tmpmle->master == dlm->node_num) {
1616                         mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1617                         BUG();
1618                 }
1619                 if (tmpmle->type == DLM_MLE_BLOCK)
1620                         response = DLM_MASTER_RESP_NO;
1621                 else if (tmpmle->type == DLM_MLE_MIGRATION) {
1622                         mlog(0, "migration mle was found (%u->%u)\n",
1623                              tmpmle->master, tmpmle->new_master);
1624                         /* real master can respond on its own */
1625                         response = DLM_MASTER_RESP_NO;
1626                 } else
1627                         response = DLM_MASTER_RESP_MAYBE;
1628                 if (set_maybe)
1629                         set_bit(request->node_idx, tmpmle->maybe_map);
1630                 spin_unlock(&tmpmle->spinlock);
1631         }
1632         spin_unlock(&dlm->master_lock);
1633         spin_unlock(&dlm->spinlock);
1634
1635         if (found) {
1636                 /* keep the mle attached to heartbeat events */
1637                 dlm_put_mle(tmpmle);
1638         }
1639 send_response:
1640         /*
1641          * __dlm_lookup_lockres() grabbed a reference to this lockres.
1642          * The reference is released by dlm_assert_master_worker() under
1643          * the call to dlm_dispatch_assert_master().  If
1644          * dlm_assert_master_worker() isn't called, we drop it here.
1645          */
1646         if (dispatch_assert) {
1647                 if (response != DLM_MASTER_RESP_YES)
1648                         mlog(ML_ERROR, "invalid response %d\n", response);
1649                 if (!res) {
1650                         mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1651                         BUG();
1652                 }
1653                 mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1654                              dlm->node_num, res->lockname.len, res->lockname.name);
1655                 ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 
1656                                                  DLM_ASSERT_MASTER_MLE_CLEANUP);
1657                 if (ret < 0) {
1658                         mlog(ML_ERROR, "failed to dispatch assert master work\n");
1659                         response = DLM_MASTER_RESP_ERROR;
1660                         dlm_lockres_put(res);
1661                 }
1662         } else {
1663                 if (res)
1664                         dlm_lockres_put(res);
1665         }
1666
1667         dlm_put(dlm);
1668         return response;
1669 }
1670
1671 /*
1672  * DLM_ASSERT_MASTER_MSG
1673  */
1674
1675
1676 /*
1677  * NOTE: this can be used for debugging
1678  * can periodically run all locks owned by this node
1679  * and re-assert across the cluster...
1680  */
1681 static int dlm_do_assert_master(struct dlm_ctxt *dlm,
1682                                 struct dlm_lock_resource *res,
1683                                 void *nodemap, u32 flags)
1684 {
1685         struct dlm_assert_master assert;
1686         int to, tmpret;
1687         struct dlm_node_iter iter;
1688         int ret = 0;
1689         int reassert;
1690         const char *lockname = res->lockname.name;
1691         unsigned int namelen = res->lockname.len;
1692
1693         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1694
1695         spin_lock(&res->spinlock);
1696         res->state |= DLM_LOCK_RES_SETREF_INPROG;
1697         spin_unlock(&res->spinlock);
1698
1699 again:
1700         reassert = 0;
1701
1702         /* note that if this nodemap is empty, it returns 0 */
1703         dlm_node_iter_init(nodemap, &iter);
1704         while ((to = dlm_node_iter_next(&iter)) >= 0) {
1705                 int r = 0;
1706                 struct dlm_master_list_entry *mle = NULL;
1707
1708                 mlog(0, "sending assert master to %d (%.*s)\n", to,
1709                      namelen, lockname);
1710                 memset(&assert, 0, sizeof(assert));
1711                 assert.node_idx = dlm->node_num;
1712                 assert.namelen = namelen;
1713                 memcpy(assert.name, lockname, namelen);
1714                 assert.flags = cpu_to_be32(flags);
1715
1716                 tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1717                                             &assert, sizeof(assert), to, &r);
1718                 if (tmpret < 0) {
1719                         mlog(0, "assert_master returned %d!\n", tmpret);
1720                         if (!dlm_is_host_down(tmpret)) {
1721                                 mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1722                                 BUG();
1723                         }
1724                         /* a node died.  finish out the rest of the nodes. */
1725                         mlog(0, "link to %d went down!\n", to);
1726                         /* any nonzero status return will do */
1727                         ret = tmpret;
1728                         r = 0;
1729                 } else if (r < 0) {
1730                         /* ok, something horribly messed.  kill thyself. */
1731                         mlog(ML_ERROR,"during assert master of %.*s to %u, "
1732                              "got %d.\n", namelen, lockname, to, r);
1733                         spin_lock(&dlm->spinlock);
1734                         spin_lock(&dlm->master_lock);
1735                         if (dlm_find_mle(dlm, &mle, (char *)lockname,
1736                                          namelen)) {
1737                                 dlm_print_one_mle(mle);
1738                                 __dlm_put_mle(mle);
1739                         }
1740                         spin_unlock(&dlm->master_lock);
1741                         spin_unlock(&dlm->spinlock);
1742                         BUG();
1743                 }
1744
1745                 if (r & DLM_ASSERT_RESPONSE_REASSERT &&
1746                     !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
1747                                 mlog(ML_ERROR, "%.*s: very strange, "
1748                                      "master MLE but no lockres on %u\n",
1749                                      namelen, lockname, to);
1750                 }
1751
1752                 if (r & DLM_ASSERT_RESPONSE_REASSERT) {
1753                         mlog(0, "%.*s: node %u create mles on other "
1754                              "nodes and requests a re-assert\n", 
1755                              namelen, lockname, to);
1756                         reassert = 1;
1757                 }
1758                 if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
1759                         mlog(0, "%.*s: node %u has a reference to this "
1760                              "lockres, set the bit in the refmap\n",
1761                              namelen, lockname, to);
1762                         spin_lock(&res->spinlock);
1763                         dlm_lockres_set_refmap_bit(to, res);
1764                         spin_unlock(&res->spinlock);
1765                 }
1766         }
1767
1768         if (reassert)
1769                 goto again;
1770
1771         spin_lock(&res->spinlock);
1772         res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
1773         spin_unlock(&res->spinlock);
1774         wake_up(&res->wq);
1775
1776         return ret;
1777 }
1778
1779 /*
1780  * locks that can be taken here:
1781  * dlm->spinlock
1782  * res->spinlock
1783  * mle->spinlock
1784  * dlm->master_list
1785  *
1786  * if possible, TRIM THIS DOWN!!!
1787  */
1788 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,
1789                               void **ret_data)
1790 {
1791         struct dlm_ctxt *dlm = data;
1792         struct dlm_master_list_entry *mle = NULL;
1793         struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1794         struct dlm_lock_resource *res = NULL;
1795         char *name;
1796         unsigned int namelen, hash;
1797         u32 flags;
1798         int master_request = 0, have_lockres_ref = 0;
1799         int ret = 0;
1800
1801         if (!dlm_grab(dlm))
1802                 return 0;
1803
1804         name = assert->name;
1805         namelen = assert->namelen;
1806         hash = dlm_lockid_hash(name, namelen);
1807         flags = be32_to_cpu(assert->flags);
1808
1809         if (namelen > DLM_LOCKID_NAME_MAX) {
1810                 mlog(ML_ERROR, "Invalid name length!");
1811                 goto done;
1812         }
1813
1814         spin_lock(&dlm->spinlock);
1815
1816         if (flags)
1817                 mlog(0, "assert_master with flags: %u\n", flags);
1818
1819         /* find the MLE */
1820         spin_lock(&dlm->master_lock);
1821         if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1822                 /* not an error, could be master just re-asserting */
1823                 mlog(0, "just got an assert_master from %u, but no "
1824                      "MLE for it! (%.*s)\n", assert->node_idx,
1825                      namelen, name);
1826         } else {
1827                 int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1828                 if (bit >= O2NM_MAX_NODES) {
1829                         /* not necessarily an error, though less likely.
1830                          * could be master just re-asserting. */
1831                         mlog(0, "no bits set in the maybe_map, but %u "
1832                              "is asserting! (%.*s)\n", assert->node_idx,
1833                              namelen, name);
1834                 } else if (bit != assert->node_idx) {
1835                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1836                                 mlog(0, "master %u was found, %u should "
1837                                      "back off\n", assert->node_idx, bit);
1838                         } else {
1839                                 /* with the fix for bug 569, a higher node
1840                                  * number winning the mastery will respond
1841                                  * YES to mastery requests, but this node
1842                                  * had no way of knowing.  let it pass. */
1843                                 mlog(0, "%u is the lowest node, "
1844                                      "%u is asserting. (%.*s)  %u must "
1845                                      "have begun after %u won.\n", bit,
1846                                      assert->node_idx, namelen, name, bit,
1847                                      assert->node_idx);
1848                         }
1849                 }
1850                 if (mle->type == DLM_MLE_MIGRATION) {
1851                         if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1852                                 mlog(0, "%s:%.*s: got cleanup assert"
1853                                      " from %u for migration\n",
1854                                      dlm->name, namelen, name,
1855                                      assert->node_idx);
1856                         } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1857                                 mlog(0, "%s:%.*s: got unrelated assert"
1858                                      " from %u for migration, ignoring\n",
1859                                      dlm->name, namelen, name,
1860                                      assert->node_idx);
1861                                 __dlm_put_mle(mle);
1862                                 spin_unlock(&dlm->master_lock);
1863                                 spin_unlock(&dlm->spinlock);
1864                                 goto done;
1865                         }       
1866                 }
1867         }
1868         spin_unlock(&dlm->master_lock);
1869
1870         /* ok everything checks out with the MLE
1871          * now check to see if there is a lockres */
1872         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1873         if (res) {
1874                 spin_lock(&res->spinlock);
1875                 if (res->state & DLM_LOCK_RES_RECOVERING)  {
1876                         mlog(ML_ERROR, "%u asserting but %.*s is "
1877                              "RECOVERING!\n", assert->node_idx, namelen, name);
1878                         goto kill;
1879                 }
1880                 if (!mle) {
1881                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1882                             res->owner != assert->node_idx) {
1883                                 mlog(ML_ERROR, "DIE! Mastery assert from %u, "
1884                                      "but current owner is %u! (%.*s)\n",
1885                                      assert->node_idx, res->owner, namelen,
1886                                      name);
1887                                 __dlm_print_one_lock_resource(res);
1888                                 BUG();
1889                         }
1890                 } else if (mle->type != DLM_MLE_MIGRATION) {
1891                         if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1892                                 /* owner is just re-asserting */
1893                                 if (res->owner == assert->node_idx) {
1894                                         mlog(0, "owner %u re-asserting on "
1895                                              "lock %.*s\n", assert->node_idx,
1896                                              namelen, name);
1897                                         goto ok;
1898                                 }
1899                                 mlog(ML_ERROR, "got assert_master from "
1900                                      "node %u, but %u is the owner! "
1901                                      "(%.*s)\n", assert->node_idx,
1902                                      res->owner, namelen, name);
1903                                 goto kill;
1904                         }
1905                         if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1906                                 mlog(ML_ERROR, "got assert from %u, but lock "
1907                                      "with no owner should be "
1908                                      "in-progress! (%.*s)\n",
1909                                      assert->node_idx,
1910                                      namelen, name);
1911                                 goto kill;
1912                         }
1913                 } else /* mle->type == DLM_MLE_MIGRATION */ {
1914                         /* should only be getting an assert from new master */
1915                         if (assert->node_idx != mle->new_master) {
1916                                 mlog(ML_ERROR, "got assert from %u, but "
1917                                      "new master is %u, and old master "
1918                                      "was %u (%.*s)\n",
1919                                      assert->node_idx, mle->new_master,
1920                                      mle->master, namelen, name);
1921                                 goto kill;
1922                         }
1923
1924                 }
1925 ok:
1926                 spin_unlock(&res->spinlock);
1927         }
1928         spin_unlock(&dlm->spinlock);
1929
1930         // mlog(0, "woo!  got an assert_master from node %u!\n",
1931         //           assert->node_idx);
1932         if (mle) {
1933                 int extra_ref = 0;
1934                 int nn = -1;
1935                 int rr, err = 0;
1936                 
1937                 spin_lock(&mle->spinlock);
1938                 if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1939                         extra_ref = 1;
1940                 else {
1941                         /* MASTER mle: if any bits set in the response map
1942                          * then the calling node needs to re-assert to clear
1943                          * up nodes that this node contacted */
1944                         while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 
1945                                                     nn+1)) < O2NM_MAX_NODES) {
1946                                 if (nn != dlm->node_num && nn != assert->node_idx)
1947                                         master_request = 1;
1948                         }
1949                 }
1950                 mle->master = assert->node_idx;
1951                 atomic_set(&mle->woken, 1);
1952                 wake_up(&mle->wq);
1953                 spin_unlock(&mle->spinlock);
1954
1955                 if (res) {
1956                         int wake = 0;
1957                         spin_lock(&res->spinlock);
1958                         if (mle->type == DLM_MLE_MIGRATION) {
1959                                 mlog(0, "finishing off migration of lockres %.*s, "
1960                                         "from %u to %u\n",
1961                                         res->lockname.len, res->lockname.name,
1962                                         dlm->node_num, mle->new_master);
1963                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
1964                                 wake = 1;
1965                                 dlm_change_lockres_owner(dlm, res, mle->new_master);
1966                                 BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1967                         } else {
1968                                 dlm_change_lockres_owner(dlm, res, mle->master);
1969                         }
1970                         spin_unlock(&res->spinlock);
1971                         have_lockres_ref = 1;
1972                         if (wake)
1973                                 wake_up(&res->wq);
1974                 }
1975
1976                 /* master is known, detach if not already detached.
1977                  * ensures that only one assert_master call will happen
1978                  * on this mle. */
1979                 spin_lock(&dlm->spinlock);
1980                 spin_lock(&dlm->master_lock);
1981
1982                 rr = atomic_read(&mle->mle_refs.refcount);
1983                 if (mle->inuse > 0) {
1984                         if (extra_ref && rr < 3)
1985                                 err = 1;
1986                         else if (!extra_ref && rr < 2)
1987                                 err = 1;
1988                 } else {
1989                         if (extra_ref && rr < 2)
1990                                 err = 1;
1991                         else if (!extra_ref && rr < 1)
1992                                 err = 1;
1993                 }
1994                 if (err) {
1995                         mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1996                              "that will mess up this node, refs=%d, extra=%d, "
1997                              "inuse=%d\n", dlm->name, namelen, name,
1998                              assert->node_idx, rr, extra_ref, mle->inuse);
1999                         dlm_print_one_mle(mle);
2000                 }
2001                 __dlm_unlink_mle(dlm, mle);
2002                 __dlm_mle_detach_hb_events(dlm, mle);
2003                 __dlm_put_mle(mle);
2004                 if (extra_ref) {
2005                         /* the assert master message now balances the extra
2006                          * ref given by the master / migration request message.
2007                          * if this is the last put, it will be removed
2008                          * from the list. */
2009                         __dlm_put_mle(mle);
2010                 }
2011                 spin_unlock(&dlm->master_lock);
2012                 spin_unlock(&dlm->spinlock);
2013         } else if (res) {
2014                 if (res->owner != assert->node_idx) {
2015                         mlog(0, "assert_master from %u, but current "
2016                              "owner is %u (%.*s), no mle\n", assert->node_idx,
2017                              res->owner, namelen, name);
2018                 }
2019         }
2020
2021 done:
2022         ret = 0;
2023         if (res) {
2024                 spin_lock(&res->spinlock);
2025                 res->state |= DLM_LOCK_RES_SETREF_INPROG;
2026                 spin_unlock(&res->spinlock);
2027                 *ret_data = (void *)res;
2028         }
2029         dlm_put(dlm);
2030         if (master_request) {
2031                 mlog(0, "need to tell master to reassert\n");
2032                 /* positive. negative would shoot down the node. */
2033                 ret |= DLM_ASSERT_RESPONSE_REASSERT;
2034                 if (!have_lockres_ref) {
2035                         mlog(ML_ERROR, "strange, got assert from %u, MASTER "
2036                              "mle present here for %s:%.*s, but no lockres!\n",
2037                              assert->node_idx, dlm->name, namelen, name);
2038                 }
2039         }
2040         if (have_lockres_ref) {
2041                 /* let the master know we have a reference to the lockres */
2042                 ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
2043                 mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
2044                      dlm->name, namelen, name, assert->node_idx);
2045         }
2046         return ret;
2047
2048 kill:
2049         /* kill the caller! */
2050         mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
2051              "and killing the other node now!  This node is OK and can continue.\n");
2052         __dlm_print_one_lock_resource(res);
2053         spin_unlock(&res->spinlock);
2054         spin_unlock(&dlm->spinlock);
2055         *ret_data = (void *)res; 
2056         dlm_put(dlm);
2057         return -EINVAL;
2058 }
2059
2060 void dlm_assert_master_post_handler(int status, void *data, void *ret_data)
2061 {
2062         struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;
2063
2064         if (ret_data) {
2065                 spin_lock(&res->spinlock);
2066                 res->state &= ~DLM_LOCK_RES_SETREF_INPROG;
2067                 spin_unlock(&res->spinlock);
2068                 wake_up(&res->wq);
2069                 dlm_lockres_put(res);
2070         }
2071         return;
2072 }
2073
2074 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
2075                                struct dlm_lock_resource *res,
2076                                int ignore_higher, u8 request_from, u32 flags)
2077 {
2078         struct dlm_work_item *item;
2079         item = kzalloc(sizeof(*item), GFP_NOFS);
2080         if (!item)
2081                 return -ENOMEM;
2082
2083
2084         /* queue up work for dlm_assert_master_worker */
2085         dlm_grab(dlm);  /* get an extra ref for the work item */
2086         dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
2087         item->u.am.lockres = res; /* already have a ref */
2088         /* can optionally ignore node numbers higher than this node */
2089         item->u.am.ignore_higher = ignore_higher;
2090         item->u.am.request_from = request_from;
2091         item->u.am.flags = flags;
2092
2093         if (ignore_higher) 
2094                 mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 
2095                      res->lockname.name);
2096                 
2097         spin_lock(&dlm->work_lock);
2098         list_add_tail(&item->list, &dlm->work_list);
2099         spin_unlock(&dlm->work_lock);
2100
2101         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2102         return 0;
2103 }
2104
2105 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
2106 {
2107         struct dlm_ctxt *dlm = data;
2108         int ret = 0;
2109         struct dlm_lock_resource *res;
2110         unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
2111         int ignore_higher;
2112         int bit;
2113         u8 request_from;
2114         u32 flags;
2115
2116         dlm = item->dlm;
2117         res = item->u.am.lockres;
2118         ignore_higher = item->u.am.ignore_higher;
2119         request_from = item->u.am.request_from;
2120         flags = item->u.am.flags;
2121
2122         spin_lock(&dlm->spinlock);
2123         memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
2124         spin_unlock(&dlm->spinlock);
2125
2126         clear_bit(dlm->node_num, nodemap);
2127         if (ignore_higher) {
2128                 /* if is this just to clear up mles for nodes below
2129                  * this node, do not send the message to the original
2130                  * caller or any node number higher than this */
2131                 clear_bit(request_from, nodemap);
2132                 bit = dlm->node_num;
2133                 while (1) {
2134                         bit = find_next_bit(nodemap, O2NM_MAX_NODES,
2135                                             bit+1);
2136                         if (bit >= O2NM_MAX_NODES)
2137                                 break;
2138                         clear_bit(bit, nodemap);
2139                 }
2140         }
2141
2142         /*
2143          * If we're migrating this lock to someone else, we are no
2144          * longer allowed to assert out own mastery.  OTOH, we need to
2145          * prevent migration from starting while we're still asserting
2146          * our dominance.  The reserved ast delays migration.
2147          */
2148         spin_lock(&res->spinlock);
2149         if (res->state & DLM_LOCK_RES_MIGRATING) {
2150                 mlog(0, "Someone asked us to assert mastery, but we're "
2151                      "in the middle of migration.  Skipping assert, "
2152                      "the new master will handle that.\n");
2153                 spin_unlock(&res->spinlock);
2154                 goto put;
2155         } else
2156                 __dlm_lockres_reserve_ast(res);
2157         spin_unlock(&res->spinlock);
2158
2159         /* this call now finishes out the nodemap
2160          * even if one or more nodes die */
2161         mlog(0, "worker about to master %.*s here, this=%u\n",
2162                      res->lockname.len, res->lockname.name, dlm->node_num);
2163         ret = dlm_do_assert_master(dlm, res, nodemap, flags);
2164         if (ret < 0) {
2165                 /* no need to restart, we are done */
2166                 if (!dlm_is_host_down(ret))
2167                         mlog_errno(ret);
2168         }
2169
2170         /* Ok, we've asserted ourselves.  Let's let migration start. */
2171         dlm_lockres_release_ast(dlm, res);
2172
2173 put:
2174         dlm_lockres_put(res);
2175
2176         mlog(0, "finished with dlm_assert_master_worker\n");
2177 }
2178
2179 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2180  * We cannot wait for node recovery to complete to begin mastering this
2181  * lockres because this lockres is used to kick off recovery! ;-)
2182  * So, do a pre-check on all living nodes to see if any of those nodes
2183  * think that $RECOVERY is currently mastered by a dead node.  If so,
2184  * we wait a short time to allow that node to get notified by its own
2185  * heartbeat stack, then check again.  All $RECOVERY lock resources
2186  * mastered by dead nodes are purged when the hearbeat callback is 
2187  * fired, so we can know for sure that it is safe to continue once
2188  * the node returns a live node or no node.  */
2189 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2190                                        struct dlm_lock_resource *res)
2191 {
2192         struct dlm_node_iter iter;
2193         int nodenum;
2194         int ret = 0;
2195         u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2196
2197         spin_lock(&dlm->spinlock);
2198         dlm_node_iter_init(dlm->domain_map, &iter);
2199         spin_unlock(&dlm->spinlock);
2200
2201         while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2202                 /* do not send to self */
2203                 if (nodenum == dlm->node_num)
2204                         continue;
2205                 ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2206                 if (ret < 0) {
2207                         mlog_errno(ret);
2208                         if (!dlm_is_host_down(ret))
2209                                 BUG();
2210                         /* host is down, so answer for that node would be
2211                          * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2212                         ret = 0;
2213                 }
2214
2215                 if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2216                         /* check to see if this master is in the recovery map */
2217                         spin_lock(&dlm->spinlock);
2218                         if (test_bit(master, dlm->recovery_map)) {
2219                                 mlog(ML_NOTICE, "%s: node %u has not seen "
2220                                      "node %u go down yet, and thinks the "
2221                                      "dead node is mastering the recovery "
2222                                      "lock.  must wait.\n", dlm->name,
2223                                      nodenum, master);
2224                                 ret = -EAGAIN;
2225                         }
2226                         spin_unlock(&dlm->spinlock);
2227                         mlog(0, "%s: reco lock master is %u\n", dlm->name, 
2228                              master);
2229                         break;
2230                 }
2231         }
2232         return ret;
2233 }
2234
2235 /*
2236  * DLM_DEREF_LOCKRES_MSG
2237  */
2238
2239 int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2240 {
2241         struct dlm_deref_lockres deref;
2242         int ret = 0, r;
2243         const char *lockname;
2244         unsigned int namelen;
2245
2246         lockname = res->lockname.name;
2247         namelen = res->lockname.len;
2248         BUG_ON(namelen > O2NM_MAX_NAME_LEN);
2249
2250         mlog(0, "%s:%.*s: sending deref to %d\n",
2251              dlm->name, namelen, lockname, res->owner);
2252         memset(&deref, 0, sizeof(deref));
2253         deref.node_idx = dlm->node_num;
2254         deref.namelen = namelen;
2255         memcpy(deref.name, lockname, namelen);
2256
2257         ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
2258                                  &deref, sizeof(deref), res->owner, &r);
2259         if (ret < 0)
2260                 mlog_errno(ret);
2261         else if (r < 0) {
2262                 /* BAD.  other node says I did not have a ref. */
2263                 mlog(ML_ERROR,"while dropping ref on %s:%.*s "
2264                     "(master=%u) got %d.\n", dlm->name, namelen,
2265                     lockname, res->owner, r);
2266                 dlm_print_one_lock_resource(res);
2267                 BUG();
2268         }
2269         return ret;
2270 }
2271
2272 int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,
2273                               void **ret_data)
2274 {
2275         struct dlm_ctxt *dlm = data;
2276         struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
2277         struct dlm_lock_resource *res = NULL;
2278         char *name;
2279         unsigned int namelen;
2280         int ret = -EINVAL;
2281         u8 node;
2282         unsigned int hash;
2283         struct dlm_work_item *item;
2284         int cleared = 0;
2285         int dispatch = 0;
2286
2287         if (!dlm_grab(dlm))
2288                 return 0;
2289
2290         name = deref->name;
2291         namelen = deref->namelen;
2292         node = deref->node_idx;
2293
2294         if (namelen > DLM_LOCKID_NAME_MAX) {
2295                 mlog(ML_ERROR, "Invalid name length!");
2296                 goto done;
2297         }
2298         if (deref->node_idx >= O2NM_MAX_NODES) {
2299                 mlog(ML_ERROR, "Invalid node number: %u\n", node);
2300                 goto done;
2301         }
2302
2303         hash = dlm_lockid_hash(name, namelen);
2304
2305         spin_lock(&dlm->spinlock);
2306         res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
2307         if (!res) {
2308                 spin_unlock(&dlm->spinlock);
2309                 mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
2310                      dlm->name, namelen, name);
2311                 goto done;
2312         }
2313         spin_unlock(&dlm->spinlock);
2314
2315         spin_lock(&res->spinlock);
2316         if (res->state & DLM_LOCK_RES_SETREF_INPROG)
2317                 dispatch = 1;
2318         else {
2319                 BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2320                 if (test_bit(node, res->refmap)) {
2321                         dlm_lockres_clear_refmap_bit(node, res);
2322                         cleared = 1;
2323                 }
2324         }
2325         spin_unlock(&res->spinlock);
2326
2327         if (!dispatch) {
2328                 if (cleared)
2329                         dlm_lockres_calc_usage(dlm, res);
2330                 else {
2331                         mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2332                         "but it is already dropped!\n", dlm->name,
2333                         res->lockname.len, res->lockname.name, node);
2334                         dlm_print_one_lock_resource(res);
2335                 }
2336                 ret = 0;
2337                 goto done;
2338         }
2339
2340         item = kzalloc(sizeof(*item), GFP_NOFS);
2341         if (!item) {
2342                 ret = -ENOMEM;
2343                 mlog_errno(ret);
2344                 goto done;
2345         }
2346
2347         dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);
2348         item->u.dl.deref_res = res;
2349         item->u.dl.deref_node = node;
2350
2351         spin_lock(&dlm->work_lock);
2352         list_add_tail(&item->list, &dlm->work_list);
2353         spin_unlock(&dlm->work_lock);
2354
2355         queue_work(dlm->dlm_worker, &dlm->dispatched_work);
2356         return 0;
2357
2358 done:
2359         if (res)
2360                 dlm_lockres_put(res);
2361         dlm_put(dlm);
2362
2363         return ret;
2364 }
2365
2366 static void dlm_deref_lockres_worker(struct dlm_work_item *item, void *data)
2367 {
2368         struct dlm_ctxt *dlm;
2369         struct dlm_lock_resource *res;
2370         u8 node;
2371         u8 cleared = 0;
2372
2373         dlm = item->dlm;
2374         res = item->u.dl.deref_res;
2375         node = item->u.dl.deref_node;
2376
2377         spin_lock(&res->spinlock);
2378         BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
2379         if (test_bit(node, res->refmap)) {
2380                 __dlm_wait_on_lockres_flags(res, DLM_LOCK_RES_SETREF_INPROG);
2381                 dlm_lockres_clear_refmap_bit(node, res);
2382                 cleared = 1;
2383         }
2384         spin_unlock(&res->spinlock);
2385
2386         if (cleared) {
2387                 mlog(0, "%s:%.*s node %u ref dropped in dispatch\n",
2388                      dlm->name, res->lockname.len, res->lockname.name, node);
2389                 dlm_lockres_calc_usage(dlm, res);
2390         } else {
2391                 mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
2392                      "but it is already dropped!\n", dlm->name,
2393                      res->lockname.len, res->lockname.name, node);
2394                 dlm_print_one_lock_resource(res);
2395         }
2396
2397         dlm_lockres_put(res);
2398 }
2399
2400 /* Checks whether the lockres can be migrated. Returns 0 if yes, < 0
2401  * if not. If 0, numlocks is set to the number of locks in the lockres.
2402  */
2403 static int dlm_is_lockres_migrateable(struct dlm_ctxt *dlm,
2404                                       struct dlm_lock_resource *res,
2405                                       int *numlocks)
2406 {
2407         int ret;
2408         int i;
2409         int count = 0;
2410         struct list_head *queue;
2411         struct dlm_lock *lock;
2412
2413         assert_spin_locked(&res->spinlock);
2414
2415         ret = -EINVAL;
2416         if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2417                 mlog(0, "cannot migrate lockres with unknown owner!\n");
2418                 goto leave;
2419         }
2420
2421         if (res->owner != dlm->node_num) {
2422                 mlog(0, "cannot migrate lockres this node doesn't own!\n");
2423                 goto leave;
2424         }
2425
2426         ret = 0;
2427         queue = &res->granted;
2428         for (i = 0; i < 3; i++) {
2429                 list_for_each_entry(lock, queue, list) {
2430                         ++count;
2431                         if (lock->ml.node == dlm->node_num) {
2432                                 mlog(0, "found a lock owned by this node still "
2433                                      "on the %s queue!  will not migrate this "
2434                                      "lockres\n", (i == 0 ? "granted" :
2435                                                    (i == 1 ? "converting" :
2436                                                     "blocked")));
2437                                 ret = -ENOTEMPTY;
2438                                 goto leave;
2439                         }
2440                 }
2441                 queue++;
2442         }
2443
2444         *numlocks = count;
2445         mlog(0, "migrateable lockres having %d locks\n", *numlocks);
2446
2447 leave:
2448         return ret;
2449 }
2450
2451 /*
2452  * DLM_MIGRATE_LOCKRES
2453  */
2454
2455
2456 static int dlm_migrate_lockres(struct dlm_ctxt *dlm,
2457                                struct dlm_lock_resource *res,
2458                                u8 target)
2459 {
2460         struct dlm_master_list_entry *mle = NULL;
2461         struct dlm_master_list_entry *oldmle = NULL;
2462         struct dlm_migratable_lockres *mres = NULL;
2463         int ret = 0;
2464         const char *name;
2465         unsigned int namelen;
2466         int mle_added = 0;
2467         int numlocks;
2468         int wake = 0;
2469
2470         if (!dlm_grab(dlm))
2471                 return -EINVAL;
2472
2473         name = res->lockname.name;
2474         namelen = res->lockname.len;
2475
2476         mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2477
2478         /*
2479          * ensure this lockres is a proper candidate for migration
2480          */
2481         spin_lock(&res->spinlock);
2482         ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2483         if (ret < 0) {
2484                 spin_unlock(&res->spinlock);
2485                 goto leave;
2486         }
2487         spin_unlock(&res->spinlock);
2488
2489         /* no work to do */
2490         if (numlocks == 0) {
2491                 mlog(0, "no locks were found on this lockres! done!\n");
2492                 goto leave;
2493         }
2494
2495         /*
2496          * preallocate up front
2497          * if this fails, abort
2498          */
2499
2500         ret = -ENOMEM;
2501         mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2502         if (!mres) {
2503                 mlog_errno(ret);
2504                 goto leave;
2505         }
2506
2507         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2508                                                                 GFP_NOFS);
2509         if (!mle) {
2510                 mlog_errno(ret);
2511                 goto leave;
2512         }
2513         ret = 0;
2514
2515         /*
2516          * find a node to migrate the lockres to
2517          */
2518
2519         mlog(0, "picking a migration node\n");
2520         spin_lock(&dlm->spinlock);
2521         /* pick a new node */
2522         if (!test_bit(target, dlm->domain_map) ||
2523             target >= O2NM_MAX_NODES) {
2524                 target = dlm_pick_migration_target(dlm, res);
2525         }
2526         mlog(0, "node %u chosen for migration\n", target);
2527
2528         if (target >= O2NM_MAX_NODES ||
2529             !test_bit(target, dlm->domain_map)) {
2530                 /* target chosen is not alive */
2531                 ret = -EINVAL;
2532         }
2533
2534         if (ret) {
2535                 spin_unlock(&dlm->spinlock);
2536                 goto fail;
2537         }
2538
2539         mlog(0, "continuing with target = %u\n", target);
2540
2541         /*
2542          * clear any existing master requests and
2543          * add the migration mle to the list
2544          */
2545         spin_lock(&dlm->master_lock);
2546         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2547                                     namelen, target, dlm->node_num);
2548         spin_unlock(&dlm->master_lock);
2549         spin_unlock(&dlm->spinlock);
2550
2551         if (ret == -EEXIST) {
2552                 mlog(0, "another process is already migrating it\n");
2553                 goto fail;
2554         }
2555         mle_added = 1;
2556
2557         /*
2558          * set the MIGRATING flag and flush asts
2559          * if we fail after this we need to re-dirty the lockres
2560          */
2561         if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2562                 mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2563                      "the target went down.\n", res->lockname.len,
2564                      res->lockname.name, target);
2565                 spin_lock(&res->spinlock);
2566                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2567                 wake = 1;
2568                 spin_unlock(&res->spinlock);
2569                 ret = -EINVAL;
2570         }
2571
2572 fail:
2573         if (oldmle) {
2574                 /* master is known, detach if not already detached */
2575                 dlm_mle_detach_hb_events(dlm, oldmle);
2576                 dlm_put_mle(oldmle);
2577         }
2578
2579         if (ret < 0) {
2580                 if (mle_added) {
2581                         dlm_mle_detach_hb_events(dlm, mle);
2582                         dlm_put_mle(mle);
2583                 } else if (mle) {
2584                         kmem_cache_free(dlm_mle_cache, mle);
2585                 }
2586                 goto leave;
2587         }
2588
2589         /*
2590          * at this point, we have a migration target, an mle
2591          * in the master list, and the MIGRATING flag set on
2592          * the lockres
2593          */
2594
2595         /* now that remote nodes are spinning on the MIGRATING flag,
2596          * ensure that all assert_master work is flushed. */
2597         flush_workqueue(dlm->dlm_worker);
2598
2599         /* get an extra reference on the mle.
2600          * otherwise the assert_master from the new
2601          * master will destroy this.
2602          * also, make sure that all callers of dlm_get_mle
2603          * take both dlm->spinlock and dlm->master_lock */
2604         spin_lock(&dlm->spinlock);
2605         spin_lock(&dlm->master_lock);
2606         dlm_get_mle_inuse(mle);
2607         spin_unlock(&dlm->master_lock);
2608         spin_unlock(&dlm->spinlock);
2609
2610         /* notify new node and send all lock state */
2611         /* call send_one_lockres with migration flag.
2612          * this serves as notice to the target node that a
2613          * migration is starting. */
2614         ret = dlm_send_one_lockres(dlm, res, mres, target,
2615                                    DLM_MRES_MIGRATION);
2616
2617         if (ret < 0) {
2618                 mlog(0, "migration to node %u failed with %d\n",
2619                      target, ret);
2620                 /* migration failed, detach and clean up mle */
2621                 dlm_mle_detach_hb_events(dlm, mle);
2622                 dlm_put_mle(mle);
2623                 dlm_put_mle_inuse(mle);
2624                 spin_lock(&res->spinlock);
2625                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2626                 wake = 1;
2627                 spin_unlock(&res->spinlock);
2628                 goto leave;
2629         }
2630
2631         /* at this point, the target sends a message to all nodes,
2632          * (using dlm_do_migrate_request).  this node is skipped since
2633          * we had to put an mle in the list to begin the process.  this
2634          * node now waits for target to do an assert master.  this node
2635          * will be the last one notified, ensuring that the migration
2636          * is complete everywhere.  if the target dies while this is
2637          * going on, some nodes could potentially see the target as the
2638          * master, so it is important that my recovery finds the migration
2639          * mle and sets the master to UNKNONWN. */
2640
2641
2642         /* wait for new node to assert master */
2643         while (1) {
2644                 ret = wait_event_interruptible_timeout(mle->wq,
2645                                         (atomic_read(&mle->woken) == 1),
2646                                         msecs_to_jiffies(5000));
2647
2648                 if (ret >= 0) {
2649                         if (atomic_read(&mle->woken) == 1 ||
2650                             res->owner == target)
2651                                 break;
2652
2653                         mlog(0, "%s:%.*s: timed out during migration\n",
2654                              dlm->name, res->lockname.len, res->lockname.name);
2655                         /* avoid hang during shutdown when migrating lockres 
2656                          * to a node which also goes down */
2657                         if (dlm_is_node_dead(dlm, target)) {
2658                                 mlog(0, "%s:%.*s: expected migration "
2659                                      "target %u is no longer up, restarting\n",
2660                                      dlm->name, res->lockname.len,
2661                                      res->lockname.name, target);
2662                                 ret = -EINVAL;
2663                                 /* migration failed, detach and clean up mle */
2664                                 dlm_mle_detach_hb_events(dlm, mle);
2665                                 dlm_put_mle(mle);
2666                                 dlm_put_mle_inuse(mle);
2667                                 spin_lock(&res->spinlock);
2668                                 res->state &= ~DLM_LOCK_RES_MIGRATING;
2669                                 wake = 1;
2670                                 spin_unlock(&res->spinlock);
2671                                 goto leave;
2672                         }
2673                 } else
2674                         mlog(0, "%s:%.*s: caught signal during migration\n",
2675                              dlm->name, res->lockname.len, res->lockname.name);
2676         }
2677
2678         /* all done, set the owner, clear the flag */
2679         spin_lock(&res->spinlock);
2680         dlm_set_lockres_owner(dlm, res, target);
2681         res->state &= ~DLM_LOCK_RES_MIGRATING;
2682         dlm_remove_nonlocal_locks(dlm, res);
2683         spin_unlock(&res->spinlock);
2684         wake_up(&res->wq);
2685
2686         /* master is known, detach if not already detached */
2687         dlm_mle_detach_hb_events(dlm, mle);
2688         dlm_put_mle_inuse(mle);
2689         ret = 0;
2690
2691         dlm_lockres_calc_usage(dlm, res);
2692
2693 leave:
2694         /* re-dirty the lockres if we failed */
2695         if (ret < 0)
2696                 dlm_kick_thread(dlm, res);
2697
2698         /* wake up waiters if the MIGRATING flag got set
2699          * but migration failed */
2700         if (wake)
2701                 wake_up(&res->wq);
2702
2703         /* TODO: cleanup */
2704         if (mres)
2705                 free_page((unsigned long)mres);
2706
2707         dlm_put(dlm);
2708
2709         mlog(0, "returning %d\n", ret);
2710         return ret;
2711 }
2712
2713 #define DLM_MIGRATION_RETRY_MS  100
2714
2715 /* Should be called only after beginning the domain leave process.
2716  * There should not be any remaining locks on nonlocal lock resources,
2717  * and there should be no local locks left on locally mastered resources.
2718  *
2719  * Called with the dlm spinlock held, may drop it to do migration, but
2720  * will re-acquire before exit.
2721  *
2722  * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
2723 int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2724 {
2725         int ret;
2726         int lock_dropped = 0;
2727         int numlocks;
2728
2729         spin_lock(&res->spinlock);
2730         if (res->owner != dlm->node_num) {
2731                 if (!__dlm_lockres_unused(res)) {
2732                         mlog(ML_ERROR, "%s:%.*s: this node is not master, "
2733                              "trying to free this but locks remain\n",
2734                              dlm->name, res->lockname.len, res->lockname.name);
2735                 }
2736                 spin_unlock(&res->spinlock);
2737                 goto leave;
2738         }
2739
2740         /* No need to migrate a lockres having no locks */
2741         ret = dlm_is_lockres_migrateable(dlm, res, &numlocks);
2742         if (ret >= 0 && numlocks == 0) {
2743                 spin_unlock(&res->spinlock);
2744                 goto leave;
2745         }
2746         spin_unlock(&res->spinlock);
2747
2748         /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
2749         spin_unlock(&dlm->spinlock);
2750         lock_dropped = 1;
2751         while (1) {
2752                 ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
2753                 if (ret >= 0)
2754                         break;
2755                 if (ret == -ENOTEMPTY) {
2756                         mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
2757                                 res->lockname.len, res->lockname.name);
2758                         BUG();
2759                 }
2760
2761                 mlog(0, "lockres %.*s: migrate failed, "
2762                      "retrying\n", res->lockname.len,
2763                      res->lockname.name);
2764                 msleep(DLM_MIGRATION_RETRY_MS);
2765         }
2766         spin_lock(&dlm->spinlock);
2767 leave:
2768         return lock_dropped;
2769 }
2770
2771 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2772 {
2773         int ret;
2774         spin_lock(&dlm->ast_lock);
2775         spin_lock(&lock->spinlock);
2776         ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2777         spin_unlock(&lock->spinlock);
2778         spin_unlock(&dlm->ast_lock);
2779         return ret;
2780 }
2781
2782 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2783                                      struct dlm_lock_resource *res,
2784                                      u8 mig_target)
2785 {
2786         int can_proceed;
2787         spin_lock(&res->spinlock);
2788         can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2789         spin_unlock(&res->spinlock);
2790
2791         /* target has died, so make the caller break out of the 
2792          * wait_event, but caller must recheck the domain_map */
2793         spin_lock(&dlm->spinlock);
2794         if (!test_bit(mig_target, dlm->domain_map))
2795                 can_proceed = 1;
2796         spin_unlock(&dlm->spinlock);
2797         return can_proceed;
2798 }
2799
2800 static int dlm_lockres_is_dirty(struct dlm_ctxt *dlm,
2801                                 struct dlm_lock_resource *res)
2802 {
2803         int ret;
2804         spin_lock(&res->spinlock);
2805         ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2806         spin_unlock(&res->spinlock);
2807         return ret;
2808 }
2809
2810
2811 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2812                                        struct dlm_lock_resource *res,
2813                                        u8 target)
2814 {
2815         int ret = 0;
2816
2817         mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2818                res->lockname.len, res->lockname.name, dlm->node_num,
2819                target);
2820         /* need to set MIGRATING flag on lockres.  this is done by
2821          * ensuring that all asts have been flushed for this lockres. */
2822         spin_lock(&res->spinlock);
2823         BUG_ON(res->migration_pending);
2824         res->migration_pending = 1;
2825         /* strategy is to reserve an extra ast then release
2826          * it below, letting the release do all of the work */
2827         __dlm_lockres_reserve_ast(res);
2828         spin_unlock(&res->spinlock);
2829
2830         /* now flush all the pending asts */
2831         dlm_kick_thread(dlm, res);
2832         /* before waiting on DIRTY, block processes which may
2833          * try to dirty the lockres before MIGRATING is set */
2834         spin_lock(&res->spinlock);
2835         BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
2836         res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
2837         spin_unlock(&res->spinlock);
2838         /* now wait on any pending asts and the DIRTY state */
2839         wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2840         dlm_lockres_release_ast(dlm, res);
2841
2842         mlog(0, "about to wait on migration_wq, dirty=%s\n",
2843                res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2844         /* if the extra ref we just put was the final one, this
2845          * will pass thru immediately.  otherwise, we need to wait
2846          * for the last ast to finish. */
2847 again:
2848         ret = wait_event_interruptible_timeout(dlm->migration_wq,
2849                    dlm_migration_can_proceed(dlm, res, target),
2850                    msecs_to_jiffies(1000));
2851         if (ret < 0) {
2852                 mlog(0, "woken again: migrating? %s, dead? %s\n",
2853                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2854                        test_bit(target, dlm->domain_map) ? "no":"yes");
2855         } else {
2856                 mlog(0, "all is well: migrating? %s, dead? %s\n",
2857                        res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2858                        test_bit(target, dlm->domain_map) ? "no":"yes");
2859         }
2860         if (!dlm_migration_can_proceed(dlm, res, target)) {
2861                 mlog(0, "trying again...\n");
2862                 goto again;
2863         }
2864         /* now that we are sure the MIGRATING state is there, drop
2865          * the unneded state which blocked threads trying to DIRTY */
2866         spin_lock(&res->spinlock);
2867         BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
2868         BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
2869         res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
2870         spin_unlock(&res->spinlock);
2871
2872         /* did the target go down or die? */
2873         spin_lock(&dlm->spinlock);
2874         if (!test_bit(target, dlm->domain_map)) {
2875                 mlog(ML_ERROR, "aha. migration target %u just went down\n",
2876                      target);
2877                 ret = -EHOSTDOWN;
2878         }
2879         spin_unlock(&dlm->spinlock);
2880
2881         /*
2882          * at this point:
2883          *
2884          *   o the DLM_LOCK_RES_MIGRATING flag is set
2885          *   o there are no pending asts on this lockres
2886          *   o all processes trying to reserve an ast on this
2887          *     lockres must wait for the MIGRATING flag to clear
2888          */
2889         return ret;
2890 }
2891
2892 /* last step in the migration process.
2893  * original master calls this to free all of the dlm_lock
2894  * structures that used to be for other nodes. */
2895 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2896                                       struct dlm_lock_resource *res)
2897 {
2898         struct list_head *queue = &res->granted;
2899         int i, bit;
2900         struct dlm_lock *lock, *next;
2901
2902         assert_spin_locked(&res->spinlock);
2903
2904         BUG_ON(res->owner == dlm->node_num);
2905
2906         for (i=0; i<3; i++) {
2907                 list_for_each_entry_safe(lock, next, queue, list) {
2908                         if (lock->ml.node != dlm->node_num) {
2909                                 mlog(0, "putting lock for node %u\n",
2910                                      lock->ml.node);
2911                                 /* be extra careful */
2912                                 BUG_ON(!list_empty(&lock->ast_list));
2913                                 BUG_ON(!list_empty(&lock->bast_list));
2914                                 BUG_ON(lock->ast_pending);
2915                                 BUG_ON(lock->bast_pending);
2916                                 dlm_lockres_clear_refmap_bit(lock->ml.node, res);
2917                                 list_del_init(&lock->list);
2918                                 dlm_lock_put(lock);
2919                                 /* In a normal unlock, we would have added a
2920                                  * DLM_UNLOCK_FREE_LOCK action. Force it. */
2921                                 dlm_lock_put(lock);
2922                         }
2923                 }
2924                 queue++;
2925         }
2926         bit = 0;
2927         while (1) {
2928                 bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
2929                 if (bit >= O2NM_MAX_NODES)
2930                         break;
2931                 /* do not clear the local node reference, if there is a
2932                  * process holding this, let it drop the ref itself */
2933                 if (bit != dlm->node_num) {
2934                         mlog(0, "%s:%.*s: node %u had a ref to this "
2935                              "migrating lockres, clearing\n", dlm->name,
2936                              res->lockname.len, res->lockname.name, bit);
2937                         dlm_lockres_clear_refmap_bit(bit, res);
2938                 }
2939                 bit++;
2940         }
2941 }
2942
2943 /* for now this is not too intelligent.  we will
2944  * need stats to make this do the right thing.
2945  * this just finds the first lock on one of the
2946  * queues and uses that node as the target. */
2947 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2948                                     struct dlm_lock_resource *res)
2949 {
2950         int i;
2951         struct list_head *queue = &res->granted;
2952         struct dlm_lock *lock;
2953         int nodenum;
2954
2955         assert_spin_locked(&dlm->spinlock);
2956
2957         spin_lock(&res->spinlock);
2958         for (i=0; i<3; i++) {
2959                 list_for_each_entry(lock, queue, list) {
2960                         /* up to the caller to make sure this node
2961                          * is alive */
2962                         if (lock->ml.node != dlm->node_num) {
2963                                 spin_unlock(&res->spinlock);
2964                                 return lock->ml.node;
2965                         }
2966                 }
2967                 queue++;
2968         }
2969         spin_unlock(&res->spinlock);
2970         mlog(0, "have not found a suitable target yet! checking domain map\n");
2971
2972         /* ok now we're getting desperate.  pick anyone alive. */
2973         nodenum = -1;
2974         while (1) {
2975                 nodenum = find_next_bit(dlm->domain_map,
2976                                         O2NM_MAX_NODES, nodenum+1);
2977                 mlog(0, "found %d in domain map\n", nodenum);
2978                 if (nodenum >= O2NM_MAX_NODES)
2979                         break;
2980                 if (nodenum != dlm->node_num) {
2981                         mlog(0, "picking %d\n", nodenum);
2982                         return nodenum;
2983                 }
2984         }
2985
2986         mlog(0, "giving up.  no master to migrate to\n");
2987         return DLM_LOCK_RES_OWNER_UNKNOWN;
2988 }
2989
2990
2991
2992 /* this is called by the new master once all lockres
2993  * data has been received */
2994 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2995                                   struct dlm_lock_resource *res,
2996                                   u8 master, u8 new_master,
2997                                   struct dlm_node_iter *iter)
2998 {
2999         struct dlm_migrate_request migrate;
3000         int ret, skip, status = 0;
3001         int nodenum;
3002
3003         memset(&migrate, 0, sizeof(migrate));
3004         migrate.namelen = res->lockname.len;
3005         memcpy(migrate.name, res->lockname.name, migrate.namelen);
3006         migrate.new_master = new_master;
3007         migrate.master = master;
3008
3009         ret = 0;
3010
3011         /* send message to all nodes, except the master and myself */
3012         while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
3013                 if (nodenum == master ||
3014                     nodenum == new_master)
3015                         continue;
3016
3017                 /* We could race exit domain. If exited, skip. */
3018                 spin_lock(&dlm->spinlock);
3019                 skip = (!test_bit(nodenum, dlm->domain_map));
3020                 spin_unlock(&dlm->spinlock);
3021                 if (skip) {
3022                         clear_bit(nodenum, iter->node_map);
3023                         continue;
3024                 }
3025
3026                 ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
3027                                          &migrate, sizeof(migrate), nodenum,
3028                                          &status);
3029                 if (ret < 0) {
3030                         mlog(0, "migrate_request returned %d!\n", ret);
3031                         if (!dlm_is_host_down(ret)) {
3032                                 mlog(ML_ERROR, "unhandled error=%d!\n", ret);
3033                                 BUG();
3034                         }
3035                         clear_bit(nodenum, iter->node_map);
3036                         ret = 0;
3037                 } else if (status < 0) {
3038                         mlog(0, "migrate request (node %u) returned %d!\n",
3039                              nodenum, status);
3040                         ret = status;
3041                 } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
3042                         /* during the migration request we short-circuited
3043                          * the mastery of the lockres.  make sure we have
3044                          * a mastery ref for nodenum */
3045                         mlog(0, "%s:%.*s: need ref for node %u\n",
3046                              dlm->name, res->lockname.len, res->lockname.name,
3047                              nodenum);
3048                         spin_lock(&res->spinlock);
3049                         dlm_lockres_set_refmap_bit(nodenum, res);
3050                         spin_unlock(&res->spinlock);
3051                 }
3052         }
3053
3054         if (ret < 0)
3055                 mlog_errno(ret);
3056
3057         mlog(0, "returning ret=%d\n", ret);
3058         return ret;
3059 }
3060
3061
3062 /* if there is an existing mle for this lockres, we now know who the master is.
3063  * (the one who sent us *this* message) we can clear it up right away.
3064  * since the process that put the mle on the list still has a reference to it,
3065  * we can unhash it now, set the master and wake the process.  as a result,
3066  * we will have no mle in the list to start with.  now we can add an mle for
3067  * the migration and this should be the only one found for those scanning the
3068  * list.  */
3069 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data,
3070                                 void **ret_data)
3071 {
3072         struct dlm_ctxt *dlm = data;
3073         struct dlm_lock_resource *res = NULL;
3074         struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
3075         struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
3076         const char *name;
3077         unsigned int namelen, hash;
3078         int ret = 0;
3079
3080         if (!dlm_grab(dlm))
3081                 return -EINVAL;
3082
3083         name = migrate->name;
3084         namelen = migrate->namelen;
3085         hash = dlm_lockid_hash(name, namelen);
3086
3087         /* preallocate.. if this fails, abort */
3088         mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
3089                                                          GFP_NOFS);
3090
3091         if (!mle) {
3092                 ret = -ENOMEM;
3093                 goto leave;
3094         }
3095
3096         /* check for pre-existing lock */
3097         spin_lock(&dlm->spinlock);
3098         res = __dlm_lookup_lockres(dlm, name, namelen, hash);
3099         spin_lock(&dlm->master_lock);
3100
3101         if (res) {
3102                 spin_lock(&res->spinlock);
3103                 if (res->state & DLM_LOCK_RES_RECOVERING) {
3104                         /* if all is working ok, this can only mean that we got
3105                         * a migrate request from a node that we now see as
3106                         * dead.  what can we do here?  drop it to the floor? */
3107                         spin_unlock(&res->spinlock);
3108                         mlog(ML_ERROR, "Got a migrate request, but the "
3109                              "lockres is marked as recovering!");
3110                         kmem_cache_free(dlm_mle_cache, mle);
3111                         ret = -EINVAL; /* need a better solution */
3112                         goto unlock;
3113                 }
3114                 res->state |= DLM_LOCK_RES_MIGRATING;
3115                 spin_unlock(&res->spinlock);
3116         }
3117
3118         /* ignore status.  only nonzero status would BUG. */
3119         ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
3120                                     name, namelen,
3121                                     migrate->new_master,
3122                                     migrate->master);
3123
3124 unlock:
3125         spin_unlock(&dlm->master_lock);
3126         spin_unlock(&dlm->spinlock);
3127
3128         if (oldmle) {
3129                 /* master is known, detach if not already detached */
3130                 dlm_mle_detach_hb_events(dlm, oldmle);
3131                 dlm_put_mle(oldmle);
3132         }
3133
3134         if (res)
3135                 dlm_lockres_put(res);
3136 leave:
3137         dlm_put(dlm);
3138         return ret;
3139 }
3140
3141 /* must be holding dlm->spinlock and dlm->master_lock
3142  * when adding a migration mle, we can clear any other mles
3143  * in the master list because we know with certainty that
3144  * the master is "master".  so we remove any old mle from
3145  * the list after setting it's master field, and then add
3146  * the new migration mle.  this way we can hold with the rule
3147  * of having only one mle for a given lock name at all times. */
3148 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
3149                                  struct dlm_lock_resource *res,
3150                                  struct dlm_master_list_entry *mle,
3151                                  struct dlm_master_list_entry **oldmle,
3152                                  const char *name, unsigned int namelen,
3153                                  u8 new_master, u8 master)
3154 {
3155         int found;
3156         int ret = 0;
3157
3158         *oldmle = NULL;
3159
3160         mlog_entry_void();
3161
3162         assert_spin_locked(&dlm->spinlock);
3163         assert_spin_locked(&dlm->master_lock);
3164
3165         /* caller is responsible for any ref taken here on oldmle */
3166         found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
3167         if (found) {
3168                 struct dlm_master_list_entry *tmp = *oldmle;
3169                 spin_lock(&tmp->spinlock);
3170                 if (tmp->type == DLM_MLE_MIGRATION) {
3171                         if (master == dlm->node_num) {
3172                                 /* ah another process raced me to it */
3173                                 mlog(0, "tried to migrate %.*s, but some "
3174                                      "process beat me to it\n",
3175                                      namelen, name);
3176                                 ret = -EEXIST;
3177                         } else {
3178                                 /* bad.  2 NODES are trying to migrate! */
3179                                 mlog(ML_ERROR, "migration error  mle: "
3180                                      "master=%u new_master=%u // request: "
3181                                      "master=%u new_master=%u // "
3182                                      "lockres=%.*s\n",
3183                                      tmp->master, tmp->new_master,
3184                                      master, new_master,
3185                                      namelen, name);
3186                                 BUG();
3187                         }
3188                 } else {
3189                         /* this is essentially what assert_master does */
3190                         tmp->master = master;
3191                         atomic_set(&tmp->woken, 1);
3192                         wake_up(&tmp->wq);
3193                         /* remove it so that only one mle will be found */
3194                         __dlm_unlink_mle(dlm, tmp);
3195                         __dlm_mle_detach_hb_events(dlm, tmp);
3196                         ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
3197                         mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
3198                             "telling master to get ref for cleared out mle "
3199                             "during migration\n", dlm->name, namelen, name,
3200                             master, new_master);
3201                 }
3202                 spin_unlock(&tmp->spinlock);
3203         }
3204
3205         /* now add a migration mle to the tail of the list */
3206         dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
3207         mle->new_master = new_master;
3208         /* the new master will be sending an assert master for this.
3209          * at that point we will get the refmap reference */
3210         mle->master = master;
3211         /* do this for consistency with other mle types */
3212         set_bit(new_master, mle->maybe_map);
3213         __dlm_insert_mle(dlm, mle);
3214
3215         return ret;
3216 }
3217
3218 /*
3219  * Sets the owner of the lockres, associated to the mle, to UNKNOWN
3220  */
3221 static struct dlm_lock_resource *dlm_reset_mleres_owner(struct dlm_ctxt *dlm,
3222                                         struct dlm_master_list_entry *mle)
3223 {
3224         struct dlm_lock_resource *res;
3225         unsigned int hash;
3226
3227         /* Find the lockres associated to the mle and set its owner to UNK */
3228         hash = dlm_lockid_hash(mle->u.mlename.name, mle->u.mlename.len);
3229         res = __dlm_lookup_lockres(dlm, mle->u.mlename.name, mle->u.mlename.len,
3230                                    hash);
3231         if (res) {
3232                 spin_unlock(&dlm->master_lock);
3233
3234                 /* move lockres onto recovery list */
3235                 spin_lock(&res->spinlock);
3236                 dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
3237                 dlm_move_lockres_to_recovery_list(dlm, res);
3238                 spin_unlock(&res->spinlock);
3239                 dlm_lockres_put(res);
3240
3241                 /* about to get rid of mle, detach from heartbeat */
3242                 __dlm_mle_detach_hb_events(dlm, mle);
3243
3244                 /* dump the mle */
3245                 spin_lock(&dlm->master_lock);
3246                 __dlm_put_mle(mle);
3247                 spin_unlock(&dlm->master_lock);
3248         }
3249
3250         return res;
3251 }
3252
3253 static void dlm_clean_migration_mle(struct dlm_ctxt *dlm,
3254                                     struct dlm_master_list_entry *mle)
3255 {
3256         __dlm_mle_detach_hb_events(dlm, mle);
3257
3258         spin_lock(&mle->spinlock);
3259         __dlm_unlink_mle(dlm, mle);
3260         atomic_set(&mle->woken, 1);
3261         spin_unlock(&mle->spinlock);
3262
3263         wake_up(&mle->wq);
3264 }
3265
3266 static void dlm_clean_block_mle(struct dlm_ctxt *dlm,
3267                                 struct dlm_master_list_entry *mle, u8 dead_node)
3268 {
3269         int bit;
3270
3271         BUG_ON(mle->type != DLM_MLE_BLOCK);
3272
3273         spin_lock(&mle->spinlock);
3274         bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
3275         if (bit != dead_node) {
3276                 mlog(0, "mle found, but dead node %u would not have been "
3277                      "master\n", dead_node);
3278                 spin_unlock(&mle->spinlock);
3279         } else {
3280                 /* Must drop the refcount by one since the assert_master will
3281                  * never arrive. This may result in the mle being unlinked and
3282                  * freed, but there may still be a process waiting in the
3283                  * dlmlock path which is fine. */
3284                 mlog(0, "node %u was expected master\n", dead_node);
3285                 atomic_set(&mle->woken, 1);
3286                 spin_unlock(&mle->spinlock);
3287                 wake_up(&mle->wq);
3288
3289                 /* Do not need events any longer, so detach from heartbeat */
3290                 __dlm_mle_detach_hb_events(dlm, mle);
3291                 __dlm_put_mle(mle);
3292         }
3293 }
3294
3295 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
3296 {
3297         struct dlm_master_list_entry *mle;
3298         struct dlm_lock_resource *res;
3299         struct hlist_head *bucket;
3300         struct hlist_node *list;
3301         unsigned int i;
3302
3303         mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
3304 top:
3305         assert_spin_locked(&dlm->spinlock);
3306
3307         /* clean the master list */
3308         spin_lock(&dlm->master_lock);
3309         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
3310                 bucket = dlm_master_hash(dlm, i);
3311                 hlist_for_each(list, bucket) {
3312                         mle = hlist_entry(list, struct dlm_master_list_entry,
3313                                           master_hash_node);
3314
3315                         BUG_ON(mle->type != DLM_MLE_BLOCK &&
3316                                mle->type != DLM_MLE_MASTER &&
3317                                mle->type != DLM_MLE_MIGRATION);
3318
3319                         /* MASTER mles are initiated locally. The waiting
3320                          * process will notice the node map change shortly.
3321                          * Let that happen as normal. */
3322                         if (mle->type == DLM_MLE_MASTER)
3323                                 continue;
3324
3325                         /* BLOCK mles are initiated by other nodes. Need to
3326                          * clean up if the dead node would have been the
3327                          * master. */
3328                         if (mle->type == DLM_MLE_BLOCK) {
3329                                 dlm_clean_block_mle(dlm, mle, dead_node);
3330                                 continue;
3331                         }
3332
3333                         /* Everything else is a MIGRATION mle */
3334
3335                         /* The rule for MIGRATION mles is that the master
3336                          * becomes UNKNOWN if *either* the original or the new
3337                          * master dies. All UNKNOWN lockres' are sent to
3338                          * whichever node becomes the recovery master. The new
3339                          * master is responsible for determining if there is
3340                          * still a master for this lockres, or if he needs to
3341                          * take over mastery. Either way, this node should
3342                          * expect another message to resolve this. */
3343
3344                         if (mle->master != dead_node &&
3345                             mle->new_master != dead_node)
3346                                 continue;
3347
3348                         /* If we have reached this point, this mle needs to be
3349                          * removed from the list and freed. */
3350                         dlm_clean_migration_mle(dlm, mle);
3351
3352                         mlog(0, "%s: node %u died during migration from "
3353                              "%u to %u!\n", dlm->name, dead_node, mle->master,
3354                              mle->new_master);
3355
3356                         /* If we find a lockres associated with the mle, we've
3357                          * hit this rare case that messes up our lock ordering.
3358                          * If so, we need to drop the master lock so that we can
3359                          * take the lockres lock, meaning that we will have to
3360                          * restart from the head of list. */
3361                         res = dlm_reset_mleres_owner(dlm, mle);
3362                         if (res)
3363                                 /* restart */
3364                                 goto top;
3365
3366                         /* This may be the last reference */
3367                         __dlm_put_mle(mle);
3368                 }
3369         }
3370         spin_unlock(&dlm->master_lock);
3371 }
3372
3373 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
3374                          u8 old_master)
3375 {
3376         struct dlm_node_iter iter;
3377         int ret = 0;
3378
3379         spin_lock(&dlm->spinlock);
3380         dlm_node_iter_init(dlm->domain_map, &iter);
3381         clear_bit(old_master, iter.node_map);
3382         clear_bit(dlm->node_num, iter.node_map);
3383         spin_unlock(&dlm->spinlock);
3384
3385         /* ownership of the lockres is changing.  account for the
3386          * mastery reference here since old_master will briefly have
3387          * a reference after the migration completes */
3388         spin_lock(&res->spinlock);
3389         dlm_lockres_set_refmap_bit(old_master, res);
3390         spin_unlock(&res->spinlock);
3391
3392         mlog(0, "now time to do a migrate request to other nodes\n");
3393         ret = dlm_do_migrate_request(dlm, res, old_master,
3394                                      dlm->node_num, &iter);
3395         if (ret < 0) {
3396                 mlog_errno(ret);
3397                 goto leave;
3398         }
3399
3400         mlog(0, "doing assert master of %.*s to all except the original node\n",
3401              res->lockname.len, res->lockname.name);
3402         /* this call now finishes out the nodemap
3403          * even if one or more nodes die */
3404         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3405                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3406         if (ret < 0) {
3407                 /* no longer need to retry.  all living nodes contacted. */
3408                 mlog_errno(ret);
3409                 ret = 0;
3410         }
3411
3412         memset(iter.node_map, 0, sizeof(iter.node_map));
3413         set_bit(old_master, iter.node_map);
3414         mlog(0, "doing assert master of %.*s back to %u\n",
3415              res->lockname.len, res->lockname.name, old_master);
3416         ret = dlm_do_assert_master(dlm, res, iter.node_map,
3417                                    DLM_ASSERT_MASTER_FINISH_MIGRATION);
3418         if (ret < 0) {
3419                 mlog(0, "assert master to original master failed "
3420                      "with %d.\n", ret);
3421                 /* the only nonzero status here would be because of
3422                  * a dead original node.  we're done. */
3423                 ret = 0;
3424         }
3425
3426         /* all done, set the owner, clear the flag */
3427         spin_lock(&res->spinlock);
3428         dlm_set_lockres_owner(dlm, res, dlm->node_num);
3429         res->state &= ~DLM_LOCK_RES_MIGRATING;
3430         spin_unlock(&res->spinlock);
3431         /* re-dirty it on the new master */
3432         dlm_kick_thread(dlm, res);
3433         wake_up(&res->wq);
3434 leave:
3435         return ret;
3436 }
3437
3438 /*
3439  * LOCKRES AST REFCOUNT
3440  * this is integral to migration
3441  */
3442
3443 /* for future intent to call an ast, reserve one ahead of time.
3444  * this should be called only after waiting on the lockres
3445  * with dlm_wait_on_lockres, and while still holding the
3446  * spinlock after the call. */
3447 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
3448 {
3449         assert_spin_locked(&res->spinlock);
3450         if (res->state & DLM_LOCK_RES_MIGRATING) {
3451                 __dlm_print_one_lock_resource(res);
3452         }
3453         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3454
3455         atomic_inc(&res->asts_reserved);
3456 }
3457
3458 /*
3459  * used to drop the reserved ast, either because it went unused,
3460  * or because the ast/bast was actually called.
3461  *
3462  * also, if there is a pending migration on this lockres,
3463  * and this was the last pending ast on the lockres,
3464  * atomically set the MIGRATING flag before we drop the lock.
3465  * this is how we ensure that migration can proceed with no
3466  * asts in progress.  note that it is ok if the state of the
3467  * queues is such that a lock should be granted in the future
3468  * or that a bast should be fired, because the new master will
3469  * shuffle the lists on this lockres as soon as it is migrated.
3470  */
3471 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
3472                              struct dlm_lock_resource *res)
3473 {
3474         if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
3475                 return;
3476
3477         if (!res->migration_pending) {
3478                 spin_unlock(&res->spinlock);
3479                 return;
3480         }
3481
3482         BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
3483         res->migration_pending = 0;
3484         res->state |= DLM_LOCK_RES_MIGRATING;
3485         spin_unlock(&res->spinlock);
3486         wake_up(&res->wq);
3487         wake_up(&dlm->migration_wq);
3488 }