1/* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public 17 * License along with this program; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 19 * Boston, MA 021110-1307, USA. 20 */ 21 22#include <linux/kernel.h> 23#include <linux/sched.h> 24#include <linux/jiffies.h> 25#include <linux/module.h> 26#include <linux/fs.h> 27#include <linux/bio.h> 28#include <linux/blkdev.h> 29#include <linux/delay.h> 30#include <linux/file.h> 31#include <linux/kthread.h> 32#include <linux/configfs.h> 33#include <linux/random.h> 34#include <linux/crc32.h> 35#include <linux/time.h> 36 37#include "heartbeat.h" 38#include "tcp.h" 39#include "nodemanager.h" 40#include "quorum.h" 41 42#include "masklog.h" 43 44 45/* 46 * The first heartbeat pass had one global thread that would serialize all hb 47 * callback calls. This global serializing sem should only be removed once 48 * we've made sure that all callees can deal with being called concurrently 49 * from multiple hb region threads. 50 */ 51static DECLARE_RWSEM(o2hb_callback_sem); 52 53/* 54 * multiple hb threads are watching multiple regions. A node is live 55 * whenever any of the threads sees activity from the node in its region. 56 */ 57static DEFINE_SPINLOCK(o2hb_live_lock); 58static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 59static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 60static LIST_HEAD(o2hb_node_events); 61static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 62 63static LIST_HEAD(o2hb_all_regions); 64 65static struct o2hb_callback { 66 struct list_head list; 67} o2hb_callbacks[O2HB_NUM_CB]; 68 69static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 70 71#define O2HB_DEFAULT_BLOCK_BITS 9 72 73unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 74 75/* Only sets a new threshold if there are no active regions. 76 * 77 * No locking or otherwise interesting code is required for reading 78 * o2hb_dead_threshold as it can't change once regions are active and 79 * it's not interesting to anyone until then anyway. */ 80static void o2hb_dead_threshold_set(unsigned int threshold) 81{ 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 83 spin_lock(&o2hb_live_lock); 84 if (list_empty(&o2hb_all_regions)) 85 o2hb_dead_threshold = threshold; 86 spin_unlock(&o2hb_live_lock); 87 } 88} 89 90struct o2hb_node_event { 91 struct list_head hn_item; 92 enum o2hb_callback_type hn_event_type; 93 struct o2nm_node *hn_node; 94 int hn_node_num; 95}; 96 97struct o2hb_disk_slot { 98 struct o2hb_disk_heartbeat_block *ds_raw_block; 99 u8 ds_node_num; 100 u64 ds_last_time; 101 u64 ds_last_generation; 102 u16 ds_equal_samples; 103 u16 ds_changed_samples; 104 struct list_head ds_live_item; 105}; 106 107/* each thread owns a region.. when we're asked to tear down the region 108 * we ask the thread to stop, who cleans up the region */ 109struct o2hb_region { 110 struct config_item hr_item; 111 112 struct list_head hr_all_item; 113 unsigned hr_unclean_stop:1; 114 115 /* protected by the hr_callback_sem */ 116 struct task_struct *hr_task; 117 118 unsigned int hr_blocks; 119 unsigned long long hr_start_block; 120 121 unsigned int hr_block_bits; 122 unsigned int hr_block_bytes; 123 124 unsigned int hr_slots_per_page; 125 unsigned int hr_num_pages; 126 127 struct page **hr_slot_data; 128 struct block_device *hr_bdev; 129 struct o2hb_disk_slot *hr_slots; 130 131 /* let the person setting up hb wait for it to return until it 132 * has reached a 'steady' state. This will be fixed when we have 133 * a more complete api that doesn't lead to this sort of fragility. */ 134 atomic_t hr_steady_iterations; 135 136 char hr_dev_name[BDEVNAME_SIZE]; 137 138 unsigned int hr_timeout_ms; 139 140 /* randomized as the region goes up and down so that a node 141 * recognizes a node going up and down in one iteration */ 142 u64 hr_generation; 143 144 struct delayed_work hr_write_timeout_work; 145 unsigned long hr_last_timeout_start; 146 147 /* Used during o2hb_check_slot to hold a copy of the block 148 * being checked because we temporarily have to zero out the 149 * crc field. */ 150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 151}; 152 153struct o2hb_bio_wait_ctxt { 154 atomic_t wc_num_reqs; 155 struct completion wc_io_complete; 156 int wc_error; 157}; 158 159static void o2hb_write_timeout(struct work_struct *work) 160{ 161 struct o2hb_region *reg = 162 container_of(work, struct o2hb_region, 163 hr_write_timeout_work.work); 164 165 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 166 "milliseconds\n", reg->hr_dev_name, 167 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 168 o2quo_disk_timeout(); 169} 170 171static void o2hb_arm_write_timeout(struct o2hb_region *reg) 172{ 173 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 174 175 cancel_delayed_work(®->hr_write_timeout_work); 176 reg->hr_last_timeout_start = jiffies; 177 schedule_delayed_work(®->hr_write_timeout_work, 178 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 179} 180 181static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 182{ 183 cancel_delayed_work(®->hr_write_timeout_work); 184 flush_scheduled_work(); 185} 186 187static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) 188{ 189 atomic_set(&wc->wc_num_reqs, 1); 190 init_completion(&wc->wc_io_complete); 191 wc->wc_error = 0; 192} 193 194/* Used in error paths too */ 195static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 196 unsigned int num) 197{ 198 /* sadly atomic_sub_and_test() isn't available on all platforms. The 199 * good news is that the fast path only completes one at a time */ 200 while(num--) { 201 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 202 BUG_ON(num > 0); 203 complete(&wc->wc_io_complete); 204 } 205 } 206} 207 208static void o2hb_wait_on_io(struct o2hb_region *reg, 209 struct o2hb_bio_wait_ctxt *wc) 210{ 211 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 212 213 blk_run_address_space(mapping); 214 o2hb_bio_wait_dec(wc, 1); 215 216 wait_for_completion(&wc->wc_io_complete); 217} 218 219static int o2hb_bio_end_io(struct bio *bio, 220 unsigned int bytes_done, 221 int error) 222{ 223 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 224 225 if (error) { 226 mlog(ML_ERROR, "IO Error %d\n", error); 227 wc->wc_error = error; 228 } 229 230 if (bio->bi_size) 231 return 1; 232 233 o2hb_bio_wait_dec(wc, 1); 234 bio_put(bio); 235 return 0; 236} 237 238/* Setup a Bio to cover I/O against num_slots slots starting at 239 * start_slot. */ 240static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 241 struct o2hb_bio_wait_ctxt *wc, 242 unsigned int *current_slot, 243 unsigned int max_slots) 244{ 245 int len, current_page; 246 unsigned int vec_len, vec_start; 247 unsigned int bits = reg->hr_block_bits; 248 unsigned int spp = reg->hr_slots_per_page; 249 unsigned int cs = *current_slot; 250 struct bio *bio; 251 struct page *page; 252 253 /* Testing has shown this allocation to take long enough under 254 * GFP_KERNEL that the local node can get fenced. It would be 255 * nicest if we could pre-allocate these bios and avoid this 256 * all together. */ 257 bio = bio_alloc(GFP_ATOMIC, 16); 258 if (!bio) { 259 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 260 bio = ERR_PTR(-ENOMEM); 261 goto bail; 262 } 263 264 /* Must put everything in 512 byte sectors for the bio... */ 265 bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9); 266 bio->bi_bdev = reg->hr_bdev; 267 bio->bi_private = wc; 268 bio->bi_end_io = o2hb_bio_end_io; 269 270 vec_start = (cs << bits) % PAGE_CACHE_SIZE; 271 while(cs < max_slots) { 272 current_page = cs / spp; 273 page = reg->hr_slot_data[current_page]; 274 275 vec_len = min(PAGE_CACHE_SIZE, 276 (max_slots-cs) * (PAGE_CACHE_SIZE/spp) ); 277 278 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 279 current_page, vec_len, vec_start); 280 281 len = bio_add_page(bio, page, vec_len, vec_start); 282 if (len != vec_len) break; 283 284 cs += vec_len / (PAGE_CACHE_SIZE/spp); 285 vec_start = 0; 286 } 287 288bail: 289 *current_slot = cs; 290 return bio; 291} 292 293static int o2hb_read_slots(struct o2hb_region *reg, 294 unsigned int max_slots) 295{ 296 unsigned int current_slot=0; 297 int status; 298 struct o2hb_bio_wait_ctxt wc; 299 struct bio *bio; 300 301 o2hb_bio_wait_init(&wc); 302 303 while(current_slot < max_slots) { 304 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots); 305 if (IS_ERR(bio)) { 306 status = PTR_ERR(bio); 307 mlog_errno(status); 308 goto bail_and_wait; 309 } 310 311 atomic_inc(&wc.wc_num_reqs); 312 submit_bio(READ, bio); 313 } 314 315 status = 0; 316 317bail_and_wait: 318 o2hb_wait_on_io(reg, &wc); 319 if (wc.wc_error && !status) 320 status = wc.wc_error; 321 322 return status; 323} 324 325static int o2hb_issue_node_write(struct o2hb_region *reg, 326 struct o2hb_bio_wait_ctxt *write_wc) 327{ 328 int status; 329 unsigned int slot; 330 struct bio *bio; 331 332 o2hb_bio_wait_init(write_wc); 333 334 slot = o2nm_this_node(); 335 336 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1); 337 if (IS_ERR(bio)) { 338 status = PTR_ERR(bio); 339 mlog_errno(status); 340 goto bail; 341 } 342 343 atomic_inc(&write_wc->wc_num_reqs); 344 submit_bio(WRITE, bio); 345 346 status = 0; 347bail: 348 return status; 349} 350 351static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 352 struct o2hb_disk_heartbeat_block *hb_block) 353{ 354 __le32 old_cksum; 355 u32 ret; 356 357 /* We want to compute the block crc with a 0 value in the 358 * hb_cksum field. Save it off here and replace after the 359 * crc. */ 360 old_cksum = hb_block->hb_cksum; 361 hb_block->hb_cksum = 0; 362 363 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 364 365 hb_block->hb_cksum = old_cksum; 366 367 return ret; 368} 369 370static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 371{ 372 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 373 "cksum = 0x%x, generation 0x%llx\n", 374 (long long)le64_to_cpu(hb_block->hb_seq), 375 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 376 (long long)le64_to_cpu(hb_block->hb_generation)); 377} 378 379static int o2hb_verify_crc(struct o2hb_region *reg, 380 struct o2hb_disk_heartbeat_block *hb_block) 381{ 382 u32 read, computed; 383 384 read = le32_to_cpu(hb_block->hb_cksum); 385 computed = o2hb_compute_block_crc_le(reg, hb_block); 386 387 return read == computed; 388} 389 390/* We want to make sure that nobody is heartbeating on top of us -- 391 * this will help detect an invalid configuration. */ 392static int o2hb_check_last_timestamp(struct o2hb_region *reg) 393{ 394 int node_num, ret; 395 struct o2hb_disk_slot *slot; 396 struct o2hb_disk_heartbeat_block *hb_block; 397 398 node_num = o2nm_this_node(); 399 400 ret = 1; 401 slot = ®->hr_slots[node_num]; 402 /* Don't check on our 1st timestamp */ 403 if (slot->ds_last_time) { 404 hb_block = slot->ds_raw_block; 405 406 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 407 ret = 0; 408 } 409 410 return ret; 411} 412 413static inline void o2hb_prepare_block(struct o2hb_region *reg, 414 u64 generation) 415{ 416 int node_num; 417 u64 cputime; 418 struct o2hb_disk_slot *slot; 419 struct o2hb_disk_heartbeat_block *hb_block; 420 421 node_num = o2nm_this_node(); 422 slot = ®->hr_slots[node_num]; 423 424 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 425 memset(hb_block, 0, reg->hr_block_bytes); 426 /* TODO: time stuff */ 427 cputime = CURRENT_TIME.tv_sec; 428 if (!cputime) 429 cputime = 1; 430 431 hb_block->hb_seq = cpu_to_le64(cputime); 432 hb_block->hb_node = node_num; 433 hb_block->hb_generation = cpu_to_le64(generation); 434 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); 435 436 /* This step must always happen last! */ 437 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 438 hb_block)); 439 440 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 441 (long long)generation, 442 le32_to_cpu(hb_block->hb_cksum)); 443} 444 445static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 446 struct o2nm_node *node, 447 int idx) 448{ 449 struct list_head *iter; 450 struct o2hb_callback_func *f; 451 452 list_for_each(iter, &hbcall->list) { 453 f = list_entry(iter, struct o2hb_callback_func, hc_item); 454 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 455 (f->hc_func)(node, idx, f->hc_data); 456 } 457} 458 459/* Will run the list in order until we process the passed event */ 460static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 461{ 462 int empty; 463 struct o2hb_callback *hbcall; 464 struct o2hb_node_event *event; 465 466 spin_lock(&o2hb_live_lock); 467 empty = list_empty(&queued_event->hn_item); 468 spin_unlock(&o2hb_live_lock); 469 if (empty) 470 return; 471 472 /* Holding callback sem assures we don't alter the callback 473 * lists when doing this, and serializes ourselves with other 474 * processes wanting callbacks. */ 475 down_write(&o2hb_callback_sem); 476 477 spin_lock(&o2hb_live_lock); 478 while (!list_empty(&o2hb_node_events) 479 && !list_empty(&queued_event->hn_item)) { 480 event = list_entry(o2hb_node_events.next, 481 struct o2hb_node_event, 482 hn_item); 483 list_del_init(&event->hn_item); 484 spin_unlock(&o2hb_live_lock); 485 486 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 487 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 488 event->hn_node_num); 489 490 hbcall = hbcall_from_type(event->hn_event_type); 491 492 /* We should *never* have gotten on to the list with a 493 * bad type... This isn't something that we should try 494 * to recover from. */ 495 BUG_ON(IS_ERR(hbcall)); 496 497 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 498 499 spin_lock(&o2hb_live_lock); 500 } 501 spin_unlock(&o2hb_live_lock); 502 503 up_write(&o2hb_callback_sem); 504} 505 506static void o2hb_queue_node_event(struct o2hb_node_event *event, 507 enum o2hb_callback_type type, 508 struct o2nm_node *node, 509 int node_num) 510{ 511 assert_spin_locked(&o2hb_live_lock); 512 513 event->hn_event_type = type; 514 event->hn_node = node; 515 event->hn_node_num = node_num; 516 517 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 518 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 519 520 list_add_tail(&event->hn_item, &o2hb_node_events); 521} 522 523static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 524{ 525 struct o2hb_node_event event = 526 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 527 struct o2nm_node *node; 528 529 node = o2nm_get_node_by_num(slot->ds_node_num); 530 if (!node) 531 return; 532 533 spin_lock(&o2hb_live_lock); 534 if (!list_empty(&slot->ds_live_item)) { 535 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 536 slot->ds_node_num); 537 538 list_del_init(&slot->ds_live_item); 539 540 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 541 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 542 543 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 544 slot->ds_node_num); 545 } 546 } 547 spin_unlock(&o2hb_live_lock); 548 549 o2hb_run_event_list(&event); 550 551 o2nm_node_put(node); 552} 553 554static int o2hb_check_slot(struct o2hb_region *reg, 555 struct o2hb_disk_slot *slot) 556{ 557 int changed = 0, gen_changed = 0; 558 struct o2hb_node_event event = 559 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 560 struct o2nm_node *node; 561 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 562 u64 cputime; 563 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; 564 unsigned int slot_dead_ms; 565 566 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 567 568 /* Is this correct? Do we assume that the node doesn't exist 569 * if we're not configured for him? */ 570 node = o2nm_get_node_by_num(slot->ds_node_num); 571 if (!node) 572 return 0; 573 574 if (!o2hb_verify_crc(reg, hb_block)) { 575 /* all paths from here will drop o2hb_live_lock for 576 * us. */ 577 spin_lock(&o2hb_live_lock); 578 579 /* Don't print an error on the console in this case - 580 * a freshly formatted heartbeat area will not have a 581 * crc set on it. */ 582 if (list_empty(&slot->ds_live_item)) 583 goto out; 584 585 /* The node is live but pushed out a bad crc. We 586 * consider it a transient miss but don't populate any 587 * other values as they may be junk. */ 588 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 589 slot->ds_node_num, reg->hr_dev_name); 590 o2hb_dump_slot(hb_block); 591 592 slot->ds_equal_samples++; 593 goto fire_callbacks; 594 } 595 596 /* we don't care if these wrap.. the state transitions below 597 * clear at the right places */ 598 cputime = le64_to_cpu(hb_block->hb_seq); 599 if (slot->ds_last_time != cputime) 600 slot->ds_changed_samples++; 601 else 602 slot->ds_equal_samples++; 603 slot->ds_last_time = cputime; 604 605 /* The node changed heartbeat generations. We assume this to 606 * mean it dropped off but came back before we timed out. We 607 * want to consider it down for the time being but don't want 608 * to lose any changed_samples state we might build up to 609 * considering it live again. */ 610 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 611 gen_changed = 1; 612 slot->ds_equal_samples = 0; 613 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 614 "to 0x%llx)\n", slot->ds_node_num, 615 (long long)slot->ds_last_generation, 616 (long long)le64_to_cpu(hb_block->hb_generation)); 617 } 618 619 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 620 621 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 622 "seq %llu last %llu changed %u equal %u\n", 623 slot->ds_node_num, (long long)slot->ds_last_generation, 624 le32_to_cpu(hb_block->hb_cksum), 625 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 626 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 627 slot->ds_equal_samples); 628 629 spin_lock(&o2hb_live_lock); 630 631fire_callbacks: 632 /* dead nodes only come to life after some number of 633 * changes at any time during their dead time */ 634 if (list_empty(&slot->ds_live_item) && 635 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 636 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 637 slot->ds_node_num, (long long)slot->ds_last_generation); 638 639 /* first on the list generates a callback */ 640 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 641 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 642 643 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 644 slot->ds_node_num); 645 646 changed = 1; 647 } 648 649 list_add_tail(&slot->ds_live_item, 650 &o2hb_live_slots[slot->ds_node_num]); 651 652 slot->ds_equal_samples = 0; 653 654 /* We want to be sure that all nodes agree on the 655 * number of milliseconds before a node will be 656 * considered dead. The self-fencing timeout is 657 * computed from this value, and a discrepancy might 658 * result in heartbeat calling a node dead when it 659 * hasn't self-fenced yet. */ 660 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); 661 if (slot_dead_ms && slot_dead_ms != dead_ms) { 662 /* TODO: Perhaps we can fail the region here. */ 663 mlog(ML_ERROR, "Node %d on device %s has a dead count " 664 "of %u ms, but our count is %u ms.\n" 665 "Please double check your configuration values " 666 "for 'O2CB_HEARTBEAT_THRESHOLD'\n", 667 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, 668 dead_ms); 669 } 670 goto out; 671 } 672 673 /* if the list is dead, we're done.. */ 674 if (list_empty(&slot->ds_live_item)) 675 goto out; 676 677 /* live nodes only go dead after enough consequtive missed 678 * samples.. reset the missed counter whenever we see 679 * activity */ 680 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 681 mlog(ML_HEARTBEAT, "Node %d left my region\n", 682 slot->ds_node_num); 683 684 /* last off the live_slot generates a callback */ 685 list_del_init(&slot->ds_live_item); 686 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 687 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 688 689 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 690 slot->ds_node_num); 691 692 changed = 1; 693 } 694 695 /* We don't clear this because the node is still 696 * actually writing new blocks. */ 697 if (!gen_changed) 698 slot->ds_changed_samples = 0; 699 goto out; 700 } 701 if (slot->ds_changed_samples) { 702 slot->ds_changed_samples = 0; 703 slot->ds_equal_samples = 0; 704 } 705out: 706 spin_unlock(&o2hb_live_lock); 707 708 o2hb_run_event_list(&event); 709 710 o2nm_node_put(node); 711 return changed; 712} 713 714/* This could be faster if we just implmented a find_last_bit, but I 715 * don't think the circumstances warrant it. */ 716static int o2hb_highest_node(unsigned long *nodes, 717 int numbits) 718{ 719 int highest, node; 720 721 highest = numbits; 722 node = -1; 723 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 724 if (node >= numbits) 725 break; 726 727 highest = node; 728 } 729 730 return highest; 731} 732 733static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 734{ 735 int i, ret, highest_node, change = 0; 736 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 737 struct o2hb_bio_wait_ctxt write_wc; 738 739 ret = o2nm_configured_node_map(configured_nodes, 740 sizeof(configured_nodes)); 741 if (ret) { 742 mlog_errno(ret); 743 return ret; 744 } 745 746 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 747 if (highest_node >= O2NM_MAX_NODES) { 748 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 749 return -EINVAL; 750 } 751 752 /* No sense in reading the slots of nodes that don't exist 753 * yet. Of course, if the node definitions have holes in them 754 * then we're reading an empty slot anyway... Consider this 755 * best-effort. */ 756 ret = o2hb_read_slots(reg, highest_node + 1); 757 if (ret < 0) { 758 mlog_errno(ret); 759 return ret; 760 } 761 762 /* With an up to date view of the slots, we can check that no 763 * other node has been improperly configured to heartbeat in 764 * our slot. */ 765 if (!o2hb_check_last_timestamp(reg)) 766 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 767 "in our slot!\n", reg->hr_dev_name); 768 769 /* fill in the proper info for our next heartbeat */ 770 o2hb_prepare_block(reg, reg->hr_generation); 771 772 /* And fire off the write. Note that we don't wait on this I/O 773 * until later. */ 774 ret = o2hb_issue_node_write(reg, &write_wc); 775 if (ret < 0) { 776 mlog_errno(ret); 777 return ret; 778 } 779 780 i = -1; 781 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 782 783 change |= o2hb_check_slot(reg, ®->hr_slots[i]); 784 } 785 786 /* 787 * We have to be sure we've advertised ourselves on disk 788 * before we can go to steady state. This ensures that 789 * people we find in our steady state have seen us. 790 */ 791 o2hb_wait_on_io(reg, &write_wc); 792 if (write_wc.wc_error) { 793 /* Do not re-arm the write timeout on I/O error - we 794 * can't be sure that the new block ever made it to 795 * disk */ 796 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 797 write_wc.wc_error, reg->hr_dev_name); 798 return write_wc.wc_error; 799 } 800 801 o2hb_arm_write_timeout(reg); 802 803 /* let the person who launched us know when things are steady */ 804 if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { 805 if (atomic_dec_and_test(®->hr_steady_iterations)) 806 wake_up(&o2hb_steady_queue); 807 } 808 809 return 0; 810} 811 812/* Subtract b from a, storing the result in a. a *must* have a larger 813 * value than b. */ 814static void o2hb_tv_subtract(struct timeval *a, 815 struct timeval *b) 816{ 817 /* just return 0 when a is after b */ 818 if (a->tv_sec < b->tv_sec || 819 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 820 a->tv_sec = 0; 821 a->tv_usec = 0; 822 return; 823 } 824 825 a->tv_sec -= b->tv_sec; 826 a->tv_usec -= b->tv_usec; 827 while ( a->tv_usec < 0 ) { 828 a->tv_sec--; 829 a->tv_usec += 1000000; 830 } 831} 832 833static unsigned int o2hb_elapsed_msecs(struct timeval *start, 834 struct timeval *end) 835{ 836 struct timeval res = *end; 837 838 o2hb_tv_subtract(&res, start); 839 840 return res.tv_sec * 1000 + res.tv_usec / 1000; 841} 842 843/* 844 * we ride the region ref that the region dir holds. before the region 845 * dir is removed and drops it ref it will wait to tear down this 846 * thread. 847 */ 848static int o2hb_thread(void *data) 849{ 850 int i, ret; 851 struct o2hb_region *reg = data; 852 struct o2hb_bio_wait_ctxt write_wc; 853 struct timeval before_hb, after_hb; 854 unsigned int elapsed_msec; 855 856 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 857 858 set_user_nice(current, -20); 859 860 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 861 /* We track the time spent inside 862 * o2hb_do_disk_heartbeat so that we avoid more then 863 * hr_timeout_ms between disk writes. On busy systems 864 * this should result in a heartbeat which is less 865 * likely to time itself out. */ 866 do_gettimeofday(&before_hb); 867 868 i = 0; 869 do { 870 ret = o2hb_do_disk_heartbeat(reg); 871 } while (ret && ++i < 2); 872 873 do_gettimeofday(&after_hb); 874 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 875 876 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 877 before_hb.tv_sec, (unsigned long) before_hb.tv_usec, 878 after_hb.tv_sec, (unsigned long) after_hb.tv_usec, 879 elapsed_msec); 880 881 if (elapsed_msec < reg->hr_timeout_ms) { 882 /* the kthread api has blocked signals for us so no 883 * need to record the return value. */ 884 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 885 } 886 } 887 888 o2hb_disarm_write_timeout(reg); 889 890 /* unclean stop is only used in very bad situation */ 891 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 892 o2hb_shutdown_slot(®->hr_slots[i]); 893 894 o2hb_prepare_block(reg, 0); 895 ret = o2hb_issue_node_write(reg, &write_wc); 896 if (ret == 0) { 897 o2hb_wait_on_io(reg, &write_wc); 898 } else { 899 mlog_errno(ret); 900 } 901 902 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 903 904 return 0; 905} 906 907void o2hb_init(void) 908{ 909 int i; 910 911 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 912 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 913 914 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 915 INIT_LIST_HEAD(&o2hb_live_slots[i]); 916 917 INIT_LIST_HEAD(&o2hb_node_events); 918 919 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 920} 921 922/* if we're already in a callback then we're already serialized by the sem */ 923static void o2hb_fill_node_map_from_callback(unsigned long *map, 924 unsigned bytes) 925{ 926 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 927 928 memcpy(map, &o2hb_live_node_bitmap, bytes); 929} 930 931/* 932 * get a map of all nodes that are heartbeating in any regions 933 */ 934void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 935{ 936 /* callers want to serialize this map and callbacks so that they 937 * can trust that they don't miss nodes coming to the party */ 938 down_read(&o2hb_callback_sem); 939 spin_lock(&o2hb_live_lock); 940 o2hb_fill_node_map_from_callback(map, bytes); 941 spin_unlock(&o2hb_live_lock); 942 up_read(&o2hb_callback_sem); 943} 944EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 945 946/* 947 * heartbeat configfs bits. The heartbeat set is a default set under 948 * the cluster set in nodemanager.c. 949 */ 950 951static struct o2hb_region *to_o2hb_region(struct config_item *item) 952{ 953 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 954} 955 956/* drop_item only drops its ref after killing the thread, nothing should 957 * be using the region anymore. this has to clean up any state that 958 * attributes might have built up. */ 959static void o2hb_region_release(struct config_item *item) 960{ 961 int i; 962 struct page *page; 963 struct o2hb_region *reg = to_o2hb_region(item); 964 965 if (reg->hr_tmp_block) 966 kfree(reg->hr_tmp_block); 967 968 if (reg->hr_slot_data) { 969 for (i = 0; i < reg->hr_num_pages; i++) { 970 page = reg->hr_slot_data[i]; 971 if (page) 972 __free_page(page); 973 } 974 kfree(reg->hr_slot_data); 975 } 976 977 if (reg->hr_bdev) 978 blkdev_put(reg->hr_bdev); 979 980 if (reg->hr_slots) 981 kfree(reg->hr_slots); 982 983 spin_lock(&o2hb_live_lock); 984 list_del(®->hr_all_item); 985 spin_unlock(&o2hb_live_lock); 986 987 kfree(reg); 988} 989 990static int o2hb_read_block_input(struct o2hb_region *reg, 991 const char *page, 992 size_t count, 993 unsigned long *ret_bytes, 994 unsigned int *ret_bits) 995{ 996 unsigned long bytes; 997 char *p = (char *)page; 998 999 bytes = simple_strtoul(p, &p, 0); 1000 if (!p || (*p && (*p != '\n'))) 1001 return -EINVAL; 1002 1003 /* Heartbeat and fs min / max block sizes are the same. */ 1004 if (bytes > 4096 || bytes < 512) 1005 return -ERANGE; 1006 if (hweight16(bytes) != 1) 1007 return -EINVAL; 1008 1009 if (ret_bytes) 1010 *ret_bytes = bytes; 1011 if (ret_bits) 1012 *ret_bits = ffs(bytes) - 1; 1013 1014 return 0; 1015} 1016 1017static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1018 char *page) 1019{ 1020 return sprintf(page, "%u\n", reg->hr_block_bytes); 1021} 1022 1023static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1024 const char *page, 1025 size_t count) 1026{ 1027 int status; 1028 unsigned long block_bytes; 1029 unsigned int block_bits; 1030 1031 if (reg->hr_bdev) 1032 return -EINVAL; 1033 1034 status = o2hb_read_block_input(reg, page, count, 1035 &block_bytes, &block_bits); 1036 if (status) 1037 return status; 1038 1039 reg->hr_block_bytes = (unsigned int)block_bytes; 1040 reg->hr_block_bits = block_bits; 1041 1042 return count; 1043} 1044 1045static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1046 char *page) 1047{ 1048 return sprintf(page, "%llu\n", reg->hr_start_block); 1049} 1050 1051static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1052 const char *page, 1053 size_t count) 1054{ 1055 unsigned long long tmp; 1056 char *p = (char *)page; 1057 1058 if (reg->hr_bdev) 1059 return -EINVAL; 1060 1061 tmp = simple_strtoull(p, &p, 0); 1062 if (!p || (*p && (*p != '\n'))) 1063 return -EINVAL; 1064 1065 reg->hr_start_block = tmp; 1066 1067 return count; 1068} 1069 1070static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1071 char *page) 1072{ 1073 return sprintf(page, "%d\n", reg->hr_blocks); 1074} 1075 1076static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1077 const char *page, 1078 size_t count) 1079{ 1080 unsigned long tmp; 1081 char *p = (char *)page; 1082 1083 if (reg->hr_bdev) 1084 return -EINVAL; 1085 1086 tmp = simple_strtoul(p, &p, 0); 1087 if (!p || (*p && (*p != '\n'))) 1088 return -EINVAL; 1089 1090 if (tmp > O2NM_MAX_NODES || tmp == 0) 1091 return -ERANGE; 1092 1093 reg->hr_blocks = (unsigned int)tmp; 1094 1095 return count; 1096} 1097 1098static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1099 char *page) 1100{ 1101 unsigned int ret = 0; 1102 1103 if (reg->hr_bdev) 1104 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1105 1106 return ret; 1107} 1108 1109static void o2hb_init_region_params(struct o2hb_region *reg) 1110{ 1111 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1112 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1113 1114 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1115 reg->hr_start_block, reg->hr_blocks); 1116 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1117 reg->hr_block_bytes, reg->hr_block_bits); 1118 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1119 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1120} 1121 1122static int o2hb_map_slot_data(struct o2hb_region *reg) 1123{ 1124 int i, j; 1125 unsigned int last_slot; 1126 unsigned int spp = reg->hr_slots_per_page; 1127 struct page *page; 1128 char *raw; 1129 struct o2hb_disk_slot *slot; 1130 1131 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1132 if (reg->hr_tmp_block == NULL) { 1133 mlog_errno(-ENOMEM); 1134 return -ENOMEM; 1135 } 1136 1137 reg->hr_slots = kcalloc(reg->hr_blocks, 1138 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1139 if (reg->hr_slots == NULL) { 1140 mlog_errno(-ENOMEM); 1141 return -ENOMEM; 1142 } 1143 1144 for(i = 0; i < reg->hr_blocks; i++) { 1145 slot = ®->hr_slots[i]; 1146 slot->ds_node_num = i; 1147 INIT_LIST_HEAD(&slot->ds_live_item); 1148 slot->ds_raw_block = NULL; 1149 } 1150 1151 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1152 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1153 "at %u blocks per page\n", 1154 reg->hr_num_pages, reg->hr_blocks, spp); 1155 1156 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1157 GFP_KERNEL); 1158 if (!reg->hr_slot_data) { 1159 mlog_errno(-ENOMEM); 1160 return -ENOMEM; 1161 } 1162 1163 for(i = 0; i < reg->hr_num_pages; i++) { 1164 page = alloc_page(GFP_KERNEL); 1165 if (!page) { 1166 mlog_errno(-ENOMEM); 1167 return -ENOMEM; 1168 } 1169 1170 reg->hr_slot_data[i] = page; 1171 1172 last_slot = i * spp; 1173 raw = page_address(page); 1174 for (j = 0; 1175 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1176 j++) { 1177 BUG_ON((j + last_slot) >= reg->hr_blocks); 1178 1179 slot = ®->hr_slots[j + last_slot]; 1180 slot->ds_raw_block = 1181 (struct o2hb_disk_heartbeat_block *) raw; 1182 1183 raw += reg->hr_block_bytes; 1184 } 1185 } 1186 1187 return 0; 1188} 1189 1190/* Read in all the slots available and populate the tracking 1191 * structures so that we can start with a baseline idea of what's 1192 * there. */ 1193static int o2hb_populate_slot_data(struct o2hb_region *reg) 1194{ 1195 int ret, i; 1196 struct o2hb_disk_slot *slot; 1197 struct o2hb_disk_heartbeat_block *hb_block; 1198 1199 mlog_entry_void(); 1200 1201 ret = o2hb_read_slots(reg, reg->hr_blocks); 1202 if (ret) { 1203 mlog_errno(ret); 1204 goto out; 1205 } 1206 1207 /* We only want to get an idea of the values initially in each 1208 * slot, so we do no verification - o2hb_check_slot will 1209 * actually determine if each configured slot is valid and 1210 * whether any values have changed. */ 1211 for(i = 0; i < reg->hr_blocks; i++) { 1212 slot = ®->hr_slots[i]; 1213 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1214 1215 /* Only fill the values that o2hb_check_slot uses to 1216 * determine changing slots */ 1217 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1218 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1219 } 1220 1221out: 1222 mlog_exit(ret); 1223 return ret; 1224} 1225 1226/* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1227static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1228 const char *page, 1229 size_t count) 1230{ 1231 struct task_struct *hb_task; 1232 long fd; 1233 int sectsize; 1234 char *p = (char *)page; 1235 struct file *filp = NULL; 1236 struct inode *inode = NULL; 1237 ssize_t ret = -EINVAL; 1238 1239 if (reg->hr_bdev) 1240 goto out; 1241 1242 /* We can't heartbeat without having had our node number 1243 * configured yet. */ 1244 if (o2nm_this_node() == O2NM_MAX_NODES) 1245 goto out; 1246 1247 fd = simple_strtol(p, &p, 0); 1248 if (!p || (*p && (*p != '\n'))) 1249 goto out; 1250 1251 if (fd < 0 || fd >= INT_MAX) 1252 goto out; 1253 1254 filp = fget(fd); 1255 if (filp == NULL) 1256 goto out; 1257 1258 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1259 reg->hr_block_bytes == 0) 1260 goto out; 1261 1262 inode = igrab(filp->f_mapping->host); 1263 if (inode == NULL) 1264 goto out; 1265 1266 if (!S_ISBLK(inode->i_mode)) 1267 goto out; 1268 1269 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1270 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1271 if (ret) { 1272 reg->hr_bdev = NULL; 1273 goto out; 1274 } 1275 inode = NULL; 1276 1277 bdevname(reg->hr_bdev, reg->hr_dev_name); 1278 1279 sectsize = bdev_hardsect_size(reg->hr_bdev); 1280 if (sectsize != reg->hr_block_bytes) { 1281 mlog(ML_ERROR, 1282 "blocksize %u incorrect for device, expected %d", 1283 reg->hr_block_bytes, sectsize); 1284 ret = -EINVAL; 1285 goto out; 1286 } 1287 1288 o2hb_init_region_params(reg); 1289 1290 /* Generation of zero is invalid */ 1291 do { 1292 get_random_bytes(®->hr_generation, 1293 sizeof(reg->hr_generation)); 1294 } while (reg->hr_generation == 0); 1295 1296 ret = o2hb_map_slot_data(reg); 1297 if (ret) { 1298 mlog_errno(ret); 1299 goto out; 1300 } 1301 1302 ret = o2hb_populate_slot_data(reg); 1303 if (ret) { 1304 mlog_errno(ret); 1305 goto out; 1306 } 1307 1308 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); 1309 1310 /* 1311 * A node is considered live after it has beat LIVE_THRESHOLD 1312 * times. We're not steady until we've given them a chance 1313 * _after_ our first read. 1314 */ 1315 atomic_set(®->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1316 1317 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1318 reg->hr_item.ci_name); 1319 if (IS_ERR(hb_task)) { 1320 ret = PTR_ERR(hb_task); 1321 mlog_errno(ret); 1322 goto out; 1323 } 1324 1325 spin_lock(&o2hb_live_lock); 1326 reg->hr_task = hb_task; 1327 spin_unlock(&o2hb_live_lock); 1328 1329 ret = wait_event_interruptible(o2hb_steady_queue, 1330 atomic_read(®->hr_steady_iterations) == 0); 1331 if (ret) { 1332 spin_lock(&o2hb_live_lock); 1333 hb_task = reg->hr_task; 1334 reg->hr_task = NULL; 1335 spin_unlock(&o2hb_live_lock); 1336 1337 if (hb_task) 1338 kthread_stop(hb_task); 1339 goto out; 1340 } 1341 1342 ret = count; 1343out: 1344 if (filp) 1345 fput(filp); 1346 if (inode) 1347 iput(inode); 1348 if (ret < 0) { 1349 if (reg->hr_bdev) { 1350 blkdev_put(reg->hr_bdev); 1351 reg->hr_bdev = NULL; 1352 } 1353 } 1354 return ret; 1355} 1356 1357static ssize_t o2hb_region_pid_read(struct o2hb_region *reg, 1358 char *page) 1359{ 1360 pid_t pid = 0; 1361 1362 spin_lock(&o2hb_live_lock); 1363 if (reg->hr_task) 1364 pid = reg->hr_task->pid; 1365 spin_unlock(&o2hb_live_lock); 1366 1367 if (!pid) 1368 return 0; 1369 1370 return sprintf(page, "%u\n", pid); 1371} 1372 1373struct o2hb_region_attribute { 1374 struct configfs_attribute attr; 1375 ssize_t (*show)(struct o2hb_region *, char *); 1376 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1377}; 1378 1379static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1380 .attr = { .ca_owner = THIS_MODULE, 1381 .ca_name = "block_bytes", 1382 .ca_mode = S_IRUGO | S_IWUSR }, 1383 .show = o2hb_region_block_bytes_read, 1384 .store = o2hb_region_block_bytes_write, 1385}; 1386 1387static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1388 .attr = { .ca_owner = THIS_MODULE, 1389 .ca_name = "start_block", 1390 .ca_mode = S_IRUGO | S_IWUSR }, 1391 .show = o2hb_region_start_block_read, 1392 .store = o2hb_region_start_block_write, 1393}; 1394 1395static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1396 .attr = { .ca_owner = THIS_MODULE, 1397 .ca_name = "blocks", 1398 .ca_mode = S_IRUGO | S_IWUSR }, 1399 .show = o2hb_region_blocks_read, 1400 .store = o2hb_region_blocks_write, 1401}; 1402 1403static struct o2hb_region_attribute o2hb_region_attr_dev = { 1404 .attr = { .ca_owner = THIS_MODULE, 1405 .ca_name = "dev", 1406 .ca_mode = S_IRUGO | S_IWUSR }, 1407 .show = o2hb_region_dev_read, 1408 .store = o2hb_region_dev_write, 1409}; 1410 1411static struct o2hb_region_attribute o2hb_region_attr_pid = { 1412 .attr = { .ca_owner = THIS_MODULE, 1413 .ca_name = "pid", 1414 .ca_mode = S_IRUGO | S_IRUSR }, 1415 .show = o2hb_region_pid_read, 1416}; 1417 1418static struct configfs_attribute *o2hb_region_attrs[] = { 1419 &o2hb_region_attr_block_bytes.attr, 1420 &o2hb_region_attr_start_block.attr, 1421 &o2hb_region_attr_blocks.attr, 1422 &o2hb_region_attr_dev.attr, 1423 &o2hb_region_attr_pid.attr, 1424 NULL, 1425}; 1426 1427static ssize_t o2hb_region_show(struct config_item *item, 1428 struct configfs_attribute *attr, 1429 char *page) 1430{ 1431 struct o2hb_region *reg = to_o2hb_region(item); 1432 struct o2hb_region_attribute *o2hb_region_attr = 1433 container_of(attr, struct o2hb_region_attribute, attr); 1434 ssize_t ret = 0; 1435 1436 if (o2hb_region_attr->show) 1437 ret = o2hb_region_attr->show(reg, page); 1438 return ret; 1439} 1440 1441static ssize_t o2hb_region_store(struct config_item *item, 1442 struct configfs_attribute *attr, 1443 const char *page, size_t count) 1444{ 1445 struct o2hb_region *reg = to_o2hb_region(item); 1446 struct o2hb_region_attribute *o2hb_region_attr = 1447 container_of(attr, struct o2hb_region_attribute, attr); 1448 ssize_t ret = -EINVAL; 1449 1450 if (o2hb_region_attr->store) 1451 ret = o2hb_region_attr->store(reg, page, count); 1452 return ret; 1453} 1454 1455static struct configfs_item_operations o2hb_region_item_ops = { 1456 .release = o2hb_region_release, 1457 .show_attribute = o2hb_region_show, 1458 .store_attribute = o2hb_region_store, 1459}; 1460 1461static struct config_item_type o2hb_region_type = { 1462 .ct_item_ops = &o2hb_region_item_ops, 1463 .ct_attrs = o2hb_region_attrs, 1464 .ct_owner = THIS_MODULE, 1465}; 1466 1467/* heartbeat set */ 1468 1469struct o2hb_heartbeat_group { 1470 struct config_group hs_group; 1471 /* some stuff? */ 1472}; 1473 1474static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1475{ 1476 return group ? 1477 container_of(group, struct o2hb_heartbeat_group, hs_group) 1478 : NULL; 1479} 1480 1481static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1482 const char *name) 1483{ 1484 struct o2hb_region *reg = NULL; 1485 struct config_item *ret = NULL; 1486 1487 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); 1488 if (reg == NULL) 1489 goto out; /* ENOMEM */ 1490 1491 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 1492 1493 ret = ®->hr_item; 1494 1495 spin_lock(&o2hb_live_lock); 1496 list_add_tail(®->hr_all_item, &o2hb_all_regions); 1497 spin_unlock(&o2hb_live_lock); 1498out: 1499 if (ret == NULL) 1500 kfree(reg); 1501 1502 return ret; 1503} 1504 1505static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1506 struct config_item *item) 1507{ 1508 struct task_struct *hb_task; 1509 struct o2hb_region *reg = to_o2hb_region(item); 1510 1511 /* stop the thread when the user removes the region dir */ 1512 spin_lock(&o2hb_live_lock); 1513 hb_task = reg->hr_task; 1514 reg->hr_task = NULL; 1515 spin_unlock(&o2hb_live_lock); 1516 1517 if (hb_task) 1518 kthread_stop(hb_task); 1519 1520 config_item_put(item); 1521} 1522 1523struct o2hb_heartbeat_group_attribute { 1524 struct configfs_attribute attr; 1525 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1526 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1527}; 1528 1529static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1530 struct configfs_attribute *attr, 1531 char *page) 1532{ 1533 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1534 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1535 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1536 ssize_t ret = 0; 1537 1538 if (o2hb_heartbeat_group_attr->show) 1539 ret = o2hb_heartbeat_group_attr->show(reg, page); 1540 return ret; 1541} 1542 1543static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1544 struct configfs_attribute *attr, 1545 const char *page, size_t count) 1546{ 1547 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1548 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1549 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1550 ssize_t ret = -EINVAL; 1551 1552 if (o2hb_heartbeat_group_attr->store) 1553 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1554 return ret; 1555} 1556 1557static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1558 char *page) 1559{ 1560 return sprintf(page, "%u\n", o2hb_dead_threshold); 1561} 1562 1563static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1564 const char *page, 1565 size_t count) 1566{ 1567 unsigned long tmp; 1568 char *p = (char *)page; 1569 1570 tmp = simple_strtoul(p, &p, 10); 1571 if (!p || (*p && (*p != '\n'))) 1572 return -EINVAL; 1573 1574 /* this will validate ranges for us. */ 1575 o2hb_dead_threshold_set((unsigned int) tmp); 1576 1577 return count; 1578} 1579 1580static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1581 .attr = { .ca_owner = THIS_MODULE, 1582 .ca_name = "dead_threshold", 1583 .ca_mode = S_IRUGO | S_IWUSR }, 1584 .show = o2hb_heartbeat_group_threshold_show, 1585 .store = o2hb_heartbeat_group_threshold_store, 1586}; 1587 1588static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1589 &o2hb_heartbeat_group_attr_threshold.attr, 1590 NULL, 1591}; 1592 1593static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1594 .show_attribute = o2hb_heartbeat_group_show, 1595 .store_attribute = o2hb_heartbeat_group_store, 1596}; 1597 1598static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1599 .make_item = o2hb_heartbeat_group_make_item, 1600 .drop_item = o2hb_heartbeat_group_drop_item, 1601}; 1602 1603static struct config_item_type o2hb_heartbeat_group_type = { 1604 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1605 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1606 .ct_attrs = o2hb_heartbeat_group_attrs, 1607 .ct_owner = THIS_MODULE, 1608}; 1609 1610/* this is just here to avoid touching group in heartbeat.h which the 1611 * entire damn world #includes */ 1612struct config_group *o2hb_alloc_hb_set(void) 1613{ 1614 struct o2hb_heartbeat_group *hs = NULL; 1615 struct config_group *ret = NULL; 1616 1617 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1618 if (hs == NULL) 1619 goto out; 1620 1621 config_group_init_type_name(&hs->hs_group, "heartbeat", 1622 &o2hb_heartbeat_group_type); 1623 1624 ret = &hs->hs_group; 1625out: 1626 if (ret == NULL) 1627 kfree(hs); 1628 return ret; 1629} 1630 1631void o2hb_free_hb_set(struct config_group *group) 1632{ 1633 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1634 kfree(hs); 1635} 1636 1637/* hb callback registration and issueing */ 1638 1639static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1640{ 1641 if (type == O2HB_NUM_CB) 1642 return ERR_PTR(-EINVAL); 1643 1644 return &o2hb_callbacks[type]; 1645} 1646 1647void o2hb_setup_callback(struct o2hb_callback_func *hc, 1648 enum o2hb_callback_type type, 1649 o2hb_cb_func *func, 1650 void *data, 1651 int priority) 1652{ 1653 INIT_LIST_HEAD(&hc->hc_item); 1654 hc->hc_func = func; 1655 hc->hc_data = data; 1656 hc->hc_priority = priority; 1657 hc->hc_type = type; 1658 hc->hc_magic = O2HB_CB_MAGIC; 1659} 1660EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1661 1662int o2hb_register_callback(struct o2hb_callback_func *hc) 1663{ 1664 struct o2hb_callback_func *tmp; 1665 struct list_head *iter; 1666 struct o2hb_callback *hbcall; 1667 int ret; 1668 1669 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1670 BUG_ON(!list_empty(&hc->hc_item)); 1671 1672 hbcall = hbcall_from_type(hc->hc_type); 1673 if (IS_ERR(hbcall)) { 1674 ret = PTR_ERR(hbcall); 1675 goto out; 1676 } 1677 1678 down_write(&o2hb_callback_sem); 1679 1680 list_for_each(iter, &hbcall->list) { 1681 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1682 if (hc->hc_priority < tmp->hc_priority) { 1683 list_add_tail(&hc->hc_item, iter); 1684 break; 1685 } 1686 } 1687 if (list_empty(&hc->hc_item)) 1688 list_add_tail(&hc->hc_item, &hbcall->list); 1689 1690 up_write(&o2hb_callback_sem); 1691 ret = 0; 1692out: 1693 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1694 ret, __builtin_return_address(0), hc); 1695 return ret; 1696} 1697EXPORT_SYMBOL_GPL(o2hb_register_callback); 1698 1699void o2hb_unregister_callback(struct o2hb_callback_func *hc) 1700{ 1701 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1702 1703 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1704 __builtin_return_address(0), hc); 1705 1706 if (list_empty(&hc->hc_item)) 1707 return; 1708 1709 down_write(&o2hb_callback_sem); 1710 1711 list_del_init(&hc->hc_item); 1712 1713 up_write(&o2hb_callback_sem); 1714} 1715EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1716 1717int o2hb_check_node_heartbeating(u8 node_num) 1718{ 1719 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1720 1721 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1722 if (!test_bit(node_num, testing_map)) { 1723 mlog(ML_HEARTBEAT, 1724 "node (%u) does not have heartbeating enabled.\n", 1725 node_num); 1726 return 0; 1727 } 1728 1729 return 1; 1730} 1731EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1732 1733int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1734{ 1735 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1736 1737 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1738 if (!test_bit(node_num, testing_map)) { 1739 mlog(ML_HEARTBEAT, 1740 "node (%u) does not have heartbeating enabled.\n", 1741 node_num); 1742 return 0; 1743 } 1744 1745 return 1; 1746} 1747EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1748 1749/* Makes sure our local node is configured with a node number, and is 1750 * heartbeating. */ 1751int o2hb_check_local_node_heartbeating(void) 1752{ 1753 u8 node_num; 1754 1755 /* if this node was set then we have networking */ 1756 node_num = o2nm_this_node(); 1757 if (node_num == O2NM_MAX_NODES) { 1758 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1759 return 0; 1760 } 1761 1762 return o2hb_check_node_heartbeating(node_num); 1763} 1764EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1765 1766/* 1767 * this is just a hack until we get the plumbing which flips file systems 1768 * read only and drops the hb ref instead of killing the node dead. 1769 */ 1770void o2hb_stop_all_regions(void) 1771{ 1772 struct o2hb_region *reg; 1773 1774 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1775 1776 spin_lock(&o2hb_live_lock); 1777 1778 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1779 reg->hr_unclean_stop = 1; 1780 1781 spin_unlock(&o2hb_live_lock); 1782} 1783EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1784