dmu_objset.c revision 290756
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
24 * Copyright (c) 2013 by Saso Kiselkov. All rights reserved.
25 * Copyright (c) 2013, Joyent, Inc. All rights reserved.
26 * Copyright (c) 2014 Spectra Logic Corporation, All rights reserved.
27 * Copyright 2015 Nexenta Systems, Inc. All rights reserved.
28 * Copyright (c) 2015, STRATO AG, Inc. All rights reserved.
29 */
30
31/* Portions Copyright 2010 Robert Milkowski */
32
33#include <sys/cred.h>
34#include <sys/zfs_context.h>
35#include <sys/dmu_objset.h>
36#include <sys/dsl_dir.h>
37#include <sys/dsl_dataset.h>
38#include <sys/dsl_prop.h>
39#include <sys/dsl_pool.h>
40#include <sys/dsl_synctask.h>
41#include <sys/dsl_deleg.h>
42#include <sys/dnode.h>
43#include <sys/dbuf.h>
44#include <sys/zvol.h>
45#include <sys/dmu_tx.h>
46#include <sys/zap.h>
47#include <sys/zil.h>
48#include <sys/dmu_impl.h>
49#include <sys/zfs_ioctl.h>
50#include <sys/sa.h>
51#include <sys/zfs_onexit.h>
52#include <sys/dsl_destroy.h>
53#include <sys/vdev.h>
54
55/*
56 * Needed to close a window in dnode_move() that allows the objset to be freed
57 * before it can be safely accessed.
58 */
59krwlock_t os_lock;
60
61/*
62 * Tunable to overwrite the maximum number of threads for the parallization
63 * of dmu_objset_find_dp, needed to speed up the import of pools with many
64 * datasets.
65 * Default is 4 times the number of leaf vdevs.
66 */
67int dmu_find_threads = 0;
68
69static void dmu_objset_find_dp_cb(void *arg);
70
71void
72dmu_objset_init(void)
73{
74	rw_init(&os_lock, NULL, RW_DEFAULT, NULL);
75}
76
77void
78dmu_objset_fini(void)
79{
80	rw_destroy(&os_lock);
81}
82
83spa_t *
84dmu_objset_spa(objset_t *os)
85{
86	return (os->os_spa);
87}
88
89zilog_t *
90dmu_objset_zil(objset_t *os)
91{
92	return (os->os_zil);
93}
94
95dsl_pool_t *
96dmu_objset_pool(objset_t *os)
97{
98	dsl_dataset_t *ds;
99
100	if ((ds = os->os_dsl_dataset) != NULL && ds->ds_dir)
101		return (ds->ds_dir->dd_pool);
102	else
103		return (spa_get_dsl(os->os_spa));
104}
105
106dsl_dataset_t *
107dmu_objset_ds(objset_t *os)
108{
109	return (os->os_dsl_dataset);
110}
111
112dmu_objset_type_t
113dmu_objset_type(objset_t *os)
114{
115	return (os->os_phys->os_type);
116}
117
118void
119dmu_objset_name(objset_t *os, char *buf)
120{
121	dsl_dataset_name(os->os_dsl_dataset, buf);
122}
123
124uint64_t
125dmu_objset_id(objset_t *os)
126{
127	dsl_dataset_t *ds = os->os_dsl_dataset;
128
129	return (ds ? ds->ds_object : 0);
130}
131
132zfs_sync_type_t
133dmu_objset_syncprop(objset_t *os)
134{
135	return (os->os_sync);
136}
137
138zfs_logbias_op_t
139dmu_objset_logbias(objset_t *os)
140{
141	return (os->os_logbias);
142}
143
144static void
145checksum_changed_cb(void *arg, uint64_t newval)
146{
147	objset_t *os = arg;
148
149	/*
150	 * Inheritance should have been done by now.
151	 */
152	ASSERT(newval != ZIO_CHECKSUM_INHERIT);
153
154	os->os_checksum = zio_checksum_select(newval, ZIO_CHECKSUM_ON_VALUE);
155}
156
157static void
158compression_changed_cb(void *arg, uint64_t newval)
159{
160	objset_t *os = arg;
161
162	/*
163	 * Inheritance and range checking should have been done by now.
164	 */
165	ASSERT(newval != ZIO_COMPRESS_INHERIT);
166
167	os->os_compress = zio_compress_select(os->os_spa, newval,
168	    ZIO_COMPRESS_ON);
169}
170
171static void
172copies_changed_cb(void *arg, uint64_t newval)
173{
174	objset_t *os = arg;
175
176	/*
177	 * Inheritance and range checking should have been done by now.
178	 */
179	ASSERT(newval > 0);
180	ASSERT(newval <= spa_max_replication(os->os_spa));
181
182	os->os_copies = newval;
183}
184
185static void
186dedup_changed_cb(void *arg, uint64_t newval)
187{
188	objset_t *os = arg;
189	spa_t *spa = os->os_spa;
190	enum zio_checksum checksum;
191
192	/*
193	 * Inheritance should have been done by now.
194	 */
195	ASSERT(newval != ZIO_CHECKSUM_INHERIT);
196
197	checksum = zio_checksum_dedup_select(spa, newval, ZIO_CHECKSUM_OFF);
198
199	os->os_dedup_checksum = checksum & ZIO_CHECKSUM_MASK;
200	os->os_dedup_verify = !!(checksum & ZIO_CHECKSUM_VERIFY);
201}
202
203static void
204primary_cache_changed_cb(void *arg, uint64_t newval)
205{
206	objset_t *os = arg;
207
208	/*
209	 * Inheritance and range checking should have been done by now.
210	 */
211	ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
212	    newval == ZFS_CACHE_METADATA);
213
214	os->os_primary_cache = newval;
215}
216
217static void
218secondary_cache_changed_cb(void *arg, uint64_t newval)
219{
220	objset_t *os = arg;
221
222	/*
223	 * Inheritance and range checking should have been done by now.
224	 */
225	ASSERT(newval == ZFS_CACHE_ALL || newval == ZFS_CACHE_NONE ||
226	    newval == ZFS_CACHE_METADATA);
227
228	os->os_secondary_cache = newval;
229}
230
231static void
232sync_changed_cb(void *arg, uint64_t newval)
233{
234	objset_t *os = arg;
235
236	/*
237	 * Inheritance and range checking should have been done by now.
238	 */
239	ASSERT(newval == ZFS_SYNC_STANDARD || newval == ZFS_SYNC_ALWAYS ||
240	    newval == ZFS_SYNC_DISABLED);
241
242	os->os_sync = newval;
243	if (os->os_zil)
244		zil_set_sync(os->os_zil, newval);
245}
246
247static void
248redundant_metadata_changed_cb(void *arg, uint64_t newval)
249{
250	objset_t *os = arg;
251
252	/*
253	 * Inheritance and range checking should have been done by now.
254	 */
255	ASSERT(newval == ZFS_REDUNDANT_METADATA_ALL ||
256	    newval == ZFS_REDUNDANT_METADATA_MOST);
257
258	os->os_redundant_metadata = newval;
259}
260
261static void
262logbias_changed_cb(void *arg, uint64_t newval)
263{
264	objset_t *os = arg;
265
266	ASSERT(newval == ZFS_LOGBIAS_LATENCY ||
267	    newval == ZFS_LOGBIAS_THROUGHPUT);
268	os->os_logbias = newval;
269	if (os->os_zil)
270		zil_set_logbias(os->os_zil, newval);
271}
272
273static void
274recordsize_changed_cb(void *arg, uint64_t newval)
275{
276	objset_t *os = arg;
277
278	os->os_recordsize = newval;
279}
280
281void
282dmu_objset_byteswap(void *buf, size_t size)
283{
284	objset_phys_t *osp = buf;
285
286	ASSERT(size == OBJSET_OLD_PHYS_SIZE || size == sizeof (objset_phys_t));
287	dnode_byteswap(&osp->os_meta_dnode);
288	byteswap_uint64_array(&osp->os_zil_header, sizeof (zil_header_t));
289	osp->os_type = BSWAP_64(osp->os_type);
290	osp->os_flags = BSWAP_64(osp->os_flags);
291	if (size == sizeof (objset_phys_t)) {
292		dnode_byteswap(&osp->os_userused_dnode);
293		dnode_byteswap(&osp->os_groupused_dnode);
294	}
295}
296
297int
298dmu_objset_open_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
299    objset_t **osp)
300{
301	objset_t *os;
302	int i, err;
303
304	ASSERT(ds == NULL || MUTEX_HELD(&ds->ds_opening_lock));
305
306	os = kmem_zalloc(sizeof (objset_t), KM_SLEEP);
307	os->os_dsl_dataset = ds;
308	os->os_spa = spa;
309	os->os_rootbp = bp;
310	if (!BP_IS_HOLE(os->os_rootbp)) {
311		arc_flags_t aflags = ARC_FLAG_WAIT;
312		zbookmark_phys_t zb;
313		SET_BOOKMARK(&zb, ds ? ds->ds_object : DMU_META_OBJSET,
314		    ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
315
316		if (DMU_OS_IS_L2CACHEABLE(os))
317			aflags |= ARC_FLAG_L2CACHE;
318		if (DMU_OS_IS_L2COMPRESSIBLE(os))
319			aflags |= ARC_FLAG_L2COMPRESS;
320
321		dprintf_bp(os->os_rootbp, "reading %s", "");
322		err = arc_read(NULL, spa, os->os_rootbp,
323		    arc_getbuf_func, &os->os_phys_buf,
324		    ZIO_PRIORITY_SYNC_READ, ZIO_FLAG_CANFAIL, &aflags, &zb);
325		if (err != 0) {
326			kmem_free(os, sizeof (objset_t));
327			/* convert checksum errors into IO errors */
328			if (err == ECKSUM)
329				err = SET_ERROR(EIO);
330			return (err);
331		}
332
333		/* Increase the blocksize if we are permitted. */
334		if (spa_version(spa) >= SPA_VERSION_USERSPACE &&
335		    arc_buf_size(os->os_phys_buf) < sizeof (objset_phys_t)) {
336			arc_buf_t *buf = arc_buf_alloc(spa,
337			    sizeof (objset_phys_t), &os->os_phys_buf,
338			    ARC_BUFC_METADATA);
339			bzero(buf->b_data, sizeof (objset_phys_t));
340			bcopy(os->os_phys_buf->b_data, buf->b_data,
341			    arc_buf_size(os->os_phys_buf));
342			(void) arc_buf_remove_ref(os->os_phys_buf,
343			    &os->os_phys_buf);
344			os->os_phys_buf = buf;
345		}
346
347		os->os_phys = os->os_phys_buf->b_data;
348		os->os_flags = os->os_phys->os_flags;
349	} else {
350		int size = spa_version(spa) >= SPA_VERSION_USERSPACE ?
351		    sizeof (objset_phys_t) : OBJSET_OLD_PHYS_SIZE;
352		os->os_phys_buf = arc_buf_alloc(spa, size,
353		    &os->os_phys_buf, ARC_BUFC_METADATA);
354		os->os_phys = os->os_phys_buf->b_data;
355		bzero(os->os_phys, size);
356	}
357
358	/*
359	 * Note: the changed_cb will be called once before the register
360	 * func returns, thus changing the checksum/compression from the
361	 * default (fletcher2/off).  Snapshots don't need to know about
362	 * checksum/compression/copies.
363	 */
364	if (ds != NULL) {
365		boolean_t needlock = B_FALSE;
366
367		/*
368		 * Note: it's valid to open the objset if the dataset is
369		 * long-held, in which case the pool_config lock will not
370		 * be held.
371		 */
372		if (!dsl_pool_config_held(dmu_objset_pool(os))) {
373			needlock = B_TRUE;
374			dsl_pool_config_enter(dmu_objset_pool(os), FTAG);
375		}
376		err = dsl_prop_register(ds,
377		    zfs_prop_to_name(ZFS_PROP_PRIMARYCACHE),
378		    primary_cache_changed_cb, os);
379		if (err == 0) {
380			err = dsl_prop_register(ds,
381			    zfs_prop_to_name(ZFS_PROP_SECONDARYCACHE),
382			    secondary_cache_changed_cb, os);
383		}
384		if (!ds->ds_is_snapshot) {
385			if (err == 0) {
386				err = dsl_prop_register(ds,
387				    zfs_prop_to_name(ZFS_PROP_CHECKSUM),
388				    checksum_changed_cb, os);
389			}
390			if (err == 0) {
391				err = dsl_prop_register(ds,
392				    zfs_prop_to_name(ZFS_PROP_COMPRESSION),
393				    compression_changed_cb, os);
394			}
395			if (err == 0) {
396				err = dsl_prop_register(ds,
397				    zfs_prop_to_name(ZFS_PROP_COPIES),
398				    copies_changed_cb, os);
399			}
400			if (err == 0) {
401				err = dsl_prop_register(ds,
402				    zfs_prop_to_name(ZFS_PROP_DEDUP),
403				    dedup_changed_cb, os);
404			}
405			if (err == 0) {
406				err = dsl_prop_register(ds,
407				    zfs_prop_to_name(ZFS_PROP_LOGBIAS),
408				    logbias_changed_cb, os);
409			}
410			if (err == 0) {
411				err = dsl_prop_register(ds,
412				    zfs_prop_to_name(ZFS_PROP_SYNC),
413				    sync_changed_cb, os);
414			}
415			if (err == 0) {
416				err = dsl_prop_register(ds,
417				    zfs_prop_to_name(
418				    ZFS_PROP_REDUNDANT_METADATA),
419				    redundant_metadata_changed_cb, os);
420			}
421			if (err == 0) {
422				err = dsl_prop_register(ds,
423				    zfs_prop_to_name(ZFS_PROP_RECORDSIZE),
424				    recordsize_changed_cb, os);
425			}
426		}
427		if (needlock)
428			dsl_pool_config_exit(dmu_objset_pool(os), FTAG);
429		if (err != 0) {
430			VERIFY(arc_buf_remove_ref(os->os_phys_buf,
431			    &os->os_phys_buf));
432			kmem_free(os, sizeof (objset_t));
433			return (err);
434		}
435	} else {
436		/* It's the meta-objset. */
437		os->os_checksum = ZIO_CHECKSUM_FLETCHER_4;
438		os->os_compress = ZIO_COMPRESS_ON;
439		os->os_copies = spa_max_replication(spa);
440		os->os_dedup_checksum = ZIO_CHECKSUM_OFF;
441		os->os_dedup_verify = B_FALSE;
442		os->os_logbias = ZFS_LOGBIAS_LATENCY;
443		os->os_sync = ZFS_SYNC_STANDARD;
444		os->os_primary_cache = ZFS_CACHE_ALL;
445		os->os_secondary_cache = ZFS_CACHE_ALL;
446	}
447
448	if (ds == NULL || !ds->ds_is_snapshot)
449		os->os_zil_header = os->os_phys->os_zil_header;
450	os->os_zil = zil_alloc(os, &os->os_zil_header);
451
452	for (i = 0; i < TXG_SIZE; i++) {
453		list_create(&os->os_dirty_dnodes[i], sizeof (dnode_t),
454		    offsetof(dnode_t, dn_dirty_link[i]));
455		list_create(&os->os_free_dnodes[i], sizeof (dnode_t),
456		    offsetof(dnode_t, dn_dirty_link[i]));
457	}
458	list_create(&os->os_dnodes, sizeof (dnode_t),
459	    offsetof(dnode_t, dn_link));
460	list_create(&os->os_downgraded_dbufs, sizeof (dmu_buf_impl_t),
461	    offsetof(dmu_buf_impl_t, db_link));
462
463	mutex_init(&os->os_lock, NULL, MUTEX_DEFAULT, NULL);
464	mutex_init(&os->os_obj_lock, NULL, MUTEX_DEFAULT, NULL);
465	mutex_init(&os->os_user_ptr_lock, NULL, MUTEX_DEFAULT, NULL);
466
467	dnode_special_open(os, &os->os_phys->os_meta_dnode,
468	    DMU_META_DNODE_OBJECT, &os->os_meta_dnode);
469	if (arc_buf_size(os->os_phys_buf) >= sizeof (objset_phys_t)) {
470		dnode_special_open(os, &os->os_phys->os_userused_dnode,
471		    DMU_USERUSED_OBJECT, &os->os_userused_dnode);
472		dnode_special_open(os, &os->os_phys->os_groupused_dnode,
473		    DMU_GROUPUSED_OBJECT, &os->os_groupused_dnode);
474	}
475
476	*osp = os;
477	return (0);
478}
479
480int
481dmu_objset_from_ds(dsl_dataset_t *ds, objset_t **osp)
482{
483	int err = 0;
484
485	/*
486	 * We shouldn't be doing anything with dsl_dataset_t's unless the
487	 * pool_config lock is held, or the dataset is long-held.
488	 */
489	ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool) ||
490	    dsl_dataset_long_held(ds));
491
492	mutex_enter(&ds->ds_opening_lock);
493	if (ds->ds_objset == NULL) {
494		objset_t *os;
495		err = dmu_objset_open_impl(dsl_dataset_get_spa(ds),
496		    ds, dsl_dataset_get_blkptr(ds), &os);
497
498		if (err == 0) {
499			mutex_enter(&ds->ds_lock);
500			ASSERT(ds->ds_objset == NULL);
501			ds->ds_objset = os;
502			mutex_exit(&ds->ds_lock);
503		}
504	}
505	*osp = ds->ds_objset;
506	mutex_exit(&ds->ds_opening_lock);
507	return (err);
508}
509
510/*
511 * Holds the pool while the objset is held.  Therefore only one objset
512 * can be held at a time.
513 */
514int
515dmu_objset_hold(const char *name, void *tag, objset_t **osp)
516{
517	dsl_pool_t *dp;
518	dsl_dataset_t *ds;
519	int err;
520
521	err = dsl_pool_hold(name, tag, &dp);
522	if (err != 0)
523		return (err);
524	err = dsl_dataset_hold(dp, name, tag, &ds);
525	if (err != 0) {
526		dsl_pool_rele(dp, tag);
527		return (err);
528	}
529
530	err = dmu_objset_from_ds(ds, osp);
531	if (err != 0) {
532		dsl_dataset_rele(ds, tag);
533		dsl_pool_rele(dp, tag);
534	}
535
536	return (err);
537}
538
539static int
540dmu_objset_own_impl(dsl_dataset_t *ds, dmu_objset_type_t type,
541    boolean_t readonly, void *tag, objset_t **osp)
542{
543	int err;
544
545	err = dmu_objset_from_ds(ds, osp);
546	if (err != 0) {
547		dsl_dataset_disown(ds, tag);
548	} else if (type != DMU_OST_ANY && type != (*osp)->os_phys->os_type) {
549		dsl_dataset_disown(ds, tag);
550		return (SET_ERROR(EINVAL));
551	} else if (!readonly && dsl_dataset_is_snapshot(ds)) {
552		dsl_dataset_disown(ds, tag);
553		return (SET_ERROR(EROFS));
554	}
555	return (err);
556}
557
558/*
559 * dsl_pool must not be held when this is called.
560 * Upon successful return, there will be a longhold on the dataset,
561 * and the dsl_pool will not be held.
562 */
563int
564dmu_objset_own(const char *name, dmu_objset_type_t type,
565    boolean_t readonly, void *tag, objset_t **osp)
566{
567	dsl_pool_t *dp;
568	dsl_dataset_t *ds;
569	int err;
570
571	err = dsl_pool_hold(name, FTAG, &dp);
572	if (err != 0)
573		return (err);
574	err = dsl_dataset_own(dp, name, tag, &ds);
575	if (err != 0) {
576		dsl_pool_rele(dp, FTAG);
577		return (err);
578	}
579	err = dmu_objset_own_impl(ds, type, readonly, tag, osp);
580	dsl_pool_rele(dp, FTAG);
581
582	return (err);
583}
584
585int
586dmu_objset_own_obj(dsl_pool_t *dp, uint64_t obj, dmu_objset_type_t type,
587    boolean_t readonly, void *tag, objset_t **osp)
588{
589	dsl_dataset_t *ds;
590	int err;
591
592	err = dsl_dataset_own_obj(dp, obj, tag, &ds);
593	if (err != 0)
594		return (err);
595
596	return (dmu_objset_own_impl(ds, type, readonly, tag, osp));
597}
598
599void
600dmu_objset_rele(objset_t *os, void *tag)
601{
602	dsl_pool_t *dp = dmu_objset_pool(os);
603	dsl_dataset_rele(os->os_dsl_dataset, tag);
604	dsl_pool_rele(dp, tag);
605}
606
607/*
608 * When we are called, os MUST refer to an objset associated with a dataset
609 * that is owned by 'tag'; that is, is held and long held by 'tag' and ds_owner
610 * == tag.  We will then release and reacquire ownership of the dataset while
611 * holding the pool config_rwlock to avoid intervening namespace or ownership
612 * changes may occur.
613 *
614 * This exists solely to accommodate zfs_ioc_userspace_upgrade()'s desire to
615 * release the hold on its dataset and acquire a new one on the dataset of the
616 * same name so that it can be partially torn down and reconstructed.
617 */
618void
619dmu_objset_refresh_ownership(objset_t *os, void *tag)
620{
621	dsl_pool_t *dp;
622	dsl_dataset_t *ds, *newds;
623	char name[MAXNAMELEN];
624
625	ds = os->os_dsl_dataset;
626	VERIFY3P(ds, !=, NULL);
627	VERIFY3P(ds->ds_owner, ==, tag);
628	VERIFY(dsl_dataset_long_held(ds));
629
630	dsl_dataset_name(ds, name);
631	dp = dmu_objset_pool(os);
632	dsl_pool_config_enter(dp, FTAG);
633	dmu_objset_disown(os, tag);
634	VERIFY0(dsl_dataset_own(dp, name, tag, &newds));
635	VERIFY3P(newds, ==, os->os_dsl_dataset);
636	dsl_pool_config_exit(dp, FTAG);
637}
638
639void
640dmu_objset_disown(objset_t *os, void *tag)
641{
642	dsl_dataset_disown(os->os_dsl_dataset, tag);
643}
644
645void
646dmu_objset_evict_dbufs(objset_t *os)
647{
648	dnode_t dn_marker;
649	dnode_t *dn;
650
651	mutex_enter(&os->os_lock);
652	dn = list_head(&os->os_dnodes);
653	while (dn != NULL) {
654		/*
655		 * Skip dnodes without holds.  We have to do this dance
656		 * because dnode_add_ref() only works if there is already a
657		 * hold.  If the dnode has no holds, then it has no dbufs.
658		 */
659		if (dnode_add_ref(dn, FTAG)) {
660			list_insert_after(&os->os_dnodes, dn, &dn_marker);
661			mutex_exit(&os->os_lock);
662
663			dnode_evict_dbufs(dn);
664			dnode_rele(dn, FTAG);
665
666			mutex_enter(&os->os_lock);
667			dn = list_next(&os->os_dnodes, &dn_marker);
668			list_remove(&os->os_dnodes, &dn_marker);
669		} else {
670			dn = list_next(&os->os_dnodes, dn);
671		}
672	}
673	mutex_exit(&os->os_lock);
674
675	if (DMU_USERUSED_DNODE(os) != NULL) {
676		dnode_evict_dbufs(DMU_GROUPUSED_DNODE(os));
677		dnode_evict_dbufs(DMU_USERUSED_DNODE(os));
678	}
679	dnode_evict_dbufs(DMU_META_DNODE(os));
680}
681
682/*
683 * Objset eviction processing is split into into two pieces.
684 * The first marks the objset as evicting, evicts any dbufs that
685 * have a refcount of zero, and then queues up the objset for the
686 * second phase of eviction.  Once os->os_dnodes has been cleared by
687 * dnode_buf_pageout()->dnode_destroy(), the second phase is executed.
688 * The second phase closes the special dnodes, dequeues the objset from
689 * the list of those undergoing eviction, and finally frees the objset.
690 *
691 * NOTE: Due to asynchronous eviction processing (invocation of
692 *       dnode_buf_pageout()), it is possible for the meta dnode for the
693 *       objset to have no holds even though os->os_dnodes is not empty.
694 */
695void
696dmu_objset_evict(objset_t *os)
697{
698	dsl_dataset_t *ds = os->os_dsl_dataset;
699
700	for (int t = 0; t < TXG_SIZE; t++)
701		ASSERT(!dmu_objset_is_dirty(os, t));
702
703	if (ds)
704		dsl_prop_unregister_all(ds, os);
705
706	if (os->os_sa)
707		sa_tear_down(os);
708
709	dmu_objset_evict_dbufs(os);
710
711	mutex_enter(&os->os_lock);
712	spa_evicting_os_register(os->os_spa, os);
713	if (list_is_empty(&os->os_dnodes)) {
714		mutex_exit(&os->os_lock);
715		dmu_objset_evict_done(os);
716	} else {
717		mutex_exit(&os->os_lock);
718	}
719}
720
721void
722dmu_objset_evict_done(objset_t *os)
723{
724	ASSERT3P(list_head(&os->os_dnodes), ==, NULL);
725
726	dnode_special_close(&os->os_meta_dnode);
727	if (DMU_USERUSED_DNODE(os)) {
728		dnode_special_close(&os->os_userused_dnode);
729		dnode_special_close(&os->os_groupused_dnode);
730	}
731	zil_free(os->os_zil);
732
733	VERIFY(arc_buf_remove_ref(os->os_phys_buf, &os->os_phys_buf));
734
735	/*
736	 * This is a barrier to prevent the objset from going away in
737	 * dnode_move() until we can safely ensure that the objset is still in
738	 * use. We consider the objset valid before the barrier and invalid
739	 * after the barrier.
740	 */
741	rw_enter(&os_lock, RW_READER);
742	rw_exit(&os_lock);
743
744	mutex_destroy(&os->os_lock);
745	mutex_destroy(&os->os_obj_lock);
746	mutex_destroy(&os->os_user_ptr_lock);
747	spa_evicting_os_deregister(os->os_spa, os);
748	kmem_free(os, sizeof (objset_t));
749}
750
751timestruc_t
752dmu_objset_snap_cmtime(objset_t *os)
753{
754	return (dsl_dir_snap_cmtime(os->os_dsl_dataset->ds_dir));
755}
756
757/* called from dsl for meta-objset */
758objset_t *
759dmu_objset_create_impl(spa_t *spa, dsl_dataset_t *ds, blkptr_t *bp,
760    dmu_objset_type_t type, dmu_tx_t *tx)
761{
762	objset_t *os;
763	dnode_t *mdn;
764
765	ASSERT(dmu_tx_is_syncing(tx));
766
767	if (ds != NULL)
768		VERIFY0(dmu_objset_from_ds(ds, &os));
769	else
770		VERIFY0(dmu_objset_open_impl(spa, NULL, bp, &os));
771
772	mdn = DMU_META_DNODE(os);
773
774	dnode_allocate(mdn, DMU_OT_DNODE, 1 << DNODE_BLOCK_SHIFT,
775	    DN_MAX_INDBLKSHIFT, DMU_OT_NONE, 0, tx);
776
777	/*
778	 * We don't want to have to increase the meta-dnode's nlevels
779	 * later, because then we could do it in quescing context while
780	 * we are also accessing it in open context.
781	 *
782	 * This precaution is not necessary for the MOS (ds == NULL),
783	 * because the MOS is only updated in syncing context.
784	 * This is most fortunate: the MOS is the only objset that
785	 * needs to be synced multiple times as spa_sync() iterates
786	 * to convergence, so minimizing its dn_nlevels matters.
787	 */
788	if (ds != NULL) {
789		int levels = 1;
790
791		/*
792		 * Determine the number of levels necessary for the meta-dnode
793		 * to contain DN_MAX_OBJECT dnodes.
794		 */
795		while ((uint64_t)mdn->dn_nblkptr << (mdn->dn_datablkshift +
796		    (levels - 1) * (mdn->dn_indblkshift - SPA_BLKPTRSHIFT)) <
797		    DN_MAX_OBJECT * sizeof (dnode_phys_t))
798			levels++;
799
800		mdn->dn_next_nlevels[tx->tx_txg & TXG_MASK] =
801		    mdn->dn_nlevels = levels;
802	}
803
804	ASSERT(type != DMU_OST_NONE);
805	ASSERT(type != DMU_OST_ANY);
806	ASSERT(type < DMU_OST_NUMTYPES);
807	os->os_phys->os_type = type;
808	if (dmu_objset_userused_enabled(os)) {
809		os->os_phys->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
810		os->os_flags = os->os_phys->os_flags;
811	}
812
813	dsl_dataset_dirty(ds, tx);
814
815	return (os);
816}
817
818typedef struct dmu_objset_create_arg {
819	const char *doca_name;
820	cred_t *doca_cred;
821	void (*doca_userfunc)(objset_t *os, void *arg,
822	    cred_t *cr, dmu_tx_t *tx);
823	void *doca_userarg;
824	dmu_objset_type_t doca_type;
825	uint64_t doca_flags;
826} dmu_objset_create_arg_t;
827
828/*ARGSUSED*/
829static int
830dmu_objset_create_check(void *arg, dmu_tx_t *tx)
831{
832	dmu_objset_create_arg_t *doca = arg;
833	dsl_pool_t *dp = dmu_tx_pool(tx);
834	dsl_dir_t *pdd;
835	const char *tail;
836	int error;
837
838	if (strchr(doca->doca_name, '@') != NULL)
839		return (SET_ERROR(EINVAL));
840
841	error = dsl_dir_hold(dp, doca->doca_name, FTAG, &pdd, &tail);
842	if (error != 0)
843		return (error);
844	if (tail == NULL) {
845		dsl_dir_rele(pdd, FTAG);
846		return (SET_ERROR(EEXIST));
847	}
848	error = dsl_fs_ss_limit_check(pdd, 1, ZFS_PROP_FILESYSTEM_LIMIT, NULL,
849	    doca->doca_cred);
850	dsl_dir_rele(pdd, FTAG);
851
852	return (error);
853}
854
855static void
856dmu_objset_create_sync(void *arg, dmu_tx_t *tx)
857{
858	dmu_objset_create_arg_t *doca = arg;
859	dsl_pool_t *dp = dmu_tx_pool(tx);
860	dsl_dir_t *pdd;
861	const char *tail;
862	dsl_dataset_t *ds;
863	uint64_t obj;
864	blkptr_t *bp;
865	objset_t *os;
866
867	VERIFY0(dsl_dir_hold(dp, doca->doca_name, FTAG, &pdd, &tail));
868
869	obj = dsl_dataset_create_sync(pdd, tail, NULL, doca->doca_flags,
870	    doca->doca_cred, tx);
871
872	VERIFY0(dsl_dataset_hold_obj(pdd->dd_pool, obj, FTAG, &ds));
873	bp = dsl_dataset_get_blkptr(ds);
874	os = dmu_objset_create_impl(pdd->dd_pool->dp_spa,
875	    ds, bp, doca->doca_type, tx);
876
877	if (doca->doca_userfunc != NULL) {
878		doca->doca_userfunc(os, doca->doca_userarg,
879		    doca->doca_cred, tx);
880	}
881
882	spa_history_log_internal_ds(ds, "create", tx, "");
883	dsl_dataset_rele(ds, FTAG);
884	dsl_dir_rele(pdd, FTAG);
885}
886
887int
888dmu_objset_create(const char *name, dmu_objset_type_t type, uint64_t flags,
889    void (*func)(objset_t *os, void *arg, cred_t *cr, dmu_tx_t *tx), void *arg)
890{
891	dmu_objset_create_arg_t doca;
892
893	doca.doca_name = name;
894	doca.doca_cred = CRED();
895	doca.doca_flags = flags;
896	doca.doca_userfunc = func;
897	doca.doca_userarg = arg;
898	doca.doca_type = type;
899
900	return (dsl_sync_task(name,
901	    dmu_objset_create_check, dmu_objset_create_sync, &doca,
902	    5, ZFS_SPACE_CHECK_NORMAL));
903}
904
905typedef struct dmu_objset_clone_arg {
906	const char *doca_clone;
907	const char *doca_origin;
908	cred_t *doca_cred;
909} dmu_objset_clone_arg_t;
910
911/*ARGSUSED*/
912static int
913dmu_objset_clone_check(void *arg, dmu_tx_t *tx)
914{
915	dmu_objset_clone_arg_t *doca = arg;
916	dsl_dir_t *pdd;
917	const char *tail;
918	int error;
919	dsl_dataset_t *origin;
920	dsl_pool_t *dp = dmu_tx_pool(tx);
921
922	if (strchr(doca->doca_clone, '@') != NULL)
923		return (SET_ERROR(EINVAL));
924
925	error = dsl_dir_hold(dp, doca->doca_clone, FTAG, &pdd, &tail);
926	if (error != 0)
927		return (error);
928	if (tail == NULL) {
929		dsl_dir_rele(pdd, FTAG);
930		return (SET_ERROR(EEXIST));
931	}
932
933	error = dsl_fs_ss_limit_check(pdd, 1, ZFS_PROP_FILESYSTEM_LIMIT, NULL,
934	    doca->doca_cred);
935	if (error != 0) {
936		dsl_dir_rele(pdd, FTAG);
937		return (SET_ERROR(EDQUOT));
938	}
939	dsl_dir_rele(pdd, FTAG);
940
941	error = dsl_dataset_hold(dp, doca->doca_origin, FTAG, &origin);
942	if (error != 0)
943		return (error);
944
945	/* You can only clone snapshots, not the head datasets. */
946	if (!origin->ds_is_snapshot) {
947		dsl_dataset_rele(origin, FTAG);
948		return (SET_ERROR(EINVAL));
949	}
950	dsl_dataset_rele(origin, FTAG);
951
952	return (0);
953}
954
955static void
956dmu_objset_clone_sync(void *arg, dmu_tx_t *tx)
957{
958	dmu_objset_clone_arg_t *doca = arg;
959	dsl_pool_t *dp = dmu_tx_pool(tx);
960	dsl_dir_t *pdd;
961	const char *tail;
962	dsl_dataset_t *origin, *ds;
963	uint64_t obj;
964	char namebuf[MAXNAMELEN];
965
966	VERIFY0(dsl_dir_hold(dp, doca->doca_clone, FTAG, &pdd, &tail));
967	VERIFY0(dsl_dataset_hold(dp, doca->doca_origin, FTAG, &origin));
968
969	obj = dsl_dataset_create_sync(pdd, tail, origin, 0,
970	    doca->doca_cred, tx);
971
972	VERIFY0(dsl_dataset_hold_obj(pdd->dd_pool, obj, FTAG, &ds));
973	dsl_dataset_name(origin, namebuf);
974	spa_history_log_internal_ds(ds, "clone", tx,
975	    "origin=%s (%llu)", namebuf, origin->ds_object);
976	dsl_dataset_rele(ds, FTAG);
977	dsl_dataset_rele(origin, FTAG);
978	dsl_dir_rele(pdd, FTAG);
979}
980
981int
982dmu_objset_clone(const char *clone, const char *origin)
983{
984	dmu_objset_clone_arg_t doca;
985
986	doca.doca_clone = clone;
987	doca.doca_origin = origin;
988	doca.doca_cred = CRED();
989
990	return (dsl_sync_task(clone,
991	    dmu_objset_clone_check, dmu_objset_clone_sync, &doca,
992	    5, ZFS_SPACE_CHECK_NORMAL));
993}
994
995int
996dmu_objset_snapshot_one(const char *fsname, const char *snapname)
997{
998	int err;
999	char *longsnap = kmem_asprintf("%s@%s", fsname, snapname);
1000	nvlist_t *snaps = fnvlist_alloc();
1001
1002	fnvlist_add_boolean(snaps, longsnap);
1003	strfree(longsnap);
1004	err = dsl_dataset_snapshot(snaps, NULL, NULL);
1005	fnvlist_free(snaps);
1006	return (err);
1007}
1008
1009static void
1010dmu_objset_sync_dnodes(list_t *list, list_t *newlist, dmu_tx_t *tx)
1011{
1012	dnode_t *dn;
1013
1014	while (dn = list_head(list)) {
1015		ASSERT(dn->dn_object != DMU_META_DNODE_OBJECT);
1016		ASSERT(dn->dn_dbuf->db_data_pending);
1017		/*
1018		 * Initialize dn_zio outside dnode_sync() because the
1019		 * meta-dnode needs to set it ouside dnode_sync().
1020		 */
1021		dn->dn_zio = dn->dn_dbuf->db_data_pending->dr_zio;
1022		ASSERT(dn->dn_zio);
1023
1024		ASSERT3U(dn->dn_nlevels, <=, DN_MAX_LEVELS);
1025		list_remove(list, dn);
1026
1027		if (newlist) {
1028			(void) dnode_add_ref(dn, newlist);
1029			list_insert_tail(newlist, dn);
1030		}
1031
1032		dnode_sync(dn, tx);
1033	}
1034}
1035
1036/* ARGSUSED */
1037static void
1038dmu_objset_write_ready(zio_t *zio, arc_buf_t *abuf, void *arg)
1039{
1040	blkptr_t *bp = zio->io_bp;
1041	objset_t *os = arg;
1042	dnode_phys_t *dnp = &os->os_phys->os_meta_dnode;
1043
1044	ASSERT(!BP_IS_EMBEDDED(bp));
1045	ASSERT3P(bp, ==, os->os_rootbp);
1046	ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_OBJSET);
1047	ASSERT0(BP_GET_LEVEL(bp));
1048
1049	/*
1050	 * Update rootbp fill count: it should be the number of objects
1051	 * allocated in the object set (not counting the "special"
1052	 * objects that are stored in the objset_phys_t -- the meta
1053	 * dnode and user/group accounting objects).
1054	 */
1055	bp->blk_fill = 0;
1056	for (int i = 0; i < dnp->dn_nblkptr; i++)
1057		bp->blk_fill += BP_GET_FILL(&dnp->dn_blkptr[i]);
1058}
1059
1060/* ARGSUSED */
1061static void
1062dmu_objset_write_done(zio_t *zio, arc_buf_t *abuf, void *arg)
1063{
1064	blkptr_t *bp = zio->io_bp;
1065	blkptr_t *bp_orig = &zio->io_bp_orig;
1066	objset_t *os = arg;
1067
1068	if (zio->io_flags & ZIO_FLAG_IO_REWRITE) {
1069		ASSERT(BP_EQUAL(bp, bp_orig));
1070	} else {
1071		dsl_dataset_t *ds = os->os_dsl_dataset;
1072		dmu_tx_t *tx = os->os_synctx;
1073
1074		(void) dsl_dataset_block_kill(ds, bp_orig, tx, B_TRUE);
1075		dsl_dataset_block_born(ds, bp, tx);
1076	}
1077}
1078
1079/* called from dsl */
1080void
1081dmu_objset_sync(objset_t *os, zio_t *pio, dmu_tx_t *tx)
1082{
1083	int txgoff;
1084	zbookmark_phys_t zb;
1085	zio_prop_t zp;
1086	zio_t *zio;
1087	list_t *list;
1088	list_t *newlist = NULL;
1089	dbuf_dirty_record_t *dr;
1090
1091	dprintf_ds(os->os_dsl_dataset, "txg=%llu\n", tx->tx_txg);
1092
1093	ASSERT(dmu_tx_is_syncing(tx));
1094	/* XXX the write_done callback should really give us the tx... */
1095	os->os_synctx = tx;
1096
1097	if (os->os_dsl_dataset == NULL) {
1098		/*
1099		 * This is the MOS.  If we have upgraded,
1100		 * spa_max_replication() could change, so reset
1101		 * os_copies here.
1102		 */
1103		os->os_copies = spa_max_replication(os->os_spa);
1104	}
1105
1106	/*
1107	 * Create the root block IO
1108	 */
1109	SET_BOOKMARK(&zb, os->os_dsl_dataset ?
1110	    os->os_dsl_dataset->ds_object : DMU_META_OBJSET,
1111	    ZB_ROOT_OBJECT, ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
1112	arc_release(os->os_phys_buf, &os->os_phys_buf);
1113
1114	dmu_write_policy(os, NULL, 0, 0, &zp);
1115
1116	zio = arc_write(pio, os->os_spa, tx->tx_txg,
1117	    os->os_rootbp, os->os_phys_buf, DMU_OS_IS_L2CACHEABLE(os),
1118	    DMU_OS_IS_L2COMPRESSIBLE(os), &zp, dmu_objset_write_ready,
1119	    NULL, dmu_objset_write_done, os, ZIO_PRIORITY_ASYNC_WRITE,
1120	    ZIO_FLAG_MUSTSUCCEED, &zb);
1121
1122	/*
1123	 * Sync special dnodes - the parent IO for the sync is the root block
1124	 */
1125	DMU_META_DNODE(os)->dn_zio = zio;
1126	dnode_sync(DMU_META_DNODE(os), tx);
1127
1128	os->os_phys->os_flags = os->os_flags;
1129
1130	if (DMU_USERUSED_DNODE(os) &&
1131	    DMU_USERUSED_DNODE(os)->dn_type != DMU_OT_NONE) {
1132		DMU_USERUSED_DNODE(os)->dn_zio = zio;
1133		dnode_sync(DMU_USERUSED_DNODE(os), tx);
1134		DMU_GROUPUSED_DNODE(os)->dn_zio = zio;
1135		dnode_sync(DMU_GROUPUSED_DNODE(os), tx);
1136	}
1137
1138	txgoff = tx->tx_txg & TXG_MASK;
1139
1140	if (dmu_objset_userused_enabled(os)) {
1141		newlist = &os->os_synced_dnodes;
1142		/*
1143		 * We must create the list here because it uses the
1144		 * dn_dirty_link[] of this txg.
1145		 */
1146		list_create(newlist, sizeof (dnode_t),
1147		    offsetof(dnode_t, dn_dirty_link[txgoff]));
1148	}
1149
1150	dmu_objset_sync_dnodes(&os->os_free_dnodes[txgoff], newlist, tx);
1151	dmu_objset_sync_dnodes(&os->os_dirty_dnodes[txgoff], newlist, tx);
1152
1153	list = &DMU_META_DNODE(os)->dn_dirty_records[txgoff];
1154	while (dr = list_head(list)) {
1155		ASSERT0(dr->dr_dbuf->db_level);
1156		list_remove(list, dr);
1157		if (dr->dr_zio)
1158			zio_nowait(dr->dr_zio);
1159	}
1160	/*
1161	 * Free intent log blocks up to this tx.
1162	 */
1163	zil_sync(os->os_zil, tx);
1164	os->os_phys->os_zil_header = os->os_zil_header;
1165	zio_nowait(zio);
1166}
1167
1168boolean_t
1169dmu_objset_is_dirty(objset_t *os, uint64_t txg)
1170{
1171	return (!list_is_empty(&os->os_dirty_dnodes[txg & TXG_MASK]) ||
1172	    !list_is_empty(&os->os_free_dnodes[txg & TXG_MASK]));
1173}
1174
1175static objset_used_cb_t *used_cbs[DMU_OST_NUMTYPES];
1176
1177void
1178dmu_objset_register_type(dmu_objset_type_t ost, objset_used_cb_t *cb)
1179{
1180	used_cbs[ost] = cb;
1181}
1182
1183boolean_t
1184dmu_objset_userused_enabled(objset_t *os)
1185{
1186	return (spa_version(os->os_spa) >= SPA_VERSION_USERSPACE &&
1187	    used_cbs[os->os_phys->os_type] != NULL &&
1188	    DMU_USERUSED_DNODE(os) != NULL);
1189}
1190
1191static void
1192do_userquota_update(objset_t *os, uint64_t used, uint64_t flags,
1193    uint64_t user, uint64_t group, boolean_t subtract, dmu_tx_t *tx)
1194{
1195	if ((flags & DNODE_FLAG_USERUSED_ACCOUNTED)) {
1196		int64_t delta = DNODE_SIZE + used;
1197		if (subtract)
1198			delta = -delta;
1199		VERIFY3U(0, ==, zap_increment_int(os, DMU_USERUSED_OBJECT,
1200		    user, delta, tx));
1201		VERIFY3U(0, ==, zap_increment_int(os, DMU_GROUPUSED_OBJECT,
1202		    group, delta, tx));
1203	}
1204}
1205
1206void
1207dmu_objset_do_userquota_updates(objset_t *os, dmu_tx_t *tx)
1208{
1209	dnode_t *dn;
1210	list_t *list = &os->os_synced_dnodes;
1211
1212	ASSERT(list_head(list) == NULL || dmu_objset_userused_enabled(os));
1213
1214	while (dn = list_head(list)) {
1215		int flags;
1216		ASSERT(!DMU_OBJECT_IS_SPECIAL(dn->dn_object));
1217		ASSERT(dn->dn_phys->dn_type == DMU_OT_NONE ||
1218		    dn->dn_phys->dn_flags &
1219		    DNODE_FLAG_USERUSED_ACCOUNTED);
1220
1221		/* Allocate the user/groupused objects if necessary. */
1222		if (DMU_USERUSED_DNODE(os)->dn_type == DMU_OT_NONE) {
1223			VERIFY(0 == zap_create_claim(os,
1224			    DMU_USERUSED_OBJECT,
1225			    DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1226			VERIFY(0 == zap_create_claim(os,
1227			    DMU_GROUPUSED_OBJECT,
1228			    DMU_OT_USERGROUP_USED, DMU_OT_NONE, 0, tx));
1229		}
1230
1231		/*
1232		 * We intentionally modify the zap object even if the
1233		 * net delta is zero.  Otherwise
1234		 * the block of the zap obj could be shared between
1235		 * datasets but need to be different between them after
1236		 * a bprewrite.
1237		 */
1238
1239		flags = dn->dn_id_flags;
1240		ASSERT(flags);
1241		if (flags & DN_ID_OLD_EXIST)  {
1242			do_userquota_update(os, dn->dn_oldused, dn->dn_oldflags,
1243			    dn->dn_olduid, dn->dn_oldgid, B_TRUE, tx);
1244		}
1245		if (flags & DN_ID_NEW_EXIST) {
1246			do_userquota_update(os, DN_USED_BYTES(dn->dn_phys),
1247			    dn->dn_phys->dn_flags,  dn->dn_newuid,
1248			    dn->dn_newgid, B_FALSE, tx);
1249		}
1250
1251		mutex_enter(&dn->dn_mtx);
1252		dn->dn_oldused = 0;
1253		dn->dn_oldflags = 0;
1254		if (dn->dn_id_flags & DN_ID_NEW_EXIST) {
1255			dn->dn_olduid = dn->dn_newuid;
1256			dn->dn_oldgid = dn->dn_newgid;
1257			dn->dn_id_flags |= DN_ID_OLD_EXIST;
1258			if (dn->dn_bonuslen == 0)
1259				dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1260			else
1261				dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1262		}
1263		dn->dn_id_flags &= ~(DN_ID_NEW_EXIST);
1264		mutex_exit(&dn->dn_mtx);
1265
1266		list_remove(list, dn);
1267		dnode_rele(dn, list);
1268	}
1269}
1270
1271/*
1272 * Returns a pointer to data to find uid/gid from
1273 *
1274 * If a dirty record for transaction group that is syncing can't
1275 * be found then NULL is returned.  In the NULL case it is assumed
1276 * the uid/gid aren't changing.
1277 */
1278static void *
1279dmu_objset_userquota_find_data(dmu_buf_impl_t *db, dmu_tx_t *tx)
1280{
1281	dbuf_dirty_record_t *dr, **drp;
1282	void *data;
1283
1284	if (db->db_dirtycnt == 0)
1285		return (db->db.db_data);  /* Nothing is changing */
1286
1287	for (drp = &db->db_last_dirty; (dr = *drp) != NULL; drp = &dr->dr_next)
1288		if (dr->dr_txg == tx->tx_txg)
1289			break;
1290
1291	if (dr == NULL) {
1292		data = NULL;
1293	} else {
1294		dnode_t *dn;
1295
1296		DB_DNODE_ENTER(dr->dr_dbuf);
1297		dn = DB_DNODE(dr->dr_dbuf);
1298
1299		if (dn->dn_bonuslen == 0 &&
1300		    dr->dr_dbuf->db_blkid == DMU_SPILL_BLKID)
1301			data = dr->dt.dl.dr_data->b_data;
1302		else
1303			data = dr->dt.dl.dr_data;
1304
1305		DB_DNODE_EXIT(dr->dr_dbuf);
1306	}
1307
1308	return (data);
1309}
1310
1311void
1312dmu_objset_userquota_get_ids(dnode_t *dn, boolean_t before, dmu_tx_t *tx)
1313{
1314	objset_t *os = dn->dn_objset;
1315	void *data = NULL;
1316	dmu_buf_impl_t *db = NULL;
1317	uint64_t *user = NULL;
1318	uint64_t *group = NULL;
1319	int flags = dn->dn_id_flags;
1320	int error;
1321	boolean_t have_spill = B_FALSE;
1322
1323	if (!dmu_objset_userused_enabled(dn->dn_objset))
1324		return;
1325
1326	if (before && (flags & (DN_ID_CHKED_BONUS|DN_ID_OLD_EXIST|
1327	    DN_ID_CHKED_SPILL)))
1328		return;
1329
1330	if (before && dn->dn_bonuslen != 0)
1331		data = DN_BONUS(dn->dn_phys);
1332	else if (!before && dn->dn_bonuslen != 0) {
1333		if (dn->dn_bonus) {
1334			db = dn->dn_bonus;
1335			mutex_enter(&db->db_mtx);
1336			data = dmu_objset_userquota_find_data(db, tx);
1337		} else {
1338			data = DN_BONUS(dn->dn_phys);
1339		}
1340	} else if (dn->dn_bonuslen == 0 && dn->dn_bonustype == DMU_OT_SA) {
1341			int rf = 0;
1342
1343			if (RW_WRITE_HELD(&dn->dn_struct_rwlock))
1344				rf |= DB_RF_HAVESTRUCT;
1345			error = dmu_spill_hold_by_dnode(dn,
1346			    rf | DB_RF_MUST_SUCCEED,
1347			    FTAG, (dmu_buf_t **)&db);
1348			ASSERT(error == 0);
1349			mutex_enter(&db->db_mtx);
1350			data = (before) ? db->db.db_data :
1351			    dmu_objset_userquota_find_data(db, tx);
1352			have_spill = B_TRUE;
1353	} else {
1354		mutex_enter(&dn->dn_mtx);
1355		dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1356		mutex_exit(&dn->dn_mtx);
1357		return;
1358	}
1359
1360	if (before) {
1361		ASSERT(data);
1362		user = &dn->dn_olduid;
1363		group = &dn->dn_oldgid;
1364	} else if (data) {
1365		user = &dn->dn_newuid;
1366		group = &dn->dn_newgid;
1367	}
1368
1369	/*
1370	 * Must always call the callback in case the object
1371	 * type has changed and that type isn't an object type to track
1372	 */
1373	error = used_cbs[os->os_phys->os_type](dn->dn_bonustype, data,
1374	    user, group);
1375
1376	/*
1377	 * Preserve existing uid/gid when the callback can't determine
1378	 * what the new uid/gid are and the callback returned EEXIST.
1379	 * The EEXIST error tells us to just use the existing uid/gid.
1380	 * If we don't know what the old values are then just assign
1381	 * them to 0, since that is a new file  being created.
1382	 */
1383	if (!before && data == NULL && error == EEXIST) {
1384		if (flags & DN_ID_OLD_EXIST) {
1385			dn->dn_newuid = dn->dn_olduid;
1386			dn->dn_newgid = dn->dn_oldgid;
1387		} else {
1388			dn->dn_newuid = 0;
1389			dn->dn_newgid = 0;
1390		}
1391		error = 0;
1392	}
1393
1394	if (db)
1395		mutex_exit(&db->db_mtx);
1396
1397	mutex_enter(&dn->dn_mtx);
1398	if (error == 0 && before)
1399		dn->dn_id_flags |= DN_ID_OLD_EXIST;
1400	if (error == 0 && !before)
1401		dn->dn_id_flags |= DN_ID_NEW_EXIST;
1402
1403	if (have_spill) {
1404		dn->dn_id_flags |= DN_ID_CHKED_SPILL;
1405	} else {
1406		dn->dn_id_flags |= DN_ID_CHKED_BONUS;
1407	}
1408	mutex_exit(&dn->dn_mtx);
1409	if (have_spill)
1410		dmu_buf_rele((dmu_buf_t *)db, FTAG);
1411}
1412
1413boolean_t
1414dmu_objset_userspace_present(objset_t *os)
1415{
1416	return (os->os_phys->os_flags &
1417	    OBJSET_FLAG_USERACCOUNTING_COMPLETE);
1418}
1419
1420int
1421dmu_objset_userspace_upgrade(objset_t *os)
1422{
1423	uint64_t obj;
1424	int err = 0;
1425
1426	if (dmu_objset_userspace_present(os))
1427		return (0);
1428	if (!dmu_objset_userused_enabled(os))
1429		return (SET_ERROR(ENOTSUP));
1430	if (dmu_objset_is_snapshot(os))
1431		return (SET_ERROR(EINVAL));
1432
1433	/*
1434	 * We simply need to mark every object dirty, so that it will be
1435	 * synced out and now accounted.  If this is called
1436	 * concurrently, or if we already did some work before crashing,
1437	 * that's fine, since we track each object's accounted state
1438	 * independently.
1439	 */
1440
1441	for (obj = 0; err == 0; err = dmu_object_next(os, &obj, FALSE, 0)) {
1442		dmu_tx_t *tx;
1443		dmu_buf_t *db;
1444		int objerr;
1445
1446		if (issig(JUSTLOOKING) && issig(FORREAL))
1447			return (SET_ERROR(EINTR));
1448
1449		objerr = dmu_bonus_hold(os, obj, FTAG, &db);
1450		if (objerr != 0)
1451			continue;
1452		tx = dmu_tx_create(os);
1453		dmu_tx_hold_bonus(tx, obj);
1454		objerr = dmu_tx_assign(tx, TXG_WAIT);
1455		if (objerr != 0) {
1456			dmu_tx_abort(tx);
1457			continue;
1458		}
1459		dmu_buf_will_dirty(db, tx);
1460		dmu_buf_rele(db, FTAG);
1461		dmu_tx_commit(tx);
1462	}
1463
1464	os->os_flags |= OBJSET_FLAG_USERACCOUNTING_COMPLETE;
1465	txg_wait_synced(dmu_objset_pool(os), 0);
1466	return (0);
1467}
1468
1469void
1470dmu_objset_space(objset_t *os, uint64_t *refdbytesp, uint64_t *availbytesp,
1471    uint64_t *usedobjsp, uint64_t *availobjsp)
1472{
1473	dsl_dataset_space(os->os_dsl_dataset, refdbytesp, availbytesp,
1474	    usedobjsp, availobjsp);
1475}
1476
1477uint64_t
1478dmu_objset_fsid_guid(objset_t *os)
1479{
1480	return (dsl_dataset_fsid_guid(os->os_dsl_dataset));
1481}
1482
1483void
1484dmu_objset_fast_stat(objset_t *os, dmu_objset_stats_t *stat)
1485{
1486	stat->dds_type = os->os_phys->os_type;
1487	if (os->os_dsl_dataset)
1488		dsl_dataset_fast_stat(os->os_dsl_dataset, stat);
1489}
1490
1491void
1492dmu_objset_stats(objset_t *os, nvlist_t *nv)
1493{
1494	ASSERT(os->os_dsl_dataset ||
1495	    os->os_phys->os_type == DMU_OST_META);
1496
1497	if (os->os_dsl_dataset != NULL)
1498		dsl_dataset_stats(os->os_dsl_dataset, nv);
1499
1500	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_TYPE,
1501	    os->os_phys->os_type);
1502	dsl_prop_nvlist_add_uint64(nv, ZFS_PROP_USERACCOUNTING,
1503	    dmu_objset_userspace_present(os));
1504}
1505
1506int
1507dmu_objset_is_snapshot(objset_t *os)
1508{
1509	if (os->os_dsl_dataset != NULL)
1510		return (os->os_dsl_dataset->ds_is_snapshot);
1511	else
1512		return (B_FALSE);
1513}
1514
1515int
1516dmu_snapshot_realname(objset_t *os, char *name, char *real, int maxlen,
1517    boolean_t *conflict)
1518{
1519	dsl_dataset_t *ds = os->os_dsl_dataset;
1520	uint64_t ignored;
1521
1522	if (dsl_dataset_phys(ds)->ds_snapnames_zapobj == 0)
1523		return (SET_ERROR(ENOENT));
1524
1525	return (zap_lookup_norm(ds->ds_dir->dd_pool->dp_meta_objset,
1526	    dsl_dataset_phys(ds)->ds_snapnames_zapobj, name, 8, 1, &ignored,
1527	    MT_FIRST, real, maxlen, conflict));
1528}
1529
1530int
1531dmu_snapshot_list_next(objset_t *os, int namelen, char *name,
1532    uint64_t *idp, uint64_t *offp, boolean_t *case_conflict)
1533{
1534	dsl_dataset_t *ds = os->os_dsl_dataset;
1535	zap_cursor_t cursor;
1536	zap_attribute_t attr;
1537
1538	ASSERT(dsl_pool_config_held(dmu_objset_pool(os)));
1539
1540	if (dsl_dataset_phys(ds)->ds_snapnames_zapobj == 0)
1541		return (SET_ERROR(ENOENT));
1542
1543	zap_cursor_init_serialized(&cursor,
1544	    ds->ds_dir->dd_pool->dp_meta_objset,
1545	    dsl_dataset_phys(ds)->ds_snapnames_zapobj, *offp);
1546
1547	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1548		zap_cursor_fini(&cursor);
1549		return (SET_ERROR(ENOENT));
1550	}
1551
1552	if (strlen(attr.za_name) + 1 > namelen) {
1553		zap_cursor_fini(&cursor);
1554		return (SET_ERROR(ENAMETOOLONG));
1555	}
1556
1557	(void) strcpy(name, attr.za_name);
1558	if (idp)
1559		*idp = attr.za_first_integer;
1560	if (case_conflict)
1561		*case_conflict = attr.za_normalization_conflict;
1562	zap_cursor_advance(&cursor);
1563	*offp = zap_cursor_serialize(&cursor);
1564	zap_cursor_fini(&cursor);
1565
1566	return (0);
1567}
1568
1569int
1570dmu_dir_list_next(objset_t *os, int namelen, char *name,
1571    uint64_t *idp, uint64_t *offp)
1572{
1573	dsl_dir_t *dd = os->os_dsl_dataset->ds_dir;
1574	zap_cursor_t cursor;
1575	zap_attribute_t attr;
1576
1577	/* there is no next dir on a snapshot! */
1578	if (os->os_dsl_dataset->ds_object !=
1579	    dsl_dir_phys(dd)->dd_head_dataset_obj)
1580		return (SET_ERROR(ENOENT));
1581
1582	zap_cursor_init_serialized(&cursor,
1583	    dd->dd_pool->dp_meta_objset,
1584	    dsl_dir_phys(dd)->dd_child_dir_zapobj, *offp);
1585
1586	if (zap_cursor_retrieve(&cursor, &attr) != 0) {
1587		zap_cursor_fini(&cursor);
1588		return (SET_ERROR(ENOENT));
1589	}
1590
1591	if (strlen(attr.za_name) + 1 > namelen) {
1592		zap_cursor_fini(&cursor);
1593		return (SET_ERROR(ENAMETOOLONG));
1594	}
1595
1596	(void) strcpy(name, attr.za_name);
1597	if (idp)
1598		*idp = attr.za_first_integer;
1599	zap_cursor_advance(&cursor);
1600	*offp = zap_cursor_serialize(&cursor);
1601	zap_cursor_fini(&cursor);
1602
1603	return (0);
1604}
1605
1606typedef struct dmu_objset_find_ctx {
1607	taskq_t		*dc_tq;
1608	dsl_pool_t	*dc_dp;
1609	uint64_t	dc_ddobj;
1610	int		(*dc_func)(dsl_pool_t *, dsl_dataset_t *, void *);
1611	void		*dc_arg;
1612	int		dc_flags;
1613	kmutex_t	*dc_error_lock;
1614	int		*dc_error;
1615} dmu_objset_find_ctx_t;
1616
1617static void
1618dmu_objset_find_dp_impl(dmu_objset_find_ctx_t *dcp)
1619{
1620	dsl_pool_t *dp = dcp->dc_dp;
1621	dmu_objset_find_ctx_t *child_dcp;
1622	dsl_dir_t *dd;
1623	dsl_dataset_t *ds;
1624	zap_cursor_t zc;
1625	zap_attribute_t *attr;
1626	uint64_t thisobj;
1627	int err = 0;
1628
1629	/* don't process if there already was an error */
1630	if (*dcp->dc_error != 0)
1631		goto out;
1632
1633	err = dsl_dir_hold_obj(dp, dcp->dc_ddobj, NULL, FTAG, &dd);
1634	if (err != 0)
1635		goto out;
1636
1637	/* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1638	if (dd->dd_myname[0] == '$') {
1639		dsl_dir_rele(dd, FTAG);
1640		goto out;
1641	}
1642
1643	thisobj = dsl_dir_phys(dd)->dd_head_dataset_obj;
1644	attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1645
1646	/*
1647	 * Iterate over all children.
1648	 */
1649	if (dcp->dc_flags & DS_FIND_CHILDREN) {
1650		for (zap_cursor_init(&zc, dp->dp_meta_objset,
1651		    dsl_dir_phys(dd)->dd_child_dir_zapobj);
1652		    zap_cursor_retrieve(&zc, attr) == 0;
1653		    (void) zap_cursor_advance(&zc)) {
1654			ASSERT3U(attr->za_integer_length, ==,
1655			    sizeof (uint64_t));
1656			ASSERT3U(attr->za_num_integers, ==, 1);
1657
1658			child_dcp = kmem_alloc(sizeof (*child_dcp), KM_SLEEP);
1659			*child_dcp = *dcp;
1660			child_dcp->dc_ddobj = attr->za_first_integer;
1661			if (dcp->dc_tq != NULL)
1662				(void) taskq_dispatch(dcp->dc_tq,
1663				    dmu_objset_find_dp_cb, child_dcp, TQ_SLEEP);
1664			else
1665				dmu_objset_find_dp_impl(child_dcp);
1666		}
1667		zap_cursor_fini(&zc);
1668	}
1669
1670	/*
1671	 * Iterate over all snapshots.
1672	 */
1673	if (dcp->dc_flags & DS_FIND_SNAPSHOTS) {
1674		dsl_dataset_t *ds;
1675		err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1676
1677		if (err == 0) {
1678			uint64_t snapobj;
1679
1680			snapobj = dsl_dataset_phys(ds)->ds_snapnames_zapobj;
1681			dsl_dataset_rele(ds, FTAG);
1682
1683			for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1684			    zap_cursor_retrieve(&zc, attr) == 0;
1685			    (void) zap_cursor_advance(&zc)) {
1686				ASSERT3U(attr->za_integer_length, ==,
1687				    sizeof (uint64_t));
1688				ASSERT3U(attr->za_num_integers, ==, 1);
1689
1690				err = dsl_dataset_hold_obj(dp,
1691				    attr->za_first_integer, FTAG, &ds);
1692				if (err != 0)
1693					break;
1694				err = dcp->dc_func(dp, ds, dcp->dc_arg);
1695				dsl_dataset_rele(ds, FTAG);
1696				if (err != 0)
1697					break;
1698			}
1699			zap_cursor_fini(&zc);
1700		}
1701	}
1702
1703	dsl_dir_rele(dd, FTAG);
1704	kmem_free(attr, sizeof (zap_attribute_t));
1705
1706	if (err != 0)
1707		goto out;
1708
1709	/*
1710	 * Apply to self.
1711	 */
1712	err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1713	if (err != 0)
1714		goto out;
1715	err = dcp->dc_func(dp, ds, dcp->dc_arg);
1716	dsl_dataset_rele(ds, FTAG);
1717
1718out:
1719	if (err != 0) {
1720		mutex_enter(dcp->dc_error_lock);
1721		/* only keep first error */
1722		if (*dcp->dc_error == 0)
1723			*dcp->dc_error = err;
1724		mutex_exit(dcp->dc_error_lock);
1725	}
1726
1727	kmem_free(dcp, sizeof (*dcp));
1728}
1729
1730static void
1731dmu_objset_find_dp_cb(void *arg)
1732{
1733	dmu_objset_find_ctx_t *dcp = arg;
1734	dsl_pool_t *dp = dcp->dc_dp;
1735
1736	/*
1737	 * We need to get a pool_config_lock here, as there are several
1738	 * asssert(pool_config_held) down the stack. Getting a lock via
1739	 * dsl_pool_config_enter is risky, as it might be stalled by a
1740	 * pending writer. This would deadlock, as the write lock can
1741	 * only be granted when our parent thread gives up the lock.
1742	 * The _prio interface gives us priority over a pending writer.
1743	 */
1744	dsl_pool_config_enter_prio(dp, FTAG);
1745
1746	dmu_objset_find_dp_impl(dcp);
1747
1748	dsl_pool_config_exit(dp, FTAG);
1749}
1750
1751/*
1752 * Find objsets under and including ddobj, call func(ds) on each.
1753 * The order for the enumeration is completely undefined.
1754 * func is called with dsl_pool_config held.
1755 */
1756int
1757dmu_objset_find_dp(dsl_pool_t *dp, uint64_t ddobj,
1758    int func(dsl_pool_t *, dsl_dataset_t *, void *), void *arg, int flags)
1759{
1760	int error = 0;
1761	taskq_t *tq = NULL;
1762	int ntasks;
1763	dmu_objset_find_ctx_t *dcp;
1764	kmutex_t err_lock;
1765
1766	mutex_init(&err_lock, NULL, MUTEX_DEFAULT, NULL);
1767	dcp = kmem_alloc(sizeof (*dcp), KM_SLEEP);
1768	dcp->dc_tq = NULL;
1769	dcp->dc_dp = dp;
1770	dcp->dc_ddobj = ddobj;
1771	dcp->dc_func = func;
1772	dcp->dc_arg = arg;
1773	dcp->dc_flags = flags;
1774	dcp->dc_error_lock = &err_lock;
1775	dcp->dc_error = &error;
1776
1777	if ((flags & DS_FIND_SERIALIZE) || dsl_pool_config_held_writer(dp)) {
1778		/*
1779		 * In case a write lock is held we can't make use of
1780		 * parallelism, as down the stack of the worker threads
1781		 * the lock is asserted via dsl_pool_config_held.
1782		 * In case of a read lock this is solved by getting a read
1783		 * lock in each worker thread, which isn't possible in case
1784		 * of a writer lock. So we fall back to the synchronous path
1785		 * here.
1786		 * In the future it might be possible to get some magic into
1787		 * dsl_pool_config_held in a way that it returns true for
1788		 * the worker threads so that a single lock held from this
1789		 * thread suffices. For now, stay single threaded.
1790		 */
1791		dmu_objset_find_dp_impl(dcp);
1792
1793		return (error);
1794	}
1795
1796	ntasks = dmu_find_threads;
1797	if (ntasks == 0)
1798		ntasks = vdev_count_leaves(dp->dp_spa) * 4;
1799	tq = taskq_create("dmu_objset_find", ntasks, minclsyspri, ntasks,
1800	    INT_MAX, 0);
1801	if (tq == NULL) {
1802		kmem_free(dcp, sizeof (*dcp));
1803		return (SET_ERROR(ENOMEM));
1804	}
1805	dcp->dc_tq = tq;
1806
1807	/* dcp will be freed by task */
1808	(void) taskq_dispatch(tq, dmu_objset_find_dp_cb, dcp, TQ_SLEEP);
1809
1810	/*
1811	 * PORTING: this code relies on the property of taskq_wait to wait
1812	 * until no more tasks are queued and no more tasks are active. As
1813	 * we always queue new tasks from within other tasks, task_wait
1814	 * reliably waits for the full recursion to finish, even though we
1815	 * enqueue new tasks after taskq_wait has been called.
1816	 * On platforms other than illumos, taskq_wait may not have this
1817	 * property.
1818	 */
1819	taskq_wait(tq);
1820	taskq_destroy(tq);
1821	mutex_destroy(&err_lock);
1822
1823	return (error);
1824}
1825
1826/*
1827 * Find all objsets under name, and for each, call 'func(child_name, arg)'.
1828 * The dp_config_rwlock must not be held when this is called, and it
1829 * will not be held when the callback is called.
1830 * Therefore this function should only be used when the pool is not changing
1831 * (e.g. in syncing context), or the callback can deal with the possible races.
1832 */
1833static int
1834dmu_objset_find_impl(spa_t *spa, const char *name,
1835    int func(const char *, void *), void *arg, int flags)
1836{
1837	dsl_dir_t *dd;
1838	dsl_pool_t *dp = spa_get_dsl(spa);
1839	dsl_dataset_t *ds;
1840	zap_cursor_t zc;
1841	zap_attribute_t *attr;
1842	char *child;
1843	uint64_t thisobj;
1844	int err;
1845
1846	dsl_pool_config_enter(dp, FTAG);
1847
1848	err = dsl_dir_hold(dp, name, FTAG, &dd, NULL);
1849	if (err != 0) {
1850		dsl_pool_config_exit(dp, FTAG);
1851		return (err);
1852	}
1853
1854	/* Don't visit hidden ($MOS & $ORIGIN) objsets. */
1855	if (dd->dd_myname[0] == '$') {
1856		dsl_dir_rele(dd, FTAG);
1857		dsl_pool_config_exit(dp, FTAG);
1858		return (0);
1859	}
1860
1861	thisobj = dsl_dir_phys(dd)->dd_head_dataset_obj;
1862	attr = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
1863
1864	/*
1865	 * Iterate over all children.
1866	 */
1867	if (flags & DS_FIND_CHILDREN) {
1868		for (zap_cursor_init(&zc, dp->dp_meta_objset,
1869		    dsl_dir_phys(dd)->dd_child_dir_zapobj);
1870		    zap_cursor_retrieve(&zc, attr) == 0;
1871		    (void) zap_cursor_advance(&zc)) {
1872			ASSERT3U(attr->za_integer_length, ==,
1873			    sizeof (uint64_t));
1874			ASSERT3U(attr->za_num_integers, ==, 1);
1875
1876			child = kmem_asprintf("%s/%s", name, attr->za_name);
1877			dsl_pool_config_exit(dp, FTAG);
1878			err = dmu_objset_find_impl(spa, child,
1879			    func, arg, flags);
1880			dsl_pool_config_enter(dp, FTAG);
1881			strfree(child);
1882			if (err != 0)
1883				break;
1884		}
1885		zap_cursor_fini(&zc);
1886
1887		if (err != 0) {
1888			dsl_dir_rele(dd, FTAG);
1889			dsl_pool_config_exit(dp, FTAG);
1890			kmem_free(attr, sizeof (zap_attribute_t));
1891			return (err);
1892		}
1893	}
1894
1895	/*
1896	 * Iterate over all snapshots.
1897	 */
1898	if (flags & DS_FIND_SNAPSHOTS) {
1899		err = dsl_dataset_hold_obj(dp, thisobj, FTAG, &ds);
1900
1901		if (err == 0) {
1902			uint64_t snapobj;
1903
1904			snapobj = dsl_dataset_phys(ds)->ds_snapnames_zapobj;
1905			dsl_dataset_rele(ds, FTAG);
1906
1907			for (zap_cursor_init(&zc, dp->dp_meta_objset, snapobj);
1908			    zap_cursor_retrieve(&zc, attr) == 0;
1909			    (void) zap_cursor_advance(&zc)) {
1910				ASSERT3U(attr->za_integer_length, ==,
1911				    sizeof (uint64_t));
1912				ASSERT3U(attr->za_num_integers, ==, 1);
1913
1914				child = kmem_asprintf("%s@%s",
1915				    name, attr->za_name);
1916				dsl_pool_config_exit(dp, FTAG);
1917				err = func(child, arg);
1918				dsl_pool_config_enter(dp, FTAG);
1919				strfree(child);
1920				if (err != 0)
1921					break;
1922			}
1923			zap_cursor_fini(&zc);
1924		}
1925	}
1926
1927	dsl_dir_rele(dd, FTAG);
1928	kmem_free(attr, sizeof (zap_attribute_t));
1929	dsl_pool_config_exit(dp, FTAG);
1930
1931	if (err != 0)
1932		return (err);
1933
1934	/* Apply to self. */
1935	return (func(name, arg));
1936}
1937
1938/*
1939 * See comment above dmu_objset_find_impl().
1940 */
1941int
1942dmu_objset_find(char *name, int func(const char *, void *), void *arg,
1943    int flags)
1944{
1945	spa_t *spa;
1946	int error;
1947
1948	error = spa_open(name, &spa, FTAG);
1949	if (error != 0)
1950		return (error);
1951	error = dmu_objset_find_impl(spa, name, func, arg, flags);
1952	spa_close(spa, FTAG);
1953	return (error);
1954}
1955
1956void
1957dmu_objset_set_user(objset_t *os, void *user_ptr)
1958{
1959	ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1960	os->os_user_ptr = user_ptr;
1961}
1962
1963void *
1964dmu_objset_get_user(objset_t *os)
1965{
1966	ASSERT(MUTEX_HELD(&os->os_user_ptr_lock));
1967	return (os->os_user_ptr);
1968}
1969
1970/*
1971 * Determine name of filesystem, given name of snapshot.
1972 * buf must be at least MAXNAMELEN bytes
1973 */
1974int
1975dmu_fsname(const char *snapname, char *buf)
1976{
1977	char *atp = strchr(snapname, '@');
1978	if (atp == NULL)
1979		return (SET_ERROR(EINVAL));
1980	if (atp - snapname >= MAXNAMELEN)
1981		return (SET_ERROR(ENAMETOOLONG));
1982	(void) strlcpy(buf, snapname, atp - snapname + 1);
1983	return (0);
1984}
1985