1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * Copyright 2023 Red Hat
4 */
5
6#include "block-map.h"
7
8#include <linux/bio.h>
9#include <linux/ratelimit.h>
10
11#include "errors.h"
12#include "logger.h"
13#include "memory-alloc.h"
14#include "permassert.h"
15
16#include "action-manager.h"
17#include "admin-state.h"
18#include "completion.h"
19#include "constants.h"
20#include "data-vio.h"
21#include "encodings.h"
22#include "io-submitter.h"
23#include "physical-zone.h"
24#include "recovery-journal.h"
25#include "slab-depot.h"
26#include "status-codes.h"
27#include "types.h"
28#include "vdo.h"
29#include "vio.h"
30#include "wait-queue.h"
31
32/**
33 * DOC: Block map eras
34 *
35 * The block map era, or maximum age, is used as follows:
36 *
37 * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40 * according to the sequence number they record.
41 *
42 * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44 * this era are issued for write. In all older eras, pages are issued for write immediately.
45 */
46
47struct page_descriptor {
48	root_count_t root_index;
49	height_t height;
50	page_number_t page_index;
51	slot_number_t slot;
52} __packed;
53
54union page_key {
55	struct page_descriptor descriptor;
56	u64 key;
57};
58
59struct write_if_not_dirtied_context {
60	struct block_map_zone *zone;
61	u8 generation;
62};
63
64struct block_map_tree_segment {
65	struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66};
67
68struct block_map_tree {
69	struct block_map_tree_segment *segments;
70};
71
72struct forest {
73	struct block_map *map;
74	size_t segments;
75	struct boundary *boundaries;
76	struct tree_page **pages;
77	struct block_map_tree trees[];
78};
79
80struct cursor_level {
81	page_number_t page_index;
82	slot_number_t slot;
83};
84
85struct cursors;
86
87struct cursor {
88	struct vdo_waiter waiter;
89	struct block_map_tree *tree;
90	height_t height;
91	struct cursors *parent;
92	struct boundary boundary;
93	struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94	struct pooled_vio *vio;
95};
96
97struct cursors {
98	struct block_map_zone *zone;
99	struct vio_pool *pool;
100	vdo_entry_callback_fn entry_callback;
101	struct vdo_completion *completion;
102	root_count_t active_roots;
103	struct cursor cursors[];
104};
105
106static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107
108/* Used to indicate that the page holding the location of a tree root has been "loaded". */
109static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110
111const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112	.mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113	.pbn_high_nibble = 0,
114	.pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115};
116
117#define LOG_INTERVAL 4000
118#define DISPLAY_INTERVAL 100000
119
120/*
121 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122 * Prevents any compiler shenanigans from affecting other threads reading those stats.
123 */
124#define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125
126static inline bool is_dirty(const struct page_info *info)
127{
128	return info->state == PS_DIRTY;
129}
130
131static inline bool is_present(const struct page_info *info)
132{
133	return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134}
135
136static inline bool is_in_flight(const struct page_info *info)
137{
138	return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139}
140
141static inline bool is_incoming(const struct page_info *info)
142{
143	return info->state == PS_INCOMING;
144}
145
146static inline bool is_outgoing(const struct page_info *info)
147{
148	return info->state == PS_OUTGOING;
149}
150
151static inline bool is_valid(const struct page_info *info)
152{
153	return is_present(info) || is_outgoing(info);
154}
155
156static char *get_page_buffer(struct page_info *info)
157{
158	struct vdo_page_cache *cache = info->cache;
159
160	return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161}
162
163static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164{
165	struct vdo_page_completion *completion;
166
167	if (waiter == NULL)
168		return NULL;
169
170	completion = container_of(waiter, struct vdo_page_completion, waiter);
171	vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172	return completion;
173}
174
175/**
176 * initialize_info() - Initialize all page info structures and put them on the free list.
177 *
178 * Return: VDO_SUCCESS or an error.
179 */
180static int initialize_info(struct vdo_page_cache *cache)
181{
182	struct page_info *info;
183
184	INIT_LIST_HEAD(&cache->free_list);
185	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
186		int result;
187
188		info->cache = cache;
189		info->state = PS_FREE;
190		info->pbn = NO_PAGE;
191
192		result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
193					     VIO_PRIORITY_METADATA, info,
194					     get_page_buffer(info), &info->vio);
195		if (result != VDO_SUCCESS)
196			return result;
197
198		/* The thread ID should never change. */
199		info->vio->completion.callback_thread_id = cache->zone->thread_id;
200
201		INIT_LIST_HEAD(&info->state_entry);
202		list_add_tail(&info->state_entry, &cache->free_list);
203		INIT_LIST_HEAD(&info->lru_entry);
204	}
205
206	return VDO_SUCCESS;
207}
208
209/**
210 * allocate_cache_components() - Allocate components of the cache which require their own
211 *                               allocation.
212 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
213 *               written out.
214 *
215 * The caller is responsible for all clean up on errors.
216 *
217 * Return: VDO_SUCCESS or an error code.
218 */
219static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
220{
221	u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
222	int result;
223
224	result = vdo_allocate(cache->page_count, struct page_info, "page infos",
225			      &cache->infos);
226	if (result != VDO_SUCCESS)
227		return result;
228
229	result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
230	if (result != VDO_SUCCESS)
231		return result;
232
233	result = vdo_int_map_create(cache->page_count, &cache->page_map);
234	if (result != VDO_SUCCESS)
235		return result;
236
237	return initialize_info(cache);
238}
239
240/**
241 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
242 *                            thread.
243 */
244static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
245					  const char *function_name)
246{
247	thread_id_t thread_id = vdo_get_callback_thread_id();
248
249	VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
250			    "%s() must only be called on cache thread %d, not thread %d",
251			    function_name, cache->zone->thread_id, thread_id);
252}
253
254/** assert_io_allowed() - Assert that a page cache may issue I/O. */
255static inline void assert_io_allowed(struct vdo_page_cache *cache)
256{
257	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
258			    "VDO page cache may issue I/O");
259}
260
261/** report_cache_pressure() - Log and, if enabled, report cache pressure. */
262static void report_cache_pressure(struct vdo_page_cache *cache)
263{
264	ADD_ONCE(cache->stats.cache_pressure, 1);
265	if (cache->waiter_count > cache->page_count) {
266		if ((cache->pressure_report % LOG_INTERVAL) == 0)
267			vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
268
269		if (++cache->pressure_report >= DISPLAY_INTERVAL)
270			cache->pressure_report = 0;
271	}
272}
273
274/**
275 * get_page_state_name() - Return the name of a page state.
276 *
277 * If the page state is invalid a static string is returned and the invalid state is logged.
278 *
279 * Return: A pointer to a static page state name.
280 */
281static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
282{
283	int result;
284	static const char * const state_names[] = {
285		"FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
286	};
287
288	BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
289
290	result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
291			    "Unknown page_state value %d", state);
292	if (result != VDO_SUCCESS)
293		return "[UNKNOWN PAGE STATE]";
294
295	return state_names[state];
296}
297
298/**
299 * update_counter() - Update the counter associated with a given state.
300 * @info: The page info to count.
301 * @delta: The delta to apply to the counter.
302 */
303static void update_counter(struct page_info *info, s32 delta)
304{
305	struct block_map_statistics *stats = &info->cache->stats;
306
307	switch (info->state) {
308	case PS_FREE:
309		ADD_ONCE(stats->free_pages, delta);
310		return;
311
312	case PS_INCOMING:
313		ADD_ONCE(stats->incoming_pages, delta);
314		return;
315
316	case PS_OUTGOING:
317		ADD_ONCE(stats->outgoing_pages, delta);
318		return;
319
320	case PS_FAILED:
321		ADD_ONCE(stats->failed_pages, delta);
322		return;
323
324	case PS_RESIDENT:
325		ADD_ONCE(stats->clean_pages, delta);
326		return;
327
328	case PS_DIRTY:
329		ADD_ONCE(stats->dirty_pages, delta);
330		return;
331
332	default:
333		return;
334	}
335}
336
337/** update_lru() - Update the lru information for an active page. */
338static void update_lru(struct page_info *info)
339{
340	if (info->cache->lru_list.prev != &info->lru_entry)
341		list_move_tail(&info->lru_entry, &info->cache->lru_list);
342}
343
344/**
345 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
346 *                    counters.
347 */
348static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
349{
350	if (new_state == info->state)
351		return;
352
353	update_counter(info, -1);
354	info->state = new_state;
355	update_counter(info, 1);
356
357	switch (info->state) {
358	case PS_FREE:
359	case PS_FAILED:
360		list_move_tail(&info->state_entry, &info->cache->free_list);
361		return;
362
363	case PS_OUTGOING:
364		list_move_tail(&info->state_entry, &info->cache->outgoing_list);
365		return;
366
367	case PS_DIRTY:
368		return;
369
370	default:
371		list_del_init(&info->state_entry);
372	}
373}
374
375/** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
376static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
377{
378	struct vdo_page_cache *cache = info->cache;
379
380	/* Either the new or the old page number must be NO_PAGE. */
381	int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
382				"Must free a page before reusing it.");
383	if (result != VDO_SUCCESS)
384		return result;
385
386	if (info->pbn != NO_PAGE)
387		vdo_int_map_remove(cache->page_map, info->pbn);
388
389	info->pbn = pbn;
390
391	if (pbn != NO_PAGE) {
392		result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
393		if (result != VDO_SUCCESS)
394			return result;
395	}
396	return VDO_SUCCESS;
397}
398
399/** reset_page_info() - Reset page info to represent an unallocated page. */
400static int reset_page_info(struct page_info *info)
401{
402	int result;
403
404	result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
405	if (result != VDO_SUCCESS)
406		return result;
407
408	result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
409			    "VDO Page must not have waiters");
410	if (result != VDO_SUCCESS)
411		return result;
412
413	result = set_info_pbn(info, NO_PAGE);
414	set_info_state(info, PS_FREE);
415	list_del_init(&info->lru_entry);
416	return result;
417}
418
419/**
420 * find_free_page() - Find a free page.
421 *
422 * Return: A pointer to the page info structure (if found), NULL otherwise.
423 */
424static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
425{
426	struct page_info *info;
427
428	info = list_first_entry_or_null(&cache->free_list, struct page_info,
429					state_entry);
430	if (info != NULL)
431		list_del_init(&info->state_entry);
432
433	return info;
434}
435
436/**
437 * find_page() - Find the page info (if any) associated with a given pbn.
438 * @pbn: The absolute physical block number of the page.
439 *
440 * Return: The page info for the page if available, or NULL if not.
441 */
442static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
443						 physical_block_number_t pbn)
444{
445	if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
446		return cache->last_found;
447
448	cache->last_found = vdo_int_map_get(cache->page_map, pbn);
449	return cache->last_found;
450}
451
452/**
453 * select_lru_page() - Determine which page is least recently used.
454 *
455 * Picks the least recently used from among the non-busy entries at the front of each of the lru
456 * ring. Since whenever we mark a page busy we also put it to the end of the ring it is unlikely
457 * that the entries at the front are busy unless the queue is very short, but not impossible.
458 *
459 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
460 *         found. The page can be dirty or resident.
461 */
462static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
463{
464	struct page_info *info;
465
466	list_for_each_entry(info, &cache->lru_list, lru_entry)
467		if ((info->busy == 0) && !is_in_flight(info))
468			return info;
469
470	return NULL;
471}
472
473/* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
474
475/**
476 * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
477 * @info: The page info representing the result page.
478 * @vdo_page_comp: The VDO page completion to complete.
479 */
480static void complete_with_page(struct page_info *info,
481			       struct vdo_page_completion *vdo_page_comp)
482{
483	bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
484
485	if (!available) {
486		vdo_log_error_strerror(VDO_BAD_PAGE,
487				       "Requested cache page %llu in state %s is not %s",
488				       (unsigned long long) info->pbn,
489				       get_page_state_name(info->state),
490				       vdo_page_comp->writable ? "present" : "valid");
491		vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
492		return;
493	}
494
495	vdo_page_comp->info = info;
496	vdo_page_comp->ready = true;
497	vdo_finish_completion(&vdo_page_comp->completion);
498}
499
500/**
501 * complete_waiter_with_error() - Complete a page completion with an error code.
502 * @waiter: The page completion, as a waiter.
503 * @result_ptr: A pointer to the error code.
504 *
505 * Implements waiter_callback_fn.
506 */
507static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
508{
509	int *result = result_ptr;
510
511	vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
512}
513
514/**
515 * complete_waiter_with_page() - Complete a page completion with a page.
516 * @waiter: The page completion, as a waiter.
517 * @page_info: The page info to complete with.
518 *
519 * Implements waiter_callback_fn.
520 */
521static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
522{
523	complete_with_page(page_info, page_completion_from_waiter(waiter));
524}
525
526/**
527 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
528 *
529 * Upon completion the waitq will be empty.
530 *
531 * Return: The number of pages distributed.
532 */
533static unsigned int distribute_page_over_waitq(struct page_info *info,
534					       struct vdo_wait_queue *waitq)
535{
536	size_t num_pages;
537
538	update_lru(info);
539	num_pages = vdo_waitq_num_waiters(waitq);
540
541	/*
542	 * Increment the busy count once for each pending completion so that this page does not
543	 * stop being busy until all completions have been processed.
544	 */
545	info->busy += num_pages;
546
547	vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
548	return num_pages;
549}
550
551/**
552 * set_persistent_error() - Set a persistent error which all requests will receive in the future.
553 * @context: A string describing what triggered the error.
554 *
555 * Once triggered, all enqueued completions will get this error. Any future requests will result in
556 * this error as well.
557 */
558static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
559				 int result)
560{
561	struct page_info *info;
562	/* If we're already read-only, there's no need to log. */
563	struct vdo *vdo = cache->vdo;
564
565	if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
566		vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
567				       context);
568		vdo_enter_read_only_mode(vdo, result);
569	}
570
571	assert_on_cache_thread(cache, __func__);
572
573	vdo_waitq_notify_all_waiters(&cache->free_waiters,
574				     complete_waiter_with_error, &result);
575	cache->waiter_count = 0;
576
577	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
578		vdo_waitq_notify_all_waiters(&info->waiting,
579					     complete_waiter_with_error, &result);
580	}
581}
582
583/**
584 * validate_completed_page() - Check that a page completion which is being freed to the cache
585 *                             referred to a valid page and is in a valid state.
586 * @writable: Whether a writable page is required.
587 *
588 * Return: VDO_SUCCESS if the page was valid, otherwise as error
589 */
590static int __must_check validate_completed_page(struct vdo_page_completion *completion,
591						bool writable)
592{
593	int result;
594
595	result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
596	if (result != VDO_SUCCESS)
597		return result;
598
599	result = VDO_ASSERT(completion->info != NULL,
600			    "VDO Page Completion must be complete");
601	if (result != VDO_SUCCESS)
602		return result;
603
604	result = VDO_ASSERT(completion->info->pbn == completion->pbn,
605			    "VDO Page Completion pbn must be consistent");
606	if (result != VDO_SUCCESS)
607		return result;
608
609	result = VDO_ASSERT(is_valid(completion->info),
610			    "VDO Page Completion page must be valid");
611	if (result != VDO_SUCCESS)
612		return result;
613
614	if (writable) {
615		result = VDO_ASSERT(completion->writable,
616				    "VDO Page Completion must be writable");
617		if (result != VDO_SUCCESS)
618			return result;
619	}
620
621	return VDO_SUCCESS;
622}
623
624static void check_for_drain_complete(struct block_map_zone *zone)
625{
626	if (vdo_is_state_draining(&zone->state) &&
627	    (zone->active_lookups == 0) &&
628	    !vdo_waitq_has_waiters(&zone->flush_waiters) &&
629	    !is_vio_pool_busy(zone->vio_pool) &&
630	    (zone->page_cache.outstanding_reads == 0) &&
631	    (zone->page_cache.outstanding_writes == 0)) {
632		vdo_finish_draining_with_result(&zone->state,
633						(vdo_is_read_only(zone->block_map->vdo) ?
634						 VDO_READ_ONLY : VDO_SUCCESS));
635	}
636}
637
638static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
639{
640	vdo_enter_read_only_mode(zone->block_map->vdo, result);
641
642	/*
643	 * We are in read-only mode, so we won't ever write any page out.
644	 * Just take all waiters off the waitq so the zone can drain.
645	 */
646	vdo_waitq_init(&zone->flush_waiters);
647	check_for_drain_complete(zone);
648}
649
650static bool __must_check
651validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
652						bool writable)
653{
654	int result = validate_completed_page(completion, writable);
655
656	if (result == VDO_SUCCESS)
657		return true;
658
659	enter_zone_read_only_mode(completion->info->cache->zone, result);
660	return false;
661}
662
663/**
664 * handle_load_error() - Handle page load errors.
665 * @completion: The page read vio.
666 */
667static void handle_load_error(struct vdo_completion *completion)
668{
669	int result = completion->result;
670	struct page_info *info = completion->parent;
671	struct vdo_page_cache *cache = info->cache;
672
673	assert_on_cache_thread(cache, __func__);
674	vio_record_metadata_io_error(as_vio(completion));
675	vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
676	ADD_ONCE(cache->stats.failed_reads, 1);
677	set_info_state(info, PS_FAILED);
678	vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
679	reset_page_info(info);
680
681	/*
682	 * Don't decrement until right before calling check_for_drain_complete() to
683	 * ensure that the above work can't cause the page cache to be freed out from under us.
684	 */
685	cache->outstanding_reads--;
686	check_for_drain_complete(cache->zone);
687}
688
689/**
690 * page_is_loaded() - Callback used when a page has been loaded.
691 * @completion: The vio which has loaded the page. Its parent is the page_info.
692 */
693static void page_is_loaded(struct vdo_completion *completion)
694{
695	struct page_info *info = completion->parent;
696	struct vdo_page_cache *cache = info->cache;
697	nonce_t nonce = info->cache->zone->block_map->nonce;
698	struct block_map_page *page;
699	enum block_map_page_validity validity;
700
701	assert_on_cache_thread(cache, __func__);
702
703	page = (struct block_map_page *) get_page_buffer(info);
704	validity = vdo_validate_block_map_page(page, nonce, info->pbn);
705	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
706		physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
707		int result = vdo_log_error_strerror(VDO_BAD_PAGE,
708						    "Expected page %llu but got page %llu instead",
709						    (unsigned long long) info->pbn,
710						    (unsigned long long) pbn);
711
712		vdo_continue_completion(completion, result);
713		return;
714	}
715
716	if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
717		vdo_format_block_map_page(page, nonce, info->pbn, false);
718
719	info->recovery_lock = 0;
720	set_info_state(info, PS_RESIDENT);
721	distribute_page_over_waitq(info, &info->waiting);
722
723	/*
724	 * Don't decrement until right before calling check_for_drain_complete() to
725	 * ensure that the above work can't cause the page cache to be freed out from under us.
726	 */
727	cache->outstanding_reads--;
728	check_for_drain_complete(cache->zone);
729}
730
731/**
732 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
733 * @completion: The page load completion.
734 */
735static void handle_rebuild_read_error(struct vdo_completion *completion)
736{
737	struct page_info *info = completion->parent;
738	struct vdo_page_cache *cache = info->cache;
739
740	assert_on_cache_thread(cache, __func__);
741
742	/*
743	 * We are doing a read-only rebuild, so treat this as a successful read
744	 * of an uninitialized page.
745	 */
746	vio_record_metadata_io_error(as_vio(completion));
747	ADD_ONCE(cache->stats.failed_reads, 1);
748	memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
749	vdo_reset_completion(completion);
750	page_is_loaded(completion);
751}
752
753static void load_cache_page_endio(struct bio *bio)
754{
755	struct vio *vio = bio->bi_private;
756	struct page_info *info = vio->completion.parent;
757
758	continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
759}
760
761/**
762 * launch_page_load() - Begin the process of loading a page.
763 *
764 * Return: VDO_SUCCESS or an error code.
765 */
766static int __must_check launch_page_load(struct page_info *info,
767					 physical_block_number_t pbn)
768{
769	int result;
770	vdo_action_fn callback;
771	struct vdo_page_cache *cache = info->cache;
772
773	assert_io_allowed(cache);
774
775	result = set_info_pbn(info, pbn);
776	if (result != VDO_SUCCESS)
777		return result;
778
779	result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
780	if (result != VDO_SUCCESS)
781		return result;
782
783	set_info_state(info, PS_INCOMING);
784	cache->outstanding_reads++;
785	ADD_ONCE(cache->stats.pages_loaded, 1);
786	callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
787	vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
788				callback, REQ_OP_READ | REQ_PRIO);
789	return VDO_SUCCESS;
790}
791
792static void write_pages(struct vdo_completion *completion);
793
794/** handle_flush_error() - Handle errors flushing the layer. */
795static void handle_flush_error(struct vdo_completion *completion)
796{
797	struct page_info *info = completion->parent;
798
799	vio_record_metadata_io_error(as_vio(completion));
800	set_persistent_error(info->cache, "flush failed", completion->result);
801	write_pages(completion);
802}
803
804static void flush_endio(struct bio *bio)
805{
806	struct vio *vio = bio->bi_private;
807	struct page_info *info = vio->completion.parent;
808
809	continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
810}
811
812/** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
813static void save_pages(struct vdo_page_cache *cache)
814{
815	struct page_info *info;
816	struct vio *vio;
817
818	if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
819		return;
820
821	assert_io_allowed(cache);
822
823	info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
824
825	cache->pages_in_flush = cache->pages_to_flush;
826	cache->pages_to_flush = 0;
827	ADD_ONCE(cache->stats.flush_count, 1);
828
829	vio = info->vio;
830
831	/*
832	 * We must make sure that the recovery journal entries that changed these pages were
833	 * successfully persisted, and thus must issue a flush before each batch of pages is
834	 * written to ensure this.
835	 */
836	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
837}
838
839/**
840 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
841 *
842 * Once in the list, a page may not be used until it has been written out.
843 */
844static void schedule_page_save(struct page_info *info)
845{
846	if (info->busy > 0) {
847		info->write_status = WRITE_STATUS_DEFERRED;
848		return;
849	}
850
851	info->cache->pages_to_flush++;
852	info->cache->outstanding_writes++;
853	set_info_state(info, PS_OUTGOING);
854}
855
856/**
857 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
858 * pages if another save is not in progress.
859 */
860static void launch_page_save(struct page_info *info)
861{
862	schedule_page_save(info);
863	save_pages(info->cache);
864}
865
866/**
867 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
868 *                           requesting a given page number.
869 * @context: A pointer to the pbn of the desired page.
870 *
871 * Implements waiter_match_fn.
872 *
873 * Return: true if the page completion is for the desired page number.
874 */
875static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
876{
877	physical_block_number_t *pbn = context;
878
879	return (page_completion_from_waiter(waiter)->pbn == *pbn);
880}
881
882/**
883 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
884 *                        any other completions that match it in page number.
885 */
886static void allocate_free_page(struct page_info *info)
887{
888	int result;
889	struct vdo_waiter *oldest_waiter;
890	physical_block_number_t pbn;
891	struct vdo_page_cache *cache = info->cache;
892
893	assert_on_cache_thread(cache, __func__);
894
895	if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
896		if (cache->stats.cache_pressure > 0) {
897			vdo_log_info("page cache pressure relieved");
898			WRITE_ONCE(cache->stats.cache_pressure, 0);
899		}
900
901		return;
902	}
903
904	result = reset_page_info(info);
905	if (result != VDO_SUCCESS) {
906		set_persistent_error(cache, "cannot reset page info", result);
907		return;
908	}
909
910	oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
911	pbn = page_completion_from_waiter(oldest_waiter)->pbn;
912
913	/*
914	 * Remove all entries which match the page number in question and push them onto the page
915	 * info's waitq.
916	 */
917	vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
918					   &pbn, &info->waiting);
919	cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
920
921	result = launch_page_load(info, pbn);
922	if (result != VDO_SUCCESS) {
923		vdo_waitq_notify_all_waiters(&info->waiting,
924					     complete_waiter_with_error, &result);
925	}
926}
927
928/**
929 * discard_a_page() - Begin the process of discarding a page.
930 *
931 * If no page is discardable, increments a count of deferred frees so that the next release of a
932 * page which is no longer busy will kick off another discard cycle. This is an indication that the
933 * cache is not big enough.
934 *
935 * If the selected page is not dirty, immediately allocates the page to the oldest completion
936 * waiting for a free page.
937 */
938static void discard_a_page(struct vdo_page_cache *cache)
939{
940	struct page_info *info = select_lru_page(cache);
941
942	if (info == NULL) {
943		report_cache_pressure(cache);
944		return;
945	}
946
947	if (!is_dirty(info)) {
948		allocate_free_page(info);
949		return;
950	}
951
952	VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
953			    "page selected for discard is not in flight");
954
955	cache->discard_count++;
956	info->write_status = WRITE_STATUS_DISCARD;
957	launch_page_save(info);
958}
959
960/**
961 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
962 *                                 a different page.
963 */
964static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
965{
966	struct vdo_page_cache *cache = vdo_page_comp->cache;
967
968	cache->waiter_count++;
969	vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
970	discard_a_page(cache);
971}
972
973/**
974 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
975 *                            page.
976 * @cache: The page cache.
977 */
978static void discard_page_if_needed(struct vdo_page_cache *cache)
979{
980	if (cache->waiter_count > cache->discard_count)
981		discard_a_page(cache);
982}
983
984/**
985 * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
986 * @info: The info structure for the page whose write just completed.
987 *
988 * Return: true if the page write was a discard.
989 */
990static bool write_has_finished(struct page_info *info)
991{
992	bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
993
994	assert_on_cache_thread(info->cache, __func__);
995	info->cache->outstanding_writes--;
996
997	info->write_status = WRITE_STATUS_NORMAL;
998	return was_discard;
999}
1000
1001/**
1002 * handle_page_write_error() - Handler for page write errors.
1003 * @completion: The page write vio.
1004 */
1005static void handle_page_write_error(struct vdo_completion *completion)
1006{
1007	int result = completion->result;
1008	struct page_info *info = completion->parent;
1009	struct vdo_page_cache *cache = info->cache;
1010
1011	vio_record_metadata_io_error(as_vio(completion));
1012
1013	/* If we're already read-only, write failures are to be expected. */
1014	if (result != VDO_READ_ONLY) {
1015		vdo_log_ratelimit(vdo_log_error,
1016				  "failed to write block map page %llu",
1017				  (unsigned long long) info->pbn);
1018	}
1019
1020	set_info_state(info, PS_DIRTY);
1021	ADD_ONCE(cache->stats.failed_writes, 1);
1022	set_persistent_error(cache, "cannot write page", result);
1023
1024	if (!write_has_finished(info))
1025		discard_page_if_needed(cache);
1026
1027	check_for_drain_complete(cache->zone);
1028}
1029
1030static void page_is_written_out(struct vdo_completion *completion);
1031
1032static void write_cache_page_endio(struct bio *bio)
1033{
1034	struct vio *vio = bio->bi_private;
1035	struct page_info *info = vio->completion.parent;
1036
1037	continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1038}
1039
1040/**
1041 * page_is_written_out() - Callback used when a page has been written out.
1042 * @completion: The vio which wrote the page. Its parent is a page_info.
1043 */
1044static void page_is_written_out(struct vdo_completion *completion)
1045{
1046	bool was_discard, reclaimed;
1047	u32 reclamations;
1048	struct page_info *info = completion->parent;
1049	struct vdo_page_cache *cache = info->cache;
1050	struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1051
1052	if (!page->header.initialized) {
1053		page->header.initialized = true;
1054		vdo_submit_metadata_vio(info->vio, info->pbn,
1055					write_cache_page_endio,
1056					handle_page_write_error,
1057					REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1058		return;
1059	}
1060
1061	/* Handle journal updates and torn write protection. */
1062	vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1063						     info->recovery_lock,
1064						     VDO_ZONE_TYPE_LOGICAL,
1065						     cache->zone->zone_number);
1066	info->recovery_lock = 0;
1067	was_discard = write_has_finished(info);
1068	reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1069
1070	set_info_state(info, PS_RESIDENT);
1071
1072	reclamations = distribute_page_over_waitq(info, &info->waiting);
1073	ADD_ONCE(cache->stats.reclaimed, reclamations);
1074
1075	if (was_discard)
1076		cache->discard_count--;
1077
1078	if (reclaimed)
1079		discard_page_if_needed(cache);
1080	else
1081		allocate_free_page(info);
1082
1083	check_for_drain_complete(cache->zone);
1084}
1085
1086/**
1087 * write_pages() - Write the batch of pages which were covered by the layer flush which just
1088 *                 completed.
1089 * @flush_completion: The flush vio.
1090 *
1091 * This callback is registered in save_pages().
1092 */
1093static void write_pages(struct vdo_completion *flush_completion)
1094{
1095	struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1096
1097	/*
1098	 * We need to cache these two values on the stack since it is possible for the last
1099	 * page info to cause the page cache to get freed. Hence once we launch the last page,
1100	 * it may be unsafe to dereference the cache.
1101	 */
1102	bool has_unflushed_pages = (cache->pages_to_flush > 0);
1103	page_count_t pages_in_flush = cache->pages_in_flush;
1104
1105	cache->pages_in_flush = 0;
1106	while (pages_in_flush-- > 0) {
1107		struct page_info *info =
1108			list_first_entry(&cache->outgoing_list, struct page_info,
1109					 state_entry);
1110
1111		list_del_init(&info->state_entry);
1112		if (vdo_is_read_only(info->cache->vdo)) {
1113			struct vdo_completion *completion = &info->vio->completion;
1114
1115			vdo_reset_completion(completion);
1116			completion->callback = page_is_written_out;
1117			completion->error_handler = handle_page_write_error;
1118			vdo_fail_completion(completion, VDO_READ_ONLY);
1119			continue;
1120		}
1121		ADD_ONCE(info->cache->stats.pages_saved, 1);
1122		vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1123					handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1124	}
1125
1126	if (has_unflushed_pages) {
1127		/*
1128		 * If there are unflushed pages, the cache can't have been freed, so this call is
1129		 * safe.
1130		 */
1131		save_pages(cache);
1132	}
1133}
1134
1135/**
1136 * vdo_release_page_completion() - Release a VDO Page Completion.
1137 *
1138 * The page referenced by this completion (if any) will no longer be held busy by this completion.
1139 * If a page becomes discardable and there are completions awaiting free pages then a new round of
1140 * page discarding is started.
1141 */
1142void vdo_release_page_completion(struct vdo_completion *completion)
1143{
1144	struct page_info *discard_info = NULL;
1145	struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1146	struct vdo_page_cache *cache;
1147
1148	if (completion->result == VDO_SUCCESS) {
1149		if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1150			return;
1151
1152		if (--page_completion->info->busy == 0)
1153			discard_info = page_completion->info;
1154	}
1155
1156	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1157			    "Page being released after leaving all queues");
1158
1159	page_completion->info = NULL;
1160	cache = page_completion->cache;
1161	assert_on_cache_thread(cache, __func__);
1162
1163	if (discard_info != NULL) {
1164		if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1165			discard_info->write_status = WRITE_STATUS_NORMAL;
1166			launch_page_save(discard_info);
1167		}
1168
1169		/*
1170		 * if there are excess requests for pages (that have not already started discards)
1171		 * we need to discard some page (which may be this one)
1172		 */
1173		discard_page_if_needed(cache);
1174	}
1175}
1176
1177/**
1178 * load_page_for_completion() - Helper function to load a page as described by a VDO Page
1179 *                              Completion.
1180 */
1181static void load_page_for_completion(struct page_info *info,
1182				     struct vdo_page_completion *vdo_page_comp)
1183{
1184	int result;
1185
1186	vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1187	result = launch_page_load(info, vdo_page_comp->pbn);
1188	if (result != VDO_SUCCESS) {
1189		vdo_waitq_notify_all_waiters(&info->waiting,
1190					     complete_waiter_with_error, &result);
1191	}
1192}
1193
1194/**
1195 * vdo_get_page() - Initialize a page completion and get a block map page.
1196 * @page_completion: The vdo_page_completion to initialize.
1197 * @zone: The block map zone of the desired page.
1198 * @pbn: The absolute physical block of the desired page.
1199 * @writable: Whether the page can be modified.
1200 * @parent: The object to notify when the fetch is complete.
1201 * @callback: The notification callback.
1202 * @error_handler: The handler for fetch errors.
1203 * @requeue: Whether we must requeue when notifying the parent.
1204 *
1205 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1206 * by the completion to be loaded from disk. When the callback is invoked, the page will be
1207 * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1208 * when they are done with the page to clear the busy mark.
1209 */
1210void vdo_get_page(struct vdo_page_completion *page_completion,
1211		  struct block_map_zone *zone, physical_block_number_t pbn,
1212		  bool writable, void *parent, vdo_action_fn callback,
1213		  vdo_action_fn error_handler, bool requeue)
1214{
1215	struct vdo_page_cache *cache = &zone->page_cache;
1216	struct vdo_completion *completion = &page_completion->completion;
1217	struct page_info *info;
1218
1219	assert_on_cache_thread(cache, __func__);
1220	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1221			    "New page completion was not already on a wait queue");
1222
1223	*page_completion = (struct vdo_page_completion) {
1224		.pbn = pbn,
1225		.writable = writable,
1226		.cache = cache,
1227	};
1228
1229	vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1230	vdo_prepare_completion(completion, callback, error_handler,
1231			       cache->zone->thread_id, parent);
1232	completion->requeue = requeue;
1233
1234	if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1235		vdo_fail_completion(completion, VDO_READ_ONLY);
1236		return;
1237	}
1238
1239	if (page_completion->writable)
1240		ADD_ONCE(cache->stats.write_count, 1);
1241	else
1242		ADD_ONCE(cache->stats.read_count, 1);
1243
1244	info = find_page(cache, page_completion->pbn);
1245	if (info != NULL) {
1246		/* The page is in the cache already. */
1247		if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1248		    is_incoming(info) ||
1249		    (is_outgoing(info) && page_completion->writable)) {
1250			/* The page is unusable until it has finished I/O. */
1251			ADD_ONCE(cache->stats.wait_for_page, 1);
1252			vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1253			return;
1254		}
1255
1256		if (is_valid(info)) {
1257			/* The page is usable. */
1258			ADD_ONCE(cache->stats.found_in_cache, 1);
1259			if (!is_present(info))
1260				ADD_ONCE(cache->stats.read_outgoing, 1);
1261			update_lru(info);
1262			info->busy++;
1263			complete_with_page(info, page_completion);
1264			return;
1265		}
1266
1267		/* Something horrible has gone wrong. */
1268		VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1269	}
1270
1271	/* The page must be fetched. */
1272	info = find_free_page(cache);
1273	if (info != NULL) {
1274		ADD_ONCE(cache->stats.fetch_required, 1);
1275		load_page_for_completion(info, page_completion);
1276		return;
1277	}
1278
1279	/* The page must wait for a page to be discarded. */
1280	ADD_ONCE(cache->stats.discard_required, 1);
1281	discard_page_for_completion(page_completion);
1282}
1283
1284/**
1285 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1286 * @completion: The vdo_page_completion containing the page.
1287 */
1288void vdo_request_page_write(struct vdo_completion *completion)
1289{
1290	struct page_info *info;
1291	struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1292
1293	if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1294		return;
1295
1296	info = vdo_page_comp->info;
1297	set_info_state(info, PS_DIRTY);
1298	launch_page_save(info);
1299}
1300
1301/**
1302 * vdo_get_cached_page() - Get the block map page from a page completion.
1303 * @completion: A vdo page completion whose callback has been called.
1304 * @page_ptr: A pointer to hold the page
1305 *
1306 * Return: VDO_SUCCESS or an error
1307 */
1308int vdo_get_cached_page(struct vdo_completion *completion,
1309			struct block_map_page **page_ptr)
1310{
1311	int result;
1312	struct vdo_page_completion *vpc;
1313
1314	vpc = as_vdo_page_completion(completion);
1315	result = validate_completed_page(vpc, true);
1316	if (result == VDO_SUCCESS)
1317		*page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1318
1319	return result;
1320}
1321
1322/**
1323 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1324 *
1325 * There must not be any dirty pages in the cache.
1326 *
1327 * Return: A success or error code.
1328 */
1329int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1330{
1331	struct page_info *info;
1332
1333	assert_on_cache_thread(cache, __func__);
1334
1335	/* Make sure we don't throw away any dirty pages. */
1336	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1337		int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1338
1339		if (result != VDO_SUCCESS)
1340			return result;
1341	}
1342
1343	/* Reset the page map by re-allocating it. */
1344	vdo_int_map_free(vdo_forget(cache->page_map));
1345	return vdo_int_map_create(cache->page_count, &cache->page_map);
1346}
1347
1348/**
1349 * get_tree_page_by_index() - Get the tree page for a given height and page index.
1350 *
1351 * Return: The requested page.
1352 */
1353static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1354							      root_count_t root_index,
1355							      height_t height,
1356							      page_number_t page_index)
1357{
1358	page_number_t offset = 0;
1359	size_t segment;
1360
1361	for (segment = 0; segment < forest->segments; segment++) {
1362		page_number_t border = forest->boundaries[segment].levels[height - 1];
1363
1364		if (page_index < border) {
1365			struct block_map_tree *tree = &forest->trees[root_index];
1366
1367			return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1368		}
1369
1370		offset = border;
1371	}
1372
1373	return NULL;
1374}
1375
1376/* Get the page referred to by the lock's tree slot at its current height. */
1377static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1378					      const struct tree_lock *lock)
1379{
1380	return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1381				      lock->height,
1382				      lock->tree_slots[lock->height].page_index);
1383}
1384
1385/** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
1386bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1387			 physical_block_number_t pbn,
1388			 struct block_map_page *page)
1389{
1390	struct block_map_page *loaded = (struct block_map_page *) buffer;
1391	enum block_map_page_validity validity =
1392		vdo_validate_block_map_page(loaded, nonce, pbn);
1393
1394	if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1395		memcpy(page, loaded, VDO_BLOCK_SIZE);
1396		return true;
1397	}
1398
1399	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1400		vdo_log_error_strerror(VDO_BAD_PAGE,
1401				       "Expected page %llu but got page %llu instead",
1402				       (unsigned long long) pbn,
1403				       (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1404	}
1405
1406	return false;
1407}
1408
1409/**
1410 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1411 *                     a cyclic range of values from 0 to (modulus - 1).
1412 * @lower: The lowest value to accept.
1413 * @value: The value to check.
1414 * @upper: The highest value to accept.
1415 * @modulus: The size of the cyclic space, no more than 2^15.
1416 *
1417 * The value and both bounds must be smaller than the modulus.
1418 *
1419 * Return: true if the value is in range.
1420 */
1421static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1422{
1423	if (value < lower)
1424		value += modulus;
1425	if (upper < lower)
1426		upper += modulus;
1427	return (value <= upper);
1428}
1429
1430/**
1431 * is_not_older() - Check whether a generation is strictly older than some other generation in the
1432 *                  context of a zone's current generation range.
1433 * @zone: The zone in which to do the comparison.
1434 * @a: The generation in question.
1435 * @b: The generation to compare to.
1436 *
1437 * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1438 */
1439static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1440{
1441	int result;
1442
1443	result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1444			     in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1445			    "generation(s) %u, %u are out of range [%u, %u]",
1446			    a, b, zone->oldest_generation, zone->generation);
1447	if (result != VDO_SUCCESS) {
1448		enter_zone_read_only_mode(zone, result);
1449		return true;
1450	}
1451
1452	return in_cyclic_range(b, a, zone->generation, 1 << 8);
1453}
1454
1455static void release_generation(struct block_map_zone *zone, u8 generation)
1456{
1457	int result;
1458
1459	result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1460			    "dirty page count underflow for generation %u", generation);
1461	if (result != VDO_SUCCESS) {
1462		enter_zone_read_only_mode(zone, result);
1463		return;
1464	}
1465
1466	zone->dirty_page_counts[generation]--;
1467	while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1468	       (zone->oldest_generation != zone->generation))
1469		zone->oldest_generation++;
1470}
1471
1472static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1473			   u8 new_generation)
1474{
1475	u32 new_count;
1476	int result;
1477	bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1478	u8 old_generation = page->generation;
1479
1480	if (decrement_old && (old_generation == new_generation))
1481		return;
1482
1483	page->generation = new_generation;
1484	new_count = ++zone->dirty_page_counts[new_generation];
1485	result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1486			    new_generation);
1487	if (result != VDO_SUCCESS) {
1488		enter_zone_read_only_mode(zone, result);
1489		return;
1490	}
1491
1492	if (decrement_old)
1493		release_generation(zone, old_generation);
1494}
1495
1496static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1497
1498/* Implements waiter_callback_fn */
1499static void write_page_callback(struct vdo_waiter *waiter, void *context)
1500{
1501	write_page(container_of(waiter, struct tree_page, waiter), context);
1502}
1503
1504static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1505{
1506	waiter->callback = write_page_callback;
1507	acquire_vio_from_pool(zone->vio_pool, waiter);
1508}
1509
1510/* Return: true if all possible generations were not already active */
1511static bool attempt_increment(struct block_map_zone *zone)
1512{
1513	u8 generation = zone->generation + 1;
1514
1515	if (zone->oldest_generation == generation)
1516		return false;
1517
1518	zone->generation = generation;
1519	return true;
1520}
1521
1522/* Launches a flush if one is not already in progress. */
1523static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1524{
1525	if ((zone->flusher == NULL) && attempt_increment(zone)) {
1526		zone->flusher = page;
1527		acquire_vio(&page->waiter, zone);
1528		return;
1529	}
1530
1531	vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1532}
1533
1534static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1535{
1536	struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1537	struct write_if_not_dirtied_context *write_context = context;
1538
1539	if (page->generation == write_context->generation) {
1540		acquire_vio(waiter, write_context->zone);
1541		return;
1542	}
1543
1544	enqueue_page(page, write_context->zone);
1545}
1546
1547static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1548{
1549	return_vio_to_pool(zone->vio_pool, vio);
1550	check_for_drain_complete(zone);
1551}
1552
1553/* This callback is registered in write_initialized_page(). */
1554static void finish_page_write(struct vdo_completion *completion)
1555{
1556	bool dirty;
1557	struct vio *vio = as_vio(completion);
1558	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1559	struct tree_page *page = completion->parent;
1560	struct block_map_zone *zone = pooled->context;
1561
1562	vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1563						     page->writing_recovery_lock,
1564						     VDO_ZONE_TYPE_LOGICAL,
1565						     zone->zone_number);
1566
1567	dirty = (page->writing_generation != page->generation);
1568	release_generation(zone, page->writing_generation);
1569	page->writing = false;
1570
1571	if (zone->flusher == page) {
1572		struct write_if_not_dirtied_context context = {
1573			.zone = zone,
1574			.generation = page->writing_generation,
1575		};
1576
1577		vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1578					     write_page_if_not_dirtied, &context);
1579		if (dirty && attempt_increment(zone)) {
1580			write_page(page, pooled);
1581			return;
1582		}
1583
1584		zone->flusher = NULL;
1585	}
1586
1587	if (dirty) {
1588		enqueue_page(page, zone);
1589	} else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1590		   attempt_increment(zone)) {
1591		zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1592					     struct tree_page, waiter);
1593		write_page(zone->flusher, pooled);
1594		return;
1595	}
1596
1597	return_to_pool(zone, pooled);
1598}
1599
1600static void handle_write_error(struct vdo_completion *completion)
1601{
1602	int result = completion->result;
1603	struct vio *vio = as_vio(completion);
1604	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1605	struct block_map_zone *zone = pooled->context;
1606
1607	vio_record_metadata_io_error(vio);
1608	enter_zone_read_only_mode(zone, result);
1609	return_to_pool(zone, pooled);
1610}
1611
1612static void write_page_endio(struct bio *bio);
1613
1614static void write_initialized_page(struct vdo_completion *completion)
1615{
1616	struct vio *vio = as_vio(completion);
1617	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1618	struct block_map_zone *zone = pooled->context;
1619	struct tree_page *tree_page = completion->parent;
1620	struct block_map_page *page = (struct block_map_page *) vio->data;
1621	blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1622
1623	/*
1624	 * Now that we know the page has been written at least once, mark the copy we are writing
1625	 * as initialized.
1626	 */
1627	page->header.initialized = true;
1628
1629	if (zone->flusher == tree_page)
1630		operation |= REQ_PREFLUSH;
1631
1632	vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1633				write_page_endio, handle_write_error,
1634				operation);
1635}
1636
1637static void write_page_endio(struct bio *bio)
1638{
1639	struct pooled_vio *vio = bio->bi_private;
1640	struct block_map_zone *zone = vio->context;
1641	struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1642
1643	continue_vio_after_io(&vio->vio,
1644			      (page->header.initialized ?
1645			       finish_page_write : write_initialized_page),
1646			      zone->thread_id);
1647}
1648
1649static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1650{
1651	struct vdo_completion *completion = &vio->vio.completion;
1652	struct block_map_zone *zone = vio->context;
1653	struct block_map_page *page = vdo_as_block_map_page(tree_page);
1654
1655	if ((zone->flusher != tree_page) &&
1656	    is_not_older(zone, tree_page->generation, zone->generation)) {
1657		/*
1658		 * This page was re-dirtied after the last flush was issued, hence we need to do
1659		 * another flush.
1660		 */
1661		enqueue_page(tree_page, zone);
1662		return_to_pool(zone, vio);
1663		return;
1664	}
1665
1666	completion->parent = tree_page;
1667	memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1668	completion->callback_thread_id = zone->thread_id;
1669
1670	tree_page->writing = true;
1671	tree_page->writing_generation = tree_page->generation;
1672	tree_page->writing_recovery_lock = tree_page->recovery_lock;
1673
1674	/* Clear this now so that we know this page is not on any dirty list. */
1675	tree_page->recovery_lock = 0;
1676
1677	/*
1678	 * We've already copied the page into the vio which will write it, so if it was not yet
1679	 * initialized, the first write will indicate that (for torn write protection). It is now
1680	 * safe to mark it as initialized in memory since if the write fails, the in memory state
1681	 * will become irrelevant.
1682	 */
1683	if (page->header.initialized) {
1684		write_initialized_page(completion);
1685		return;
1686	}
1687
1688	page->header.initialized = true;
1689	vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1690				write_page_endio, handle_write_error,
1691				REQ_OP_WRITE | REQ_PRIO);
1692}
1693
1694/* Release a lock on a page which was being loaded or allocated. */
1695static void release_page_lock(struct data_vio *data_vio, char *what)
1696{
1697	struct block_map_zone *zone;
1698	struct tree_lock *lock_holder;
1699	struct tree_lock *lock = &data_vio->tree_lock;
1700
1701	VDO_ASSERT_LOG_ONLY(lock->locked,
1702			    "release of unlocked block map page %s for key %llu in tree %u",
1703			    what, (unsigned long long) lock->key, lock->root_index);
1704
1705	zone = data_vio->logical.zone->block_map_zone;
1706	lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1707	VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1708			    "block map page %s mismatch for key %llu in tree %u",
1709			    what, (unsigned long long) lock->key, lock->root_index);
1710	lock->locked = false;
1711}
1712
1713static void finish_lookup(struct data_vio *data_vio, int result)
1714{
1715	data_vio->tree_lock.height = 0;
1716
1717	--data_vio->logical.zone->block_map_zone->active_lookups;
1718
1719	set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1720	data_vio->vio.completion.error_handler = handle_data_vio_error;
1721	continue_data_vio_with_error(data_vio, result);
1722}
1723
1724static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1725{
1726	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1727	int result = *((int *) context);
1728
1729	if (!data_vio->write) {
1730		if (result == VDO_NO_SPACE)
1731			result = VDO_SUCCESS;
1732	} else if (result != VDO_NO_SPACE) {
1733		result = VDO_READ_ONLY;
1734	}
1735
1736	finish_lookup(data_vio, result);
1737}
1738
1739static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1740{
1741	if (result != VDO_NO_SPACE)
1742		enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1743
1744	if (data_vio->tree_lock.locked) {
1745		release_page_lock(data_vio, what);
1746		vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1747					     abort_lookup_for_waiter,
1748					     &result);
1749	}
1750
1751	finish_lookup(data_vio, result);
1752}
1753
1754static void abort_load(struct data_vio *data_vio, int result)
1755{
1756	abort_lookup(data_vio, result, "load");
1757}
1758
1759static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1760					       const struct data_location *mapping,
1761					       height_t height)
1762{
1763	if (!vdo_is_valid_location(mapping) ||
1764	    vdo_is_state_compressed(mapping->state) ||
1765	    (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1766		return true;
1767
1768	/* Roots aren't physical data blocks, so we can't check their PBNs. */
1769	if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1770		return false;
1771
1772	return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1773}
1774
1775static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1776static void allocate_block_map_page(struct block_map_zone *zone,
1777				    struct data_vio *data_vio);
1778
1779static void continue_with_loaded_page(struct data_vio *data_vio,
1780				      struct block_map_page *page)
1781{
1782	struct tree_lock *lock = &data_vio->tree_lock;
1783	struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1784	struct data_location mapping =
1785		vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1786
1787	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1788		vdo_log_error_strerror(VDO_BAD_MAPPING,
1789				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1790				       (unsigned long long) mapping.pbn, mapping.state,
1791				       lock->tree_slots[lock->height - 1].page_index,
1792				       lock->height - 1);
1793		abort_load(data_vio, VDO_BAD_MAPPING);
1794		return;
1795	}
1796
1797	if (!vdo_is_mapped_location(&mapping)) {
1798		/* The page we need is unallocated */
1799		allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1800					data_vio);
1801		return;
1802	}
1803
1804	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1805	if (lock->height == 1) {
1806		finish_lookup(data_vio, VDO_SUCCESS);
1807		return;
1808	}
1809
1810	/* We know what page we need to load next */
1811	load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1812}
1813
1814static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1815{
1816	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1817
1818	data_vio->tree_lock.height--;
1819	continue_with_loaded_page(data_vio, context);
1820}
1821
1822static void finish_block_map_page_load(struct vdo_completion *completion)
1823{
1824	physical_block_number_t pbn;
1825	struct tree_page *tree_page;
1826	struct block_map_page *page;
1827	nonce_t nonce;
1828	struct vio *vio = as_vio(completion);
1829	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1830	struct data_vio *data_vio = completion->parent;
1831	struct block_map_zone *zone = pooled->context;
1832	struct tree_lock *tree_lock = &data_vio->tree_lock;
1833
1834	tree_lock->height--;
1835	pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1836	tree_page = get_tree_page(zone, tree_lock);
1837	page = (struct block_map_page *) tree_page->page_buffer;
1838	nonce = zone->block_map->nonce;
1839
1840	if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1841		vdo_format_block_map_page(page, nonce, pbn, false);
1842	return_vio_to_pool(zone->vio_pool, pooled);
1843
1844	/* Release our claim to the load and wake any waiters */
1845	release_page_lock(data_vio, "load");
1846	vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1847	continue_with_loaded_page(data_vio, page);
1848}
1849
1850static void handle_io_error(struct vdo_completion *completion)
1851{
1852	int result = completion->result;
1853	struct vio *vio = as_vio(completion);
1854	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1855	struct data_vio *data_vio = completion->parent;
1856	struct block_map_zone *zone = pooled->context;
1857
1858	vio_record_metadata_io_error(vio);
1859	return_vio_to_pool(zone->vio_pool, pooled);
1860	abort_load(data_vio, result);
1861}
1862
1863static void load_page_endio(struct bio *bio)
1864{
1865	struct vio *vio = bio->bi_private;
1866	struct data_vio *data_vio = vio->completion.parent;
1867
1868	continue_vio_after_io(vio, finish_block_map_page_load,
1869			      data_vio->logical.zone->thread_id);
1870}
1871
1872static void load_page(struct vdo_waiter *waiter, void *context)
1873{
1874	struct pooled_vio *pooled = context;
1875	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1876	struct tree_lock *lock = &data_vio->tree_lock;
1877	physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1878
1879	pooled->vio.completion.parent = data_vio;
1880	vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1881				handle_io_error, REQ_OP_READ | REQ_PRIO);
1882}
1883
1884/*
1885 * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1886 * acquired, @data_vio->tree_lock.locked will be true.
1887 */
1888static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1889{
1890	int result;
1891	struct tree_lock *lock_holder;
1892	struct tree_lock *lock = &data_vio->tree_lock;
1893	height_t height = lock->height;
1894	struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1895	union page_key key;
1896
1897	key.descriptor = (struct page_descriptor) {
1898		.root_index = lock->root_index,
1899		.height = height,
1900		.page_index = tree_slot.page_index,
1901		.slot = tree_slot.block_map_slot.slot,
1902	};
1903	lock->key = key.key;
1904
1905	result = vdo_int_map_put(zone->loading_pages, lock->key,
1906				 lock, false, (void **) &lock_holder);
1907	if (result != VDO_SUCCESS)
1908		return result;
1909
1910	if (lock_holder == NULL) {
1911		/* We got the lock */
1912		data_vio->tree_lock.locked = true;
1913		return VDO_SUCCESS;
1914	}
1915
1916	/* Someone else is loading or allocating the page we need */
1917	vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1918	return VDO_SUCCESS;
1919}
1920
1921/* Load a block map tree page from disk, for the next level in the data vio tree lock. */
1922static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1923{
1924	int result;
1925
1926	result = attempt_page_lock(zone, data_vio);
1927	if (result != VDO_SUCCESS) {
1928		abort_load(data_vio, result);
1929		return;
1930	}
1931
1932	if (data_vio->tree_lock.locked) {
1933		data_vio->waiter.callback = load_page;
1934		acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1935	}
1936}
1937
1938static void allocation_failure(struct vdo_completion *completion)
1939{
1940	struct data_vio *data_vio = as_data_vio(completion);
1941
1942	if (vdo_requeue_completion_if_needed(completion,
1943					     data_vio->logical.zone->thread_id))
1944		return;
1945
1946	abort_lookup(data_vio, completion->result, "allocation");
1947}
1948
1949static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1950{
1951	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1952	struct tree_lock *tree_lock = &data_vio->tree_lock;
1953	physical_block_number_t pbn = *((physical_block_number_t *) context);
1954
1955	tree_lock->height--;
1956	data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1957
1958	if (tree_lock->height == 0) {
1959		finish_lookup(data_vio, VDO_SUCCESS);
1960		return;
1961	}
1962
1963	allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1964}
1965
1966/** expire_oldest_list() - Expire the oldest list. */
1967static void expire_oldest_list(struct dirty_lists *dirty_lists)
1968{
1969	block_count_t i = dirty_lists->offset++;
1970
1971	dirty_lists->oldest_period++;
1972	if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1973		list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1974				      &dirty_lists->expired[VDO_TREE_PAGE]);
1975	}
1976
1977	if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1978		list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1979				      &dirty_lists->expired[VDO_CACHE_PAGE]);
1980	}
1981
1982	if (dirty_lists->offset == dirty_lists->maximum_age)
1983		dirty_lists->offset = 0;
1984}
1985
1986
1987/** update_period() - Update the dirty_lists period if necessary. */
1988static void update_period(struct dirty_lists *dirty, sequence_number_t period)
1989{
1990	while (dirty->next_period <= period) {
1991		if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
1992			expire_oldest_list(dirty);
1993		dirty->next_period++;
1994	}
1995}
1996
1997/** write_expired_elements() - Write out the expired list. */
1998static void write_expired_elements(struct block_map_zone *zone)
1999{
2000	struct tree_page *page, *ttmp;
2001	struct page_info *info, *ptmp;
2002	struct list_head *expired;
2003	u8 generation = zone->generation;
2004
2005	expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2006	list_for_each_entry_safe(page, ttmp, expired, entry) {
2007		int result;
2008
2009		list_del_init(&page->entry);
2010
2011		result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2012				    "Newly expired page not already waiting to write");
2013		if (result != VDO_SUCCESS) {
2014			enter_zone_read_only_mode(zone, result);
2015			continue;
2016		}
2017
2018		set_generation(zone, page, generation);
2019		if (!page->writing)
2020			enqueue_page(page, zone);
2021	}
2022
2023	expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2024	list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2025		list_del_init(&info->state_entry);
2026		schedule_page_save(info);
2027	}
2028
2029	save_pages(&zone->page_cache);
2030}
2031
2032/**
2033 * add_to_dirty_lists() - Add an element to the dirty lists.
2034 * @zone: The zone in which we are operating.
2035 * @entry: The list entry of the element to add.
2036 * @type: The type of page.
2037 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2038 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2039 *              lock.
2040 */
2041static void add_to_dirty_lists(struct block_map_zone *zone,
2042			       struct list_head *entry,
2043			       enum block_map_page_type type,
2044			       sequence_number_t old_period,
2045			       sequence_number_t new_period)
2046{
2047	struct dirty_lists *dirty_lists = zone->dirty_lists;
2048
2049	if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2050		return;
2051
2052	if (new_period < dirty_lists->oldest_period) {
2053		list_move_tail(entry, &dirty_lists->expired[type]);
2054	} else {
2055		update_period(dirty_lists, new_period);
2056		list_move_tail(entry,
2057			       &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2058	}
2059
2060	write_expired_elements(zone);
2061}
2062
2063/*
2064 * Record the allocation in the tree and wake any waiters now that the write lock has been
2065 * released.
2066 */
2067static void finish_block_map_allocation(struct vdo_completion *completion)
2068{
2069	physical_block_number_t pbn;
2070	struct tree_page *tree_page;
2071	struct block_map_page *page;
2072	sequence_number_t old_lock;
2073	struct data_vio *data_vio = as_data_vio(completion);
2074	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2075	struct tree_lock *tree_lock = &data_vio->tree_lock;
2076	height_t height = tree_lock->height;
2077
2078	assert_data_vio_in_logical_zone(data_vio);
2079
2080	tree_page = get_tree_page(zone, tree_lock);
2081	pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2082
2083	/* Record the allocation. */
2084	page = (struct block_map_page *) tree_page->page_buffer;
2085	old_lock = tree_page->recovery_lock;
2086	vdo_update_block_map_page(page, data_vio, pbn,
2087				  VDO_MAPPING_STATE_UNCOMPRESSED,
2088				  &tree_page->recovery_lock);
2089
2090	if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2091		/* This page is waiting to be written out. */
2092		if (zone->flusher != tree_page) {
2093			/*
2094			 * The outstanding flush won't cover the update we just made,
2095			 * so mark the page as needing another flush.
2096			 */
2097			set_generation(zone, tree_page, zone->generation);
2098		}
2099	} else {
2100		/* Put the page on a dirty list */
2101		if (old_lock == 0)
2102			INIT_LIST_HEAD(&tree_page->entry);
2103		add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2104				   old_lock, tree_page->recovery_lock);
2105	}
2106
2107	tree_lock->height--;
2108	if (height > 1) {
2109		/* Format the interior node we just allocated (in memory). */
2110		tree_page = get_tree_page(zone, tree_lock);
2111		vdo_format_block_map_page(tree_page->page_buffer,
2112					  zone->block_map->nonce,
2113					  pbn, false);
2114	}
2115
2116	/* Release our claim to the allocation and wake any waiters */
2117	release_page_lock(data_vio, "allocation");
2118	vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2119				     continue_allocation_for_waiter, &pbn);
2120	if (tree_lock->height == 0) {
2121		finish_lookup(data_vio, VDO_SUCCESS);
2122		return;
2123	}
2124
2125	allocate_block_map_page(zone, data_vio);
2126}
2127
2128static void release_block_map_write_lock(struct vdo_completion *completion)
2129{
2130	struct data_vio *data_vio = as_data_vio(completion);
2131
2132	assert_data_vio_in_allocated_zone(data_vio);
2133
2134	release_data_vio_allocation_lock(data_vio, true);
2135	launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2136}
2137
2138/*
2139 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2140 * to prevent deduplication against the block after we release the write lock on it, but before we
2141 * write out the page.
2142 */
2143static void set_block_map_page_reference_count(struct vdo_completion *completion)
2144{
2145	struct data_vio *data_vio = as_data_vio(completion);
2146
2147	assert_data_vio_in_allocated_zone(data_vio);
2148
2149	completion->callback = release_block_map_write_lock;
2150	vdo_modify_reference_count(completion, &data_vio->increment_updater);
2151}
2152
2153static void journal_block_map_allocation(struct vdo_completion *completion)
2154{
2155	struct data_vio *data_vio = as_data_vio(completion);
2156
2157	assert_data_vio_in_journal_zone(data_vio);
2158
2159	set_data_vio_allocated_zone_callback(data_vio,
2160					     set_block_map_page_reference_count);
2161	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2162}
2163
2164static void allocate_block(struct vdo_completion *completion)
2165{
2166	struct data_vio *data_vio = as_data_vio(completion);
2167	struct tree_lock *lock = &data_vio->tree_lock;
2168	physical_block_number_t pbn;
2169
2170	assert_data_vio_in_allocated_zone(data_vio);
2171
2172	if (!vdo_allocate_block_in_zone(data_vio))
2173		return;
2174
2175	pbn = data_vio->allocation.pbn;
2176	lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2177	data_vio->increment_updater = (struct reference_updater) {
2178		.operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2179		.increment = true,
2180		.zpbn = {
2181			.pbn = pbn,
2182			.state = VDO_MAPPING_STATE_UNCOMPRESSED,
2183		},
2184		.lock = data_vio->allocation.lock,
2185	};
2186
2187	launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2188}
2189
2190static void allocate_block_map_page(struct block_map_zone *zone,
2191				    struct data_vio *data_vio)
2192{
2193	int result;
2194
2195	if (!data_vio->write || data_vio->is_discard) {
2196		/* This is a pure read or a discard, so there's nothing left to do here. */
2197		finish_lookup(data_vio, VDO_SUCCESS);
2198		return;
2199	}
2200
2201	result = attempt_page_lock(zone, data_vio);
2202	if (result != VDO_SUCCESS) {
2203		abort_lookup(data_vio, result, "allocation");
2204		return;
2205	}
2206
2207	if (!data_vio->tree_lock.locked)
2208		return;
2209
2210	data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2211				     allocate_block, allocation_failure);
2212}
2213
2214/**
2215 * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2216 *                             resides and cache that result in the data_vio.
2217 *
2218 * All ancestors in the tree will be allocated or loaded, as needed.
2219 */
2220void vdo_find_block_map_slot(struct data_vio *data_vio)
2221{
2222	page_number_t page_index;
2223	struct block_map_tree_slot tree_slot;
2224	struct data_location mapping;
2225	struct block_map_page *page = NULL;
2226	struct tree_lock *lock = &data_vio->tree_lock;
2227	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2228
2229	zone->active_lookups++;
2230	if (vdo_is_state_draining(&zone->state)) {
2231		finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2232		return;
2233	}
2234
2235	lock->tree_slots[0].block_map_slot.slot =
2236		data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2237	page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2238	tree_slot = (struct block_map_tree_slot) {
2239		.page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2240		.block_map_slot = {
2241			.pbn = 0,
2242			.slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2243		},
2244	};
2245
2246	for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2247		physical_block_number_t pbn;
2248
2249		lock->tree_slots[lock->height] = tree_slot;
2250		page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2251		pbn = vdo_get_block_map_page_pbn(page);
2252		if (pbn != VDO_ZERO_BLOCK) {
2253			lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2254			break;
2255		}
2256
2257		/* Calculate the index and slot for the next level. */
2258		tree_slot.block_map_slot.slot =
2259			tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2260		tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2261	}
2262
2263	/* The page at this height has been allocated and loaded. */
2264	mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2265	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2266		vdo_log_error_strerror(VDO_BAD_MAPPING,
2267				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2268				       (unsigned long long) mapping.pbn, mapping.state,
2269				       lock->tree_slots[lock->height - 1].page_index,
2270				       lock->height - 1);
2271		abort_load(data_vio, VDO_BAD_MAPPING);
2272		return;
2273	}
2274
2275	if (!vdo_is_mapped_location(&mapping)) {
2276		/* The page we want one level down has not been allocated, so allocate it. */
2277		allocate_block_map_page(zone, data_vio);
2278		return;
2279	}
2280
2281	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2282	if (lock->height == 1) {
2283		/* This is the ultimate block map page, so we're done */
2284		finish_lookup(data_vio, VDO_SUCCESS);
2285		return;
2286	}
2287
2288	/* We know what page we need to load. */
2289	load_block_map_page(zone, data_vio);
2290}
2291
2292/*
2293 * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2294 * pages have been loaded, otherwise, it may give the wrong answer (0).
2295 */
2296physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2297						    page_number_t page_number)
2298{
2299	struct data_location mapping;
2300	struct tree_page *tree_page;
2301	struct block_map_page *page;
2302	root_count_t root_index = page_number % map->root_count;
2303	page_number_t page_index = page_number / map->root_count;
2304	slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2305
2306	page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2307
2308	tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2309	page = (struct block_map_page *) tree_page->page_buffer;
2310	if (!page->header.initialized)
2311		return VDO_ZERO_BLOCK;
2312
2313	mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2314	if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2315		return VDO_ZERO_BLOCK;
2316	return mapping.pbn;
2317}
2318
2319/*
2320 * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2321 * method is used when correcting errors in the tree during read-only rebuild.
2322 */
2323void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2324{
2325	bool waiting = vdo_waiter_is_waiting(&page->waiter);
2326
2327	if (waiting && (zone->flusher == page))
2328		return;
2329
2330	set_generation(zone, page, zone->generation);
2331	if (waiting || page->writing)
2332		return;
2333
2334	enqueue_page(page, zone);
2335}
2336
2337static int make_segment(struct forest *old_forest, block_count_t new_pages,
2338			struct boundary *new_boundary, struct forest *forest)
2339{
2340	size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2341	struct tree_page *page_ptr;
2342	page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2343	height_t height;
2344	root_count_t root;
2345	int result;
2346
2347	forest->segments = index + 1;
2348
2349	result = vdo_allocate(forest->segments, struct boundary,
2350			      "forest boundary array", &forest->boundaries);
2351	if (result != VDO_SUCCESS)
2352		return result;
2353
2354	result = vdo_allocate(forest->segments, struct tree_page *,
2355			      "forest page pointers", &forest->pages);
2356	if (result != VDO_SUCCESS)
2357		return result;
2358
2359	result = vdo_allocate(new_pages, struct tree_page,
2360			      "new forest pages", &forest->pages[index]);
2361	if (result != VDO_SUCCESS)
2362		return result;
2363
2364	if (index > 0) {
2365		memcpy(forest->boundaries, old_forest->boundaries,
2366		       index * sizeof(struct boundary));
2367		memcpy(forest->pages, old_forest->pages,
2368		       index * sizeof(struct tree_page *));
2369	}
2370
2371	memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2372
2373	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2374		segment_sizes[height] = new_boundary->levels[height];
2375		if (index > 0)
2376			segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2377	}
2378
2379	page_ptr = forest->pages[index];
2380	for (root = 0; root < forest->map->root_count; root++) {
2381		struct block_map_tree_segment *segment;
2382		struct block_map_tree *tree = &(forest->trees[root]);
2383		height_t height;
2384
2385		int result = vdo_allocate(forest->segments,
2386					  struct block_map_tree_segment,
2387					  "tree root segments", &tree->segments);
2388		if (result != VDO_SUCCESS)
2389			return result;
2390
2391		if (index > 0) {
2392			memcpy(tree->segments, old_forest->trees[root].segments,
2393			       index * sizeof(struct block_map_tree_segment));
2394		}
2395
2396		segment = &(tree->segments[index]);
2397		for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2398			if (segment_sizes[height] == 0)
2399				continue;
2400
2401			segment->levels[height] = page_ptr;
2402			if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2403				/* Record the root. */
2404				struct block_map_page *page =
2405					vdo_format_block_map_page(page_ptr->page_buffer,
2406								  forest->map->nonce,
2407								  VDO_INVALID_PBN, true);
2408				page->entries[0] =
2409					vdo_pack_block_map_entry(forest->map->root_origin + root,
2410								 VDO_MAPPING_STATE_UNCOMPRESSED);
2411			}
2412			page_ptr += segment_sizes[height];
2413		}
2414	}
2415
2416	return VDO_SUCCESS;
2417}
2418
2419static void deforest(struct forest *forest, size_t first_page_segment)
2420{
2421	root_count_t root;
2422
2423	if (forest->pages != NULL) {
2424		size_t segment;
2425
2426		for (segment = first_page_segment; segment < forest->segments; segment++)
2427			vdo_free(forest->pages[segment]);
2428		vdo_free(forest->pages);
2429	}
2430
2431	for (root = 0; root < forest->map->root_count; root++)
2432		vdo_free(forest->trees[root].segments);
2433
2434	vdo_free(forest->boundaries);
2435	vdo_free(forest);
2436}
2437
2438/**
2439 * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2440 *                 there is one.
2441 * @entries: The number of entries the block map will hold.
2442 *
2443 * Return: VDO_SUCCESS or an error.
2444 */
2445static int make_forest(struct block_map *map, block_count_t entries)
2446{
2447	struct forest *forest, *old_forest = map->forest;
2448	struct boundary new_boundary, *old_boundary = NULL;
2449	block_count_t new_pages;
2450	int result;
2451
2452	if (old_forest != NULL)
2453		old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2454
2455	new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2456						 entries, &new_boundary);
2457	if (new_pages == 0) {
2458		map->next_entry_count = entries;
2459		return VDO_SUCCESS;
2460	}
2461
2462	result = vdo_allocate_extended(struct forest, map->root_count,
2463				       struct block_map_tree, __func__,
2464				       &forest);
2465	if (result != VDO_SUCCESS)
2466		return result;
2467
2468	forest->map = map;
2469	result = make_segment(old_forest, new_pages, &new_boundary, forest);
2470	if (result != VDO_SUCCESS) {
2471		deforest(forest, forest->segments - 1);
2472		return result;
2473	}
2474
2475	map->next_forest = forest;
2476	map->next_entry_count = entries;
2477	return VDO_SUCCESS;
2478}
2479
2480/**
2481 * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2482 */
2483static void replace_forest(struct block_map *map)
2484{
2485	if (map->next_forest != NULL) {
2486		if (map->forest != NULL)
2487			deforest(map->forest, map->forest->segments);
2488		map->forest = vdo_forget(map->next_forest);
2489	}
2490
2491	map->entry_count = map->next_entry_count;
2492	map->next_entry_count = 0;
2493}
2494
2495/**
2496 * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2497 *                   traversal.
2498 */
2499static void finish_cursor(struct cursor *cursor)
2500{
2501	struct cursors *cursors = cursor->parent;
2502	struct vdo_completion *completion = cursors->completion;
2503
2504	return_vio_to_pool(cursors->pool, vdo_forget(cursor->vio));
2505	if (--cursors->active_roots > 0)
2506		return;
2507
2508	vdo_free(cursors);
2509
2510	vdo_finish_completion(completion);
2511}
2512
2513static void traverse(struct cursor *cursor);
2514
2515/**
2516 * continue_traversal() - Continue traversing a block map tree.
2517 * @completion: The VIO doing a read or write.
2518 */
2519static void continue_traversal(struct vdo_completion *completion)
2520{
2521	vio_record_metadata_io_error(as_vio(completion));
2522	traverse(completion->parent);
2523}
2524
2525/**
2526 * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2527 * @completion: The VIO doing the read.
2528 */
2529static void finish_traversal_load(struct vdo_completion *completion)
2530{
2531	struct cursor *cursor = completion->parent;
2532	height_t height = cursor->height;
2533	struct cursor_level *level = &cursor->levels[height];
2534	struct tree_page *tree_page =
2535		&(cursor->tree->segments[0].levels[height][level->page_index]);
2536	struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2537
2538	vdo_copy_valid_page(cursor->vio->vio.data,
2539			    cursor->parent->zone->block_map->nonce,
2540			    pbn_from_vio_bio(cursor->vio->vio.bio), page);
2541	traverse(cursor);
2542}
2543
2544static void traversal_endio(struct bio *bio)
2545{
2546	struct vio *vio = bio->bi_private;
2547	struct cursor *cursor = vio->completion.parent;
2548
2549	continue_vio_after_io(vio, finish_traversal_load,
2550			      cursor->parent->zone->thread_id);
2551}
2552
2553/**
2554 * traverse() - Traverse a single block map tree.
2555 *
2556 * This is the recursive heart of the traversal process.
2557 */
2558static void traverse(struct cursor *cursor)
2559{
2560	for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2561		height_t height = cursor->height;
2562		struct cursor_level *level = &cursor->levels[height];
2563		struct tree_page *tree_page =
2564			&(cursor->tree->segments[0].levels[height][level->page_index]);
2565		struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2566
2567		if (!page->header.initialized)
2568			continue;
2569
2570		for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2571			struct cursor_level *next_level;
2572			page_number_t entry_index =
2573				(VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2574			struct data_location location =
2575				vdo_unpack_block_map_entry(&page->entries[level->slot]);
2576
2577			if (!vdo_is_valid_location(&location)) {
2578				/* This entry is invalid, so remove it from the page. */
2579				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2580				vdo_write_tree_page(tree_page, cursor->parent->zone);
2581				continue;
2582			}
2583
2584			if (!vdo_is_mapped_location(&location))
2585				continue;
2586
2587			/* Erase mapped entries past the end of the logical space. */
2588			if (entry_index >= cursor->boundary.levels[height]) {
2589				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2590				vdo_write_tree_page(tree_page, cursor->parent->zone);
2591				continue;
2592			}
2593
2594			if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2595				int result = cursor->parent->entry_callback(location.pbn,
2596									    cursor->parent->completion);
2597				if (result != VDO_SUCCESS) {
2598					page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2599					vdo_write_tree_page(tree_page, cursor->parent->zone);
2600					continue;
2601				}
2602			}
2603
2604			if (cursor->height == 0)
2605				continue;
2606
2607			cursor->height--;
2608			next_level = &cursor->levels[cursor->height];
2609			next_level->page_index = entry_index;
2610			next_level->slot = 0;
2611			level->slot++;
2612			vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2613						traversal_endio, continue_traversal,
2614						REQ_OP_READ | REQ_PRIO);
2615			return;
2616		}
2617	}
2618
2619	finish_cursor(cursor);
2620}
2621
2622/**
2623 * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2624 *                   which to load pages.
2625 * @context: The pooled_vio just acquired.
2626 *
2627 * Implements waiter_callback_fn.
2628 */
2629static void launch_cursor(struct vdo_waiter *waiter, void *context)
2630{
2631	struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2632	struct pooled_vio *pooled = context;
2633
2634	cursor->vio = pooled;
2635	pooled->vio.completion.parent = cursor;
2636	pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2637	traverse(cursor);
2638}
2639
2640/**
2641 * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2642 *
2643 * Return: The list of page counts as a boundary structure.
2644 */
2645static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2646{
2647	struct boundary boundary;
2648	height_t height;
2649	page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2650	/*
2651	 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2652	 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2653	 * roots starting from tree 0.
2654	 */
2655	page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2656	page_count_t level_pages = leaf_pages / map->root_count;
2657
2658	if (root_index <= last_tree_root)
2659		level_pages++;
2660
2661	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2662		boundary.levels[height] = level_pages;
2663		level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2664	}
2665
2666	/* The root node always exists, even if the root is otherwise unused. */
2667	boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2668
2669	return boundary;
2670}
2671
2672/**
2673 * vdo_traverse_forest() - Walk the entire forest of a block map.
2674 * @callback: A function to call with the pbn of each allocated node in the forest.
2675 * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2676 */
2677void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2678			 struct vdo_completion *completion)
2679{
2680	root_count_t root;
2681	struct cursors *cursors;
2682	int result;
2683
2684	result = vdo_allocate_extended(struct cursors, map->root_count,
2685				       struct cursor, __func__, &cursors);
2686	if (result != VDO_SUCCESS) {
2687		vdo_fail_completion(completion, result);
2688		return;
2689	}
2690
2691	cursors->zone = &map->zones[0];
2692	cursors->pool = cursors->zone->vio_pool;
2693	cursors->entry_callback = callback;
2694	cursors->completion = completion;
2695	cursors->active_roots = map->root_count;
2696	for (root = 0; root < map->root_count; root++) {
2697		struct cursor *cursor = &cursors->cursors[root];
2698
2699		*cursor = (struct cursor) {
2700			.tree = &map->forest->trees[root],
2701			.height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2702			.parent = cursors,
2703			.boundary = compute_boundary(map, root),
2704		};
2705
2706		cursor->waiter.callback = launch_cursor;
2707		acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2708	}
2709}
2710
2711/**
2712 * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2713 * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2714 *               written out.
2715 */
2716static int __must_check initialize_block_map_zone(struct block_map *map,
2717						  zone_count_t zone_number,
2718						  page_count_t cache_size,
2719						  block_count_t maximum_age)
2720{
2721	int result;
2722	block_count_t i;
2723	struct vdo *vdo = map->vdo;
2724	struct block_map_zone *zone = &map->zones[zone_number];
2725
2726	BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2727
2728	zone->zone_number = zone_number;
2729	zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2730	zone->block_map = map;
2731
2732	result = vdo_allocate_extended(struct dirty_lists, maximum_age,
2733				       dirty_era_t, __func__,
2734				       &zone->dirty_lists);
2735	if (result != VDO_SUCCESS)
2736		return result;
2737
2738	zone->dirty_lists->maximum_age = maximum_age;
2739	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2740	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2741
2742	for (i = 0; i < maximum_age; i++) {
2743		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2744		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2745	}
2746
2747	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2748	if (result != VDO_SUCCESS)
2749		return result;
2750
2751	result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE,
2752			       zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2753			       VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2754	if (result != VDO_SUCCESS)
2755		return result;
2756
2757	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2758
2759	zone->page_cache.zone = zone;
2760	zone->page_cache.vdo = vdo;
2761	zone->page_cache.page_count = cache_size / map->zone_count;
2762	zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2763
2764	result = allocate_cache_components(&zone->page_cache);
2765	if (result != VDO_SUCCESS)
2766		return result;
2767
2768	/* initialize empty circular queues */
2769	INIT_LIST_HEAD(&zone->page_cache.lru_list);
2770	INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2771
2772	return VDO_SUCCESS;
2773}
2774
2775/* Implements vdo_zone_thread_getter_fn */
2776static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2777{
2778	struct block_map *map = context;
2779
2780	return map->zones[zone_number].thread_id;
2781}
2782
2783/* Implements vdo_action_preamble_fn */
2784static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2785{
2786	struct block_map *map = context;
2787
2788	map->current_era_point = map->pending_era_point;
2789	vdo_finish_completion(parent);
2790}
2791
2792/* Implements vdo_zone_action_fn */
2793static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2794				       struct vdo_completion *parent)
2795{
2796	struct block_map *map = context;
2797	struct block_map_zone *zone = &map->zones[zone_number];
2798
2799	update_period(zone->dirty_lists, map->current_era_point);
2800	write_expired_elements(zone);
2801	vdo_finish_completion(parent);
2802}
2803
2804/*
2805 * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2806 * vdo_schedule_default_action() on the block map's action manager.
2807 *
2808 * Implements vdo_action_scheduler_fn.
2809 */
2810static bool schedule_era_advance(void *context)
2811{
2812	struct block_map *map = context;
2813
2814	if (map->current_era_point == map->pending_era_point)
2815		return false;
2816
2817	return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2818				   advance_block_map_zone_era, NULL, NULL);
2819}
2820
2821static void uninitialize_block_map_zone(struct block_map_zone *zone)
2822{
2823	struct vdo_page_cache *cache = &zone->page_cache;
2824
2825	vdo_free(vdo_forget(zone->dirty_lists));
2826	free_vio_pool(vdo_forget(zone->vio_pool));
2827	vdo_int_map_free(vdo_forget(zone->loading_pages));
2828	if (cache->infos != NULL) {
2829		struct page_info *info;
2830
2831		for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2832			free_vio(vdo_forget(info->vio));
2833	}
2834
2835	vdo_int_map_free(vdo_forget(cache->page_map));
2836	vdo_free(vdo_forget(cache->infos));
2837	vdo_free(vdo_forget(cache->pages));
2838}
2839
2840void vdo_free_block_map(struct block_map *map)
2841{
2842	zone_count_t zone;
2843
2844	if (map == NULL)
2845		return;
2846
2847	for (zone = 0; zone < map->zone_count; zone++)
2848		uninitialize_block_map_zone(&map->zones[zone]);
2849
2850	vdo_abandon_block_map_growth(map);
2851	if (map->forest != NULL)
2852		deforest(vdo_forget(map->forest), 0);
2853	vdo_free(vdo_forget(map->action_manager));
2854	vdo_free(map);
2855}
2856
2857/* @journal may be NULL. */
2858int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2859			 struct vdo *vdo, struct recovery_journal *journal,
2860			 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2861			 struct block_map **map_ptr)
2862{
2863	struct block_map *map;
2864	int result;
2865	zone_count_t zone = 0;
2866
2867	BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2868		     ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2869		      sizeof(struct block_map_entry)));
2870	result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2871	if (result != VDO_SUCCESS)
2872		return result;
2873
2874	result = vdo_allocate_extended(struct block_map,
2875				       vdo->thread_config.logical_zone_count,
2876				       struct block_map_zone, __func__, &map);
2877	if (result != VDO_SUCCESS)
2878		return result;
2879
2880	map->vdo = vdo;
2881	map->root_origin = state.root_origin;
2882	map->root_count = state.root_count;
2883	map->entry_count = logical_blocks;
2884	map->journal = journal;
2885	map->nonce = nonce;
2886
2887	result = make_forest(map, map->entry_count);
2888	if (result != VDO_SUCCESS) {
2889		vdo_free_block_map(map);
2890		return result;
2891	}
2892
2893	replace_forest(map);
2894
2895	map->zone_count = vdo->thread_config.logical_zone_count;
2896	for (zone = 0; zone < map->zone_count; zone++) {
2897		result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2898		if (result != VDO_SUCCESS) {
2899			vdo_free_block_map(map);
2900			return result;
2901		}
2902	}
2903
2904	result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2905					 vdo_get_recovery_journal_thread_id(journal),
2906					 map, schedule_era_advance, vdo,
2907					 &map->action_manager);
2908	if (result != VDO_SUCCESS) {
2909		vdo_free_block_map(map);
2910		return result;
2911	}
2912
2913	*map_ptr = map;
2914	return VDO_SUCCESS;
2915}
2916
2917struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2918{
2919	return (struct block_map_state_2_0) {
2920		.flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2921		/* This is the flat page count, which has turned out to always be 0. */
2922		.flat_page_count = 0,
2923		.root_origin = map->root_origin,
2924		.root_count = map->root_count,
2925	};
2926}
2927
2928/* The block map needs to know the journals' sequence number to initialize the eras. */
2929void vdo_initialize_block_map_from_journal(struct block_map *map,
2930					   struct recovery_journal *journal)
2931{
2932	zone_count_t z = 0;
2933
2934	map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2935	map->pending_era_point = map->current_era_point;
2936
2937	for (z = 0; z < map->zone_count; z++) {
2938		struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2939
2940		VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2941		dirty_lists->oldest_period = map->current_era_point;
2942		dirty_lists->next_period = map->current_era_point + 1;
2943		dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2944	}
2945}
2946
2947/* Compute the logical zone for the LBN of a data vio. */
2948zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2949{
2950	struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2951	struct tree_lock *tree_lock = &data_vio->tree_lock;
2952	page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2953
2954	tree_lock->tree_slots[0].page_index = page_number;
2955	tree_lock->root_index = page_number % map->root_count;
2956	return (tree_lock->root_index % map->zone_count);
2957}
2958
2959void vdo_advance_block_map_era(struct block_map *map,
2960			       sequence_number_t recovery_block_number)
2961{
2962	if (map == NULL)
2963		return;
2964
2965	map->pending_era_point = recovery_block_number;
2966	vdo_schedule_default_action(map->action_manager);
2967}
2968
2969/* Implements vdo_admin_initiator_fn */
2970static void initiate_drain(struct admin_state *state)
2971{
2972	struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2973
2974	VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2975			    "%s() called with no active lookups", __func__);
2976
2977	if (!vdo_is_state_suspending(state)) {
2978		while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2979			expire_oldest_list(zone->dirty_lists);
2980		write_expired_elements(zone);
2981	}
2982
2983	check_for_drain_complete(zone);
2984}
2985
2986/* Implements vdo_zone_action_fn. */
2987static void drain_zone(void *context, zone_count_t zone_number,
2988		       struct vdo_completion *parent)
2989{
2990	struct block_map *map = context;
2991	struct block_map_zone *zone = &map->zones[zone_number];
2992
2993	vdo_start_draining(&zone->state,
2994			   vdo_get_current_manager_operation(map->action_manager),
2995			   parent, initiate_drain);
2996}
2997
2998void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
2999			 struct vdo_completion *parent)
3000{
3001	vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3002			       parent);
3003}
3004
3005/* Implements vdo_zone_action_fn. */
3006static void resume_block_map_zone(void *context, zone_count_t zone_number,
3007				  struct vdo_completion *parent)
3008{
3009	struct block_map *map = context;
3010	struct block_map_zone *zone = &map->zones[zone_number];
3011
3012	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3013}
3014
3015void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3016{
3017	vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3018			       NULL, resume_block_map_zone, NULL, parent);
3019}
3020
3021/* Allocate an expanded collection of trees, for a future growth. */
3022int vdo_prepare_to_grow_block_map(struct block_map *map,
3023				  block_count_t new_logical_blocks)
3024{
3025	if (map->next_entry_count == new_logical_blocks)
3026		return VDO_SUCCESS;
3027
3028	if (map->next_entry_count > 0)
3029		vdo_abandon_block_map_growth(map);
3030
3031	if (new_logical_blocks < map->entry_count) {
3032		map->next_entry_count = map->entry_count;
3033		return VDO_SUCCESS;
3034	}
3035
3036	return make_forest(map, new_logical_blocks);
3037}
3038
3039/* Implements vdo_action_preamble_fn */
3040static void grow_forest(void *context, struct vdo_completion *completion)
3041{
3042	replace_forest(context);
3043	vdo_finish_completion(completion);
3044}
3045
3046/* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
3047void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3048{
3049	vdo_schedule_operation(map->action_manager,
3050			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3051			       grow_forest, NULL, NULL, parent);
3052}
3053
3054void vdo_abandon_block_map_growth(struct block_map *map)
3055{
3056	struct forest *forest = vdo_forget(map->next_forest);
3057
3058	if (forest != NULL)
3059		deforest(forest, forest->segments - 1);
3060
3061	map->next_entry_count = 0;
3062}
3063
3064/* Release the page completion and then continue the requester. */
3065static inline void finish_processing_page(struct vdo_completion *completion, int result)
3066{
3067	struct vdo_completion *parent = completion->parent;
3068
3069	vdo_release_page_completion(completion);
3070	vdo_continue_completion(parent, result);
3071}
3072
3073static void handle_page_error(struct vdo_completion *completion)
3074{
3075	finish_processing_page(completion, completion->result);
3076}
3077
3078/* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
3079static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3080			       vdo_action_fn action)
3081{
3082	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3083
3084	if (vdo_is_state_draining(&zone->state)) {
3085		continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3086		return;
3087	}
3088
3089	vdo_get_page(&data_vio->page_completion, zone,
3090		     data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3091		     modifiable, &data_vio->vio.completion,
3092		     action, handle_page_error, false);
3093}
3094
3095/**
3096 * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3097 *
3098 * This indicates the block map entry for the logical block is either unmapped or corrupted.
3099 */
3100static void clear_mapped_location(struct data_vio *data_vio)
3101{
3102	data_vio->mapped = (struct zoned_pbn) {
3103		.state = VDO_MAPPING_STATE_UNMAPPED,
3104	};
3105}
3106
3107/**
3108 * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3109 *                         data_vio.
3110 *
3111 * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3112 *         other failure
3113 */
3114static int __must_check set_mapped_location(struct data_vio *data_vio,
3115					    const struct block_map_entry *entry)
3116{
3117	/* Unpack the PBN for logging purposes even if the entry is invalid. */
3118	struct data_location mapped = vdo_unpack_block_map_entry(entry);
3119
3120	if (vdo_is_valid_location(&mapped)) {
3121		int result;
3122
3123		result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3124					       mapped.pbn, &data_vio->mapped.zone);
3125		if (result == VDO_SUCCESS) {
3126			data_vio->mapped.pbn = mapped.pbn;
3127			data_vio->mapped.state = mapped.state;
3128			return VDO_SUCCESS;
3129		}
3130
3131		/*
3132		 * Return all errors not specifically known to be errors from validating the
3133		 * location.
3134		 */
3135		if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3136			return result;
3137	}
3138
3139	/*
3140	 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3141	 * to VDO_BAD_MAPPING.
3142	 */
3143	vdo_log_error_strerror(VDO_BAD_MAPPING,
3144			       "PBN %llu with state %u read from the block map was invalid",
3145			       (unsigned long long) mapped.pbn, mapped.state);
3146
3147	/*
3148	 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3149	 * known data loss.
3150	 */
3151	if (!data_vio->write)
3152		return VDO_BAD_MAPPING;
3153
3154	/*
3155	 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3156	 * entry rather than fail the write.
3157	 */
3158	clear_mapped_location(data_vio);
3159	return VDO_SUCCESS;
3160}
3161
3162/* This callback is registered in vdo_get_mapped_block(). */
3163static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3164{
3165	int result;
3166	struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3167	const struct block_map_page *page;
3168	const struct block_map_entry *entry;
3169	struct data_vio *data_vio = as_data_vio(completion->parent);
3170	struct block_map_tree_slot *tree_slot;
3171
3172	if (completion->result != VDO_SUCCESS) {
3173		finish_processing_page(completion, completion->result);
3174		return;
3175	}
3176
3177	result = validate_completed_page(vpc, false);
3178	if (result != VDO_SUCCESS) {
3179		finish_processing_page(completion, result);
3180		return;
3181	}
3182
3183	page = (const struct block_map_page *) get_page_buffer(vpc->info);
3184	tree_slot = &data_vio->tree_lock.tree_slots[0];
3185	entry = &page->entries[tree_slot->block_map_slot.slot];
3186
3187	result = set_mapped_location(data_vio, entry);
3188	finish_processing_page(completion, result);
3189}
3190
3191void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3192			       physical_block_number_t pbn,
3193			       enum block_mapping_state mapping_state,
3194			       sequence_number_t *recovery_lock)
3195{
3196	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3197	struct block_map *block_map = zone->block_map;
3198	struct recovery_journal *journal = block_map->journal;
3199	sequence_number_t old_locked, new_locked;
3200	struct tree_lock *tree_lock = &data_vio->tree_lock;
3201
3202	/* Encode the new mapping. */
3203	page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3204		vdo_pack_block_map_entry(pbn, mapping_state);
3205
3206	/* Adjust references on the recovery journal blocks. */
3207	old_locked = *recovery_lock;
3208	new_locked = data_vio->recovery_sequence_number;
3209
3210	if ((old_locked == 0) || (old_locked > new_locked)) {
3211		vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3212							     VDO_ZONE_TYPE_LOGICAL,
3213							     zone->zone_number);
3214
3215		if (old_locked > 0) {
3216			vdo_release_recovery_journal_block_reference(journal, old_locked,
3217								     VDO_ZONE_TYPE_LOGICAL,
3218								     zone->zone_number);
3219		}
3220
3221		*recovery_lock = new_locked;
3222	}
3223
3224	/*
3225	 * FIXME: explain this more
3226	 * Release the transferred lock from the data_vio.
3227	 */
3228	vdo_release_journal_entry_lock(journal, new_locked);
3229	data_vio->recovery_sequence_number = 0;
3230}
3231
3232static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3233{
3234	struct data_vio *data_vio = as_data_vio(completion->parent);
3235	sequence_number_t old_lock;
3236	struct vdo_page_completion *vpc;
3237	struct page_info *info;
3238	int result;
3239
3240	if (completion->result != VDO_SUCCESS) {
3241		finish_processing_page(completion, completion->result);
3242		return;
3243	}
3244
3245	vpc = as_vdo_page_completion(completion);
3246	result = validate_completed_page(vpc, true);
3247	if (result != VDO_SUCCESS) {
3248		finish_processing_page(completion, result);
3249		return;
3250	}
3251
3252	info = vpc->info;
3253	old_lock = info->recovery_lock;
3254	vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3255				  data_vio, data_vio->new_mapped.pbn,
3256				  data_vio->new_mapped.state, &info->recovery_lock);
3257	set_info_state(info, PS_DIRTY);
3258	add_to_dirty_lists(info->cache->zone, &info->state_entry,
3259			   VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3260	finish_processing_page(completion, VDO_SUCCESS);
3261}
3262
3263/* Read a stored block mapping into a data_vio. */
3264void vdo_get_mapped_block(struct data_vio *data_vio)
3265{
3266	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3267		/*
3268		 * We know that the block map page for this LBN has not been allocated, so the
3269		 * block must be unmapped.
3270		 */
3271		clear_mapped_location(data_vio);
3272		continue_data_vio(data_vio);
3273		return;
3274	}
3275
3276	fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3277}
3278
3279/* Update a stored block mapping to reflect a data_vio's new mapping. */
3280void vdo_put_mapped_block(struct data_vio *data_vio)
3281{
3282	fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3283}
3284
3285struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3286{
3287	zone_count_t zone = 0;
3288	struct block_map_statistics totals;
3289
3290	memset(&totals, 0, sizeof(struct block_map_statistics));
3291	for (zone = 0; zone < map->zone_count; zone++) {
3292		const struct block_map_statistics *stats =
3293			&(map->zones[zone].page_cache.stats);
3294
3295		totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3296		totals.clean_pages += READ_ONCE(stats->clean_pages);
3297		totals.free_pages += READ_ONCE(stats->free_pages);
3298		totals.failed_pages += READ_ONCE(stats->failed_pages);
3299		totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3300		totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3301		totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3302		totals.read_count += READ_ONCE(stats->read_count);
3303		totals.write_count += READ_ONCE(stats->write_count);
3304		totals.failed_reads += READ_ONCE(stats->failed_reads);
3305		totals.failed_writes += READ_ONCE(stats->failed_writes);
3306		totals.reclaimed += READ_ONCE(stats->reclaimed);
3307		totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3308		totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3309		totals.discard_required += READ_ONCE(stats->discard_required);
3310		totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3311		totals.fetch_required += READ_ONCE(stats->fetch_required);
3312		totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3313		totals.pages_saved += READ_ONCE(stats->pages_saved);
3314		totals.flush_count += READ_ONCE(stats->flush_count);
3315	}
3316
3317	return totals;
3318}
3319