1/* 2 * Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 3 * Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved. 4 * 5 * This copyrighted material is made available to anyone wishing to use, 6 * modify, copy, or redistribute it subject to the terms and conditions 7 * of the GNU General Public License version 2. 8 */ 9 10#include "lock_dlm.h" 11 12static char junk_lvb[GDLM_LVB_SIZE]; 13 14static void queue_complete(struct gdlm_lock *lp) 15{ 16 struct gdlm_ls *ls = lp->ls; 17 18 clear_bit(LFL_ACTIVE, &lp->flags); 19 20 spin_lock(&ls->async_lock); 21 list_add_tail(&lp->clist, &ls->complete); 22 spin_unlock(&ls->async_lock); 23 wake_up(&ls->thread_wait); 24} 25 26static inline void gdlm_ast(void *astarg) 27{ 28 queue_complete(astarg); 29} 30 31static inline void gdlm_bast(void *astarg, int mode) 32{ 33 struct gdlm_lock *lp = astarg; 34 struct gdlm_ls *ls = lp->ls; 35 36 if (!mode) { 37 printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n", 38 lp->lockname.ln_type, 39 (unsigned long long)lp->lockname.ln_number); 40 return; 41 } 42 43 spin_lock(&ls->async_lock); 44 if (!lp->bast_mode) { 45 list_add_tail(&lp->blist, &ls->blocking); 46 lp->bast_mode = mode; 47 } else if (lp->bast_mode < mode) 48 lp->bast_mode = mode; 49 spin_unlock(&ls->async_lock); 50 wake_up(&ls->thread_wait); 51} 52 53void gdlm_queue_delayed(struct gdlm_lock *lp) 54{ 55 struct gdlm_ls *ls = lp->ls; 56 57 spin_lock(&ls->async_lock); 58 list_add_tail(&lp->delay_list, &ls->delayed); 59 spin_unlock(&ls->async_lock); 60} 61 62/* convert gfs lock-state to dlm lock-mode */ 63 64static s16 make_mode(s16 lmstate) 65{ 66 switch (lmstate) { 67 case LM_ST_UNLOCKED: 68 return DLM_LOCK_NL; 69 case LM_ST_EXCLUSIVE: 70 return DLM_LOCK_EX; 71 case LM_ST_DEFERRED: 72 return DLM_LOCK_CW; 73 case LM_ST_SHARED: 74 return DLM_LOCK_PR; 75 } 76 gdlm_assert(0, "unknown LM state %d", lmstate); 77 return -1; 78} 79 80/* convert dlm lock-mode to gfs lock-state */ 81 82s16 gdlm_make_lmstate(s16 dlmmode) 83{ 84 switch (dlmmode) { 85 case DLM_LOCK_IV: 86 case DLM_LOCK_NL: 87 return LM_ST_UNLOCKED; 88 case DLM_LOCK_EX: 89 return LM_ST_EXCLUSIVE; 90 case DLM_LOCK_CW: 91 return LM_ST_DEFERRED; 92 case DLM_LOCK_PR: 93 return LM_ST_SHARED; 94 } 95 gdlm_assert(0, "unknown DLM mode %d", dlmmode); 96 return -1; 97} 98 99/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and 100 DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */ 101 102static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state) 103{ 104 s16 cur = make_mode(cur_state); 105 if (lp->cur != DLM_LOCK_IV) 106 gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur); 107} 108 109static inline unsigned int make_flags(struct gdlm_lock *lp, 110 unsigned int gfs_flags, 111 s16 cur, s16 req) 112{ 113 unsigned int lkf = 0; 114 115 if (gfs_flags & LM_FLAG_TRY) 116 lkf |= DLM_LKF_NOQUEUE; 117 118 if (gfs_flags & LM_FLAG_TRY_1CB) { 119 lkf |= DLM_LKF_NOQUEUE; 120 lkf |= DLM_LKF_NOQUEUEBAST; 121 } 122 123 if (gfs_flags & LM_FLAG_PRIORITY) { 124 lkf |= DLM_LKF_NOORDER; 125 lkf |= DLM_LKF_HEADQUE; 126 } 127 128 if (gfs_flags & LM_FLAG_ANY) { 129 if (req == DLM_LOCK_PR) 130 lkf |= DLM_LKF_ALTCW; 131 else if (req == DLM_LOCK_CW) 132 lkf |= DLM_LKF_ALTPR; 133 } 134 135 if (lp->lksb.sb_lkid != 0) { 136 lkf |= DLM_LKF_CONVERT; 137 138 /* Conversion deadlock avoidance by DLM */ 139 140 if (!test_bit(LFL_FORCE_PROMOTE, &lp->flags) && 141 !(lkf & DLM_LKF_NOQUEUE) && 142 cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req) 143 lkf |= DLM_LKF_CONVDEADLK; 144 } 145 146 if (lp->lvb) 147 lkf |= DLM_LKF_VALBLK; 148 149 return lkf; 150} 151 152/* make_strname - convert GFS lock numbers to a string */ 153 154static inline void make_strname(const struct lm_lockname *lockname, 155 struct gdlm_strname *str) 156{ 157 sprintf(str->name, "%8x%16llx", lockname->ln_type, 158 (unsigned long long)lockname->ln_number); 159 str->namelen = GDLM_STRNAME_BYTES; 160} 161 162static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name, 163 struct gdlm_lock **lpp) 164{ 165 struct gdlm_lock *lp; 166 167 lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL); 168 if (!lp) 169 return -ENOMEM; 170 171 lp->lockname = *name; 172 make_strname(name, &lp->strname); 173 lp->ls = ls; 174 lp->cur = DLM_LOCK_IV; 175 lp->lvb = NULL; 176 lp->hold_null = NULL; 177 init_completion(&lp->ast_wait); 178 INIT_LIST_HEAD(&lp->clist); 179 INIT_LIST_HEAD(&lp->blist); 180 INIT_LIST_HEAD(&lp->delay_list); 181 182 spin_lock(&ls->async_lock); 183 list_add(&lp->all_list, &ls->all_locks); 184 ls->all_locks_count++; 185 spin_unlock(&ls->async_lock); 186 187 *lpp = lp; 188 return 0; 189} 190 191void gdlm_delete_lp(struct gdlm_lock *lp) 192{ 193 struct gdlm_ls *ls = lp->ls; 194 195 spin_lock(&ls->async_lock); 196 if (!list_empty(&lp->clist)) 197 list_del_init(&lp->clist); 198 if (!list_empty(&lp->blist)) 199 list_del_init(&lp->blist); 200 if (!list_empty(&lp->delay_list)) 201 list_del_init(&lp->delay_list); 202 gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type, 203 (unsigned long long)lp->lockname.ln_number); 204 list_del_init(&lp->all_list); 205 ls->all_locks_count--; 206 spin_unlock(&ls->async_lock); 207 208 kfree(lp); 209} 210 211int gdlm_get_lock(void *lockspace, struct lm_lockname *name, 212 void **lockp) 213{ 214 struct gdlm_lock *lp; 215 int error; 216 217 error = gdlm_create_lp(lockspace, name, &lp); 218 219 *lockp = lp; 220 return error; 221} 222 223void gdlm_put_lock(void *lock) 224{ 225 gdlm_delete_lp(lock); 226} 227 228unsigned int gdlm_do_lock(struct gdlm_lock *lp) 229{ 230 struct gdlm_ls *ls = lp->ls; 231 int error, bast = 1; 232 233 /* 234 * When recovery is in progress, delay lock requests for submission 235 * once recovery is done. Requests for recovery (NOEXP) and unlocks 236 * can pass. 237 */ 238 239 if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) && 240 !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) { 241 gdlm_queue_delayed(lp); 242 return LM_OUT_ASYNC; 243 } 244 245 /* 246 * Submit the actual lock request. 247 */ 248 249 if (test_bit(LFL_NOBAST, &lp->flags)) 250 bast = 0; 251 252 set_bit(LFL_ACTIVE, &lp->flags); 253 254 log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type, 255 (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid, 256 lp->cur, lp->req, lp->lkf); 257 258 error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf, 259 lp->strname.name, lp->strname.namelen, 0, gdlm_ast, 260 lp, bast ? gdlm_bast : NULL); 261 262 if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) { 263 lp->lksb.sb_status = -EAGAIN; 264 queue_complete(lp); 265 error = 0; 266 } 267 268 if (error) { 269 log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x " 270 "flags=%lx", ls->fsname, lp->lockname.ln_type, 271 (unsigned long long)lp->lockname.ln_number, error, 272 lp->cur, lp->req, lp->lkf, lp->flags); 273 return LM_OUT_ERROR; 274 } 275 return LM_OUT_ASYNC; 276} 277 278static unsigned int gdlm_do_unlock(struct gdlm_lock *lp) 279{ 280 struct gdlm_ls *ls = lp->ls; 281 unsigned int lkf = 0; 282 int error; 283 284 set_bit(LFL_DLM_UNLOCK, &lp->flags); 285 set_bit(LFL_ACTIVE, &lp->flags); 286 287 if (lp->lvb) 288 lkf = DLM_LKF_VALBLK; 289 290 log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type, 291 (unsigned long long)lp->lockname.ln_number, 292 lp->lksb.sb_lkid, lp->cur, lkf); 293 294 error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp); 295 296 if (error) { 297 log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x " 298 "flags=%lx", ls->fsname, lp->lockname.ln_type, 299 (unsigned long long)lp->lockname.ln_number, error, 300 lp->cur, lp->req, lp->lkf, lp->flags); 301 return LM_OUT_ERROR; 302 } 303 return LM_OUT_ASYNC; 304} 305 306unsigned int gdlm_lock(void *lock, unsigned int cur_state, 307 unsigned int req_state, unsigned int flags) 308{ 309 struct gdlm_lock *lp = lock; 310 311 clear_bit(LFL_DLM_CANCEL, &lp->flags); 312 if (flags & LM_FLAG_NOEXP) 313 set_bit(LFL_NOBLOCK, &lp->flags); 314 315 check_cur_state(lp, cur_state); 316 lp->req = make_mode(req_state); 317 lp->lkf = make_flags(lp, flags, lp->cur, lp->req); 318 319 return gdlm_do_lock(lp); 320} 321 322unsigned int gdlm_unlock(void *lock, unsigned int cur_state) 323{ 324 struct gdlm_lock *lp = lock; 325 326 clear_bit(LFL_DLM_CANCEL, &lp->flags); 327 if (lp->cur == DLM_LOCK_IV) 328 return 0; 329 return gdlm_do_unlock(lp); 330} 331 332void gdlm_cancel(void *lock) 333{ 334 struct gdlm_lock *lp = lock; 335 struct gdlm_ls *ls = lp->ls; 336 int error, delay_list = 0; 337 338 if (test_bit(LFL_DLM_CANCEL, &lp->flags)) 339 return; 340 341 log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type, 342 (unsigned long long)lp->lockname.ln_number, lp->flags); 343 344 spin_lock(&ls->async_lock); 345 if (!list_empty(&lp->delay_list)) { 346 list_del_init(&lp->delay_list); 347 delay_list = 1; 348 } 349 spin_unlock(&ls->async_lock); 350 351 if (delay_list) { 352 set_bit(LFL_CANCEL, &lp->flags); 353 set_bit(LFL_ACTIVE, &lp->flags); 354 queue_complete(lp); 355 return; 356 } 357 358 if (!test_bit(LFL_ACTIVE, &lp->flags) || 359 test_bit(LFL_DLM_UNLOCK, &lp->flags)) { 360 log_info("gdlm_cancel skip %x,%llx flags %lx", 361 lp->lockname.ln_type, 362 (unsigned long long)lp->lockname.ln_number, lp->flags); 363 return; 364 } 365 366 /* the lock is blocked in the dlm */ 367 368 set_bit(LFL_DLM_CANCEL, &lp->flags); 369 set_bit(LFL_ACTIVE, &lp->flags); 370 371 error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL, 372 NULL, lp); 373 374 log_info("gdlm_cancel rv %d %x,%llx flags %lx", error, 375 lp->lockname.ln_type, 376 (unsigned long long)lp->lockname.ln_number, lp->flags); 377 378 if (error == -EBUSY) 379 clear_bit(LFL_DLM_CANCEL, &lp->flags); 380} 381 382static int gdlm_add_lvb(struct gdlm_lock *lp) 383{ 384 char *lvb; 385 386 lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL); 387 if (!lvb) 388 return -ENOMEM; 389 390 lp->lksb.sb_lvbptr = lvb; 391 lp->lvb = lvb; 392 return 0; 393} 394 395static void gdlm_del_lvb(struct gdlm_lock *lp) 396{ 397 kfree(lp->lvb); 398 lp->lvb = NULL; 399 lp->lksb.sb_lvbptr = NULL; 400} 401 402/* This can do a synchronous dlm request (requiring a lock_dlm thread to get 403 the completion) because gfs won't call hold_lvb() during a callback (from 404 the context of a lock_dlm thread). */ 405 406static int hold_null_lock(struct gdlm_lock *lp) 407{ 408 struct gdlm_lock *lpn = NULL; 409 int error; 410 411 if (lp->hold_null) { 412 printk(KERN_INFO "lock_dlm: lvb already held\n"); 413 return 0; 414 } 415 416 error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn); 417 if (error) 418 goto out; 419 420 lpn->lksb.sb_lvbptr = junk_lvb; 421 lpn->lvb = junk_lvb; 422 423 lpn->req = DLM_LOCK_NL; 424 lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE; 425 set_bit(LFL_NOBAST, &lpn->flags); 426 set_bit(LFL_INLOCK, &lpn->flags); 427 428 init_completion(&lpn->ast_wait); 429 gdlm_do_lock(lpn); 430 wait_for_completion(&lpn->ast_wait); 431 error = lpn->lksb.sb_status; 432 if (error) { 433 printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n", 434 error); 435 gdlm_delete_lp(lpn); 436 lpn = NULL; 437 } 438out: 439 lp->hold_null = lpn; 440 return error; 441} 442 443/* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get 444 the completion) because gfs may call unhold_lvb() during a callback (from 445 the context of a lock_dlm thread) which could cause a deadlock since the 446 other lock_dlm thread could be engaged in recovery. */ 447 448static void unhold_null_lock(struct gdlm_lock *lp) 449{ 450 struct gdlm_lock *lpn = lp->hold_null; 451 452 gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type, 453 (unsigned long long)lp->lockname.ln_number); 454 lpn->lksb.sb_lvbptr = NULL; 455 lpn->lvb = NULL; 456 set_bit(LFL_UNLOCK_DELETE, &lpn->flags); 457 gdlm_do_unlock(lpn); 458 lp->hold_null = NULL; 459} 460 461/* Acquire a NL lock because gfs requires the value block to remain 462 intact on the resource while the lvb is "held" even if it's holding no locks 463 on the resource. */ 464 465int gdlm_hold_lvb(void *lock, char **lvbp) 466{ 467 struct gdlm_lock *lp = lock; 468 int error; 469 470 error = gdlm_add_lvb(lp); 471 if (error) 472 return error; 473 474 *lvbp = lp->lvb; 475 476 error = hold_null_lock(lp); 477 if (error) 478 gdlm_del_lvb(lp); 479 480 return error; 481} 482 483void gdlm_unhold_lvb(void *lock, char *lvb) 484{ 485 struct gdlm_lock *lp = lock; 486 487 unhold_null_lock(lp); 488 gdlm_del_lvb(lp); 489} 490 491void gdlm_submit_delayed(struct gdlm_ls *ls) 492{ 493 struct gdlm_lock *lp, *safe; 494 495 spin_lock(&ls->async_lock); 496 list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) { 497 list_del_init(&lp->delay_list); 498 list_add_tail(&lp->delay_list, &ls->submit); 499 } 500 spin_unlock(&ls->async_lock); 501 wake_up(&ls->thread_wait); 502} 503 504int gdlm_release_all_locks(struct gdlm_ls *ls) 505{ 506 struct gdlm_lock *lp, *safe; 507 int count = 0; 508 509 spin_lock(&ls->async_lock); 510 list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) { 511 list_del_init(&lp->all_list); 512 513 if (lp->lvb && lp->lvb != junk_lvb) 514 kfree(lp->lvb); 515 kfree(lp); 516 count++; 517 } 518 spin_unlock(&ls->async_lock); 519 520 return count; 521} 522