1/*
2 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3 * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
4 *
5 * This copyrighted material is made available to anyone wishing to use,
6 * modify, copy, or redistribute it subject to the terms and conditions
7 * of the GNU General Public License version 2.
8 */
9
10#include "lock_dlm.h"
11
12static char junk_lvb[GDLM_LVB_SIZE];
13
14static void queue_complete(struct gdlm_lock *lp)
15{
16	struct gdlm_ls *ls = lp->ls;
17
18	clear_bit(LFL_ACTIVE, &lp->flags);
19
20	spin_lock(&ls->async_lock);
21	list_add_tail(&lp->clist, &ls->complete);
22	spin_unlock(&ls->async_lock);
23	wake_up(&ls->thread_wait);
24}
25
26static inline void gdlm_ast(void *astarg)
27{
28	queue_complete(astarg);
29}
30
31static inline void gdlm_bast(void *astarg, int mode)
32{
33	struct gdlm_lock *lp = astarg;
34	struct gdlm_ls *ls = lp->ls;
35
36	if (!mode) {
37		printk(KERN_INFO "lock_dlm: bast mode zero %x,%llx\n",
38			lp->lockname.ln_type,
39			(unsigned long long)lp->lockname.ln_number);
40		return;
41	}
42
43	spin_lock(&ls->async_lock);
44	if (!lp->bast_mode) {
45		list_add_tail(&lp->blist, &ls->blocking);
46		lp->bast_mode = mode;
47	} else if (lp->bast_mode < mode)
48		lp->bast_mode = mode;
49	spin_unlock(&ls->async_lock);
50	wake_up(&ls->thread_wait);
51}
52
53void gdlm_queue_delayed(struct gdlm_lock *lp)
54{
55	struct gdlm_ls *ls = lp->ls;
56
57	spin_lock(&ls->async_lock);
58	list_add_tail(&lp->delay_list, &ls->delayed);
59	spin_unlock(&ls->async_lock);
60}
61
62/* convert gfs lock-state to dlm lock-mode */
63
64static s16 make_mode(s16 lmstate)
65{
66	switch (lmstate) {
67	case LM_ST_UNLOCKED:
68		return DLM_LOCK_NL;
69	case LM_ST_EXCLUSIVE:
70		return DLM_LOCK_EX;
71	case LM_ST_DEFERRED:
72		return DLM_LOCK_CW;
73	case LM_ST_SHARED:
74		return DLM_LOCK_PR;
75	}
76	gdlm_assert(0, "unknown LM state %d", lmstate);
77	return -1;
78}
79
80/* convert dlm lock-mode to gfs lock-state */
81
82s16 gdlm_make_lmstate(s16 dlmmode)
83{
84	switch (dlmmode) {
85	case DLM_LOCK_IV:
86	case DLM_LOCK_NL:
87		return LM_ST_UNLOCKED;
88	case DLM_LOCK_EX:
89		return LM_ST_EXCLUSIVE;
90	case DLM_LOCK_CW:
91		return LM_ST_DEFERRED;
92	case DLM_LOCK_PR:
93		return LM_ST_SHARED;
94	}
95	gdlm_assert(0, "unknown DLM mode %d", dlmmode);
96	return -1;
97}
98
99/* verify agreement with GFS on the current lock state, NB: DLM_LOCK_NL and
100   DLM_LOCK_IV are both considered LM_ST_UNLOCKED by GFS. */
101
102static void check_cur_state(struct gdlm_lock *lp, unsigned int cur_state)
103{
104	s16 cur = make_mode(cur_state);
105	if (lp->cur != DLM_LOCK_IV)
106		gdlm_assert(lp->cur == cur, "%d, %d", lp->cur, cur);
107}
108
109static inline unsigned int make_flags(struct gdlm_lock *lp,
110				      unsigned int gfs_flags,
111				      s16 cur, s16 req)
112{
113	unsigned int lkf = 0;
114
115	if (gfs_flags & LM_FLAG_TRY)
116		lkf |= DLM_LKF_NOQUEUE;
117
118	if (gfs_flags & LM_FLAG_TRY_1CB) {
119		lkf |= DLM_LKF_NOQUEUE;
120		lkf |= DLM_LKF_NOQUEUEBAST;
121	}
122
123	if (gfs_flags & LM_FLAG_PRIORITY) {
124		lkf |= DLM_LKF_NOORDER;
125		lkf |= DLM_LKF_HEADQUE;
126	}
127
128	if (gfs_flags & LM_FLAG_ANY) {
129		if (req == DLM_LOCK_PR)
130			lkf |= DLM_LKF_ALTCW;
131		else if (req == DLM_LOCK_CW)
132			lkf |= DLM_LKF_ALTPR;
133	}
134
135	if (lp->lksb.sb_lkid != 0) {
136		lkf |= DLM_LKF_CONVERT;
137
138		/* Conversion deadlock avoidance by DLM */
139
140		if (!test_bit(LFL_FORCE_PROMOTE, &lp->flags) &&
141		    !(lkf & DLM_LKF_NOQUEUE) &&
142		    cur > DLM_LOCK_NL && req > DLM_LOCK_NL && cur != req)
143			lkf |= DLM_LKF_CONVDEADLK;
144	}
145
146	if (lp->lvb)
147		lkf |= DLM_LKF_VALBLK;
148
149	return lkf;
150}
151
152/* make_strname - convert GFS lock numbers to a string */
153
154static inline void make_strname(const struct lm_lockname *lockname,
155				struct gdlm_strname *str)
156{
157	sprintf(str->name, "%8x%16llx", lockname->ln_type,
158		(unsigned long long)lockname->ln_number);
159	str->namelen = GDLM_STRNAME_BYTES;
160}
161
162static int gdlm_create_lp(struct gdlm_ls *ls, struct lm_lockname *name,
163			  struct gdlm_lock **lpp)
164{
165	struct gdlm_lock *lp;
166
167	lp = kzalloc(sizeof(struct gdlm_lock), GFP_KERNEL);
168	if (!lp)
169		return -ENOMEM;
170
171	lp->lockname = *name;
172	make_strname(name, &lp->strname);
173	lp->ls = ls;
174	lp->cur = DLM_LOCK_IV;
175	lp->lvb = NULL;
176	lp->hold_null = NULL;
177	init_completion(&lp->ast_wait);
178	INIT_LIST_HEAD(&lp->clist);
179	INIT_LIST_HEAD(&lp->blist);
180	INIT_LIST_HEAD(&lp->delay_list);
181
182	spin_lock(&ls->async_lock);
183	list_add(&lp->all_list, &ls->all_locks);
184	ls->all_locks_count++;
185	spin_unlock(&ls->async_lock);
186
187	*lpp = lp;
188	return 0;
189}
190
191void gdlm_delete_lp(struct gdlm_lock *lp)
192{
193	struct gdlm_ls *ls = lp->ls;
194
195	spin_lock(&ls->async_lock);
196	if (!list_empty(&lp->clist))
197		list_del_init(&lp->clist);
198	if (!list_empty(&lp->blist))
199		list_del_init(&lp->blist);
200	if (!list_empty(&lp->delay_list))
201		list_del_init(&lp->delay_list);
202	gdlm_assert(!list_empty(&lp->all_list), "%x,%llx", lp->lockname.ln_type,
203		    (unsigned long long)lp->lockname.ln_number);
204	list_del_init(&lp->all_list);
205	ls->all_locks_count--;
206	spin_unlock(&ls->async_lock);
207
208	kfree(lp);
209}
210
211int gdlm_get_lock(void *lockspace, struct lm_lockname *name,
212		  void **lockp)
213{
214	struct gdlm_lock *lp;
215	int error;
216
217	error = gdlm_create_lp(lockspace, name, &lp);
218
219	*lockp = lp;
220	return error;
221}
222
223void gdlm_put_lock(void *lock)
224{
225	gdlm_delete_lp(lock);
226}
227
228unsigned int gdlm_do_lock(struct gdlm_lock *lp)
229{
230	struct gdlm_ls *ls = lp->ls;
231	int error, bast = 1;
232
233	/*
234	 * When recovery is in progress, delay lock requests for submission
235	 * once recovery is done.  Requests for recovery (NOEXP) and unlocks
236	 * can pass.
237	 */
238
239	if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
240	    !test_bit(LFL_NOBLOCK, &lp->flags) && lp->req != DLM_LOCK_NL) {
241		gdlm_queue_delayed(lp);
242		return LM_OUT_ASYNC;
243	}
244
245	/*
246	 * Submit the actual lock request.
247	 */
248
249	if (test_bit(LFL_NOBAST, &lp->flags))
250		bast = 0;
251
252	set_bit(LFL_ACTIVE, &lp->flags);
253
254	log_debug("lk %x,%llx id %x %d,%d %x", lp->lockname.ln_type,
255		  (unsigned long long)lp->lockname.ln_number, lp->lksb.sb_lkid,
256		  lp->cur, lp->req, lp->lkf);
257
258	error = dlm_lock(ls->dlm_lockspace, lp->req, &lp->lksb, lp->lkf,
259			 lp->strname.name, lp->strname.namelen, 0, gdlm_ast,
260			 lp, bast ? gdlm_bast : NULL);
261
262	if ((error == -EAGAIN) && (lp->lkf & DLM_LKF_NOQUEUE)) {
263		lp->lksb.sb_status = -EAGAIN;
264		queue_complete(lp);
265		error = 0;
266	}
267
268	if (error) {
269		log_error("%s: gdlm_lock %x,%llx err=%d cur=%d req=%d lkf=%x "
270			  "flags=%lx", ls->fsname, lp->lockname.ln_type,
271			  (unsigned long long)lp->lockname.ln_number, error,
272			  lp->cur, lp->req, lp->lkf, lp->flags);
273		return LM_OUT_ERROR;
274	}
275	return LM_OUT_ASYNC;
276}
277
278static unsigned int gdlm_do_unlock(struct gdlm_lock *lp)
279{
280	struct gdlm_ls *ls = lp->ls;
281	unsigned int lkf = 0;
282	int error;
283
284	set_bit(LFL_DLM_UNLOCK, &lp->flags);
285	set_bit(LFL_ACTIVE, &lp->flags);
286
287	if (lp->lvb)
288		lkf = DLM_LKF_VALBLK;
289
290	log_debug("un %x,%llx %x %d %x", lp->lockname.ln_type,
291		  (unsigned long long)lp->lockname.ln_number,
292		  lp->lksb.sb_lkid, lp->cur, lkf);
293
294	error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, lkf, NULL, lp);
295
296	if (error) {
297		log_error("%s: gdlm_unlock %x,%llx err=%d cur=%d req=%d lkf=%x "
298			  "flags=%lx", ls->fsname, lp->lockname.ln_type,
299			  (unsigned long long)lp->lockname.ln_number, error,
300			  lp->cur, lp->req, lp->lkf, lp->flags);
301		return LM_OUT_ERROR;
302	}
303	return LM_OUT_ASYNC;
304}
305
306unsigned int gdlm_lock(void *lock, unsigned int cur_state,
307		       unsigned int req_state, unsigned int flags)
308{
309	struct gdlm_lock *lp = lock;
310
311	clear_bit(LFL_DLM_CANCEL, &lp->flags);
312	if (flags & LM_FLAG_NOEXP)
313		set_bit(LFL_NOBLOCK, &lp->flags);
314
315	check_cur_state(lp, cur_state);
316	lp->req = make_mode(req_state);
317	lp->lkf = make_flags(lp, flags, lp->cur, lp->req);
318
319	return gdlm_do_lock(lp);
320}
321
322unsigned int gdlm_unlock(void *lock, unsigned int cur_state)
323{
324	struct gdlm_lock *lp = lock;
325
326	clear_bit(LFL_DLM_CANCEL, &lp->flags);
327	if (lp->cur == DLM_LOCK_IV)
328		return 0;
329	return gdlm_do_unlock(lp);
330}
331
332void gdlm_cancel(void *lock)
333{
334	struct gdlm_lock *lp = lock;
335	struct gdlm_ls *ls = lp->ls;
336	int error, delay_list = 0;
337
338	if (test_bit(LFL_DLM_CANCEL, &lp->flags))
339		return;
340
341	log_info("gdlm_cancel %x,%llx flags %lx", lp->lockname.ln_type,
342		 (unsigned long long)lp->lockname.ln_number, lp->flags);
343
344	spin_lock(&ls->async_lock);
345	if (!list_empty(&lp->delay_list)) {
346		list_del_init(&lp->delay_list);
347		delay_list = 1;
348	}
349	spin_unlock(&ls->async_lock);
350
351	if (delay_list) {
352		set_bit(LFL_CANCEL, &lp->flags);
353		set_bit(LFL_ACTIVE, &lp->flags);
354		queue_complete(lp);
355		return;
356	}
357
358	if (!test_bit(LFL_ACTIVE, &lp->flags) ||
359	    test_bit(LFL_DLM_UNLOCK, &lp->flags)) {
360		log_info("gdlm_cancel skip %x,%llx flags %lx",
361		 	 lp->lockname.ln_type,
362			 (unsigned long long)lp->lockname.ln_number, lp->flags);
363		return;
364	}
365
366	/* the lock is blocked in the dlm */
367
368	set_bit(LFL_DLM_CANCEL, &lp->flags);
369	set_bit(LFL_ACTIVE, &lp->flags);
370
371	error = dlm_unlock(ls->dlm_lockspace, lp->lksb.sb_lkid, DLM_LKF_CANCEL,
372			   NULL, lp);
373
374	log_info("gdlm_cancel rv %d %x,%llx flags %lx", error,
375		 lp->lockname.ln_type,
376		 (unsigned long long)lp->lockname.ln_number, lp->flags);
377
378	if (error == -EBUSY)
379		clear_bit(LFL_DLM_CANCEL, &lp->flags);
380}
381
382static int gdlm_add_lvb(struct gdlm_lock *lp)
383{
384	char *lvb;
385
386	lvb = kzalloc(GDLM_LVB_SIZE, GFP_KERNEL);
387	if (!lvb)
388		return -ENOMEM;
389
390	lp->lksb.sb_lvbptr = lvb;
391	lp->lvb = lvb;
392	return 0;
393}
394
395static void gdlm_del_lvb(struct gdlm_lock *lp)
396{
397	kfree(lp->lvb);
398	lp->lvb = NULL;
399	lp->lksb.sb_lvbptr = NULL;
400}
401
402/* This can do a synchronous dlm request (requiring a lock_dlm thread to get
403   the completion) because gfs won't call hold_lvb() during a callback (from
404   the context of a lock_dlm thread). */
405
406static int hold_null_lock(struct gdlm_lock *lp)
407{
408	struct gdlm_lock *lpn = NULL;
409	int error;
410
411	if (lp->hold_null) {
412		printk(KERN_INFO "lock_dlm: lvb already held\n");
413		return 0;
414	}
415
416	error = gdlm_create_lp(lp->ls, &lp->lockname, &lpn);
417	if (error)
418		goto out;
419
420	lpn->lksb.sb_lvbptr = junk_lvb;
421	lpn->lvb = junk_lvb;
422
423	lpn->req = DLM_LOCK_NL;
424	lpn->lkf = DLM_LKF_VALBLK | DLM_LKF_EXPEDITE;
425	set_bit(LFL_NOBAST, &lpn->flags);
426	set_bit(LFL_INLOCK, &lpn->flags);
427
428	init_completion(&lpn->ast_wait);
429	gdlm_do_lock(lpn);
430	wait_for_completion(&lpn->ast_wait);
431	error = lpn->lksb.sb_status;
432	if (error) {
433		printk(KERN_INFO "lock_dlm: hold_null_lock dlm error %d\n",
434		       error);
435		gdlm_delete_lp(lpn);
436		lpn = NULL;
437	}
438out:
439	lp->hold_null = lpn;
440	return error;
441}
442
443/* This cannot do a synchronous dlm request (requiring a lock_dlm thread to get
444   the completion) because gfs may call unhold_lvb() during a callback (from
445   the context of a lock_dlm thread) which could cause a deadlock since the
446   other lock_dlm thread could be engaged in recovery. */
447
448static void unhold_null_lock(struct gdlm_lock *lp)
449{
450	struct gdlm_lock *lpn = lp->hold_null;
451
452	gdlm_assert(lpn, "%x,%llx", lp->lockname.ln_type,
453		    (unsigned long long)lp->lockname.ln_number);
454	lpn->lksb.sb_lvbptr = NULL;
455	lpn->lvb = NULL;
456	set_bit(LFL_UNLOCK_DELETE, &lpn->flags);
457	gdlm_do_unlock(lpn);
458	lp->hold_null = NULL;
459}
460
461/* Acquire a NL lock because gfs requires the value block to remain
462   intact on the resource while the lvb is "held" even if it's holding no locks
463   on the resource. */
464
465int gdlm_hold_lvb(void *lock, char **lvbp)
466{
467	struct gdlm_lock *lp = lock;
468	int error;
469
470	error = gdlm_add_lvb(lp);
471	if (error)
472		return error;
473
474	*lvbp = lp->lvb;
475
476	error = hold_null_lock(lp);
477	if (error)
478		gdlm_del_lvb(lp);
479
480	return error;
481}
482
483void gdlm_unhold_lvb(void *lock, char *lvb)
484{
485	struct gdlm_lock *lp = lock;
486
487	unhold_null_lock(lp);
488	gdlm_del_lvb(lp);
489}
490
491void gdlm_submit_delayed(struct gdlm_ls *ls)
492{
493	struct gdlm_lock *lp, *safe;
494
495	spin_lock(&ls->async_lock);
496	list_for_each_entry_safe(lp, safe, &ls->delayed, delay_list) {
497		list_del_init(&lp->delay_list);
498		list_add_tail(&lp->delay_list, &ls->submit);
499	}
500	spin_unlock(&ls->async_lock);
501	wake_up(&ls->thread_wait);
502}
503
504int gdlm_release_all_locks(struct gdlm_ls *ls)
505{
506	struct gdlm_lock *lp, *safe;
507	int count = 0;
508
509	spin_lock(&ls->async_lock);
510	list_for_each_entry_safe(lp, safe, &ls->all_locks, all_list) {
511		list_del_init(&lp->all_list);
512
513		if (lp->lvb && lp->lvb != junk_lvb)
514			kfree(lp->lvb);
515		kfree(lp);
516		count++;
517	}
518	spin_unlock(&ls->async_lock);
519
520	return count;
521}
522