dbuf.c revision 288541
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc.  All rights reserved.
24 * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
25 * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
26 * Copyright (c) 2013, Joyent, Inc. All rights reserved.
27 */
28
29#include <sys/zfs_context.h>
30#include <sys/dmu.h>
31#include <sys/dmu_send.h>
32#include <sys/dmu_impl.h>
33#include <sys/dbuf.h>
34#include <sys/dmu_objset.h>
35#include <sys/dsl_dataset.h>
36#include <sys/dsl_dir.h>
37#include <sys/dmu_tx.h>
38#include <sys/spa.h>
39#include <sys/zio.h>
40#include <sys/dmu_zfetch.h>
41#include <sys/sa.h>
42#include <sys/sa_impl.h>
43#include <sys/zfeature.h>
44#include <sys/blkptr.h>
45#include <sys/range_tree.h>
46
47/*
48 * Number of times that zfs_free_range() took the slow path while doing
49 * a zfs receive.  A nonzero value indicates a potential performance problem.
50 */
51uint64_t zfs_free_range_recv_miss;
52
53static void dbuf_destroy(dmu_buf_impl_t *db);
54static boolean_t dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx);
55static void dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx);
56
57/*
58 * Global data structures and functions for the dbuf cache.
59 */
60static kmem_cache_t *dbuf_cache;
61
62/* ARGSUSED */
63static int
64dbuf_cons(void *vdb, void *unused, int kmflag)
65{
66	dmu_buf_impl_t *db = vdb;
67	bzero(db, sizeof (dmu_buf_impl_t));
68
69	mutex_init(&db->db_mtx, NULL, MUTEX_DEFAULT, NULL);
70	cv_init(&db->db_changed, NULL, CV_DEFAULT, NULL);
71	refcount_create(&db->db_holds);
72
73	return (0);
74}
75
76/* ARGSUSED */
77static void
78dbuf_dest(void *vdb, void *unused)
79{
80	dmu_buf_impl_t *db = vdb;
81	mutex_destroy(&db->db_mtx);
82	cv_destroy(&db->db_changed);
83	refcount_destroy(&db->db_holds);
84}
85
86/*
87 * dbuf hash table routines
88 */
89static dbuf_hash_table_t dbuf_hash_table;
90
91static uint64_t dbuf_hash_count;
92
93static uint64_t
94dbuf_hash(void *os, uint64_t obj, uint8_t lvl, uint64_t blkid)
95{
96	uintptr_t osv = (uintptr_t)os;
97	uint64_t crc = -1ULL;
98
99	ASSERT(zfs_crc64_table[128] == ZFS_CRC64_POLY);
100	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (lvl)) & 0xFF];
101	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (osv >> 6)) & 0xFF];
102	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (obj >> 0)) & 0xFF];
103	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (obj >> 8)) & 0xFF];
104	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (blkid >> 0)) & 0xFF];
105	crc = (crc >> 8) ^ zfs_crc64_table[(crc ^ (blkid >> 8)) & 0xFF];
106
107	crc ^= (osv>>14) ^ (obj>>16) ^ (blkid>>16);
108
109	return (crc);
110}
111
112#define	DBUF_HASH(os, obj, level, blkid) dbuf_hash(os, obj, level, blkid);
113
114#define	DBUF_EQUAL(dbuf, os, obj, level, blkid)		\
115	((dbuf)->db.db_object == (obj) &&		\
116	(dbuf)->db_objset == (os) &&			\
117	(dbuf)->db_level == (level) &&			\
118	(dbuf)->db_blkid == (blkid))
119
120dmu_buf_impl_t *
121dbuf_find(objset_t *os, uint64_t obj, uint8_t level, uint64_t blkid)
122{
123	dbuf_hash_table_t *h = &dbuf_hash_table;
124	uint64_t hv = DBUF_HASH(os, obj, level, blkid);
125	uint64_t idx = hv & h->hash_table_mask;
126	dmu_buf_impl_t *db;
127
128	mutex_enter(DBUF_HASH_MUTEX(h, idx));
129	for (db = h->hash_table[idx]; db != NULL; db = db->db_hash_next) {
130		if (DBUF_EQUAL(db, os, obj, level, blkid)) {
131			mutex_enter(&db->db_mtx);
132			if (db->db_state != DB_EVICTING) {
133				mutex_exit(DBUF_HASH_MUTEX(h, idx));
134				return (db);
135			}
136			mutex_exit(&db->db_mtx);
137		}
138	}
139	mutex_exit(DBUF_HASH_MUTEX(h, idx));
140	return (NULL);
141}
142
143static dmu_buf_impl_t *
144dbuf_find_bonus(objset_t *os, uint64_t object)
145{
146	dnode_t *dn;
147	dmu_buf_impl_t *db = NULL;
148
149	if (dnode_hold(os, object, FTAG, &dn) == 0) {
150		rw_enter(&dn->dn_struct_rwlock, RW_READER);
151		if (dn->dn_bonus != NULL) {
152			db = dn->dn_bonus;
153			mutex_enter(&db->db_mtx);
154		}
155		rw_exit(&dn->dn_struct_rwlock);
156		dnode_rele(dn, FTAG);
157	}
158	return (db);
159}
160
161/*
162 * Insert an entry into the hash table.  If there is already an element
163 * equal to elem in the hash table, then the already existing element
164 * will be returned and the new element will not be inserted.
165 * Otherwise returns NULL.
166 */
167static dmu_buf_impl_t *
168dbuf_hash_insert(dmu_buf_impl_t *db)
169{
170	dbuf_hash_table_t *h = &dbuf_hash_table;
171	objset_t *os = db->db_objset;
172	uint64_t obj = db->db.db_object;
173	int level = db->db_level;
174	uint64_t blkid = db->db_blkid;
175	uint64_t hv = DBUF_HASH(os, obj, level, blkid);
176	uint64_t idx = hv & h->hash_table_mask;
177	dmu_buf_impl_t *dbf;
178
179	mutex_enter(DBUF_HASH_MUTEX(h, idx));
180	for (dbf = h->hash_table[idx]; dbf != NULL; dbf = dbf->db_hash_next) {
181		if (DBUF_EQUAL(dbf, os, obj, level, blkid)) {
182			mutex_enter(&dbf->db_mtx);
183			if (dbf->db_state != DB_EVICTING) {
184				mutex_exit(DBUF_HASH_MUTEX(h, idx));
185				return (dbf);
186			}
187			mutex_exit(&dbf->db_mtx);
188		}
189	}
190
191	mutex_enter(&db->db_mtx);
192	db->db_hash_next = h->hash_table[idx];
193	h->hash_table[idx] = db;
194	mutex_exit(DBUF_HASH_MUTEX(h, idx));
195	atomic_inc_64(&dbuf_hash_count);
196
197	return (NULL);
198}
199
200/*
201 * Remove an entry from the hash table.  It must be in the EVICTING state.
202 */
203static void
204dbuf_hash_remove(dmu_buf_impl_t *db)
205{
206	dbuf_hash_table_t *h = &dbuf_hash_table;
207	uint64_t hv = DBUF_HASH(db->db_objset, db->db.db_object,
208	    db->db_level, db->db_blkid);
209	uint64_t idx = hv & h->hash_table_mask;
210	dmu_buf_impl_t *dbf, **dbp;
211
212	/*
213	 * We musn't hold db_mtx to maintain lock ordering:
214	 * DBUF_HASH_MUTEX > db_mtx.
215	 */
216	ASSERT(refcount_is_zero(&db->db_holds));
217	ASSERT(db->db_state == DB_EVICTING);
218	ASSERT(!MUTEX_HELD(&db->db_mtx));
219
220	mutex_enter(DBUF_HASH_MUTEX(h, idx));
221	dbp = &h->hash_table[idx];
222	while ((dbf = *dbp) != db) {
223		dbp = &dbf->db_hash_next;
224		ASSERT(dbf != NULL);
225	}
226	*dbp = db->db_hash_next;
227	db->db_hash_next = NULL;
228	mutex_exit(DBUF_HASH_MUTEX(h, idx));
229	atomic_dec_64(&dbuf_hash_count);
230}
231
232static arc_evict_func_t dbuf_do_evict;
233
234static void
235dbuf_evict_user(dmu_buf_impl_t *db)
236{
237	ASSERT(MUTEX_HELD(&db->db_mtx));
238
239	if (db->db_level != 0 || db->db_evict_func == NULL)
240		return;
241
242	db->db_evict_func(&db->db, db->db_user_ptr);
243	db->db_user_ptr = NULL;
244	db->db_evict_func = NULL;
245}
246
247boolean_t
248dbuf_is_metadata(dmu_buf_impl_t *db)
249{
250	if (db->db_level > 0) {
251		return (B_TRUE);
252	} else {
253		boolean_t is_metadata;
254
255		DB_DNODE_ENTER(db);
256		is_metadata = DMU_OT_IS_METADATA(DB_DNODE(db)->dn_type);
257		DB_DNODE_EXIT(db);
258
259		return (is_metadata);
260	}
261}
262
263void
264dbuf_evict(dmu_buf_impl_t *db)
265{
266	ASSERT(MUTEX_HELD(&db->db_mtx));
267	ASSERT(db->db_buf == NULL);
268	ASSERT(db->db_data_pending == NULL);
269
270	dbuf_clear(db);
271	dbuf_destroy(db);
272}
273
274void
275dbuf_init(void)
276{
277	uint64_t hsize = 1ULL << 16;
278	dbuf_hash_table_t *h = &dbuf_hash_table;
279	int i;
280
281	/*
282	 * The hash table is big enough to fill all of physical memory
283	 * with an average 4K block size.  The table will take up
284	 * totalmem*sizeof(void*)/4K (i.e. 2MB/GB with 8-byte pointers).
285	 */
286	while (hsize * 4096 < (uint64_t)physmem * PAGESIZE)
287		hsize <<= 1;
288
289retry:
290	h->hash_table_mask = hsize - 1;
291	h->hash_table = kmem_zalloc(hsize * sizeof (void *), KM_NOSLEEP);
292	if (h->hash_table == NULL) {
293		/* XXX - we should really return an error instead of assert */
294		ASSERT(hsize > (1ULL << 10));
295		hsize >>= 1;
296		goto retry;
297	}
298
299	dbuf_cache = kmem_cache_create("dmu_buf_impl_t",
300	    sizeof (dmu_buf_impl_t),
301	    0, dbuf_cons, dbuf_dest, NULL, NULL, NULL, 0);
302
303	for (i = 0; i < DBUF_MUTEXES; i++)
304		mutex_init(&h->hash_mutexes[i], NULL, MUTEX_DEFAULT, NULL);
305}
306
307void
308dbuf_fini(void)
309{
310	dbuf_hash_table_t *h = &dbuf_hash_table;
311	int i;
312
313	for (i = 0; i < DBUF_MUTEXES; i++)
314		mutex_destroy(&h->hash_mutexes[i]);
315	kmem_free(h->hash_table, (h->hash_table_mask + 1) * sizeof (void *));
316	kmem_cache_destroy(dbuf_cache);
317}
318
319/*
320 * Other stuff.
321 */
322
323#ifdef ZFS_DEBUG
324static void
325dbuf_verify(dmu_buf_impl_t *db)
326{
327	dnode_t *dn;
328	dbuf_dirty_record_t *dr;
329
330	ASSERT(MUTEX_HELD(&db->db_mtx));
331
332	if (!(zfs_flags & ZFS_DEBUG_DBUF_VERIFY))
333		return;
334
335	ASSERT(db->db_objset != NULL);
336	DB_DNODE_ENTER(db);
337	dn = DB_DNODE(db);
338	if (dn == NULL) {
339		ASSERT(db->db_parent == NULL);
340		ASSERT(db->db_blkptr == NULL);
341	} else {
342		ASSERT3U(db->db.db_object, ==, dn->dn_object);
343		ASSERT3P(db->db_objset, ==, dn->dn_objset);
344		ASSERT3U(db->db_level, <, dn->dn_nlevels);
345		ASSERT(db->db_blkid == DMU_BONUS_BLKID ||
346		    db->db_blkid == DMU_SPILL_BLKID ||
347		    !avl_is_empty(&dn->dn_dbufs));
348	}
349	if (db->db_blkid == DMU_BONUS_BLKID) {
350		ASSERT(dn != NULL);
351		ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
352		ASSERT3U(db->db.db_offset, ==, DMU_BONUS_BLKID);
353	} else if (db->db_blkid == DMU_SPILL_BLKID) {
354		ASSERT(dn != NULL);
355		ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
356		ASSERT0(db->db.db_offset);
357	} else {
358		ASSERT3U(db->db.db_offset, ==, db->db_blkid * db->db.db_size);
359	}
360
361	for (dr = db->db_data_pending; dr != NULL; dr = dr->dr_next)
362		ASSERT(dr->dr_dbuf == db);
363
364	for (dr = db->db_last_dirty; dr != NULL; dr = dr->dr_next)
365		ASSERT(dr->dr_dbuf == db);
366
367	/*
368	 * We can't assert that db_size matches dn_datablksz because it
369	 * can be momentarily different when another thread is doing
370	 * dnode_set_blksz().
371	 */
372	if (db->db_level == 0 && db->db.db_object == DMU_META_DNODE_OBJECT) {
373		dr = db->db_data_pending;
374		/*
375		 * It should only be modified in syncing context, so
376		 * make sure we only have one copy of the data.
377		 */
378		ASSERT(dr == NULL || dr->dt.dl.dr_data == db->db_buf);
379	}
380
381	/* verify db->db_blkptr */
382	if (db->db_blkptr) {
383		if (db->db_parent == dn->dn_dbuf) {
384			/* db is pointed to by the dnode */
385			/* ASSERT3U(db->db_blkid, <, dn->dn_nblkptr); */
386			if (DMU_OBJECT_IS_SPECIAL(db->db.db_object))
387				ASSERT(db->db_parent == NULL);
388			else
389				ASSERT(db->db_parent != NULL);
390			if (db->db_blkid != DMU_SPILL_BLKID)
391				ASSERT3P(db->db_blkptr, ==,
392				    &dn->dn_phys->dn_blkptr[db->db_blkid]);
393		} else {
394			/* db is pointed to by an indirect block */
395			int epb = db->db_parent->db.db_size >> SPA_BLKPTRSHIFT;
396			ASSERT3U(db->db_parent->db_level, ==, db->db_level+1);
397			ASSERT3U(db->db_parent->db.db_object, ==,
398			    db->db.db_object);
399			/*
400			 * dnode_grow_indblksz() can make this fail if we don't
401			 * have the struct_rwlock.  XXX indblksz no longer
402			 * grows.  safe to do this now?
403			 */
404			if (RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
405				ASSERT3P(db->db_blkptr, ==,
406				    ((blkptr_t *)db->db_parent->db.db_data +
407				    db->db_blkid % epb));
408			}
409		}
410	}
411	if ((db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr)) &&
412	    (db->db_buf == NULL || db->db_buf->b_data) &&
413	    db->db.db_data && db->db_blkid != DMU_BONUS_BLKID &&
414	    db->db_state != DB_FILL && !dn->dn_free_txg) {
415		/*
416		 * If the blkptr isn't set but they have nonzero data,
417		 * it had better be dirty, otherwise we'll lose that
418		 * data when we evict this buffer.
419		 */
420		if (db->db_dirtycnt == 0) {
421			uint64_t *buf = db->db.db_data;
422			int i;
423
424			for (i = 0; i < db->db.db_size >> 3; i++) {
425				ASSERT(buf[i] == 0);
426			}
427		}
428	}
429	DB_DNODE_EXIT(db);
430}
431#endif
432
433static void
434dbuf_set_data(dmu_buf_impl_t *db, arc_buf_t *buf)
435{
436	ASSERT(MUTEX_HELD(&db->db_mtx));
437	db->db_buf = buf;
438	if (buf != NULL) {
439		ASSERT(buf->b_data != NULL);
440		db->db.db_data = buf->b_data;
441		if (!arc_released(buf))
442			arc_set_callback(buf, dbuf_do_evict, db);
443	} else {
444		dbuf_evict_user(db);
445		db->db.db_data = NULL;
446		if (db->db_state != DB_NOFILL)
447			db->db_state = DB_UNCACHED;
448	}
449}
450
451/*
452 * Loan out an arc_buf for read.  Return the loaned arc_buf.
453 */
454arc_buf_t *
455dbuf_loan_arcbuf(dmu_buf_impl_t *db)
456{
457	arc_buf_t *abuf;
458
459	mutex_enter(&db->db_mtx);
460	if (arc_released(db->db_buf) || refcount_count(&db->db_holds) > 1) {
461		int blksz = db->db.db_size;
462		spa_t *spa = db->db_objset->os_spa;
463
464		mutex_exit(&db->db_mtx);
465		abuf = arc_loan_buf(spa, blksz);
466		bcopy(db->db.db_data, abuf->b_data, blksz);
467	} else {
468		abuf = db->db_buf;
469		arc_loan_inuse_buf(abuf, db);
470		dbuf_set_data(db, NULL);
471		mutex_exit(&db->db_mtx);
472	}
473	return (abuf);
474}
475
476uint64_t
477dbuf_whichblock(dnode_t *dn, uint64_t offset)
478{
479	if (dn->dn_datablkshift) {
480		return (offset >> dn->dn_datablkshift);
481	} else {
482		ASSERT3U(offset, <, dn->dn_datablksz);
483		return (0);
484	}
485}
486
487static void
488dbuf_read_done(zio_t *zio, arc_buf_t *buf, void *vdb)
489{
490	dmu_buf_impl_t *db = vdb;
491
492	mutex_enter(&db->db_mtx);
493	ASSERT3U(db->db_state, ==, DB_READ);
494	/*
495	 * All reads are synchronous, so we must have a hold on the dbuf
496	 */
497	ASSERT(refcount_count(&db->db_holds) > 0);
498	ASSERT(db->db_buf == NULL);
499	ASSERT(db->db.db_data == NULL);
500	if (db->db_level == 0 && db->db_freed_in_flight) {
501		/* we were freed in flight; disregard any error */
502		arc_release(buf, db);
503		bzero(buf->b_data, db->db.db_size);
504		arc_buf_freeze(buf);
505		db->db_freed_in_flight = FALSE;
506		dbuf_set_data(db, buf);
507		db->db_state = DB_CACHED;
508	} else if (zio == NULL || zio->io_error == 0) {
509		dbuf_set_data(db, buf);
510		db->db_state = DB_CACHED;
511	} else {
512		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
513		ASSERT3P(db->db_buf, ==, NULL);
514		VERIFY(arc_buf_remove_ref(buf, db));
515		db->db_state = DB_UNCACHED;
516	}
517	cv_broadcast(&db->db_changed);
518	dbuf_rele_and_unlock(db, NULL);
519}
520
521static void
522dbuf_read_impl(dmu_buf_impl_t *db, zio_t *zio, uint32_t *flags)
523{
524	dnode_t *dn;
525	zbookmark_phys_t zb;
526	arc_flags_t aflags = ARC_FLAG_NOWAIT;
527
528	DB_DNODE_ENTER(db);
529	dn = DB_DNODE(db);
530	ASSERT(!refcount_is_zero(&db->db_holds));
531	/* We need the struct_rwlock to prevent db_blkptr from changing. */
532	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
533	ASSERT(MUTEX_HELD(&db->db_mtx));
534	ASSERT(db->db_state == DB_UNCACHED);
535	ASSERT(db->db_buf == NULL);
536
537	if (db->db_blkid == DMU_BONUS_BLKID) {
538		int bonuslen = MIN(dn->dn_bonuslen, dn->dn_phys->dn_bonuslen);
539
540		ASSERT3U(bonuslen, <=, db->db.db_size);
541		db->db.db_data = zio_buf_alloc(DN_MAX_BONUSLEN);
542		arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
543		if (bonuslen < DN_MAX_BONUSLEN)
544			bzero(db->db.db_data, DN_MAX_BONUSLEN);
545		if (bonuslen)
546			bcopy(DN_BONUS(dn->dn_phys), db->db.db_data, bonuslen);
547		DB_DNODE_EXIT(db);
548		db->db_state = DB_CACHED;
549		mutex_exit(&db->db_mtx);
550		return;
551	}
552
553	/*
554	 * Recheck BP_IS_HOLE() after dnode_block_freed() in case dnode_sync()
555	 * processes the delete record and clears the bp while we are waiting
556	 * for the dn_mtx (resulting in a "no" from block_freed).
557	 */
558	if (db->db_blkptr == NULL || BP_IS_HOLE(db->db_blkptr) ||
559	    (db->db_level == 0 && (dnode_block_freed(dn, db->db_blkid) ||
560	    BP_IS_HOLE(db->db_blkptr)))) {
561		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
562
563		DB_DNODE_EXIT(db);
564		dbuf_set_data(db, arc_buf_alloc(db->db_objset->os_spa,
565		    db->db.db_size, db, type));
566		bzero(db->db.db_data, db->db.db_size);
567		db->db_state = DB_CACHED;
568		*flags |= DB_RF_CACHED;
569		mutex_exit(&db->db_mtx);
570		return;
571	}
572
573	DB_DNODE_EXIT(db);
574
575	db->db_state = DB_READ;
576	mutex_exit(&db->db_mtx);
577
578	if (DBUF_IS_L2CACHEABLE(db))
579		aflags |= ARC_FLAG_L2CACHE;
580	if (DBUF_IS_L2COMPRESSIBLE(db))
581		aflags |= ARC_FLAG_L2COMPRESS;
582
583	SET_BOOKMARK(&zb, db->db_objset->os_dsl_dataset ?
584	    db->db_objset->os_dsl_dataset->ds_object : DMU_META_OBJSET,
585	    db->db.db_object, db->db_level, db->db_blkid);
586
587	dbuf_add_ref(db, NULL);
588
589	(void) arc_read(zio, db->db_objset->os_spa, db->db_blkptr,
590	    dbuf_read_done, db, ZIO_PRIORITY_SYNC_READ,
591	    (*flags & DB_RF_CANFAIL) ? ZIO_FLAG_CANFAIL : ZIO_FLAG_MUSTSUCCEED,
592	    &aflags, &zb);
593	if (aflags & ARC_FLAG_CACHED)
594		*flags |= DB_RF_CACHED;
595}
596
597int
598dbuf_read(dmu_buf_impl_t *db, zio_t *zio, uint32_t flags)
599{
600	int err = 0;
601	boolean_t havepzio = (zio != NULL);
602	boolean_t prefetch;
603	dnode_t *dn;
604
605	/*
606	 * We don't have to hold the mutex to check db_state because it
607	 * can't be freed while we have a hold on the buffer.
608	 */
609	ASSERT(!refcount_is_zero(&db->db_holds));
610
611	if (db->db_state == DB_NOFILL)
612		return (SET_ERROR(EIO));
613
614	DB_DNODE_ENTER(db);
615	dn = DB_DNODE(db);
616	if ((flags & DB_RF_HAVESTRUCT) == 0)
617		rw_enter(&dn->dn_struct_rwlock, RW_READER);
618
619	prefetch = db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
620	    (flags & DB_RF_NOPREFETCH) == 0 && dn != NULL &&
621	    DBUF_IS_CACHEABLE(db);
622
623	mutex_enter(&db->db_mtx);
624	if (db->db_state == DB_CACHED) {
625		mutex_exit(&db->db_mtx);
626		if (prefetch)
627			dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
628			    db->db.db_size, TRUE);
629		if ((flags & DB_RF_HAVESTRUCT) == 0)
630			rw_exit(&dn->dn_struct_rwlock);
631		DB_DNODE_EXIT(db);
632	} else if (db->db_state == DB_UNCACHED) {
633		spa_t *spa = dn->dn_objset->os_spa;
634
635		if (zio == NULL)
636			zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
637		dbuf_read_impl(db, zio, &flags);
638
639		/* dbuf_read_impl has dropped db_mtx for us */
640
641		if (prefetch)
642			dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
643			    db->db.db_size, flags & DB_RF_CACHED);
644
645		if ((flags & DB_RF_HAVESTRUCT) == 0)
646			rw_exit(&dn->dn_struct_rwlock);
647		DB_DNODE_EXIT(db);
648
649		if (!havepzio)
650			err = zio_wait(zio);
651	} else {
652		/*
653		 * Another reader came in while the dbuf was in flight
654		 * between UNCACHED and CACHED.  Either a writer will finish
655		 * writing the buffer (sending the dbuf to CACHED) or the
656		 * first reader's request will reach the read_done callback
657		 * and send the dbuf to CACHED.  Otherwise, a failure
658		 * occurred and the dbuf went to UNCACHED.
659		 */
660		mutex_exit(&db->db_mtx);
661		if (prefetch)
662			dmu_zfetch(&dn->dn_zfetch, db->db.db_offset,
663			    db->db.db_size, TRUE);
664		if ((flags & DB_RF_HAVESTRUCT) == 0)
665			rw_exit(&dn->dn_struct_rwlock);
666		DB_DNODE_EXIT(db);
667
668		/* Skip the wait per the caller's request. */
669		mutex_enter(&db->db_mtx);
670		if ((flags & DB_RF_NEVERWAIT) == 0) {
671			while (db->db_state == DB_READ ||
672			    db->db_state == DB_FILL) {
673				ASSERT(db->db_state == DB_READ ||
674				    (flags & DB_RF_HAVESTRUCT) == 0);
675				DTRACE_PROBE2(blocked__read, dmu_buf_impl_t *,
676				    db, zio_t *, zio);
677				cv_wait(&db->db_changed, &db->db_mtx);
678			}
679			if (db->db_state == DB_UNCACHED)
680				err = SET_ERROR(EIO);
681		}
682		mutex_exit(&db->db_mtx);
683	}
684
685	ASSERT(err || havepzio || db->db_state == DB_CACHED);
686	return (err);
687}
688
689static void
690dbuf_noread(dmu_buf_impl_t *db)
691{
692	ASSERT(!refcount_is_zero(&db->db_holds));
693	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
694	mutex_enter(&db->db_mtx);
695	while (db->db_state == DB_READ || db->db_state == DB_FILL)
696		cv_wait(&db->db_changed, &db->db_mtx);
697	if (db->db_state == DB_UNCACHED) {
698		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
699		spa_t *spa = db->db_objset->os_spa;
700
701		ASSERT(db->db_buf == NULL);
702		ASSERT(db->db.db_data == NULL);
703		dbuf_set_data(db, arc_buf_alloc(spa, db->db.db_size, db, type));
704		db->db_state = DB_FILL;
705	} else if (db->db_state == DB_NOFILL) {
706		dbuf_set_data(db, NULL);
707	} else {
708		ASSERT3U(db->db_state, ==, DB_CACHED);
709	}
710	mutex_exit(&db->db_mtx);
711}
712
713/*
714 * This is our just-in-time copy function.  It makes a copy of
715 * buffers, that have been modified in a previous transaction
716 * group, before we modify them in the current active group.
717 *
718 * This function is used in two places: when we are dirtying a
719 * buffer for the first time in a txg, and when we are freeing
720 * a range in a dnode that includes this buffer.
721 *
722 * Note that when we are called from dbuf_free_range() we do
723 * not put a hold on the buffer, we just traverse the active
724 * dbuf list for the dnode.
725 */
726static void
727dbuf_fix_old_data(dmu_buf_impl_t *db, uint64_t txg)
728{
729	dbuf_dirty_record_t *dr = db->db_last_dirty;
730
731	ASSERT(MUTEX_HELD(&db->db_mtx));
732	ASSERT(db->db.db_data != NULL);
733	ASSERT(db->db_level == 0);
734	ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT);
735
736	if (dr == NULL ||
737	    (dr->dt.dl.dr_data !=
738	    ((db->db_blkid  == DMU_BONUS_BLKID) ? db->db.db_data : db->db_buf)))
739		return;
740
741	/*
742	 * If the last dirty record for this dbuf has not yet synced
743	 * and its referencing the dbuf data, either:
744	 *	reset the reference to point to a new copy,
745	 * or (if there a no active holders)
746	 *	just null out the current db_data pointer.
747	 */
748	ASSERT(dr->dr_txg >= txg - 2);
749	if (db->db_blkid == DMU_BONUS_BLKID) {
750		/* Note that the data bufs here are zio_bufs */
751		dr->dt.dl.dr_data = zio_buf_alloc(DN_MAX_BONUSLEN);
752		arc_space_consume(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
753		bcopy(db->db.db_data, dr->dt.dl.dr_data, DN_MAX_BONUSLEN);
754	} else if (refcount_count(&db->db_holds) > db->db_dirtycnt) {
755		int size = db->db.db_size;
756		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
757		spa_t *spa = db->db_objset->os_spa;
758
759		dr->dt.dl.dr_data = arc_buf_alloc(spa, size, db, type);
760		bcopy(db->db.db_data, dr->dt.dl.dr_data->b_data, size);
761	} else {
762		dbuf_set_data(db, NULL);
763	}
764}
765
766void
767dbuf_unoverride(dbuf_dirty_record_t *dr)
768{
769	dmu_buf_impl_t *db = dr->dr_dbuf;
770	blkptr_t *bp = &dr->dt.dl.dr_overridden_by;
771	uint64_t txg = dr->dr_txg;
772
773	ASSERT(MUTEX_HELD(&db->db_mtx));
774	ASSERT(dr->dt.dl.dr_override_state != DR_IN_DMU_SYNC);
775	ASSERT(db->db_level == 0);
776
777	if (db->db_blkid == DMU_BONUS_BLKID ||
778	    dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN)
779		return;
780
781	ASSERT(db->db_data_pending != dr);
782
783	/* free this block */
784	if (!BP_IS_HOLE(bp) && !dr->dt.dl.dr_nopwrite)
785		zio_free(db->db_objset->os_spa, txg, bp);
786
787	dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
788	dr->dt.dl.dr_nopwrite = B_FALSE;
789
790	/*
791	 * Release the already-written buffer, so we leave it in
792	 * a consistent dirty state.  Note that all callers are
793	 * modifying the buffer, so they will immediately do
794	 * another (redundant) arc_release().  Therefore, leave
795	 * the buf thawed to save the effort of freezing &
796	 * immediately re-thawing it.
797	 */
798	arc_release(dr->dt.dl.dr_data, db);
799}
800
801/*
802 * Evict (if its unreferenced) or clear (if its referenced) any level-0
803 * data blocks in the free range, so that any future readers will find
804 * empty blocks.
805 *
806 * This is a no-op if the dataset is in the middle of an incremental
807 * receive; see comment below for details.
808 */
809void
810dbuf_free_range(dnode_t *dn, uint64_t start_blkid, uint64_t end_blkid,
811    dmu_tx_t *tx)
812{
813	dmu_buf_impl_t *db, *db_next, db_search;
814	uint64_t txg = tx->tx_txg;
815	avl_index_t where;
816
817	if (end_blkid > dn->dn_maxblkid && (end_blkid != DMU_SPILL_BLKID))
818		end_blkid = dn->dn_maxblkid;
819	dprintf_dnode(dn, "start=%llu end=%llu\n", start_blkid, end_blkid);
820
821	db_search.db_level = 0;
822	db_search.db_blkid = start_blkid;
823	db_search.db_state = DB_SEARCH;
824
825	mutex_enter(&dn->dn_dbufs_mtx);
826	if (start_blkid >= dn->dn_unlisted_l0_blkid) {
827		/* There can't be any dbufs in this range; no need to search. */
828#ifdef DEBUG
829		db = avl_find(&dn->dn_dbufs, &db_search, &where);
830		ASSERT3P(db, ==, NULL);
831		db = avl_nearest(&dn->dn_dbufs, where, AVL_AFTER);
832		ASSERT(db == NULL || db->db_level > 0);
833#endif
834		mutex_exit(&dn->dn_dbufs_mtx);
835		return;
836	} else if (dmu_objset_is_receiving(dn->dn_objset)) {
837		/*
838		 * If we are receiving, we expect there to be no dbufs in
839		 * the range to be freed, because receive modifies each
840		 * block at most once, and in offset order.  If this is
841		 * not the case, it can lead to performance problems,
842		 * so note that we unexpectedly took the slow path.
843		 */
844		atomic_inc_64(&zfs_free_range_recv_miss);
845	}
846
847	db = avl_find(&dn->dn_dbufs, &db_search, &where);
848	ASSERT3P(db, ==, NULL);
849	db = avl_nearest(&dn->dn_dbufs, where, AVL_AFTER);
850
851	for (; db != NULL; db = db_next) {
852		db_next = AVL_NEXT(&dn->dn_dbufs, db);
853		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
854
855		if (db->db_level != 0 || db->db_blkid > end_blkid) {
856			break;
857		}
858		ASSERT3U(db->db_blkid, >=, start_blkid);
859
860		/* found a level 0 buffer in the range */
861		mutex_enter(&db->db_mtx);
862		if (dbuf_undirty(db, tx)) {
863			/* mutex has been dropped and dbuf destroyed */
864			continue;
865		}
866
867		if (db->db_state == DB_UNCACHED ||
868		    db->db_state == DB_NOFILL ||
869		    db->db_state == DB_EVICTING) {
870			ASSERT(db->db.db_data == NULL);
871			mutex_exit(&db->db_mtx);
872			continue;
873		}
874		if (db->db_state == DB_READ || db->db_state == DB_FILL) {
875			/* will be handled in dbuf_read_done or dbuf_rele */
876			db->db_freed_in_flight = TRUE;
877			mutex_exit(&db->db_mtx);
878			continue;
879		}
880		if (refcount_count(&db->db_holds) == 0) {
881			ASSERT(db->db_buf);
882			dbuf_clear(db);
883			continue;
884		}
885		/* The dbuf is referenced */
886
887		if (db->db_last_dirty != NULL) {
888			dbuf_dirty_record_t *dr = db->db_last_dirty;
889
890			if (dr->dr_txg == txg) {
891				/*
892				 * This buffer is "in-use", re-adjust the file
893				 * size to reflect that this buffer may
894				 * contain new data when we sync.
895				 */
896				if (db->db_blkid != DMU_SPILL_BLKID &&
897				    db->db_blkid > dn->dn_maxblkid)
898					dn->dn_maxblkid = db->db_blkid;
899				dbuf_unoverride(dr);
900			} else {
901				/*
902				 * This dbuf is not dirty in the open context.
903				 * Either uncache it (if its not referenced in
904				 * the open context) or reset its contents to
905				 * empty.
906				 */
907				dbuf_fix_old_data(db, txg);
908			}
909		}
910		/* clear the contents if its cached */
911		if (db->db_state == DB_CACHED) {
912			ASSERT(db->db.db_data != NULL);
913			arc_release(db->db_buf, db);
914			bzero(db->db.db_data, db->db.db_size);
915			arc_buf_freeze(db->db_buf);
916		}
917
918		mutex_exit(&db->db_mtx);
919	}
920	mutex_exit(&dn->dn_dbufs_mtx);
921}
922
923static int
924dbuf_block_freeable(dmu_buf_impl_t *db)
925{
926	dsl_dataset_t *ds = db->db_objset->os_dsl_dataset;
927	uint64_t birth_txg = 0;
928
929	/*
930	 * We don't need any locking to protect db_blkptr:
931	 * If it's syncing, then db_last_dirty will be set
932	 * so we'll ignore db_blkptr.
933	 *
934	 * This logic ensures that only block births for
935	 * filled blocks are considered.
936	 */
937	ASSERT(MUTEX_HELD(&db->db_mtx));
938	if (db->db_last_dirty && (db->db_blkptr == NULL ||
939	    !BP_IS_HOLE(db->db_blkptr))) {
940		birth_txg = db->db_last_dirty->dr_txg;
941	} else if (db->db_blkptr != NULL && !BP_IS_HOLE(db->db_blkptr)) {
942		birth_txg = db->db_blkptr->blk_birth;
943	}
944
945	/*
946	 * If this block don't exist or is in a snapshot, it can't be freed.
947	 * Don't pass the bp to dsl_dataset_block_freeable() since we
948	 * are holding the db_mtx lock and might deadlock if we are
949	 * prefetching a dedup-ed block.
950	 */
951	if (birth_txg != 0)
952		return (ds == NULL ||
953		    dsl_dataset_block_freeable(ds, NULL, birth_txg));
954	else
955		return (B_FALSE);
956}
957
958void
959dbuf_new_size(dmu_buf_impl_t *db, int size, dmu_tx_t *tx)
960{
961	arc_buf_t *buf, *obuf;
962	int osize = db->db.db_size;
963	arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
964	dnode_t *dn;
965
966	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
967
968	DB_DNODE_ENTER(db);
969	dn = DB_DNODE(db);
970
971	/* XXX does *this* func really need the lock? */
972	ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
973
974	/*
975	 * This call to dmu_buf_will_dirty() with the dn_struct_rwlock held
976	 * is OK, because there can be no other references to the db
977	 * when we are changing its size, so no concurrent DB_FILL can
978	 * be happening.
979	 */
980	/*
981	 * XXX we should be doing a dbuf_read, checking the return
982	 * value and returning that up to our callers
983	 */
984	dmu_buf_will_dirty(&db->db, tx);
985
986	/* create the data buffer for the new block */
987	buf = arc_buf_alloc(dn->dn_objset->os_spa, size, db, type);
988
989	/* copy old block data to the new block */
990	obuf = db->db_buf;
991	bcopy(obuf->b_data, buf->b_data, MIN(osize, size));
992	/* zero the remainder */
993	if (size > osize)
994		bzero((uint8_t *)buf->b_data + osize, size - osize);
995
996	mutex_enter(&db->db_mtx);
997	dbuf_set_data(db, buf);
998	VERIFY(arc_buf_remove_ref(obuf, db));
999	db->db.db_size = size;
1000
1001	if (db->db_level == 0) {
1002		ASSERT3U(db->db_last_dirty->dr_txg, ==, tx->tx_txg);
1003		db->db_last_dirty->dt.dl.dr_data = buf;
1004	}
1005	mutex_exit(&db->db_mtx);
1006
1007	dnode_willuse_space(dn, size-osize, tx);
1008	DB_DNODE_EXIT(db);
1009}
1010
1011void
1012dbuf_release_bp(dmu_buf_impl_t *db)
1013{
1014	objset_t *os = db->db_objset;
1015
1016	ASSERT(dsl_pool_sync_context(dmu_objset_pool(os)));
1017	ASSERT(arc_released(os->os_phys_buf) ||
1018	    list_link_active(&os->os_dsl_dataset->ds_synced_link));
1019	ASSERT(db->db_parent == NULL || arc_released(db->db_parent->db_buf));
1020
1021	(void) arc_release(db->db_buf, db);
1022}
1023
1024dbuf_dirty_record_t *
1025dbuf_dirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
1026{
1027	dnode_t *dn;
1028	objset_t *os;
1029	dbuf_dirty_record_t **drp, *dr;
1030	int drop_struct_lock = FALSE;
1031	boolean_t do_free_accounting = B_FALSE;
1032	int txgoff = tx->tx_txg & TXG_MASK;
1033
1034	ASSERT(tx->tx_txg != 0);
1035	ASSERT(!refcount_is_zero(&db->db_holds));
1036	DMU_TX_DIRTY_BUF(tx, db);
1037
1038	DB_DNODE_ENTER(db);
1039	dn = DB_DNODE(db);
1040	/*
1041	 * Shouldn't dirty a regular buffer in syncing context.  Private
1042	 * objects may be dirtied in syncing context, but only if they
1043	 * were already pre-dirtied in open context.
1044	 */
1045	ASSERT(!dmu_tx_is_syncing(tx) ||
1046	    BP_IS_HOLE(dn->dn_objset->os_rootbp) ||
1047	    DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
1048	    dn->dn_objset->os_dsl_dataset == NULL);
1049	/*
1050	 * We make this assert for private objects as well, but after we
1051	 * check if we're already dirty.  They are allowed to re-dirty
1052	 * in syncing context.
1053	 */
1054	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
1055	    dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
1056	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
1057
1058	mutex_enter(&db->db_mtx);
1059	/*
1060	 * XXX make this true for indirects too?  The problem is that
1061	 * transactions created with dmu_tx_create_assigned() from
1062	 * syncing context don't bother holding ahead.
1063	 */
1064	ASSERT(db->db_level != 0 ||
1065	    db->db_state == DB_CACHED || db->db_state == DB_FILL ||
1066	    db->db_state == DB_NOFILL);
1067
1068	mutex_enter(&dn->dn_mtx);
1069	/*
1070	 * Don't set dirtyctx to SYNC if we're just modifying this as we
1071	 * initialize the objset.
1072	 */
1073	if (dn->dn_dirtyctx == DN_UNDIRTIED &&
1074	    !BP_IS_HOLE(dn->dn_objset->os_rootbp)) {
1075		dn->dn_dirtyctx =
1076		    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN);
1077		ASSERT(dn->dn_dirtyctx_firstset == NULL);
1078		dn->dn_dirtyctx_firstset = kmem_alloc(1, KM_SLEEP);
1079	}
1080	mutex_exit(&dn->dn_mtx);
1081
1082	if (db->db_blkid == DMU_SPILL_BLKID)
1083		dn->dn_have_spill = B_TRUE;
1084
1085	/*
1086	 * If this buffer is already dirty, we're done.
1087	 */
1088	drp = &db->db_last_dirty;
1089	ASSERT(*drp == NULL || (*drp)->dr_txg <= tx->tx_txg ||
1090	    db->db.db_object == DMU_META_DNODE_OBJECT);
1091	while ((dr = *drp) != NULL && dr->dr_txg > tx->tx_txg)
1092		drp = &dr->dr_next;
1093	if (dr && dr->dr_txg == tx->tx_txg) {
1094		DB_DNODE_EXIT(db);
1095
1096		if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID) {
1097			/*
1098			 * If this buffer has already been written out,
1099			 * we now need to reset its state.
1100			 */
1101			dbuf_unoverride(dr);
1102			if (db->db.db_object != DMU_META_DNODE_OBJECT &&
1103			    db->db_state != DB_NOFILL)
1104				arc_buf_thaw(db->db_buf);
1105		}
1106		mutex_exit(&db->db_mtx);
1107		return (dr);
1108	}
1109
1110	/*
1111	 * Only valid if not already dirty.
1112	 */
1113	ASSERT(dn->dn_object == 0 ||
1114	    dn->dn_dirtyctx == DN_UNDIRTIED || dn->dn_dirtyctx ==
1115	    (dmu_tx_is_syncing(tx) ? DN_DIRTY_SYNC : DN_DIRTY_OPEN));
1116
1117	ASSERT3U(dn->dn_nlevels, >, db->db_level);
1118	ASSERT((dn->dn_phys->dn_nlevels == 0 && db->db_level == 0) ||
1119	    dn->dn_phys->dn_nlevels > db->db_level ||
1120	    dn->dn_next_nlevels[txgoff] > db->db_level ||
1121	    dn->dn_next_nlevels[(tx->tx_txg-1) & TXG_MASK] > db->db_level ||
1122	    dn->dn_next_nlevels[(tx->tx_txg-2) & TXG_MASK] > db->db_level);
1123
1124	/*
1125	 * We should only be dirtying in syncing context if it's the
1126	 * mos or we're initializing the os or it's a special object.
1127	 * However, we are allowed to dirty in syncing context provided
1128	 * we already dirtied it in open context.  Hence we must make
1129	 * this assertion only if we're not already dirty.
1130	 */
1131	os = dn->dn_objset;
1132	ASSERT(!dmu_tx_is_syncing(tx) || DMU_OBJECT_IS_SPECIAL(dn->dn_object) ||
1133	    os->os_dsl_dataset == NULL || BP_IS_HOLE(os->os_rootbp));
1134	ASSERT(db->db.db_size != 0);
1135
1136	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
1137
1138	if (db->db_blkid != DMU_BONUS_BLKID) {
1139		/*
1140		 * Update the accounting.
1141		 * Note: we delay "free accounting" until after we drop
1142		 * the db_mtx.  This keeps us from grabbing other locks
1143		 * (and possibly deadlocking) in bp_get_dsize() while
1144		 * also holding the db_mtx.
1145		 */
1146		dnode_willuse_space(dn, db->db.db_size, tx);
1147		do_free_accounting = dbuf_block_freeable(db);
1148	}
1149
1150	/*
1151	 * If this buffer is dirty in an old transaction group we need
1152	 * to make a copy of it so that the changes we make in this
1153	 * transaction group won't leak out when we sync the older txg.
1154	 */
1155	dr = kmem_zalloc(sizeof (dbuf_dirty_record_t), KM_SLEEP);
1156	if (db->db_level == 0) {
1157		void *data_old = db->db_buf;
1158
1159		if (db->db_state != DB_NOFILL) {
1160			if (db->db_blkid == DMU_BONUS_BLKID) {
1161				dbuf_fix_old_data(db, tx->tx_txg);
1162				data_old = db->db.db_data;
1163			} else if (db->db.db_object != DMU_META_DNODE_OBJECT) {
1164				/*
1165				 * Release the data buffer from the cache so
1166				 * that we can modify it without impacting
1167				 * possible other users of this cached data
1168				 * block.  Note that indirect blocks and
1169				 * private objects are not released until the
1170				 * syncing state (since they are only modified
1171				 * then).
1172				 */
1173				arc_release(db->db_buf, db);
1174				dbuf_fix_old_data(db, tx->tx_txg);
1175				data_old = db->db_buf;
1176			}
1177			ASSERT(data_old != NULL);
1178		}
1179		dr->dt.dl.dr_data = data_old;
1180	} else {
1181		mutex_init(&dr->dt.di.dr_mtx, NULL, MUTEX_DEFAULT, NULL);
1182		list_create(&dr->dt.di.dr_children,
1183		    sizeof (dbuf_dirty_record_t),
1184		    offsetof(dbuf_dirty_record_t, dr_dirty_node));
1185	}
1186	if (db->db_blkid != DMU_BONUS_BLKID && os->os_dsl_dataset != NULL)
1187		dr->dr_accounted = db->db.db_size;
1188	dr->dr_dbuf = db;
1189	dr->dr_txg = tx->tx_txg;
1190	dr->dr_next = *drp;
1191	*drp = dr;
1192
1193	/*
1194	 * We could have been freed_in_flight between the dbuf_noread
1195	 * and dbuf_dirty.  We win, as though the dbuf_noread() had
1196	 * happened after the free.
1197	 */
1198	if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
1199	    db->db_blkid != DMU_SPILL_BLKID) {
1200		mutex_enter(&dn->dn_mtx);
1201		if (dn->dn_free_ranges[txgoff] != NULL) {
1202			range_tree_clear(dn->dn_free_ranges[txgoff],
1203			    db->db_blkid, 1);
1204		}
1205		mutex_exit(&dn->dn_mtx);
1206		db->db_freed_in_flight = FALSE;
1207	}
1208
1209	/*
1210	 * This buffer is now part of this txg
1211	 */
1212	dbuf_add_ref(db, (void *)(uintptr_t)tx->tx_txg);
1213	db->db_dirtycnt += 1;
1214	ASSERT3U(db->db_dirtycnt, <=, 3);
1215
1216	mutex_exit(&db->db_mtx);
1217
1218	if (db->db_blkid == DMU_BONUS_BLKID ||
1219	    db->db_blkid == DMU_SPILL_BLKID) {
1220		mutex_enter(&dn->dn_mtx);
1221		ASSERT(!list_link_active(&dr->dr_dirty_node));
1222		list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
1223		mutex_exit(&dn->dn_mtx);
1224		dnode_setdirty(dn, tx);
1225		DB_DNODE_EXIT(db);
1226		return (dr);
1227	} else if (do_free_accounting) {
1228		blkptr_t *bp = db->db_blkptr;
1229		int64_t willfree = (bp && !BP_IS_HOLE(bp)) ?
1230		    bp_get_dsize(os->os_spa, bp) : db->db.db_size;
1231		/*
1232		 * This is only a guess -- if the dbuf is dirty
1233		 * in a previous txg, we don't know how much
1234		 * space it will use on disk yet.  We should
1235		 * really have the struct_rwlock to access
1236		 * db_blkptr, but since this is just a guess,
1237		 * it's OK if we get an odd answer.
1238		 */
1239		ddt_prefetch(os->os_spa, bp);
1240		dnode_willuse_space(dn, -willfree, tx);
1241	}
1242
1243	if (!RW_WRITE_HELD(&dn->dn_struct_rwlock)) {
1244		rw_enter(&dn->dn_struct_rwlock, RW_READER);
1245		drop_struct_lock = TRUE;
1246	}
1247
1248	if (db->db_level == 0) {
1249		dnode_new_blkid(dn, db->db_blkid, tx, drop_struct_lock);
1250		ASSERT(dn->dn_maxblkid >= db->db_blkid);
1251	}
1252
1253	if (db->db_level+1 < dn->dn_nlevels) {
1254		dmu_buf_impl_t *parent = db->db_parent;
1255		dbuf_dirty_record_t *di;
1256		int parent_held = FALSE;
1257
1258		if (db->db_parent == NULL || db->db_parent == dn->dn_dbuf) {
1259			int epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
1260
1261			parent = dbuf_hold_level(dn, db->db_level+1,
1262			    db->db_blkid >> epbs, FTAG);
1263			ASSERT(parent != NULL);
1264			parent_held = TRUE;
1265		}
1266		if (drop_struct_lock)
1267			rw_exit(&dn->dn_struct_rwlock);
1268		ASSERT3U(db->db_level+1, ==, parent->db_level);
1269		di = dbuf_dirty(parent, tx);
1270		if (parent_held)
1271			dbuf_rele(parent, FTAG);
1272
1273		mutex_enter(&db->db_mtx);
1274		/*
1275		 * Since we've dropped the mutex, it's possible that
1276		 * dbuf_undirty() might have changed this out from under us.
1277		 */
1278		if (db->db_last_dirty == dr ||
1279		    dn->dn_object == DMU_META_DNODE_OBJECT) {
1280			mutex_enter(&di->dt.di.dr_mtx);
1281			ASSERT3U(di->dr_txg, ==, tx->tx_txg);
1282			ASSERT(!list_link_active(&dr->dr_dirty_node));
1283			list_insert_tail(&di->dt.di.dr_children, dr);
1284			mutex_exit(&di->dt.di.dr_mtx);
1285			dr->dr_parent = di;
1286		}
1287		mutex_exit(&db->db_mtx);
1288	} else {
1289		ASSERT(db->db_level+1 == dn->dn_nlevels);
1290		ASSERT(db->db_blkid < dn->dn_nblkptr);
1291		ASSERT(db->db_parent == NULL || db->db_parent == dn->dn_dbuf);
1292		mutex_enter(&dn->dn_mtx);
1293		ASSERT(!list_link_active(&dr->dr_dirty_node));
1294		list_insert_tail(&dn->dn_dirty_records[txgoff], dr);
1295		mutex_exit(&dn->dn_mtx);
1296		if (drop_struct_lock)
1297			rw_exit(&dn->dn_struct_rwlock);
1298	}
1299
1300	dnode_setdirty(dn, tx);
1301	DB_DNODE_EXIT(db);
1302	return (dr);
1303}
1304
1305/*
1306 * Undirty a buffer in the transaction group referenced by the given
1307 * transaction.  Return whether this evicted the dbuf.
1308 */
1309static boolean_t
1310dbuf_undirty(dmu_buf_impl_t *db, dmu_tx_t *tx)
1311{
1312	dnode_t *dn;
1313	uint64_t txg = tx->tx_txg;
1314	dbuf_dirty_record_t *dr, **drp;
1315
1316	ASSERT(txg != 0);
1317
1318	/*
1319	 * Due to our use of dn_nlevels below, this can only be called
1320	 * in open context, unless we are operating on the MOS.
1321	 * From syncing context, dn_nlevels may be different from the
1322	 * dn_nlevels used when dbuf was dirtied.
1323	 */
1324	ASSERT(db->db_objset ==
1325	    dmu_objset_pool(db->db_objset)->dp_meta_objset ||
1326	    txg != spa_syncing_txg(dmu_objset_spa(db->db_objset)));
1327	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1328	ASSERT0(db->db_level);
1329	ASSERT(MUTEX_HELD(&db->db_mtx));
1330
1331	/*
1332	 * If this buffer is not dirty, we're done.
1333	 */
1334	for (drp = &db->db_last_dirty; (dr = *drp) != NULL; drp = &dr->dr_next)
1335		if (dr->dr_txg <= txg)
1336			break;
1337	if (dr == NULL || dr->dr_txg < txg)
1338		return (B_FALSE);
1339	ASSERT(dr->dr_txg == txg);
1340	ASSERT(dr->dr_dbuf == db);
1341
1342	DB_DNODE_ENTER(db);
1343	dn = DB_DNODE(db);
1344
1345	dprintf_dbuf(db, "size=%llx\n", (u_longlong_t)db->db.db_size);
1346
1347	ASSERT(db->db.db_size != 0);
1348
1349	dsl_pool_undirty_space(dmu_objset_pool(dn->dn_objset),
1350	    dr->dr_accounted, txg);
1351
1352	*drp = dr->dr_next;
1353
1354	/*
1355	 * Note that there are three places in dbuf_dirty()
1356	 * where this dirty record may be put on a list.
1357	 * Make sure to do a list_remove corresponding to
1358	 * every one of those list_insert calls.
1359	 */
1360	if (dr->dr_parent) {
1361		mutex_enter(&dr->dr_parent->dt.di.dr_mtx);
1362		list_remove(&dr->dr_parent->dt.di.dr_children, dr);
1363		mutex_exit(&dr->dr_parent->dt.di.dr_mtx);
1364	} else if (db->db_blkid == DMU_SPILL_BLKID ||
1365	    db->db_level + 1 == dn->dn_nlevels) {
1366		ASSERT(db->db_blkptr == NULL || db->db_parent == dn->dn_dbuf);
1367		mutex_enter(&dn->dn_mtx);
1368		list_remove(&dn->dn_dirty_records[txg & TXG_MASK], dr);
1369		mutex_exit(&dn->dn_mtx);
1370	}
1371	DB_DNODE_EXIT(db);
1372
1373	if (db->db_state != DB_NOFILL) {
1374		dbuf_unoverride(dr);
1375
1376		ASSERT(db->db_buf != NULL);
1377		ASSERT(dr->dt.dl.dr_data != NULL);
1378		if (dr->dt.dl.dr_data != db->db_buf)
1379			VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data, db));
1380	}
1381
1382	kmem_free(dr, sizeof (dbuf_dirty_record_t));
1383
1384	ASSERT(db->db_dirtycnt > 0);
1385	db->db_dirtycnt -= 1;
1386
1387	if (refcount_remove(&db->db_holds, (void *)(uintptr_t)txg) == 0) {
1388		arc_buf_t *buf = db->db_buf;
1389
1390		ASSERT(db->db_state == DB_NOFILL || arc_released(buf));
1391		dbuf_set_data(db, NULL);
1392		VERIFY(arc_buf_remove_ref(buf, db));
1393		dbuf_evict(db);
1394		return (B_TRUE);
1395	}
1396
1397	return (B_FALSE);
1398}
1399
1400void
1401dmu_buf_will_dirty(dmu_buf_t *db_fake, dmu_tx_t *tx)
1402{
1403	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1404	int rf = DB_RF_MUST_SUCCEED | DB_RF_NOPREFETCH;
1405
1406	ASSERT(tx->tx_txg != 0);
1407	ASSERT(!refcount_is_zero(&db->db_holds));
1408
1409	DB_DNODE_ENTER(db);
1410	if (RW_WRITE_HELD(&DB_DNODE(db)->dn_struct_rwlock))
1411		rf |= DB_RF_HAVESTRUCT;
1412	DB_DNODE_EXIT(db);
1413	(void) dbuf_read(db, NULL, rf);
1414	(void) dbuf_dirty(db, tx);
1415}
1416
1417void
1418dmu_buf_will_not_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
1419{
1420	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1421
1422	db->db_state = DB_NOFILL;
1423
1424	dmu_buf_will_fill(db_fake, tx);
1425}
1426
1427void
1428dmu_buf_will_fill(dmu_buf_t *db_fake, dmu_tx_t *tx)
1429{
1430	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
1431
1432	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1433	ASSERT(tx->tx_txg != 0);
1434	ASSERT(db->db_level == 0);
1435	ASSERT(!refcount_is_zero(&db->db_holds));
1436
1437	ASSERT(db->db.db_object != DMU_META_DNODE_OBJECT ||
1438	    dmu_tx_private_ok(tx));
1439
1440	dbuf_noread(db);
1441	(void) dbuf_dirty(db, tx);
1442}
1443
1444#pragma weak dmu_buf_fill_done = dbuf_fill_done
1445/* ARGSUSED */
1446void
1447dbuf_fill_done(dmu_buf_impl_t *db, dmu_tx_t *tx)
1448{
1449	mutex_enter(&db->db_mtx);
1450	DBUF_VERIFY(db);
1451
1452	if (db->db_state == DB_FILL) {
1453		if (db->db_level == 0 && db->db_freed_in_flight) {
1454			ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1455			/* we were freed while filling */
1456			/* XXX dbuf_undirty? */
1457			bzero(db->db.db_data, db->db.db_size);
1458			db->db_freed_in_flight = FALSE;
1459		}
1460		db->db_state = DB_CACHED;
1461		cv_broadcast(&db->db_changed);
1462	}
1463	mutex_exit(&db->db_mtx);
1464}
1465
1466void
1467dmu_buf_write_embedded(dmu_buf_t *dbuf, void *data,
1468    bp_embedded_type_t etype, enum zio_compress comp,
1469    int uncompressed_size, int compressed_size, int byteorder,
1470    dmu_tx_t *tx)
1471{
1472	dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbuf;
1473	struct dirty_leaf *dl;
1474	dmu_object_type_t type;
1475
1476	DB_DNODE_ENTER(db);
1477	type = DB_DNODE(db)->dn_type;
1478	DB_DNODE_EXIT(db);
1479
1480	ASSERT0(db->db_level);
1481	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1482
1483	dmu_buf_will_not_fill(dbuf, tx);
1484
1485	ASSERT3U(db->db_last_dirty->dr_txg, ==, tx->tx_txg);
1486	dl = &db->db_last_dirty->dt.dl;
1487	encode_embedded_bp_compressed(&dl->dr_overridden_by,
1488	    data, comp, uncompressed_size, compressed_size);
1489	BPE_SET_ETYPE(&dl->dr_overridden_by, etype);
1490	BP_SET_TYPE(&dl->dr_overridden_by, type);
1491	BP_SET_LEVEL(&dl->dr_overridden_by, 0);
1492	BP_SET_BYTEORDER(&dl->dr_overridden_by, byteorder);
1493
1494	dl->dr_override_state = DR_OVERRIDDEN;
1495	dl->dr_overridden_by.blk_birth = db->db_last_dirty->dr_txg;
1496}
1497
1498/*
1499 * Directly assign a provided arc buf to a given dbuf if it's not referenced
1500 * by anybody except our caller. Otherwise copy arcbuf's contents to dbuf.
1501 */
1502void
1503dbuf_assign_arcbuf(dmu_buf_impl_t *db, arc_buf_t *buf, dmu_tx_t *tx)
1504{
1505	ASSERT(!refcount_is_zero(&db->db_holds));
1506	ASSERT(db->db_blkid != DMU_BONUS_BLKID);
1507	ASSERT(db->db_level == 0);
1508	ASSERT(DBUF_GET_BUFC_TYPE(db) == ARC_BUFC_DATA);
1509	ASSERT(buf != NULL);
1510	ASSERT(arc_buf_size(buf) == db->db.db_size);
1511	ASSERT(tx->tx_txg != 0);
1512
1513	arc_return_buf(buf, db);
1514	ASSERT(arc_released(buf));
1515
1516	mutex_enter(&db->db_mtx);
1517
1518	while (db->db_state == DB_READ || db->db_state == DB_FILL)
1519		cv_wait(&db->db_changed, &db->db_mtx);
1520
1521	ASSERT(db->db_state == DB_CACHED || db->db_state == DB_UNCACHED);
1522
1523	if (db->db_state == DB_CACHED &&
1524	    refcount_count(&db->db_holds) - 1 > db->db_dirtycnt) {
1525		mutex_exit(&db->db_mtx);
1526		(void) dbuf_dirty(db, tx);
1527		bcopy(buf->b_data, db->db.db_data, db->db.db_size);
1528		VERIFY(arc_buf_remove_ref(buf, db));
1529		xuio_stat_wbuf_copied();
1530		return;
1531	}
1532
1533	xuio_stat_wbuf_nocopy();
1534	if (db->db_state == DB_CACHED) {
1535		dbuf_dirty_record_t *dr = db->db_last_dirty;
1536
1537		ASSERT(db->db_buf != NULL);
1538		if (dr != NULL && dr->dr_txg == tx->tx_txg) {
1539			ASSERT(dr->dt.dl.dr_data == db->db_buf);
1540			if (!arc_released(db->db_buf)) {
1541				ASSERT(dr->dt.dl.dr_override_state ==
1542				    DR_OVERRIDDEN);
1543				arc_release(db->db_buf, db);
1544			}
1545			dr->dt.dl.dr_data = buf;
1546			VERIFY(arc_buf_remove_ref(db->db_buf, db));
1547		} else if (dr == NULL || dr->dt.dl.dr_data != db->db_buf) {
1548			arc_release(db->db_buf, db);
1549			VERIFY(arc_buf_remove_ref(db->db_buf, db));
1550		}
1551		db->db_buf = NULL;
1552	}
1553	ASSERT(db->db_buf == NULL);
1554	dbuf_set_data(db, buf);
1555	db->db_state = DB_FILL;
1556	mutex_exit(&db->db_mtx);
1557	(void) dbuf_dirty(db, tx);
1558	dmu_buf_fill_done(&db->db, tx);
1559}
1560
1561/*
1562 * "Clear" the contents of this dbuf.  This will mark the dbuf
1563 * EVICTING and clear *most* of its references.  Unfortunately,
1564 * when we are not holding the dn_dbufs_mtx, we can't clear the
1565 * entry in the dn_dbufs list.  We have to wait until dbuf_destroy()
1566 * in this case.  For callers from the DMU we will usually see:
1567 *	dbuf_clear()->arc_clear_callback()->dbuf_do_evict()->dbuf_destroy()
1568 * For the arc callback, we will usually see:
1569 *	dbuf_do_evict()->dbuf_clear();dbuf_destroy()
1570 * Sometimes, though, we will get a mix of these two:
1571 *	DMU: dbuf_clear()->arc_clear_callback()
1572 *	ARC: dbuf_do_evict()->dbuf_destroy()
1573 *
1574 * This routine will dissociate the dbuf from the arc, by calling
1575 * arc_clear_callback(), but will not evict the data from the ARC.
1576 */
1577void
1578dbuf_clear(dmu_buf_impl_t *db)
1579{
1580	dnode_t *dn;
1581	dmu_buf_impl_t *parent = db->db_parent;
1582	dmu_buf_impl_t *dndb;
1583	boolean_t dbuf_gone = B_FALSE;
1584
1585	ASSERT(MUTEX_HELD(&db->db_mtx));
1586	ASSERT(refcount_is_zero(&db->db_holds));
1587
1588	dbuf_evict_user(db);
1589
1590	if (db->db_state == DB_CACHED) {
1591		ASSERT(db->db.db_data != NULL);
1592		if (db->db_blkid == DMU_BONUS_BLKID) {
1593			zio_buf_free(db->db.db_data, DN_MAX_BONUSLEN);
1594			arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
1595		}
1596		db->db.db_data = NULL;
1597		db->db_state = DB_UNCACHED;
1598	}
1599
1600	ASSERT(db->db_state == DB_UNCACHED || db->db_state == DB_NOFILL);
1601	ASSERT(db->db_data_pending == NULL);
1602
1603	db->db_state = DB_EVICTING;
1604	db->db_blkptr = NULL;
1605
1606	DB_DNODE_ENTER(db);
1607	dn = DB_DNODE(db);
1608	dndb = dn->dn_dbuf;
1609	if (db->db_blkid != DMU_BONUS_BLKID && MUTEX_HELD(&dn->dn_dbufs_mtx)) {
1610		avl_remove(&dn->dn_dbufs, db);
1611		atomic_dec_32(&dn->dn_dbufs_count);
1612		membar_producer();
1613		DB_DNODE_EXIT(db);
1614		/*
1615		 * Decrementing the dbuf count means that the hold corresponding
1616		 * to the removed dbuf is no longer discounted in dnode_move(),
1617		 * so the dnode cannot be moved until after we release the hold.
1618		 * The membar_producer() ensures visibility of the decremented
1619		 * value in dnode_move(), since DB_DNODE_EXIT doesn't actually
1620		 * release any lock.
1621		 */
1622		dnode_rele(dn, db);
1623		db->db_dnode_handle = NULL;
1624	} else {
1625		DB_DNODE_EXIT(db);
1626	}
1627
1628	if (db->db_buf)
1629		dbuf_gone = arc_clear_callback(db->db_buf);
1630
1631	if (!dbuf_gone)
1632		mutex_exit(&db->db_mtx);
1633
1634	/*
1635	 * If this dbuf is referenced from an indirect dbuf,
1636	 * decrement the ref count on the indirect dbuf.
1637	 */
1638	if (parent && parent != dndb)
1639		dbuf_rele(parent, db);
1640}
1641
1642static int
1643dbuf_findbp(dnode_t *dn, int level, uint64_t blkid, int fail_sparse,
1644    dmu_buf_impl_t **parentp, blkptr_t **bpp)
1645{
1646	int nlevels, epbs;
1647
1648	*parentp = NULL;
1649	*bpp = NULL;
1650
1651	ASSERT(blkid != DMU_BONUS_BLKID);
1652
1653	if (blkid == DMU_SPILL_BLKID) {
1654		mutex_enter(&dn->dn_mtx);
1655		if (dn->dn_have_spill &&
1656		    (dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR))
1657			*bpp = &dn->dn_phys->dn_spill;
1658		else
1659			*bpp = NULL;
1660		dbuf_add_ref(dn->dn_dbuf, NULL);
1661		*parentp = dn->dn_dbuf;
1662		mutex_exit(&dn->dn_mtx);
1663		return (0);
1664	}
1665
1666	if (dn->dn_phys->dn_nlevels == 0)
1667		nlevels = 1;
1668	else
1669		nlevels = dn->dn_phys->dn_nlevels;
1670
1671	epbs = dn->dn_indblkshift - SPA_BLKPTRSHIFT;
1672
1673	ASSERT3U(level * epbs, <, 64);
1674	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1675	if (level >= nlevels ||
1676	    (blkid > (dn->dn_phys->dn_maxblkid >> (level * epbs)))) {
1677		/* the buffer has no parent yet */
1678		return (SET_ERROR(ENOENT));
1679	} else if (level < nlevels-1) {
1680		/* this block is referenced from an indirect block */
1681		int err = dbuf_hold_impl(dn, level+1,
1682		    blkid >> epbs, fail_sparse, NULL, parentp);
1683		if (err)
1684			return (err);
1685		err = dbuf_read(*parentp, NULL,
1686		    (DB_RF_HAVESTRUCT | DB_RF_NOPREFETCH | DB_RF_CANFAIL));
1687		if (err) {
1688			dbuf_rele(*parentp, NULL);
1689			*parentp = NULL;
1690			return (err);
1691		}
1692		*bpp = ((blkptr_t *)(*parentp)->db.db_data) +
1693		    (blkid & ((1ULL << epbs) - 1));
1694		return (0);
1695	} else {
1696		/* the block is referenced from the dnode */
1697		ASSERT3U(level, ==, nlevels-1);
1698		ASSERT(dn->dn_phys->dn_nblkptr == 0 ||
1699		    blkid < dn->dn_phys->dn_nblkptr);
1700		if (dn->dn_dbuf) {
1701			dbuf_add_ref(dn->dn_dbuf, NULL);
1702			*parentp = dn->dn_dbuf;
1703		}
1704		*bpp = &dn->dn_phys->dn_blkptr[blkid];
1705		return (0);
1706	}
1707}
1708
1709static dmu_buf_impl_t *
1710dbuf_create(dnode_t *dn, uint8_t level, uint64_t blkid,
1711    dmu_buf_impl_t *parent, blkptr_t *blkptr)
1712{
1713	objset_t *os = dn->dn_objset;
1714	dmu_buf_impl_t *db, *odb;
1715
1716	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1717	ASSERT(dn->dn_type != DMU_OT_NONE);
1718
1719	db = kmem_cache_alloc(dbuf_cache, KM_SLEEP);
1720
1721	db->db_objset = os;
1722	db->db.db_object = dn->dn_object;
1723	db->db_level = level;
1724	db->db_blkid = blkid;
1725	db->db_last_dirty = NULL;
1726	db->db_dirtycnt = 0;
1727	db->db_dnode_handle = dn->dn_handle;
1728	db->db_parent = parent;
1729	db->db_blkptr = blkptr;
1730
1731	db->db_user_ptr = NULL;
1732	db->db_evict_func = NULL;
1733	db->db_immediate_evict = 0;
1734	db->db_freed_in_flight = 0;
1735
1736	if (blkid == DMU_BONUS_BLKID) {
1737		ASSERT3P(parent, ==, dn->dn_dbuf);
1738		db->db.db_size = DN_MAX_BONUSLEN -
1739		    (dn->dn_nblkptr-1) * sizeof (blkptr_t);
1740		ASSERT3U(db->db.db_size, >=, dn->dn_bonuslen);
1741		db->db.db_offset = DMU_BONUS_BLKID;
1742		db->db_state = DB_UNCACHED;
1743		/* the bonus dbuf is not placed in the hash table */
1744		arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
1745		return (db);
1746	} else if (blkid == DMU_SPILL_BLKID) {
1747		db->db.db_size = (blkptr != NULL) ?
1748		    BP_GET_LSIZE(blkptr) : SPA_MINBLOCKSIZE;
1749		db->db.db_offset = 0;
1750	} else {
1751		int blocksize =
1752		    db->db_level ? 1 << dn->dn_indblkshift : dn->dn_datablksz;
1753		db->db.db_size = blocksize;
1754		db->db.db_offset = db->db_blkid * blocksize;
1755	}
1756
1757	/*
1758	 * Hold the dn_dbufs_mtx while we get the new dbuf
1759	 * in the hash table *and* added to the dbufs list.
1760	 * This prevents a possible deadlock with someone
1761	 * trying to look up this dbuf before its added to the
1762	 * dn_dbufs list.
1763	 */
1764	mutex_enter(&dn->dn_dbufs_mtx);
1765	db->db_state = DB_EVICTING;
1766	if ((odb = dbuf_hash_insert(db)) != NULL) {
1767		/* someone else inserted it first */
1768		kmem_cache_free(dbuf_cache, db);
1769		mutex_exit(&dn->dn_dbufs_mtx);
1770		return (odb);
1771	}
1772	avl_add(&dn->dn_dbufs, db);
1773	if (db->db_level == 0 && db->db_blkid >=
1774	    dn->dn_unlisted_l0_blkid)
1775		dn->dn_unlisted_l0_blkid = db->db_blkid + 1;
1776	db->db_state = DB_UNCACHED;
1777	mutex_exit(&dn->dn_dbufs_mtx);
1778	arc_space_consume(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
1779
1780	if (parent && parent != dn->dn_dbuf)
1781		dbuf_add_ref(parent, db);
1782
1783	ASSERT(dn->dn_object == DMU_META_DNODE_OBJECT ||
1784	    refcount_count(&dn->dn_holds) > 0);
1785	(void) refcount_add(&dn->dn_holds, db);
1786	atomic_inc_32(&dn->dn_dbufs_count);
1787
1788	dprintf_dbuf(db, "db=%p\n", db);
1789
1790	return (db);
1791}
1792
1793static int
1794dbuf_do_evict(void *private)
1795{
1796	dmu_buf_impl_t *db = private;
1797
1798	if (!MUTEX_HELD(&db->db_mtx))
1799		mutex_enter(&db->db_mtx);
1800
1801	ASSERT(refcount_is_zero(&db->db_holds));
1802
1803	if (db->db_state != DB_EVICTING) {
1804		ASSERT(db->db_state == DB_CACHED);
1805		DBUF_VERIFY(db);
1806		db->db_buf = NULL;
1807		dbuf_evict(db);
1808	} else {
1809		mutex_exit(&db->db_mtx);
1810		dbuf_destroy(db);
1811	}
1812	return (0);
1813}
1814
1815static void
1816dbuf_destroy(dmu_buf_impl_t *db)
1817{
1818	ASSERT(refcount_is_zero(&db->db_holds));
1819
1820	if (db->db_blkid != DMU_BONUS_BLKID) {
1821		/*
1822		 * If this dbuf is still on the dn_dbufs list,
1823		 * remove it from that list.
1824		 */
1825		if (db->db_dnode_handle != NULL) {
1826			dnode_t *dn;
1827
1828			DB_DNODE_ENTER(db);
1829			dn = DB_DNODE(db);
1830			mutex_enter(&dn->dn_dbufs_mtx);
1831			avl_remove(&dn->dn_dbufs, db);
1832			atomic_dec_32(&dn->dn_dbufs_count);
1833			mutex_exit(&dn->dn_dbufs_mtx);
1834			DB_DNODE_EXIT(db);
1835			/*
1836			 * Decrementing the dbuf count means that the hold
1837			 * corresponding to the removed dbuf is no longer
1838			 * discounted in dnode_move(), so the dnode cannot be
1839			 * moved until after we release the hold.
1840			 */
1841			dnode_rele(dn, db);
1842			db->db_dnode_handle = NULL;
1843		}
1844		dbuf_hash_remove(db);
1845	}
1846	db->db_parent = NULL;
1847	db->db_buf = NULL;
1848
1849	ASSERT(db->db.db_data == NULL);
1850	ASSERT(db->db_hash_next == NULL);
1851	ASSERT(db->db_blkptr == NULL);
1852	ASSERT(db->db_data_pending == NULL);
1853
1854	kmem_cache_free(dbuf_cache, db);
1855	arc_space_return(sizeof (dmu_buf_impl_t), ARC_SPACE_OTHER);
1856}
1857
1858void
1859dbuf_prefetch(dnode_t *dn, uint64_t blkid, zio_priority_t prio)
1860{
1861	dmu_buf_impl_t *db = NULL;
1862	blkptr_t *bp = NULL;
1863
1864	ASSERT(blkid != DMU_BONUS_BLKID);
1865	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1866
1867	if (dnode_block_freed(dn, blkid))
1868		return;
1869
1870	/* dbuf_find() returns with db_mtx held */
1871	if (db = dbuf_find(dn->dn_objset, dn->dn_object, 0, blkid)) {
1872		/*
1873		 * This dbuf is already in the cache.  We assume that
1874		 * it is already CACHED, or else about to be either
1875		 * read or filled.
1876		 */
1877		mutex_exit(&db->db_mtx);
1878		return;
1879	}
1880
1881	if (dbuf_findbp(dn, 0, blkid, TRUE, &db, &bp) == 0) {
1882		if (bp && !BP_IS_HOLE(bp) && !BP_IS_EMBEDDED(bp)) {
1883			dsl_dataset_t *ds = dn->dn_objset->os_dsl_dataset;
1884			arc_flags_t aflags =
1885			    ARC_FLAG_NOWAIT | ARC_FLAG_PREFETCH;
1886			zbookmark_phys_t zb;
1887
1888			SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
1889			    dn->dn_object, 0, blkid);
1890
1891			(void) arc_read(NULL, dn->dn_objset->os_spa,
1892			    bp, NULL, NULL, prio,
1893			    ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
1894			    &aflags, &zb);
1895		}
1896		if (db)
1897			dbuf_rele(db, NULL);
1898	}
1899}
1900
1901/*
1902 * Returns with db_holds incremented, and db_mtx not held.
1903 * Note: dn_struct_rwlock must be held.
1904 */
1905int
1906dbuf_hold_impl(dnode_t *dn, uint8_t level, uint64_t blkid, int fail_sparse,
1907    void *tag, dmu_buf_impl_t **dbp)
1908{
1909	dmu_buf_impl_t *db, *parent = NULL;
1910
1911	ASSERT(blkid != DMU_BONUS_BLKID);
1912	ASSERT(RW_LOCK_HELD(&dn->dn_struct_rwlock));
1913	ASSERT3U(dn->dn_nlevels, >, level);
1914
1915	*dbp = NULL;
1916top:
1917	/* dbuf_find() returns with db_mtx held */
1918	db = dbuf_find(dn->dn_objset, dn->dn_object, level, blkid);
1919
1920	if (db == NULL) {
1921		blkptr_t *bp = NULL;
1922		int err;
1923
1924		ASSERT3P(parent, ==, NULL);
1925		err = dbuf_findbp(dn, level, blkid, fail_sparse, &parent, &bp);
1926		if (fail_sparse) {
1927			if (err == 0 && bp && BP_IS_HOLE(bp))
1928				err = SET_ERROR(ENOENT);
1929			if (err) {
1930				if (parent)
1931					dbuf_rele(parent, NULL);
1932				return (err);
1933			}
1934		}
1935		if (err && err != ENOENT)
1936			return (err);
1937		db = dbuf_create(dn, level, blkid, parent, bp);
1938	}
1939
1940	if (db->db_buf && refcount_is_zero(&db->db_holds)) {
1941		arc_buf_add_ref(db->db_buf, db);
1942		if (db->db_buf->b_data == NULL) {
1943			dbuf_clear(db);
1944			if (parent) {
1945				dbuf_rele(parent, NULL);
1946				parent = NULL;
1947			}
1948			goto top;
1949		}
1950		ASSERT3P(db->db.db_data, ==, db->db_buf->b_data);
1951	}
1952
1953	ASSERT(db->db_buf == NULL || arc_referenced(db->db_buf));
1954
1955	/*
1956	 * If this buffer is currently syncing out, and we are are
1957	 * still referencing it from db_data, we need to make a copy
1958	 * of it in case we decide we want to dirty it again in this txg.
1959	 */
1960	if (db->db_level == 0 && db->db_blkid != DMU_BONUS_BLKID &&
1961	    dn->dn_object != DMU_META_DNODE_OBJECT &&
1962	    db->db_state == DB_CACHED && db->db_data_pending) {
1963		dbuf_dirty_record_t *dr = db->db_data_pending;
1964
1965		if (dr->dt.dl.dr_data == db->db_buf) {
1966			arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
1967
1968			dbuf_set_data(db,
1969			    arc_buf_alloc(dn->dn_objset->os_spa,
1970			    db->db.db_size, db, type));
1971			bcopy(dr->dt.dl.dr_data->b_data, db->db.db_data,
1972			    db->db.db_size);
1973		}
1974	}
1975
1976	(void) refcount_add(&db->db_holds, tag);
1977	DBUF_VERIFY(db);
1978	mutex_exit(&db->db_mtx);
1979
1980	/* NOTE: we can't rele the parent until after we drop the db_mtx */
1981	if (parent)
1982		dbuf_rele(parent, NULL);
1983
1984	ASSERT3P(DB_DNODE(db), ==, dn);
1985	ASSERT3U(db->db_blkid, ==, blkid);
1986	ASSERT3U(db->db_level, ==, level);
1987	*dbp = db;
1988
1989	return (0);
1990}
1991
1992dmu_buf_impl_t *
1993dbuf_hold(dnode_t *dn, uint64_t blkid, void *tag)
1994{
1995	dmu_buf_impl_t *db;
1996	int err = dbuf_hold_impl(dn, 0, blkid, FALSE, tag, &db);
1997	return (err ? NULL : db);
1998}
1999
2000dmu_buf_impl_t *
2001dbuf_hold_level(dnode_t *dn, int level, uint64_t blkid, void *tag)
2002{
2003	dmu_buf_impl_t *db;
2004	int err = dbuf_hold_impl(dn, level, blkid, FALSE, tag, &db);
2005	return (err ? NULL : db);
2006}
2007
2008void
2009dbuf_create_bonus(dnode_t *dn)
2010{
2011	ASSERT(RW_WRITE_HELD(&dn->dn_struct_rwlock));
2012
2013	ASSERT(dn->dn_bonus == NULL);
2014	dn->dn_bonus = dbuf_create(dn, 0, DMU_BONUS_BLKID, dn->dn_dbuf, NULL);
2015}
2016
2017int
2018dbuf_spill_set_blksz(dmu_buf_t *db_fake, uint64_t blksz, dmu_tx_t *tx)
2019{
2020	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2021	dnode_t *dn;
2022
2023	if (db->db_blkid != DMU_SPILL_BLKID)
2024		return (SET_ERROR(ENOTSUP));
2025	if (blksz == 0)
2026		blksz = SPA_MINBLOCKSIZE;
2027	ASSERT3U(blksz, <=, spa_maxblocksize(dmu_objset_spa(db->db_objset)));
2028	blksz = P2ROUNDUP(blksz, SPA_MINBLOCKSIZE);
2029
2030	DB_DNODE_ENTER(db);
2031	dn = DB_DNODE(db);
2032	rw_enter(&dn->dn_struct_rwlock, RW_WRITER);
2033	dbuf_new_size(db, blksz, tx);
2034	rw_exit(&dn->dn_struct_rwlock);
2035	DB_DNODE_EXIT(db);
2036
2037	return (0);
2038}
2039
2040void
2041dbuf_rm_spill(dnode_t *dn, dmu_tx_t *tx)
2042{
2043	dbuf_free_range(dn, DMU_SPILL_BLKID, DMU_SPILL_BLKID, tx);
2044}
2045
2046#pragma weak dmu_buf_add_ref = dbuf_add_ref
2047void
2048dbuf_add_ref(dmu_buf_impl_t *db, void *tag)
2049{
2050	int64_t holds = refcount_add(&db->db_holds, tag);
2051	ASSERT(holds > 1);
2052}
2053
2054#pragma weak dmu_buf_try_add_ref = dbuf_try_add_ref
2055boolean_t
2056dbuf_try_add_ref(dmu_buf_t *db_fake, objset_t *os, uint64_t obj, uint64_t blkid,
2057    void *tag)
2058{
2059	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2060	dmu_buf_impl_t *found_db;
2061	boolean_t result = B_FALSE;
2062
2063	if (db->db_blkid == DMU_BONUS_BLKID)
2064		found_db = dbuf_find_bonus(os, obj);
2065	else
2066		found_db = dbuf_find(os, obj, 0, blkid);
2067
2068	if (found_db != NULL) {
2069		if (db == found_db && dbuf_refcount(db) > db->db_dirtycnt) {
2070			(void) refcount_add(&db->db_holds, tag);
2071			result = B_TRUE;
2072		}
2073		mutex_exit(&db->db_mtx);
2074	}
2075	return (result);
2076}
2077
2078/*
2079 * If you call dbuf_rele() you had better not be referencing the dnode handle
2080 * unless you have some other direct or indirect hold on the dnode. (An indirect
2081 * hold is a hold on one of the dnode's dbufs, including the bonus buffer.)
2082 * Without that, the dbuf_rele() could lead to a dnode_rele() followed by the
2083 * dnode's parent dbuf evicting its dnode handles.
2084 */
2085void
2086dbuf_rele(dmu_buf_impl_t *db, void *tag)
2087{
2088	mutex_enter(&db->db_mtx);
2089	dbuf_rele_and_unlock(db, tag);
2090}
2091
2092void
2093dmu_buf_rele(dmu_buf_t *db, void *tag)
2094{
2095	dbuf_rele((dmu_buf_impl_t *)db, tag);
2096}
2097
2098/*
2099 * dbuf_rele() for an already-locked dbuf.  This is necessary to allow
2100 * db_dirtycnt and db_holds to be updated atomically.
2101 */
2102void
2103dbuf_rele_and_unlock(dmu_buf_impl_t *db, void *tag)
2104{
2105	int64_t holds;
2106
2107	ASSERT(MUTEX_HELD(&db->db_mtx));
2108	DBUF_VERIFY(db);
2109
2110	/*
2111	 * Remove the reference to the dbuf before removing its hold on the
2112	 * dnode so we can guarantee in dnode_move() that a referenced bonus
2113	 * buffer has a corresponding dnode hold.
2114	 */
2115	holds = refcount_remove(&db->db_holds, tag);
2116	ASSERT(holds >= 0);
2117
2118	/*
2119	 * We can't freeze indirects if there is a possibility that they
2120	 * may be modified in the current syncing context.
2121	 */
2122	if (db->db_buf && holds == (db->db_level == 0 ? db->db_dirtycnt : 0))
2123		arc_buf_freeze(db->db_buf);
2124
2125	if (holds == db->db_dirtycnt &&
2126	    db->db_level == 0 && db->db_immediate_evict)
2127		dbuf_evict_user(db);
2128
2129	if (holds == 0) {
2130		if (db->db_blkid == DMU_BONUS_BLKID) {
2131			dnode_t *dn;
2132
2133			/*
2134			 * If the dnode moves here, we cannot cross this
2135			 * barrier until the move completes.
2136			 */
2137			DB_DNODE_ENTER(db);
2138
2139			dn = DB_DNODE(db);
2140			atomic_dec_32(&dn->dn_dbufs_count);
2141
2142			/*
2143			 * Decrementing the dbuf count means that the bonus
2144			 * buffer's dnode hold is no longer discounted in
2145			 * dnode_move(). The dnode cannot move until after
2146			 * the dnode_rele_and_unlock() below.
2147			 */
2148			DB_DNODE_EXIT(db);
2149
2150			/*
2151			 * Do not reference db after its lock is dropped.
2152			 * Another thread may evict it.
2153			 */
2154			mutex_exit(&db->db_mtx);
2155
2156			/*
2157			 * If the dnode has been freed, evict the bonus
2158			 * buffer immediately.	The data in the bonus
2159			 * buffer is no longer relevant and this prevents
2160			 * a stale bonus buffer from being associated
2161			 * with this dnode_t should the dnode_t be reused
2162			 * prior to being destroyed.
2163			 */
2164			mutex_enter(&dn->dn_mtx);
2165			if (dn->dn_type == DMU_OT_NONE ||
2166			    dn->dn_free_txg != 0) {
2167				/*
2168				 * Drop dn_mtx.  It is a leaf lock and
2169				 * cannot be held when dnode_evict_bonus()
2170				 * acquires other locks in order to
2171				 * perform the eviction.
2172				 *
2173				 * Freed dnodes cannot be reused until the
2174				 * last hold is released.  Since this bonus
2175				 * buffer has a hold, the dnode will remain
2176				 * in the free state, even without dn_mtx
2177				 * held, until the dnode_rele_and_unlock()
2178				 * below.
2179				 */
2180				mutex_exit(&dn->dn_mtx);
2181				dnode_evict_bonus(dn);
2182				mutex_enter(&dn->dn_mtx);
2183			}
2184			dnode_rele_and_unlock(dn, db);
2185		} else if (db->db_buf == NULL) {
2186			/*
2187			 * This is a special case: we never associated this
2188			 * dbuf with any data allocated from the ARC.
2189			 */
2190			ASSERT(db->db_state == DB_UNCACHED ||
2191			    db->db_state == DB_NOFILL);
2192			dbuf_evict(db);
2193		} else if (arc_released(db->db_buf)) {
2194			arc_buf_t *buf = db->db_buf;
2195			/*
2196			 * This dbuf has anonymous data associated with it.
2197			 */
2198			dbuf_set_data(db, NULL);
2199			VERIFY(arc_buf_remove_ref(buf, db));
2200			dbuf_evict(db);
2201		} else {
2202			VERIFY(!arc_buf_remove_ref(db->db_buf, db));
2203
2204			/*
2205			 * A dbuf will be eligible for eviction if either the
2206			 * 'primarycache' property is set or a duplicate
2207			 * copy of this buffer is already cached in the arc.
2208			 *
2209			 * In the case of the 'primarycache' a buffer
2210			 * is considered for eviction if it matches the
2211			 * criteria set in the property.
2212			 *
2213			 * To decide if our buffer is considered a
2214			 * duplicate, we must call into the arc to determine
2215			 * if multiple buffers are referencing the same
2216			 * block on-disk. If so, then we simply evict
2217			 * ourselves.
2218			 */
2219			if (!DBUF_IS_CACHEABLE(db)) {
2220				if (db->db_blkptr != NULL &&
2221				    !BP_IS_HOLE(db->db_blkptr) &&
2222				    !BP_IS_EMBEDDED(db->db_blkptr)) {
2223					spa_t *spa =
2224					    dmu_objset_spa(db->db_objset);
2225					blkptr_t bp = *db->db_blkptr;
2226					dbuf_clear(db);
2227					arc_freed(spa, &bp);
2228				} else {
2229					dbuf_clear(db);
2230				}
2231			} else if (arc_buf_eviction_needed(db->db_buf)) {
2232				dbuf_clear(db);
2233			} else {
2234				mutex_exit(&db->db_mtx);
2235			}
2236		}
2237	} else {
2238		mutex_exit(&db->db_mtx);
2239	}
2240}
2241
2242#pragma weak dmu_buf_refcount = dbuf_refcount
2243uint64_t
2244dbuf_refcount(dmu_buf_impl_t *db)
2245{
2246	return (refcount_count(&db->db_holds));
2247}
2248
2249void *
2250dmu_buf_set_user(dmu_buf_t *db_fake, void *user_ptr,
2251    dmu_buf_evict_func_t *evict_func)
2252{
2253	return (dmu_buf_update_user(db_fake, NULL, user_ptr, evict_func));
2254}
2255
2256void *
2257dmu_buf_set_user_ie(dmu_buf_t *db_fake, void *user_ptr,
2258    dmu_buf_evict_func_t *evict_func)
2259{
2260	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2261
2262	db->db_immediate_evict = TRUE;
2263	return (dmu_buf_update_user(db_fake, NULL, user_ptr, evict_func));
2264}
2265
2266void *
2267dmu_buf_update_user(dmu_buf_t *db_fake, void *old_user_ptr, void *user_ptr,
2268    dmu_buf_evict_func_t *evict_func)
2269{
2270	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2271	ASSERT(db->db_level == 0);
2272
2273	ASSERT((user_ptr == NULL) == (evict_func == NULL));
2274
2275	mutex_enter(&db->db_mtx);
2276
2277	if (db->db_user_ptr == old_user_ptr) {
2278		db->db_user_ptr = user_ptr;
2279		db->db_evict_func = evict_func;
2280	} else {
2281		old_user_ptr = db->db_user_ptr;
2282	}
2283
2284	mutex_exit(&db->db_mtx);
2285	return (old_user_ptr);
2286}
2287
2288void *
2289dmu_buf_get_user(dmu_buf_t *db_fake)
2290{
2291	dmu_buf_impl_t *db = (dmu_buf_impl_t *)db_fake;
2292	ASSERT(!refcount_is_zero(&db->db_holds));
2293
2294	return (db->db_user_ptr);
2295}
2296
2297boolean_t
2298dmu_buf_freeable(dmu_buf_t *dbuf)
2299{
2300	boolean_t res = B_FALSE;
2301	dmu_buf_impl_t *db = (dmu_buf_impl_t *)dbuf;
2302
2303	if (db->db_blkptr)
2304		res = dsl_dataset_block_freeable(db->db_objset->os_dsl_dataset,
2305		    db->db_blkptr, db->db_blkptr->blk_birth);
2306
2307	return (res);
2308}
2309
2310blkptr_t *
2311dmu_buf_get_blkptr(dmu_buf_t *db)
2312{
2313	dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
2314	return (dbi->db_blkptr);
2315}
2316
2317static void
2318dbuf_check_blkptr(dnode_t *dn, dmu_buf_impl_t *db)
2319{
2320	/* ASSERT(dmu_tx_is_syncing(tx) */
2321	ASSERT(MUTEX_HELD(&db->db_mtx));
2322
2323	if (db->db_blkptr != NULL)
2324		return;
2325
2326	if (db->db_blkid == DMU_SPILL_BLKID) {
2327		db->db_blkptr = &dn->dn_phys->dn_spill;
2328		BP_ZERO(db->db_blkptr);
2329		return;
2330	}
2331	if (db->db_level == dn->dn_phys->dn_nlevels-1) {
2332		/*
2333		 * This buffer was allocated at a time when there was
2334		 * no available blkptrs from the dnode, or it was
2335		 * inappropriate to hook it in (i.e., nlevels mis-match).
2336		 */
2337		ASSERT(db->db_blkid < dn->dn_phys->dn_nblkptr);
2338		ASSERT(db->db_parent == NULL);
2339		db->db_parent = dn->dn_dbuf;
2340		db->db_blkptr = &dn->dn_phys->dn_blkptr[db->db_blkid];
2341		DBUF_VERIFY(db);
2342	} else {
2343		dmu_buf_impl_t *parent = db->db_parent;
2344		int epbs = dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
2345
2346		ASSERT(dn->dn_phys->dn_nlevels > 1);
2347		if (parent == NULL) {
2348			mutex_exit(&db->db_mtx);
2349			rw_enter(&dn->dn_struct_rwlock, RW_READER);
2350			(void) dbuf_hold_impl(dn, db->db_level+1,
2351			    db->db_blkid >> epbs, FALSE, db, &parent);
2352			rw_exit(&dn->dn_struct_rwlock);
2353			mutex_enter(&db->db_mtx);
2354			db->db_parent = parent;
2355		}
2356		db->db_blkptr = (blkptr_t *)parent->db.db_data +
2357		    (db->db_blkid & ((1ULL << epbs) - 1));
2358		DBUF_VERIFY(db);
2359	}
2360}
2361
2362static void
2363dbuf_sync_indirect(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
2364{
2365	dmu_buf_impl_t *db = dr->dr_dbuf;
2366	dnode_t *dn;
2367	zio_t *zio;
2368
2369	ASSERT(dmu_tx_is_syncing(tx));
2370
2371	dprintf_dbuf_bp(db, db->db_blkptr, "blkptr=%p", db->db_blkptr);
2372
2373	mutex_enter(&db->db_mtx);
2374
2375	ASSERT(db->db_level > 0);
2376	DBUF_VERIFY(db);
2377
2378	/* Read the block if it hasn't been read yet. */
2379	if (db->db_buf == NULL) {
2380		mutex_exit(&db->db_mtx);
2381		(void) dbuf_read(db, NULL, DB_RF_MUST_SUCCEED);
2382		mutex_enter(&db->db_mtx);
2383	}
2384	ASSERT3U(db->db_state, ==, DB_CACHED);
2385	ASSERT(db->db_buf != NULL);
2386
2387	DB_DNODE_ENTER(db);
2388	dn = DB_DNODE(db);
2389	/* Indirect block size must match what the dnode thinks it is. */
2390	ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
2391	dbuf_check_blkptr(dn, db);
2392	DB_DNODE_EXIT(db);
2393
2394	/* Provide the pending dirty record to child dbufs */
2395	db->db_data_pending = dr;
2396
2397	mutex_exit(&db->db_mtx);
2398	dbuf_write(dr, db->db_buf, tx);
2399
2400	zio = dr->dr_zio;
2401	mutex_enter(&dr->dt.di.dr_mtx);
2402	dbuf_sync_list(&dr->dt.di.dr_children, db->db_level - 1, tx);
2403	ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
2404	mutex_exit(&dr->dt.di.dr_mtx);
2405	zio_nowait(zio);
2406}
2407
2408static void
2409dbuf_sync_leaf(dbuf_dirty_record_t *dr, dmu_tx_t *tx)
2410{
2411	arc_buf_t **datap = &dr->dt.dl.dr_data;
2412	dmu_buf_impl_t *db = dr->dr_dbuf;
2413	dnode_t *dn;
2414	objset_t *os;
2415	uint64_t txg = tx->tx_txg;
2416
2417	ASSERT(dmu_tx_is_syncing(tx));
2418
2419	dprintf_dbuf_bp(db, db->db_blkptr, "blkptr=%p", db->db_blkptr);
2420
2421	mutex_enter(&db->db_mtx);
2422	/*
2423	 * To be synced, we must be dirtied.  But we
2424	 * might have been freed after the dirty.
2425	 */
2426	if (db->db_state == DB_UNCACHED) {
2427		/* This buffer has been freed since it was dirtied */
2428		ASSERT(db->db.db_data == NULL);
2429	} else if (db->db_state == DB_FILL) {
2430		/* This buffer was freed and is now being re-filled */
2431		ASSERT(db->db.db_data != dr->dt.dl.dr_data);
2432	} else {
2433		ASSERT(db->db_state == DB_CACHED || db->db_state == DB_NOFILL);
2434	}
2435	DBUF_VERIFY(db);
2436
2437	DB_DNODE_ENTER(db);
2438	dn = DB_DNODE(db);
2439
2440	if (db->db_blkid == DMU_SPILL_BLKID) {
2441		mutex_enter(&dn->dn_mtx);
2442		dn->dn_phys->dn_flags |= DNODE_FLAG_SPILL_BLKPTR;
2443		mutex_exit(&dn->dn_mtx);
2444	}
2445
2446	/*
2447	 * If this is a bonus buffer, simply copy the bonus data into the
2448	 * dnode.  It will be written out when the dnode is synced (and it
2449	 * will be synced, since it must have been dirty for dbuf_sync to
2450	 * be called).
2451	 */
2452	if (db->db_blkid == DMU_BONUS_BLKID) {
2453		dbuf_dirty_record_t **drp;
2454
2455		ASSERT(*datap != NULL);
2456		ASSERT0(db->db_level);
2457		ASSERT3U(dn->dn_phys->dn_bonuslen, <=, DN_MAX_BONUSLEN);
2458		bcopy(*datap, DN_BONUS(dn->dn_phys), dn->dn_phys->dn_bonuslen);
2459		DB_DNODE_EXIT(db);
2460
2461		if (*datap != db->db.db_data) {
2462			zio_buf_free(*datap, DN_MAX_BONUSLEN);
2463			arc_space_return(DN_MAX_BONUSLEN, ARC_SPACE_OTHER);
2464		}
2465		db->db_data_pending = NULL;
2466		drp = &db->db_last_dirty;
2467		while (*drp != dr)
2468			drp = &(*drp)->dr_next;
2469		ASSERT(dr->dr_next == NULL);
2470		ASSERT(dr->dr_dbuf == db);
2471		*drp = dr->dr_next;
2472		if (dr->dr_dbuf->db_level != 0) {
2473			list_destroy(&dr->dt.di.dr_children);
2474			mutex_destroy(&dr->dt.di.dr_mtx);
2475		}
2476		kmem_free(dr, sizeof (dbuf_dirty_record_t));
2477		ASSERT(db->db_dirtycnt > 0);
2478		db->db_dirtycnt -= 1;
2479		dbuf_rele_and_unlock(db, (void *)(uintptr_t)txg);
2480		return;
2481	}
2482
2483	os = dn->dn_objset;
2484
2485	/*
2486	 * This function may have dropped the db_mtx lock allowing a dmu_sync
2487	 * operation to sneak in. As a result, we need to ensure that we
2488	 * don't check the dr_override_state until we have returned from
2489	 * dbuf_check_blkptr.
2490	 */
2491	dbuf_check_blkptr(dn, db);
2492
2493	/*
2494	 * If this buffer is in the middle of an immediate write,
2495	 * wait for the synchronous IO to complete.
2496	 */
2497	while (dr->dt.dl.dr_override_state == DR_IN_DMU_SYNC) {
2498		ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
2499		cv_wait(&db->db_changed, &db->db_mtx);
2500		ASSERT(dr->dt.dl.dr_override_state != DR_NOT_OVERRIDDEN);
2501	}
2502
2503	if (db->db_state != DB_NOFILL &&
2504	    dn->dn_object != DMU_META_DNODE_OBJECT &&
2505	    refcount_count(&db->db_holds) > 1 &&
2506	    dr->dt.dl.dr_override_state != DR_OVERRIDDEN &&
2507	    *datap == db->db_buf) {
2508		/*
2509		 * If this buffer is currently "in use" (i.e., there
2510		 * are active holds and db_data still references it),
2511		 * then make a copy before we start the write so that
2512		 * any modifications from the open txg will not leak
2513		 * into this write.
2514		 *
2515		 * NOTE: this copy does not need to be made for
2516		 * objects only modified in the syncing context (e.g.
2517		 * DNONE_DNODE blocks).
2518		 */
2519		int blksz = arc_buf_size(*datap);
2520		arc_buf_contents_t type = DBUF_GET_BUFC_TYPE(db);
2521		*datap = arc_buf_alloc(os->os_spa, blksz, db, type);
2522		bcopy(db->db.db_data, (*datap)->b_data, blksz);
2523	}
2524	db->db_data_pending = dr;
2525
2526	mutex_exit(&db->db_mtx);
2527
2528	dbuf_write(dr, *datap, tx);
2529
2530	ASSERT(!list_link_active(&dr->dr_dirty_node));
2531	if (dn->dn_object == DMU_META_DNODE_OBJECT) {
2532		list_insert_tail(&dn->dn_dirty_records[txg&TXG_MASK], dr);
2533		DB_DNODE_EXIT(db);
2534	} else {
2535		/*
2536		 * Although zio_nowait() does not "wait for an IO", it does
2537		 * initiate the IO. If this is an empty write it seems plausible
2538		 * that the IO could actually be completed before the nowait
2539		 * returns. We need to DB_DNODE_EXIT() first in case
2540		 * zio_nowait() invalidates the dbuf.
2541		 */
2542		DB_DNODE_EXIT(db);
2543		zio_nowait(dr->dr_zio);
2544	}
2545}
2546
2547void
2548dbuf_sync_list(list_t *list, int level, dmu_tx_t *tx)
2549{
2550	dbuf_dirty_record_t *dr;
2551
2552	while (dr = list_head(list)) {
2553		if (dr->dr_zio != NULL) {
2554			/*
2555			 * If we find an already initialized zio then we
2556			 * are processing the meta-dnode, and we have finished.
2557			 * The dbufs for all dnodes are put back on the list
2558			 * during processing, so that we can zio_wait()
2559			 * these IOs after initiating all child IOs.
2560			 */
2561			ASSERT3U(dr->dr_dbuf->db.db_object, ==,
2562			    DMU_META_DNODE_OBJECT);
2563			break;
2564		}
2565		if (dr->dr_dbuf->db_blkid != DMU_BONUS_BLKID &&
2566		    dr->dr_dbuf->db_blkid != DMU_SPILL_BLKID) {
2567			VERIFY3U(dr->dr_dbuf->db_level, ==, level);
2568		}
2569		list_remove(list, dr);
2570		if (dr->dr_dbuf->db_level > 0)
2571			dbuf_sync_indirect(dr, tx);
2572		else
2573			dbuf_sync_leaf(dr, tx);
2574	}
2575}
2576
2577/* ARGSUSED */
2578static void
2579dbuf_write_ready(zio_t *zio, arc_buf_t *buf, void *vdb)
2580{
2581	dmu_buf_impl_t *db = vdb;
2582	dnode_t *dn;
2583	blkptr_t *bp = zio->io_bp;
2584	blkptr_t *bp_orig = &zio->io_bp_orig;
2585	spa_t *spa = zio->io_spa;
2586	int64_t delta;
2587	uint64_t fill = 0;
2588	int i;
2589
2590	ASSERT3P(db->db_blkptr, ==, bp);
2591
2592	DB_DNODE_ENTER(db);
2593	dn = DB_DNODE(db);
2594	delta = bp_get_dsize_sync(spa, bp) - bp_get_dsize_sync(spa, bp_orig);
2595	dnode_diduse_space(dn, delta - zio->io_prev_space_delta);
2596	zio->io_prev_space_delta = delta;
2597
2598	if (bp->blk_birth != 0) {
2599		ASSERT((db->db_blkid != DMU_SPILL_BLKID &&
2600		    BP_GET_TYPE(bp) == dn->dn_type) ||
2601		    (db->db_blkid == DMU_SPILL_BLKID &&
2602		    BP_GET_TYPE(bp) == dn->dn_bonustype) ||
2603		    BP_IS_EMBEDDED(bp));
2604		ASSERT(BP_GET_LEVEL(bp) == db->db_level);
2605	}
2606
2607	mutex_enter(&db->db_mtx);
2608
2609#ifdef ZFS_DEBUG
2610	if (db->db_blkid == DMU_SPILL_BLKID) {
2611		ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
2612		ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
2613		    db->db_blkptr == &dn->dn_phys->dn_spill);
2614	}
2615#endif
2616
2617	if (db->db_level == 0) {
2618		mutex_enter(&dn->dn_mtx);
2619		if (db->db_blkid > dn->dn_phys->dn_maxblkid &&
2620		    db->db_blkid != DMU_SPILL_BLKID)
2621			dn->dn_phys->dn_maxblkid = db->db_blkid;
2622		mutex_exit(&dn->dn_mtx);
2623
2624		if (dn->dn_type == DMU_OT_DNODE) {
2625			dnode_phys_t *dnp = db->db.db_data;
2626			for (i = db->db.db_size >> DNODE_SHIFT; i > 0;
2627			    i--, dnp++) {
2628				if (dnp->dn_type != DMU_OT_NONE)
2629					fill++;
2630			}
2631		} else {
2632			if (BP_IS_HOLE(bp)) {
2633				fill = 0;
2634			} else {
2635				fill = 1;
2636			}
2637		}
2638	} else {
2639		blkptr_t *ibp = db->db.db_data;
2640		ASSERT3U(db->db.db_size, ==, 1<<dn->dn_phys->dn_indblkshift);
2641		for (i = db->db.db_size >> SPA_BLKPTRSHIFT; i > 0; i--, ibp++) {
2642			if (BP_IS_HOLE(ibp))
2643				continue;
2644			fill += BP_GET_FILL(ibp);
2645		}
2646	}
2647	DB_DNODE_EXIT(db);
2648
2649	if (!BP_IS_EMBEDDED(bp))
2650		bp->blk_fill = fill;
2651
2652	mutex_exit(&db->db_mtx);
2653}
2654
2655/*
2656 * The SPA will call this callback several times for each zio - once
2657 * for every physical child i/o (zio->io_phys_children times).  This
2658 * allows the DMU to monitor the progress of each logical i/o.  For example,
2659 * there may be 2 copies of an indirect block, or many fragments of a RAID-Z
2660 * block.  There may be a long delay before all copies/fragments are completed,
2661 * so this callback allows us to retire dirty space gradually, as the physical
2662 * i/os complete.
2663 */
2664/* ARGSUSED */
2665static void
2666dbuf_write_physdone(zio_t *zio, arc_buf_t *buf, void *arg)
2667{
2668	dmu_buf_impl_t *db = arg;
2669	objset_t *os = db->db_objset;
2670	dsl_pool_t *dp = dmu_objset_pool(os);
2671	dbuf_dirty_record_t *dr;
2672	int delta = 0;
2673
2674	dr = db->db_data_pending;
2675	ASSERT3U(dr->dr_txg, ==, zio->io_txg);
2676
2677	/*
2678	 * The callback will be called io_phys_children times.  Retire one
2679	 * portion of our dirty space each time we are called.  Any rounding
2680	 * error will be cleaned up by dsl_pool_sync()'s call to
2681	 * dsl_pool_undirty_space().
2682	 */
2683	delta = dr->dr_accounted / zio->io_phys_children;
2684	dsl_pool_undirty_space(dp, delta, zio->io_txg);
2685}
2686
2687/* ARGSUSED */
2688static void
2689dbuf_write_done(zio_t *zio, arc_buf_t *buf, void *vdb)
2690{
2691	dmu_buf_impl_t *db = vdb;
2692	blkptr_t *bp_orig = &zio->io_bp_orig;
2693	blkptr_t *bp = db->db_blkptr;
2694	objset_t *os = db->db_objset;
2695	dmu_tx_t *tx = os->os_synctx;
2696	dbuf_dirty_record_t **drp, *dr;
2697
2698	ASSERT0(zio->io_error);
2699	ASSERT(db->db_blkptr == bp);
2700
2701	/*
2702	 * For nopwrites and rewrites we ensure that the bp matches our
2703	 * original and bypass all the accounting.
2704	 */
2705	if (zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)) {
2706		ASSERT(BP_EQUAL(bp, bp_orig));
2707	} else {
2708		dsl_dataset_t *ds = os->os_dsl_dataset;
2709		(void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
2710		dsl_dataset_block_born(ds, bp, tx);
2711	}
2712
2713	mutex_enter(&db->db_mtx);
2714
2715	DBUF_VERIFY(db);
2716
2717	drp = &db->db_last_dirty;
2718	while ((dr = *drp) != db->db_data_pending)
2719		drp = &dr->dr_next;
2720	ASSERT(!list_link_active(&dr->dr_dirty_node));
2721	ASSERT(dr->dr_dbuf == db);
2722	ASSERT(dr->dr_next == NULL);
2723	*drp = dr->dr_next;
2724
2725#ifdef ZFS_DEBUG
2726	if (db->db_blkid == DMU_SPILL_BLKID) {
2727		dnode_t *dn;
2728
2729		DB_DNODE_ENTER(db);
2730		dn = DB_DNODE(db);
2731		ASSERT(dn->dn_phys->dn_flags & DNODE_FLAG_SPILL_BLKPTR);
2732		ASSERT(!(BP_IS_HOLE(db->db_blkptr)) &&
2733		    db->db_blkptr == &dn->dn_phys->dn_spill);
2734		DB_DNODE_EXIT(db);
2735	}
2736#endif
2737
2738	if (db->db_level == 0) {
2739		ASSERT(db->db_blkid != DMU_BONUS_BLKID);
2740		ASSERT(dr->dt.dl.dr_override_state == DR_NOT_OVERRIDDEN);
2741		if (db->db_state != DB_NOFILL) {
2742			if (dr->dt.dl.dr_data != db->db_buf)
2743				VERIFY(arc_buf_remove_ref(dr->dt.dl.dr_data,
2744				    db));
2745			else if (!arc_released(db->db_buf))
2746				arc_set_callback(db->db_buf, dbuf_do_evict, db);
2747		}
2748	} else {
2749		dnode_t *dn;
2750
2751		DB_DNODE_ENTER(db);
2752		dn = DB_DNODE(db);
2753		ASSERT(list_head(&dr->dt.di.dr_children) == NULL);
2754		ASSERT3U(db->db.db_size, ==, 1 << dn->dn_phys->dn_indblkshift);
2755		if (!BP_IS_HOLE(db->db_blkptr)) {
2756			int epbs =
2757			    dn->dn_phys->dn_indblkshift - SPA_BLKPTRSHIFT;
2758			ASSERT3U(db->db_blkid, <=,
2759			    dn->dn_phys->dn_maxblkid >> (db->db_level * epbs));
2760			ASSERT3U(BP_GET_LSIZE(db->db_blkptr), ==,
2761			    db->db.db_size);
2762			if (!arc_released(db->db_buf))
2763				arc_set_callback(db->db_buf, dbuf_do_evict, db);
2764		}
2765		DB_DNODE_EXIT(db);
2766		mutex_destroy(&dr->dt.di.dr_mtx);
2767		list_destroy(&dr->dt.di.dr_children);
2768	}
2769	kmem_free(dr, sizeof (dbuf_dirty_record_t));
2770
2771	cv_broadcast(&db->db_changed);
2772	ASSERT(db->db_dirtycnt > 0);
2773	db->db_dirtycnt -= 1;
2774	db->db_data_pending = NULL;
2775	dbuf_rele_and_unlock(db, (void *)(uintptr_t)tx->tx_txg);
2776}
2777
2778static void
2779dbuf_write_nofill_ready(zio_t *zio)
2780{
2781	dbuf_write_ready(zio, NULL, zio->io_private);
2782}
2783
2784static void
2785dbuf_write_nofill_done(zio_t *zio)
2786{
2787	dbuf_write_done(zio, NULL, zio->io_private);
2788}
2789
2790static void
2791dbuf_write_override_ready(zio_t *zio)
2792{
2793	dbuf_dirty_record_t *dr = zio->io_private;
2794	dmu_buf_impl_t *db = dr->dr_dbuf;
2795
2796	dbuf_write_ready(zio, NULL, db);
2797}
2798
2799static void
2800dbuf_write_override_done(zio_t *zio)
2801{
2802	dbuf_dirty_record_t *dr = zio->io_private;
2803	dmu_buf_impl_t *db = dr->dr_dbuf;
2804	blkptr_t *obp = &dr->dt.dl.dr_overridden_by;
2805
2806	mutex_enter(&db->db_mtx);
2807	if (!BP_EQUAL(zio->io_bp, obp)) {
2808		if (!BP_IS_HOLE(obp))
2809			dsl_free(spa_get_dsl(zio->io_spa), zio->io_txg, obp);
2810		arc_release(dr->dt.dl.dr_data, db);
2811	}
2812	mutex_exit(&db->db_mtx);
2813
2814	dbuf_write_done(zio, NULL, db);
2815}
2816
2817/* Issue I/O to commit a dirty buffer to disk. */
2818static void
2819dbuf_write(dbuf_dirty_record_t *dr, arc_buf_t *data, dmu_tx_t *tx)
2820{
2821	dmu_buf_impl_t *db = dr->dr_dbuf;
2822	dnode_t *dn;
2823	objset_t *os;
2824	dmu_buf_impl_t *parent = db->db_parent;
2825	uint64_t txg = tx->tx_txg;
2826	zbookmark_phys_t zb;
2827	zio_prop_t zp;
2828	zio_t *zio;
2829	int wp_flag = 0;
2830
2831	DB_DNODE_ENTER(db);
2832	dn = DB_DNODE(db);
2833	os = dn->dn_objset;
2834
2835	if (db->db_state != DB_NOFILL) {
2836		if (db->db_level > 0 || dn->dn_type == DMU_OT_DNODE) {
2837			/*
2838			 * Private object buffers are released here rather
2839			 * than in dbuf_dirty() since they are only modified
2840			 * in the syncing context and we don't want the
2841			 * overhead of making multiple copies of the data.
2842			 */
2843			if (BP_IS_HOLE(db->db_blkptr)) {
2844				arc_buf_thaw(data);
2845			} else {
2846				dbuf_release_bp(db);
2847			}
2848		}
2849	}
2850
2851	if (parent != dn->dn_dbuf) {
2852		/* Our parent is an indirect block. */
2853		/* We have a dirty parent that has been scheduled for write. */
2854		ASSERT(parent && parent->db_data_pending);
2855		/* Our parent's buffer is one level closer to the dnode. */
2856		ASSERT(db->db_level == parent->db_level-1);
2857		/*
2858		 * We're about to modify our parent's db_data by modifying
2859		 * our block pointer, so the parent must be released.
2860		 */
2861		ASSERT(arc_released(parent->db_buf));
2862		zio = parent->db_data_pending->dr_zio;
2863	} else {
2864		/* Our parent is the dnode itself. */
2865		ASSERT((db->db_level == dn->dn_phys->dn_nlevels-1 &&
2866		    db->db_blkid != DMU_SPILL_BLKID) ||
2867		    (db->db_blkid == DMU_SPILL_BLKID && db->db_level == 0));
2868		if (db->db_blkid != DMU_SPILL_BLKID)
2869			ASSERT3P(db->db_blkptr, ==,
2870			    &dn->dn_phys->dn_blkptr[db->db_blkid]);
2871		zio = dn->dn_zio;
2872	}
2873
2874	ASSERT(db->db_level == 0 || data == db->db_buf);
2875	ASSERT3U(db->db_blkptr->blk_birth, <=, txg);
2876	ASSERT(zio);
2877
2878	SET_BOOKMARK(&zb, os->os_dsl_dataset ?
2879	    os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
2880	    db->db.db_object, db->db_level, db->db_blkid);
2881
2882	if (db->db_blkid == DMU_SPILL_BLKID)
2883		wp_flag = WP_SPILL;
2884	wp_flag |= (db->db_state == DB_NOFILL) ? WP_NOFILL : 0;
2885
2886	dmu_write_policy(os, dn, db->db_level, wp_flag, &zp);
2887	DB_DNODE_EXIT(db);
2888
2889	if (db->db_level == 0 &&
2890	    dr->dt.dl.dr_override_state == DR_OVERRIDDEN) {
2891		/*
2892		 * The BP for this block has been provided by open context
2893		 * (by dmu_sync() or dmu_buf_write_embedded()).
2894		 */
2895		void *contents = (data != NULL) ? data->b_data : NULL;
2896
2897		dr->dr_zio = zio_write(zio, os->os_spa, txg,
2898		    db->db_blkptr, contents, db->db.db_size, &zp,
2899		    dbuf_write_override_ready, NULL, dbuf_write_override_done,
2900		    dr, ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
2901		mutex_enter(&db->db_mtx);
2902		dr->dt.dl.dr_override_state = DR_NOT_OVERRIDDEN;
2903		zio_write_override(dr->dr_zio, &dr->dt.dl.dr_overridden_by,
2904		    dr->dt.dl.dr_copies, dr->dt.dl.dr_nopwrite);
2905		mutex_exit(&db->db_mtx);
2906	} else if (db->db_state == DB_NOFILL) {
2907		ASSERT(zp.zp_checksum == ZIO_CHECKSUM_OFF ||
2908		    zp.zp_checksum == ZIO_CHECKSUM_NOPARITY);
2909		dr->dr_zio = zio_write(zio, os->os_spa, txg,
2910		    db->db_blkptr, NULL, db->db.db_size, &zp,
2911		    dbuf_write_nofill_ready, NULL, dbuf_write_nofill_done, db,
2912		    ZIO_PRIORITY_ASYNC_WRITE,
2913		    ZIO_FLAG_MUSTSUCCEED | ZIO_FLAG_NODATA, &zb);
2914	} else {
2915		ASSERT(arc_released(data));
2916		dr->dr_zio = arc_write(zio, os->os_spa, txg,
2917		    db->db_blkptr, data, DBUF_IS_L2CACHEABLE(db),
2918		    DBUF_IS_L2COMPRESSIBLE(db), &zp, dbuf_write_ready,
2919		    dbuf_write_physdone, dbuf_write_done, db,
2920		    ZIO_PRIORITY_ASYNC_WRITE, ZIO_FLAG_MUSTSUCCEED, &zb);
2921	}
2922}
2923