Merge branch 'master' into 85xx
[pandora-kernel.git] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36
37 #include "heartbeat.h"
38 #include "tcp.h"
39 #include "nodemanager.h"
40 #include "quorum.h"
41
42 #include "masklog.h"
43
44
45 /*
46  * The first heartbeat pass had one global thread that would serialize all hb
47  * callback calls.  This global serializing sem should only be removed once
48  * we've made sure that all callees can deal with being called concurrently
49  * from multiple hb region threads.
50  */
51 static DECLARE_RWSEM(o2hb_callback_sem);
52
53 /*
54  * multiple hb threads are watching multiple regions.  A node is live
55  * whenever any of the threads sees activity from the node in its region.
56  */
57 static DEFINE_SPINLOCK(o2hb_live_lock);
58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60 static LIST_HEAD(o2hb_node_events);
61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62
63 static LIST_HEAD(o2hb_all_regions);
64
65 static struct o2hb_callback {
66         struct list_head list;
67 } o2hb_callbacks[O2HB_NUM_CB];
68
69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70
71 #define O2HB_DEFAULT_BLOCK_BITS       9
72
73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74
75 /* Only sets a new threshold if there are no active regions. 
76  *
77  * No locking or otherwise interesting code is required for reading
78  * o2hb_dead_threshold as it can't change once regions are active and
79  * it's not interesting to anyone until then anyway. */
80 static void o2hb_dead_threshold_set(unsigned int threshold)
81 {
82         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83                 spin_lock(&o2hb_live_lock);
84                 if (list_empty(&o2hb_all_regions))
85                         o2hb_dead_threshold = threshold;
86                 spin_unlock(&o2hb_live_lock);
87         }
88 }
89
90 struct o2hb_node_event {
91         struct list_head        hn_item;
92         enum o2hb_callback_type hn_event_type;
93         struct o2nm_node        *hn_node;
94         int                     hn_node_num;
95 };
96
97 struct o2hb_disk_slot {
98         struct o2hb_disk_heartbeat_block *ds_raw_block;
99         u8                      ds_node_num;
100         u64                     ds_last_time;
101         u64                     ds_last_generation;
102         u16                     ds_equal_samples;
103         u16                     ds_changed_samples;
104         struct list_head        ds_live_item;
105 };
106
107 /* each thread owns a region.. when we're asked to tear down the region
108  * we ask the thread to stop, who cleans up the region */
109 struct o2hb_region {
110         struct config_item      hr_item;
111
112         struct list_head        hr_all_item;
113         unsigned                hr_unclean_stop:1;
114
115         /* protected by the hr_callback_sem */
116         struct task_struct      *hr_task;
117
118         unsigned int            hr_blocks;
119         unsigned long long      hr_start_block;
120
121         unsigned int            hr_block_bits;
122         unsigned int            hr_block_bytes;
123
124         unsigned int            hr_slots_per_page;
125         unsigned int            hr_num_pages;
126
127         struct page             **hr_slot_data;
128         struct block_device     *hr_bdev;
129         struct o2hb_disk_slot   *hr_slots;
130
131         /* let the person setting up hb wait for it to return until it
132          * has reached a 'steady' state.  This will be fixed when we have
133          * a more complete api that doesn't lead to this sort of fragility. */
134         atomic_t                hr_steady_iterations;
135
136         char                    hr_dev_name[BDEVNAME_SIZE];
137
138         unsigned int            hr_timeout_ms;
139
140         /* randomized as the region goes up and down so that a node
141          * recognizes a node going up and down in one iteration */
142         u64                     hr_generation;
143
144         struct delayed_work     hr_write_timeout_work;
145         unsigned long           hr_last_timeout_start;
146
147         /* Used during o2hb_check_slot to hold a copy of the block
148          * being checked because we temporarily have to zero out the
149          * crc field. */
150         struct o2hb_disk_heartbeat_block *hr_tmp_block;
151 };
152
153 struct o2hb_bio_wait_ctxt {
154         atomic_t          wc_num_reqs;
155         struct completion wc_io_complete;
156         int               wc_error;
157 };
158
159 static void o2hb_write_timeout(struct work_struct *work)
160 {
161         struct o2hb_region *reg =
162                 container_of(work, struct o2hb_region,
163                              hr_write_timeout_work.work);
164
165         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
166              "milliseconds\n", reg->hr_dev_name,
167              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 
168         o2quo_disk_timeout();
169 }
170
171 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
172 {
173         mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
174
175         cancel_delayed_work(&reg->hr_write_timeout_work);
176         reg->hr_last_timeout_start = jiffies;
177         schedule_delayed_work(&reg->hr_write_timeout_work,
178                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
179 }
180
181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
182 {
183         cancel_delayed_work(&reg->hr_write_timeout_work);
184         flush_scheduled_work();
185 }
186
187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
188 {
189         atomic_set(&wc->wc_num_reqs, 1);
190         init_completion(&wc->wc_io_complete);
191         wc->wc_error = 0;
192 }
193
194 /* Used in error paths too */
195 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
196                                      unsigned int num)
197 {
198         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
199          * good news is that the fast path only completes one at a time */
200         while(num--) {
201                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
202                         BUG_ON(num > 0);
203                         complete(&wc->wc_io_complete);
204                 }
205         }
206 }
207
208 static void o2hb_wait_on_io(struct o2hb_region *reg,
209                             struct o2hb_bio_wait_ctxt *wc)
210 {
211         struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
212
213         blk_run_address_space(mapping);
214         o2hb_bio_wait_dec(wc, 1);
215
216         wait_for_completion(&wc->wc_io_complete);
217 }
218
219 static int o2hb_bio_end_io(struct bio *bio,
220                            unsigned int bytes_done,
221                            int error)
222 {
223         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
224
225         if (error) {
226                 mlog(ML_ERROR, "IO Error %d\n", error);
227                 wc->wc_error = error;
228         }
229
230         if (bio->bi_size)
231                 return 1;
232
233         o2hb_bio_wait_dec(wc, 1);
234         bio_put(bio);
235         return 0;
236 }
237
238 /* Setup a Bio to cover I/O against num_slots slots starting at
239  * start_slot. */
240 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
241                                       struct o2hb_bio_wait_ctxt *wc,
242                                       unsigned int *current_slot,
243                                       unsigned int max_slots)
244 {
245         int len, current_page;
246         unsigned int vec_len, vec_start;
247         unsigned int bits = reg->hr_block_bits;
248         unsigned int spp = reg->hr_slots_per_page;
249         unsigned int cs = *current_slot;
250         struct bio *bio;
251         struct page *page;
252
253         /* Testing has shown this allocation to take long enough under
254          * GFP_KERNEL that the local node can get fenced. It would be
255          * nicest if we could pre-allocate these bios and avoid this
256          * all together. */
257         bio = bio_alloc(GFP_ATOMIC, 16);
258         if (!bio) {
259                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
260                 bio = ERR_PTR(-ENOMEM);
261                 goto bail;
262         }
263
264         /* Must put everything in 512 byte sectors for the bio... */
265         bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
266         bio->bi_bdev = reg->hr_bdev;
267         bio->bi_private = wc;
268         bio->bi_end_io = o2hb_bio_end_io;
269
270         vec_start = (cs << bits) % PAGE_CACHE_SIZE;
271         while(cs < max_slots) {
272                 current_page = cs / spp;
273                 page = reg->hr_slot_data[current_page];
274
275                 vec_len = min(PAGE_CACHE_SIZE,
276                               (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
277
278                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
279                      current_page, vec_len, vec_start);
280
281                 len = bio_add_page(bio, page, vec_len, vec_start);
282                 if (len != vec_len) break;
283
284                 cs += vec_len / (PAGE_CACHE_SIZE/spp);
285                 vec_start = 0;
286         }
287
288 bail:
289         *current_slot = cs;
290         return bio;
291 }
292
293 static int o2hb_read_slots(struct o2hb_region *reg,
294                            unsigned int max_slots)
295 {
296         unsigned int current_slot=0;
297         int status;
298         struct o2hb_bio_wait_ctxt wc;
299         struct bio *bio;
300
301         o2hb_bio_wait_init(&wc);
302
303         while(current_slot < max_slots) {
304                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
305                 if (IS_ERR(bio)) {
306                         status = PTR_ERR(bio);
307                         mlog_errno(status);
308                         goto bail_and_wait;
309                 }
310
311                 atomic_inc(&wc.wc_num_reqs);
312                 submit_bio(READ, bio);
313         }
314
315         status = 0;
316
317 bail_and_wait:
318         o2hb_wait_on_io(reg, &wc);
319         if (wc.wc_error && !status)
320                 status = wc.wc_error;
321
322         return status;
323 }
324
325 static int o2hb_issue_node_write(struct o2hb_region *reg,
326                                  struct o2hb_bio_wait_ctxt *write_wc)
327 {
328         int status;
329         unsigned int slot;
330         struct bio *bio;
331
332         o2hb_bio_wait_init(write_wc);
333
334         slot = o2nm_this_node();
335
336         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
337         if (IS_ERR(bio)) {
338                 status = PTR_ERR(bio);
339                 mlog_errno(status);
340                 goto bail;
341         }
342
343         atomic_inc(&write_wc->wc_num_reqs);
344         submit_bio(WRITE, bio);
345
346         status = 0;
347 bail:
348         return status;
349 }
350
351 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
352                                      struct o2hb_disk_heartbeat_block *hb_block)
353 {
354         __le32 old_cksum;
355         u32 ret;
356
357         /* We want to compute the block crc with a 0 value in the
358          * hb_cksum field. Save it off here and replace after the
359          * crc. */
360         old_cksum = hb_block->hb_cksum;
361         hb_block->hb_cksum = 0;
362
363         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
364
365         hb_block->hb_cksum = old_cksum;
366
367         return ret;
368 }
369
370 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
371 {
372         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
373              "cksum = 0x%x, generation 0x%llx\n",
374              (long long)le64_to_cpu(hb_block->hb_seq),
375              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
376              (long long)le64_to_cpu(hb_block->hb_generation));
377 }
378
379 static int o2hb_verify_crc(struct o2hb_region *reg,
380                            struct o2hb_disk_heartbeat_block *hb_block)
381 {
382         u32 read, computed;
383
384         read = le32_to_cpu(hb_block->hb_cksum);
385         computed = o2hb_compute_block_crc_le(reg, hb_block);
386
387         return read == computed;
388 }
389
390 /* We want to make sure that nobody is heartbeating on top of us --
391  * this will help detect an invalid configuration. */
392 static int o2hb_check_last_timestamp(struct o2hb_region *reg)
393 {
394         int node_num, ret;
395         struct o2hb_disk_slot *slot;
396         struct o2hb_disk_heartbeat_block *hb_block;
397
398         node_num = o2nm_this_node();
399
400         ret = 1;
401         slot = &reg->hr_slots[node_num];
402         /* Don't check on our 1st timestamp */
403         if (slot->ds_last_time) {
404                 hb_block = slot->ds_raw_block;
405
406                 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
407                         ret = 0;
408         }
409
410         return ret;
411 }
412
413 static inline void o2hb_prepare_block(struct o2hb_region *reg,
414                                       u64 generation)
415 {
416         int node_num;
417         u64 cputime;
418         struct o2hb_disk_slot *slot;
419         struct o2hb_disk_heartbeat_block *hb_block;
420
421         node_num = o2nm_this_node();
422         slot = &reg->hr_slots[node_num];
423
424         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
425         memset(hb_block, 0, reg->hr_block_bytes);
426         /* TODO: time stuff */
427         cputime = CURRENT_TIME.tv_sec;
428         if (!cputime)
429                 cputime = 1;
430
431         hb_block->hb_seq = cpu_to_le64(cputime);
432         hb_block->hb_node = node_num;
433         hb_block->hb_generation = cpu_to_le64(generation);
434         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
435
436         /* This step must always happen last! */
437         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
438                                                                    hb_block));
439
440         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
441              (long long)cpu_to_le64(generation),
442              le32_to_cpu(hb_block->hb_cksum));
443 }
444
445 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
446                                 struct o2nm_node *node,
447                                 int idx)
448 {
449         struct list_head *iter;
450         struct o2hb_callback_func *f;
451
452         list_for_each(iter, &hbcall->list) {
453                 f = list_entry(iter, struct o2hb_callback_func, hc_item);
454                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
455                 (f->hc_func)(node, idx, f->hc_data);
456         }
457 }
458
459 /* Will run the list in order until we process the passed event */
460 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
461 {
462         int empty;
463         struct o2hb_callback *hbcall;
464         struct o2hb_node_event *event;
465
466         spin_lock(&o2hb_live_lock);
467         empty = list_empty(&queued_event->hn_item);
468         spin_unlock(&o2hb_live_lock);
469         if (empty)
470                 return;
471
472         /* Holding callback sem assures we don't alter the callback
473          * lists when doing this, and serializes ourselves with other
474          * processes wanting callbacks. */
475         down_write(&o2hb_callback_sem);
476
477         spin_lock(&o2hb_live_lock);
478         while (!list_empty(&o2hb_node_events)
479                && !list_empty(&queued_event->hn_item)) {
480                 event = list_entry(o2hb_node_events.next,
481                                    struct o2hb_node_event,
482                                    hn_item);
483                 list_del_init(&event->hn_item);
484                 spin_unlock(&o2hb_live_lock);
485
486                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
487                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
488                      event->hn_node_num);
489
490                 hbcall = hbcall_from_type(event->hn_event_type);
491
492                 /* We should *never* have gotten on to the list with a
493                  * bad type... This isn't something that we should try
494                  * to recover from. */
495                 BUG_ON(IS_ERR(hbcall));
496
497                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
498
499                 spin_lock(&o2hb_live_lock);
500         }
501         spin_unlock(&o2hb_live_lock);
502
503         up_write(&o2hb_callback_sem);
504 }
505
506 static void o2hb_queue_node_event(struct o2hb_node_event *event,
507                                   enum o2hb_callback_type type,
508                                   struct o2nm_node *node,
509                                   int node_num)
510 {
511         assert_spin_locked(&o2hb_live_lock);
512
513         event->hn_event_type = type;
514         event->hn_node = node;
515         event->hn_node_num = node_num;
516
517         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
518              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
519
520         list_add_tail(&event->hn_item, &o2hb_node_events);
521 }
522
523 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
524 {
525         struct o2hb_node_event event =
526                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
527         struct o2nm_node *node;
528
529         node = o2nm_get_node_by_num(slot->ds_node_num);
530         if (!node)
531                 return;
532
533         spin_lock(&o2hb_live_lock);
534         if (!list_empty(&slot->ds_live_item)) {
535                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
536                      slot->ds_node_num);
537
538                 list_del_init(&slot->ds_live_item);
539
540                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
541                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
542
543                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
544                                               slot->ds_node_num);
545                 }
546         }
547         spin_unlock(&o2hb_live_lock);
548
549         o2hb_run_event_list(&event);
550
551         o2nm_node_put(node);
552 }
553
554 static int o2hb_check_slot(struct o2hb_region *reg,
555                            struct o2hb_disk_slot *slot)
556 {
557         int changed = 0, gen_changed = 0;
558         struct o2hb_node_event event =
559                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
560         struct o2nm_node *node;
561         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
562         u64 cputime;
563         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
564         unsigned int slot_dead_ms;
565
566         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
567
568         /* Is this correct? Do we assume that the node doesn't exist
569          * if we're not configured for him? */
570         node = o2nm_get_node_by_num(slot->ds_node_num);
571         if (!node)
572                 return 0;
573
574         if (!o2hb_verify_crc(reg, hb_block)) {
575                 /* all paths from here will drop o2hb_live_lock for
576                  * us. */
577                 spin_lock(&o2hb_live_lock);
578
579                 /* Don't print an error on the console in this case -
580                  * a freshly formatted heartbeat area will not have a
581                  * crc set on it. */
582                 if (list_empty(&slot->ds_live_item))
583                         goto out;
584
585                 /* The node is live but pushed out a bad crc. We
586                  * consider it a transient miss but don't populate any
587                  * other values as they may be junk. */
588                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
589                      slot->ds_node_num, reg->hr_dev_name);
590                 o2hb_dump_slot(hb_block);
591
592                 slot->ds_equal_samples++;
593                 goto fire_callbacks;
594         }
595
596         /* we don't care if these wrap.. the state transitions below
597          * clear at the right places */
598         cputime = le64_to_cpu(hb_block->hb_seq);
599         if (slot->ds_last_time != cputime)
600                 slot->ds_changed_samples++;
601         else
602                 slot->ds_equal_samples++;
603         slot->ds_last_time = cputime;
604
605         /* The node changed heartbeat generations. We assume this to
606          * mean it dropped off but came back before we timed out. We
607          * want to consider it down for the time being but don't want
608          * to lose any changed_samples state we might build up to
609          * considering it live again. */
610         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
611                 gen_changed = 1;
612                 slot->ds_equal_samples = 0;
613                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
614                      "to 0x%llx)\n", slot->ds_node_num,
615                      (long long)slot->ds_last_generation,
616                      (long long)le64_to_cpu(hb_block->hb_generation));
617         }
618
619         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
620
621         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
622              "seq %llu last %llu changed %u equal %u\n",
623              slot->ds_node_num, (long long)slot->ds_last_generation,
624              le32_to_cpu(hb_block->hb_cksum),
625              (unsigned long long)le64_to_cpu(hb_block->hb_seq), 
626              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
627              slot->ds_equal_samples);
628
629         spin_lock(&o2hb_live_lock);
630
631 fire_callbacks:
632         /* dead nodes only come to life after some number of
633          * changes at any time during their dead time */
634         if (list_empty(&slot->ds_live_item) &&
635             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
636                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
637                      slot->ds_node_num, (long long)slot->ds_last_generation);
638
639                 /* first on the list generates a callback */
640                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
641                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
642
643                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
644                                               slot->ds_node_num);
645
646                         changed = 1;
647                 }
648
649                 list_add_tail(&slot->ds_live_item,
650                               &o2hb_live_slots[slot->ds_node_num]);
651
652                 slot->ds_equal_samples = 0;
653
654                 /* We want to be sure that all nodes agree on the
655                  * number of milliseconds before a node will be
656                  * considered dead. The self-fencing timeout is
657                  * computed from this value, and a discrepancy might
658                  * result in heartbeat calling a node dead when it
659                  * hasn't self-fenced yet. */
660                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
661                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
662                         /* TODO: Perhaps we can fail the region here. */
663                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
664                              "of %u ms, but our count is %u ms.\n"
665                              "Please double check your configuration values "
666                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
667                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
668                              dead_ms);
669                 }
670                 goto out;
671         }
672
673         /* if the list is dead, we're done.. */
674         if (list_empty(&slot->ds_live_item))
675                 goto out;
676
677         /* live nodes only go dead after enough consequtive missed
678          * samples..  reset the missed counter whenever we see
679          * activity */
680         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
681                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
682                      slot->ds_node_num);
683
684                 /* last off the live_slot generates a callback */
685                 list_del_init(&slot->ds_live_item);
686                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
687                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
688
689                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
690                                               slot->ds_node_num);
691
692                         changed = 1;
693                 }
694
695                 /* We don't clear this because the node is still
696                  * actually writing new blocks. */
697                 if (!gen_changed)
698                         slot->ds_changed_samples = 0;
699                 goto out;
700         }
701         if (slot->ds_changed_samples) {
702                 slot->ds_changed_samples = 0;
703                 slot->ds_equal_samples = 0;
704         }
705 out:
706         spin_unlock(&o2hb_live_lock);
707
708         o2hb_run_event_list(&event);
709
710         o2nm_node_put(node);
711         return changed;
712 }
713
714 /* This could be faster if we just implmented a find_last_bit, but I
715  * don't think the circumstances warrant it. */
716 static int o2hb_highest_node(unsigned long *nodes,
717                              int numbits)
718 {
719         int highest, node;
720
721         highest = numbits;
722         node = -1;
723         while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
724                 if (node >= numbits)
725                         break;
726
727                 highest = node;
728         }
729
730         return highest;
731 }
732
733 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
734 {
735         int i, ret, highest_node, change = 0;
736         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
737         struct o2hb_bio_wait_ctxt write_wc;
738
739         ret = o2nm_configured_node_map(configured_nodes,
740                                        sizeof(configured_nodes));
741         if (ret) {
742                 mlog_errno(ret);
743                 return ret;
744         }
745
746         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
747         if (highest_node >= O2NM_MAX_NODES) {
748                 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
749                 return -EINVAL;
750         }
751
752         /* No sense in reading the slots of nodes that don't exist
753          * yet. Of course, if the node definitions have holes in them
754          * then we're reading an empty slot anyway... Consider this
755          * best-effort. */
756         ret = o2hb_read_slots(reg, highest_node + 1);
757         if (ret < 0) {
758                 mlog_errno(ret);
759                 return ret;
760         }
761
762         /* With an up to date view of the slots, we can check that no
763          * other node has been improperly configured to heartbeat in
764          * our slot. */
765         if (!o2hb_check_last_timestamp(reg))
766                 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
767                      "in our slot!\n", reg->hr_dev_name);
768
769         /* fill in the proper info for our next heartbeat */
770         o2hb_prepare_block(reg, reg->hr_generation);
771
772         /* And fire off the write. Note that we don't wait on this I/O
773          * until later. */
774         ret = o2hb_issue_node_write(reg, &write_wc);
775         if (ret < 0) {
776                 mlog_errno(ret);
777                 return ret;
778         }
779
780         i = -1;
781         while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
782
783                 change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
784         }
785
786         /*
787          * We have to be sure we've advertised ourselves on disk
788          * before we can go to steady state.  This ensures that
789          * people we find in our steady state have seen us.
790          */
791         o2hb_wait_on_io(reg, &write_wc);
792         if (write_wc.wc_error) {
793                 /* Do not re-arm the write timeout on I/O error - we
794                  * can't be sure that the new block ever made it to
795                  * disk */
796                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
797                      write_wc.wc_error, reg->hr_dev_name);
798                 return write_wc.wc_error;
799         }
800
801         o2hb_arm_write_timeout(reg);
802
803         /* let the person who launched us know when things are steady */
804         if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
805                 if (atomic_dec_and_test(&reg->hr_steady_iterations))
806                         wake_up(&o2hb_steady_queue);
807         }
808
809         return 0;
810 }
811
812 /* Subtract b from a, storing the result in a. a *must* have a larger
813  * value than b. */
814 static void o2hb_tv_subtract(struct timeval *a,
815                              struct timeval *b)
816 {
817         /* just return 0 when a is after b */
818         if (a->tv_sec < b->tv_sec ||
819             (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
820                 a->tv_sec = 0;
821                 a->tv_usec = 0;
822                 return;
823         }
824
825         a->tv_sec -= b->tv_sec;
826         a->tv_usec -= b->tv_usec;
827         while ( a->tv_usec < 0 ) {
828                 a->tv_sec--;
829                 a->tv_usec += 1000000;
830         }
831 }
832
833 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
834                                        struct timeval *end)
835 {
836         struct timeval res = *end;
837
838         o2hb_tv_subtract(&res, start);
839
840         return res.tv_sec * 1000 + res.tv_usec / 1000;
841 }
842
843 /*
844  * we ride the region ref that the region dir holds.  before the region
845  * dir is removed and drops it ref it will wait to tear down this
846  * thread.
847  */
848 static int o2hb_thread(void *data)
849 {
850         int i, ret;
851         struct o2hb_region *reg = data;
852         struct o2hb_bio_wait_ctxt write_wc;
853         struct timeval before_hb, after_hb;
854         unsigned int elapsed_msec;
855
856         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
857
858         set_user_nice(current, -20);
859
860         while (!kthread_should_stop() && !reg->hr_unclean_stop) {
861                 /* We track the time spent inside
862                  * o2hb_do_disk_heartbeat so that we avoid more then
863                  * hr_timeout_ms between disk writes. On busy systems
864                  * this should result in a heartbeat which is less
865                  * likely to time itself out. */
866                 do_gettimeofday(&before_hb);
867
868                 i = 0;
869                 do {
870                         ret = o2hb_do_disk_heartbeat(reg);
871                 } while (ret && ++i < 2);
872
873                 do_gettimeofday(&after_hb);
874                 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
875
876                 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
877                      before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
878                      after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
879                      elapsed_msec);
880
881                 if (elapsed_msec < reg->hr_timeout_ms) {
882                         /* the kthread api has blocked signals for us so no
883                          * need to record the return value. */
884                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
885                 }
886         }
887
888         o2hb_disarm_write_timeout(reg);
889
890         /* unclean stop is only used in very bad situation */
891         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
892                 o2hb_shutdown_slot(&reg->hr_slots[i]);
893
894         /* Explicit down notification - avoid forcing the other nodes
895          * to timeout on this region when we could just as easily
896          * write a clear generation - thus indicating to them that
897          * this node has left this region.
898          *
899          * XXX: Should we skip this on unclean_stop? */
900         o2hb_prepare_block(reg, 0);
901         ret = o2hb_issue_node_write(reg, &write_wc);
902         if (ret == 0) {
903                 o2hb_wait_on_io(reg, &write_wc);
904         } else {
905                 mlog_errno(ret);
906         }
907
908         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
909
910         return 0;
911 }
912
913 void o2hb_init(void)
914 {
915         int i;
916
917         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
918                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
919
920         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
921                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
922
923         INIT_LIST_HEAD(&o2hb_node_events);
924
925         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
926 }
927
928 /* if we're already in a callback then we're already serialized by the sem */
929 static void o2hb_fill_node_map_from_callback(unsigned long *map,
930                                              unsigned bytes)
931 {
932         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
933
934         memcpy(map, &o2hb_live_node_bitmap, bytes);
935 }
936
937 /*
938  * get a map of all nodes that are heartbeating in any regions
939  */
940 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
941 {
942         /* callers want to serialize this map and callbacks so that they
943          * can trust that they don't miss nodes coming to the party */
944         down_read(&o2hb_callback_sem);
945         spin_lock(&o2hb_live_lock);
946         o2hb_fill_node_map_from_callback(map, bytes);
947         spin_unlock(&o2hb_live_lock);
948         up_read(&o2hb_callback_sem);
949 }
950 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
951
952 /*
953  * heartbeat configfs bits.  The heartbeat set is a default set under
954  * the cluster set in nodemanager.c.
955  */
956
957 static struct o2hb_region *to_o2hb_region(struct config_item *item)
958 {
959         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
960 }
961
962 /* drop_item only drops its ref after killing the thread, nothing should
963  * be using the region anymore.  this has to clean up any state that
964  * attributes might have built up. */
965 static void o2hb_region_release(struct config_item *item)
966 {
967         int i;
968         struct page *page;
969         struct o2hb_region *reg = to_o2hb_region(item);
970
971         if (reg->hr_tmp_block)
972                 kfree(reg->hr_tmp_block);
973
974         if (reg->hr_slot_data) {
975                 for (i = 0; i < reg->hr_num_pages; i++) {
976                         page = reg->hr_slot_data[i];
977                         if (page)
978                                 __free_page(page);
979                 }
980                 kfree(reg->hr_slot_data);
981         }
982
983         if (reg->hr_bdev)
984                 blkdev_put(reg->hr_bdev);
985
986         if (reg->hr_slots)
987                 kfree(reg->hr_slots);
988
989         spin_lock(&o2hb_live_lock);
990         list_del(&reg->hr_all_item);
991         spin_unlock(&o2hb_live_lock);
992
993         kfree(reg);
994 }
995
996 static int o2hb_read_block_input(struct o2hb_region *reg,
997                                  const char *page,
998                                  size_t count,
999                                  unsigned long *ret_bytes,
1000                                  unsigned int *ret_bits)
1001 {
1002         unsigned long bytes;
1003         char *p = (char *)page;
1004
1005         bytes = simple_strtoul(p, &p, 0);
1006         if (!p || (*p && (*p != '\n')))
1007                 return -EINVAL;
1008
1009         /* Heartbeat and fs min / max block sizes are the same. */
1010         if (bytes > 4096 || bytes < 512)
1011                 return -ERANGE;
1012         if (hweight16(bytes) != 1)
1013                 return -EINVAL;
1014
1015         if (ret_bytes)
1016                 *ret_bytes = bytes;
1017         if (ret_bits)
1018                 *ret_bits = ffs(bytes) - 1;
1019
1020         return 0;
1021 }
1022
1023 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1024                                             char *page)
1025 {
1026         return sprintf(page, "%u\n", reg->hr_block_bytes);
1027 }
1028
1029 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1030                                              const char *page,
1031                                              size_t count)
1032 {
1033         int status;
1034         unsigned long block_bytes;
1035         unsigned int block_bits;
1036
1037         if (reg->hr_bdev)
1038                 return -EINVAL;
1039
1040         status = o2hb_read_block_input(reg, page, count,
1041                                        &block_bytes, &block_bits);
1042         if (status)
1043                 return status;
1044
1045         reg->hr_block_bytes = (unsigned int)block_bytes;
1046         reg->hr_block_bits = block_bits;
1047
1048         return count;
1049 }
1050
1051 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1052                                             char *page)
1053 {
1054         return sprintf(page, "%llu\n", reg->hr_start_block);
1055 }
1056
1057 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1058                                              const char *page,
1059                                              size_t count)
1060 {
1061         unsigned long long tmp;
1062         char *p = (char *)page;
1063
1064         if (reg->hr_bdev)
1065                 return -EINVAL;
1066
1067         tmp = simple_strtoull(p, &p, 0);
1068         if (!p || (*p && (*p != '\n')))
1069                 return -EINVAL;
1070
1071         reg->hr_start_block = tmp;
1072
1073         return count;
1074 }
1075
1076 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1077                                        char *page)
1078 {
1079         return sprintf(page, "%d\n", reg->hr_blocks);
1080 }
1081
1082 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1083                                         const char *page,
1084                                         size_t count)
1085 {
1086         unsigned long tmp;
1087         char *p = (char *)page;
1088
1089         if (reg->hr_bdev)
1090                 return -EINVAL;
1091
1092         tmp = simple_strtoul(p, &p, 0);
1093         if (!p || (*p && (*p != '\n')))
1094                 return -EINVAL;
1095
1096         if (tmp > O2NM_MAX_NODES || tmp == 0)
1097                 return -ERANGE;
1098
1099         reg->hr_blocks = (unsigned int)tmp;
1100
1101         return count;
1102 }
1103
1104 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1105                                     char *page)
1106 {
1107         unsigned int ret = 0;
1108
1109         if (reg->hr_bdev)
1110                 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1111
1112         return ret;
1113 }
1114
1115 static void o2hb_init_region_params(struct o2hb_region *reg)
1116 {
1117         reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1118         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1119
1120         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1121              reg->hr_start_block, reg->hr_blocks);
1122         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1123              reg->hr_block_bytes, reg->hr_block_bits);
1124         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1125         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1126 }
1127
1128 static int o2hb_map_slot_data(struct o2hb_region *reg)
1129 {
1130         int i, j;
1131         unsigned int last_slot;
1132         unsigned int spp = reg->hr_slots_per_page;
1133         struct page *page;
1134         char *raw;
1135         struct o2hb_disk_slot *slot;
1136
1137         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1138         if (reg->hr_tmp_block == NULL) {
1139                 mlog_errno(-ENOMEM);
1140                 return -ENOMEM;
1141         }
1142
1143         reg->hr_slots = kcalloc(reg->hr_blocks,
1144                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1145         if (reg->hr_slots == NULL) {
1146                 mlog_errno(-ENOMEM);
1147                 return -ENOMEM;
1148         }
1149
1150         for(i = 0; i < reg->hr_blocks; i++) {
1151                 slot = &reg->hr_slots[i];
1152                 slot->ds_node_num = i;
1153                 INIT_LIST_HEAD(&slot->ds_live_item);
1154                 slot->ds_raw_block = NULL;
1155         }
1156
1157         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1158         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1159                            "at %u blocks per page\n",
1160              reg->hr_num_pages, reg->hr_blocks, spp);
1161
1162         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1163                                     GFP_KERNEL);
1164         if (!reg->hr_slot_data) {
1165                 mlog_errno(-ENOMEM);
1166                 return -ENOMEM;
1167         }
1168
1169         for(i = 0; i < reg->hr_num_pages; i++) {
1170                 page = alloc_page(GFP_KERNEL);
1171                 if (!page) {
1172                         mlog_errno(-ENOMEM);
1173                         return -ENOMEM;
1174                 }
1175
1176                 reg->hr_slot_data[i] = page;
1177
1178                 last_slot = i * spp;
1179                 raw = page_address(page);
1180                 for (j = 0;
1181                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1182                      j++) {
1183                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1184
1185                         slot = &reg->hr_slots[j + last_slot];
1186                         slot->ds_raw_block =
1187                                 (struct o2hb_disk_heartbeat_block *) raw;
1188
1189                         raw += reg->hr_block_bytes;
1190                 }
1191         }
1192
1193         return 0;
1194 }
1195
1196 /* Read in all the slots available and populate the tracking
1197  * structures so that we can start with a baseline idea of what's
1198  * there. */
1199 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1200 {
1201         int ret, i;
1202         struct o2hb_disk_slot *slot;
1203         struct o2hb_disk_heartbeat_block *hb_block;
1204
1205         mlog_entry_void();
1206
1207         ret = o2hb_read_slots(reg, reg->hr_blocks);
1208         if (ret) {
1209                 mlog_errno(ret);
1210                 goto out;
1211         }
1212
1213         /* We only want to get an idea of the values initially in each
1214          * slot, so we do no verification - o2hb_check_slot will
1215          * actually determine if each configured slot is valid and
1216          * whether any values have changed. */
1217         for(i = 0; i < reg->hr_blocks; i++) {
1218                 slot = &reg->hr_slots[i];
1219                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1220
1221                 /* Only fill the values that o2hb_check_slot uses to
1222                  * determine changing slots */
1223                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1224                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1225         }
1226
1227 out:
1228         mlog_exit(ret);
1229         return ret;
1230 }
1231
1232 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1233 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1234                                      const char *page,
1235                                      size_t count)
1236 {
1237         long fd;
1238         int sectsize;
1239         char *p = (char *)page;
1240         struct file *filp = NULL;
1241         struct inode *inode = NULL;
1242         ssize_t ret = -EINVAL;
1243
1244         if (reg->hr_bdev)
1245                 goto out;
1246
1247         /* We can't heartbeat without having had our node number
1248          * configured yet. */
1249         if (o2nm_this_node() == O2NM_MAX_NODES)
1250                 goto out;
1251
1252         fd = simple_strtol(p, &p, 0);
1253         if (!p || (*p && (*p != '\n')))
1254                 goto out;
1255
1256         if (fd < 0 || fd >= INT_MAX)
1257                 goto out;
1258
1259         filp = fget(fd);
1260         if (filp == NULL)
1261                 goto out;
1262
1263         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1264             reg->hr_block_bytes == 0)
1265                 goto out;
1266
1267         inode = igrab(filp->f_mapping->host);
1268         if (inode == NULL)
1269                 goto out;
1270
1271         if (!S_ISBLK(inode->i_mode))
1272                 goto out;
1273
1274         reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1275         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1276         if (ret) {
1277                 reg->hr_bdev = NULL;
1278                 goto out;
1279         }
1280         inode = NULL;
1281
1282         bdevname(reg->hr_bdev, reg->hr_dev_name);
1283
1284         sectsize = bdev_hardsect_size(reg->hr_bdev);
1285         if (sectsize != reg->hr_block_bytes) {
1286                 mlog(ML_ERROR,
1287                      "blocksize %u incorrect for device, expected %d",
1288                      reg->hr_block_bytes, sectsize);
1289                 ret = -EINVAL;
1290                 goto out;
1291         }
1292
1293         o2hb_init_region_params(reg);
1294
1295         /* Generation of zero is invalid */
1296         do {
1297                 get_random_bytes(&reg->hr_generation,
1298                                  sizeof(reg->hr_generation));
1299         } while (reg->hr_generation == 0);
1300
1301         ret = o2hb_map_slot_data(reg);
1302         if (ret) {
1303                 mlog_errno(ret);
1304                 goto out;
1305         }
1306
1307         ret = o2hb_populate_slot_data(reg);
1308         if (ret) {
1309                 mlog_errno(ret);
1310                 goto out;
1311         }
1312
1313         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1314
1315         /*
1316          * A node is considered live after it has beat LIVE_THRESHOLD
1317          * times.  We're not steady until we've given them a chance
1318          * _after_ our first read.
1319          */
1320         atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1321
1322         reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1323                                    reg->hr_item.ci_name);
1324         if (IS_ERR(reg->hr_task)) {
1325                 ret = PTR_ERR(reg->hr_task);
1326                 mlog_errno(ret);
1327                 reg->hr_task = NULL;
1328                 goto out;
1329         }
1330
1331         ret = wait_event_interruptible(o2hb_steady_queue,
1332                                 atomic_read(&reg->hr_steady_iterations) == 0);
1333         if (ret) {
1334                 kthread_stop(reg->hr_task);
1335                 reg->hr_task = NULL;
1336                 goto out;
1337         }
1338
1339         ret = count;
1340 out:
1341         if (filp)
1342                 fput(filp);
1343         if (inode)
1344                 iput(inode);
1345         if (ret < 0) {
1346                 if (reg->hr_bdev) {
1347                         blkdev_put(reg->hr_bdev);
1348                         reg->hr_bdev = NULL;
1349                 }
1350         }
1351         return ret;
1352 }
1353
1354 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1355                                       char *page)
1356 {
1357         if (!reg->hr_task)
1358                 return 0;
1359
1360         return sprintf(page, "%u\n", reg->hr_task->pid);
1361 }
1362
1363 struct o2hb_region_attribute {
1364         struct configfs_attribute attr;
1365         ssize_t (*show)(struct o2hb_region *, char *);
1366         ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1367 };
1368
1369 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1370         .attr   = { .ca_owner = THIS_MODULE,
1371                     .ca_name = "block_bytes",
1372                     .ca_mode = S_IRUGO | S_IWUSR },
1373         .show   = o2hb_region_block_bytes_read,
1374         .store  = o2hb_region_block_bytes_write,
1375 };
1376
1377 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1378         .attr   = { .ca_owner = THIS_MODULE,
1379                     .ca_name = "start_block",
1380                     .ca_mode = S_IRUGO | S_IWUSR },
1381         .show   = o2hb_region_start_block_read,
1382         .store  = o2hb_region_start_block_write,
1383 };
1384
1385 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1386         .attr   = { .ca_owner = THIS_MODULE,
1387                     .ca_name = "blocks",
1388                     .ca_mode = S_IRUGO | S_IWUSR },
1389         .show   = o2hb_region_blocks_read,
1390         .store  = o2hb_region_blocks_write,
1391 };
1392
1393 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1394         .attr   = { .ca_owner = THIS_MODULE,
1395                     .ca_name = "dev",
1396                     .ca_mode = S_IRUGO | S_IWUSR },
1397         .show   = o2hb_region_dev_read,
1398         .store  = o2hb_region_dev_write,
1399 };
1400
1401 static struct o2hb_region_attribute o2hb_region_attr_pid = {
1402        .attr   = { .ca_owner = THIS_MODULE,
1403                    .ca_name = "pid",
1404                    .ca_mode = S_IRUGO | S_IRUSR },
1405        .show   = o2hb_region_pid_read,
1406 };
1407
1408 static struct configfs_attribute *o2hb_region_attrs[] = {
1409         &o2hb_region_attr_block_bytes.attr,
1410         &o2hb_region_attr_start_block.attr,
1411         &o2hb_region_attr_blocks.attr,
1412         &o2hb_region_attr_dev.attr,
1413         &o2hb_region_attr_pid.attr,
1414         NULL,
1415 };
1416
1417 static ssize_t o2hb_region_show(struct config_item *item,
1418                                 struct configfs_attribute *attr,
1419                                 char *page)
1420 {
1421         struct o2hb_region *reg = to_o2hb_region(item);
1422         struct o2hb_region_attribute *o2hb_region_attr =
1423                 container_of(attr, struct o2hb_region_attribute, attr);
1424         ssize_t ret = 0;
1425
1426         if (o2hb_region_attr->show)
1427                 ret = o2hb_region_attr->show(reg, page);
1428         return ret;
1429 }
1430
1431 static ssize_t o2hb_region_store(struct config_item *item,
1432                                  struct configfs_attribute *attr,
1433                                  const char *page, size_t count)
1434 {
1435         struct o2hb_region *reg = to_o2hb_region(item);
1436         struct o2hb_region_attribute *o2hb_region_attr =
1437                 container_of(attr, struct o2hb_region_attribute, attr);
1438         ssize_t ret = -EINVAL;
1439
1440         if (o2hb_region_attr->store)
1441                 ret = o2hb_region_attr->store(reg, page, count);
1442         return ret;
1443 }
1444
1445 static struct configfs_item_operations o2hb_region_item_ops = {
1446         .release                = o2hb_region_release,
1447         .show_attribute         = o2hb_region_show,
1448         .store_attribute        = o2hb_region_store,
1449 };
1450
1451 static struct config_item_type o2hb_region_type = {
1452         .ct_item_ops    = &o2hb_region_item_ops,
1453         .ct_attrs       = o2hb_region_attrs,
1454         .ct_owner       = THIS_MODULE,
1455 };
1456
1457 /* heartbeat set */
1458
1459 struct o2hb_heartbeat_group {
1460         struct config_group hs_group;
1461         /* some stuff? */
1462 };
1463
1464 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1465 {
1466         return group ?
1467                 container_of(group, struct o2hb_heartbeat_group, hs_group)
1468                 : NULL;
1469 }
1470
1471 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1472                                                           const char *name)
1473 {
1474         struct o2hb_region *reg = NULL;
1475         struct config_item *ret = NULL;
1476
1477         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1478         if (reg == NULL)
1479                 goto out; /* ENOMEM */
1480
1481         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1482
1483         ret = &reg->hr_item;
1484
1485         spin_lock(&o2hb_live_lock);
1486         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1487         spin_unlock(&o2hb_live_lock);
1488 out:
1489         if (ret == NULL)
1490                 kfree(reg);
1491
1492         return ret;
1493 }
1494
1495 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1496                                            struct config_item *item)
1497 {
1498         struct o2hb_region *reg = to_o2hb_region(item);
1499
1500         /* stop the thread when the user removes the region dir */
1501         if (reg->hr_task) {
1502                 kthread_stop(reg->hr_task);
1503                 reg->hr_task = NULL;
1504         }
1505
1506         config_item_put(item);
1507 }
1508
1509 struct o2hb_heartbeat_group_attribute {
1510         struct configfs_attribute attr;
1511         ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1512         ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1513 };
1514
1515 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1516                                          struct configfs_attribute *attr,
1517                                          char *page)
1518 {
1519         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1520         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1521                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1522         ssize_t ret = 0;
1523
1524         if (o2hb_heartbeat_group_attr->show)
1525                 ret = o2hb_heartbeat_group_attr->show(reg, page);
1526         return ret;
1527 }
1528
1529 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1530                                           struct configfs_attribute *attr,
1531                                           const char *page, size_t count)
1532 {
1533         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1534         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1535                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1536         ssize_t ret = -EINVAL;
1537
1538         if (o2hb_heartbeat_group_attr->store)
1539                 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1540         return ret;
1541 }
1542
1543 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1544                                                      char *page)
1545 {
1546         return sprintf(page, "%u\n", o2hb_dead_threshold);
1547 }
1548
1549 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1550                                                     const char *page,
1551                                                     size_t count)
1552 {
1553         unsigned long tmp;
1554         char *p = (char *)page;
1555
1556         tmp = simple_strtoul(p, &p, 10);
1557         if (!p || (*p && (*p != '\n')))
1558                 return -EINVAL;
1559
1560         /* this will validate ranges for us. */
1561         o2hb_dead_threshold_set((unsigned int) tmp);
1562
1563         return count;
1564 }
1565
1566 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1567         .attr   = { .ca_owner = THIS_MODULE,
1568                     .ca_name = "dead_threshold",
1569                     .ca_mode = S_IRUGO | S_IWUSR },
1570         .show   = o2hb_heartbeat_group_threshold_show,
1571         .store  = o2hb_heartbeat_group_threshold_store,
1572 };
1573
1574 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1575         &o2hb_heartbeat_group_attr_threshold.attr,
1576         NULL,
1577 };
1578
1579 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1580         .show_attribute         = o2hb_heartbeat_group_show,
1581         .store_attribute        = o2hb_heartbeat_group_store,
1582 };
1583
1584 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1585         .make_item      = o2hb_heartbeat_group_make_item,
1586         .drop_item      = o2hb_heartbeat_group_drop_item,
1587 };
1588
1589 static struct config_item_type o2hb_heartbeat_group_type = {
1590         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
1591         .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
1592         .ct_attrs       = o2hb_heartbeat_group_attrs,
1593         .ct_owner       = THIS_MODULE,
1594 };
1595
1596 /* this is just here to avoid touching group in heartbeat.h which the
1597  * entire damn world #includes */
1598 struct config_group *o2hb_alloc_hb_set(void)
1599 {
1600         struct o2hb_heartbeat_group *hs = NULL;
1601         struct config_group *ret = NULL;
1602
1603         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1604         if (hs == NULL)
1605                 goto out;
1606
1607         config_group_init_type_name(&hs->hs_group, "heartbeat",
1608                                     &o2hb_heartbeat_group_type);
1609
1610         ret = &hs->hs_group;
1611 out:
1612         if (ret == NULL)
1613                 kfree(hs);
1614         return ret;
1615 }
1616
1617 void o2hb_free_hb_set(struct config_group *group)
1618 {
1619         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1620         kfree(hs);
1621 }
1622
1623 /* hb callback registration and issueing */
1624
1625 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1626 {
1627         if (type == O2HB_NUM_CB)
1628                 return ERR_PTR(-EINVAL);
1629
1630         return &o2hb_callbacks[type];
1631 }
1632
1633 void o2hb_setup_callback(struct o2hb_callback_func *hc,
1634                          enum o2hb_callback_type type,
1635                          o2hb_cb_func *func,
1636                          void *data,
1637                          int priority)
1638 {
1639         INIT_LIST_HEAD(&hc->hc_item);
1640         hc->hc_func = func;
1641         hc->hc_data = data;
1642         hc->hc_priority = priority;
1643         hc->hc_type = type;
1644         hc->hc_magic = O2HB_CB_MAGIC;
1645 }
1646 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1647
1648 int o2hb_register_callback(struct o2hb_callback_func *hc)
1649 {
1650         struct o2hb_callback_func *tmp;
1651         struct list_head *iter;
1652         struct o2hb_callback *hbcall;
1653         int ret;
1654
1655         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1656         BUG_ON(!list_empty(&hc->hc_item));
1657
1658         hbcall = hbcall_from_type(hc->hc_type);
1659         if (IS_ERR(hbcall)) {
1660                 ret = PTR_ERR(hbcall);
1661                 goto out;
1662         }
1663
1664         down_write(&o2hb_callback_sem);
1665
1666         list_for_each(iter, &hbcall->list) {
1667                 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1668                 if (hc->hc_priority < tmp->hc_priority) {
1669                         list_add_tail(&hc->hc_item, iter);
1670                         break;
1671                 }
1672         }
1673         if (list_empty(&hc->hc_item))
1674                 list_add_tail(&hc->hc_item, &hbcall->list);
1675
1676         up_write(&o2hb_callback_sem);
1677         ret = 0;
1678 out:
1679         mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1680              ret, __builtin_return_address(0), hc);
1681         return ret;
1682 }
1683 EXPORT_SYMBOL_GPL(o2hb_register_callback);
1684
1685 int o2hb_unregister_callback(struct o2hb_callback_func *hc)
1686 {
1687         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1688
1689         mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1690              __builtin_return_address(0), hc);
1691
1692         if (list_empty(&hc->hc_item))
1693                 return 0;
1694
1695         down_write(&o2hb_callback_sem);
1696
1697         list_del_init(&hc->hc_item);
1698
1699         up_write(&o2hb_callback_sem);
1700
1701         return 0;
1702 }
1703 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1704
1705 int o2hb_check_node_heartbeating(u8 node_num)
1706 {
1707         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1708
1709         o2hb_fill_node_map(testing_map, sizeof(testing_map));
1710         if (!test_bit(node_num, testing_map)) {
1711                 mlog(ML_HEARTBEAT,
1712                      "node (%u) does not have heartbeating enabled.\n",
1713                      node_num);
1714                 return 0;
1715         }
1716
1717         return 1;
1718 }
1719 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1720
1721 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1722 {
1723         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1724
1725         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1726         if (!test_bit(node_num, testing_map)) {
1727                 mlog(ML_HEARTBEAT,
1728                      "node (%u) does not have heartbeating enabled.\n",
1729                      node_num);
1730                 return 0;
1731         }
1732
1733         return 1;
1734 }
1735 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1736
1737 /* Makes sure our local node is configured with a node number, and is
1738  * heartbeating. */
1739 int o2hb_check_local_node_heartbeating(void)
1740 {
1741         u8 node_num;
1742
1743         /* if this node was set then we have networking */
1744         node_num = o2nm_this_node();
1745         if (node_num == O2NM_MAX_NODES) {
1746                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1747                 return 0;
1748         }
1749
1750         return o2hb_check_node_heartbeating(node_num);
1751 }
1752 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1753
1754 /*
1755  * this is just a hack until we get the plumbing which flips file systems
1756  * read only and drops the hb ref instead of killing the node dead.
1757  */
1758 void o2hb_stop_all_regions(void)
1759 {
1760         struct o2hb_region *reg;
1761
1762         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1763
1764         spin_lock(&o2hb_live_lock);
1765
1766         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1767                 reg->hr_unclean_stop = 1;
1768
1769         spin_unlock(&o2hb_live_lock);
1770 }
1771 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);