dmu_send.c revision 307122
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved.
27 * Copyright 2014 HybridCluster. All rights reserved.
28 * Copyright 2016 RackTop Systems.
29 * Copyright (c) 2014 Integros [integros.com]
30 */
31
32#include <sys/dmu.h>
33#include <sys/dmu_impl.h>
34#include <sys/dmu_tx.h>
35#include <sys/dbuf.h>
36#include <sys/dnode.h>
37#include <sys/zfs_context.h>
38#include <sys/dmu_objset.h>
39#include <sys/dmu_traverse.h>
40#include <sys/dsl_dataset.h>
41#include <sys/dsl_dir.h>
42#include <sys/dsl_prop.h>
43#include <sys/dsl_pool.h>
44#include <sys/dsl_synctask.h>
45#include <sys/zfs_ioctl.h>
46#include <sys/zap.h>
47#include <sys/zio_checksum.h>
48#include <sys/zfs_znode.h>
49#include <zfs_fletcher.h>
50#include <sys/avl.h>
51#include <sys/ddt.h>
52#include <sys/zfs_onexit.h>
53#include <sys/dmu_send.h>
54#include <sys/dsl_destroy.h>
55#include <sys/blkptr.h>
56#include <sys/dsl_bookmark.h>
57#include <sys/zfeature.h>
58#include <sys/bqueue.h>
59
60#ifdef __FreeBSD__
61#undef dump_write
62#define dump_write dmu_dump_write
63#endif
64
65/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
66int zfs_send_corrupt_data = B_FALSE;
67int zfs_send_queue_length = 16 * 1024 * 1024;
68int zfs_recv_queue_length = 16 * 1024 * 1024;
69/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
70int zfs_send_set_freerecords_bit = B_TRUE;
71
72#ifdef _KERNEL
73TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit);
74#endif
75
76static char *dmu_recv_tag = "dmu_recv_tag";
77const char *recv_clone_name = "%recv";
78
79#define	BP_SPAN(datablkszsec, indblkshift, level) \
80	(((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
81	(level) * (indblkshift - SPA_BLKPTRSHIFT)))
82
83static void byteswap_record(dmu_replay_record_t *drr);
84
85struct send_thread_arg {
86	bqueue_t	q;
87	dsl_dataset_t	*ds;		/* Dataset to traverse */
88	uint64_t	fromtxg;	/* Traverse from this txg */
89	int		flags;		/* flags to pass to traverse_dataset */
90	int		error_code;
91	boolean_t	cancel;
92	zbookmark_phys_t resume;
93};
94
95struct send_block_record {
96	boolean_t		eos_marker; /* Marks the end of the stream */
97	blkptr_t		bp;
98	zbookmark_phys_t	zb;
99	uint8_t			indblkshift;
100	uint16_t		datablkszsec;
101	bqueue_node_t		ln;
102};
103
104static int
105dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
106{
107	dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
108	struct uio auio;
109	struct iovec aiov;
110
111	/*
112	 * The code does not rely on this (len being a multiple of 8).  We keep
113	 * this assertion because of the corresponding assertion in
114	 * receive_read().  Keeping this assertion ensures that we do not
115	 * inadvertently break backwards compatibility (causing the assertion
116	 * in receive_read() to trigger on old software).
117	 *
118	 * Removing the assertions could be rolled into a new feature that uses
119	 * data that isn't 8-byte aligned; if the assertions were removed, a
120	 * feature flag would have to be added.
121	 */
122
123	ASSERT0(len % 8);
124
125	aiov.iov_base = buf;
126	aiov.iov_len = len;
127	auio.uio_iov = &aiov;
128	auio.uio_iovcnt = 1;
129	auio.uio_resid = len;
130	auio.uio_segflg = UIO_SYSSPACE;
131	auio.uio_rw = UIO_WRITE;
132	auio.uio_offset = (off_t)-1;
133	auio.uio_td = dsp->dsa_td;
134#ifdef _KERNEL
135	if (dsp->dsa_fp->f_type == DTYPE_VNODE)
136		bwillwrite();
137	dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0,
138	    dsp->dsa_td);
139#else
140	fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__);
141	dsp->dsa_err = EOPNOTSUPP;
142#endif
143	mutex_enter(&ds->ds_sendstream_lock);
144	*dsp->dsa_off += len;
145	mutex_exit(&ds->ds_sendstream_lock);
146
147	return (dsp->dsa_err);
148}
149
150/*
151 * For all record types except BEGIN, fill in the checksum (overlaid in
152 * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
153 * up to the start of the checksum itself.
154 */
155static int
156dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
157{
158	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
159	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
160	fletcher_4_incremental_native(dsp->dsa_drr,
161	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
162	    &dsp->dsa_zc);
163	if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
164		ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
165		    drr_checksum.drr_checksum));
166		dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
167	}
168	fletcher_4_incremental_native(&dsp->dsa_drr->
169	    drr_u.drr_checksum.drr_checksum,
170	    sizeof (zio_cksum_t), &dsp->dsa_zc);
171	if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
172		return (SET_ERROR(EINTR));
173	if (payload_len != 0) {
174		fletcher_4_incremental_native(payload, payload_len,
175		    &dsp->dsa_zc);
176		if (dump_bytes(dsp, payload, payload_len) != 0)
177			return (SET_ERROR(EINTR));
178	}
179	return (0);
180}
181
182/*
183 * Fill in the drr_free struct, or perform aggregation if the previous record is
184 * also a free record, and the two are adjacent.
185 *
186 * Note that we send free records even for a full send, because we want to be
187 * able to receive a full send as a clone, which requires a list of all the free
188 * and freeobject records that were generated on the source.
189 */
190static int
191dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
192    uint64_t length)
193{
194	struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
195
196	/*
197	 * When we receive a free record, dbuf_free_range() assumes
198	 * that the receiving system doesn't have any dbufs in the range
199	 * being freed.  This is always true because there is a one-record
200	 * constraint: we only send one WRITE record for any given
201	 * object,offset.  We know that the one-record constraint is
202	 * true because we always send data in increasing order by
203	 * object,offset.
204	 *
205	 * If the increasing-order constraint ever changes, we should find
206	 * another way to assert that the one-record constraint is still
207	 * satisfied.
208	 */
209	ASSERT(object > dsp->dsa_last_data_object ||
210	    (object == dsp->dsa_last_data_object &&
211	    offset > dsp->dsa_last_data_offset));
212
213	if (length != -1ULL && offset + length < offset)
214		length = -1ULL;
215
216	/*
217	 * If there is a pending op, but it's not PENDING_FREE, push it out,
218	 * since free block aggregation can only be done for blocks of the
219	 * same type (i.e., DRR_FREE records can only be aggregated with
220	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
221	 * aggregated with other DRR_FREEOBJECTS records.
222	 */
223	if (dsp->dsa_pending_op != PENDING_NONE &&
224	    dsp->dsa_pending_op != PENDING_FREE) {
225		if (dump_record(dsp, NULL, 0) != 0)
226			return (SET_ERROR(EINTR));
227		dsp->dsa_pending_op = PENDING_NONE;
228	}
229
230	if (dsp->dsa_pending_op == PENDING_FREE) {
231		/*
232		 * There should never be a PENDING_FREE if length is -1
233		 * (because dump_dnode is the only place where this
234		 * function is called with a -1, and only after flushing
235		 * any pending record).
236		 */
237		ASSERT(length != -1ULL);
238		/*
239		 * Check to see whether this free block can be aggregated
240		 * with pending one.
241		 */
242		if (drrf->drr_object == object && drrf->drr_offset +
243		    drrf->drr_length == offset) {
244			drrf->drr_length += length;
245			return (0);
246		} else {
247			/* not a continuation.  Push out pending record */
248			if (dump_record(dsp, NULL, 0) != 0)
249				return (SET_ERROR(EINTR));
250			dsp->dsa_pending_op = PENDING_NONE;
251		}
252	}
253	/* create a FREE record and make it pending */
254	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
255	dsp->dsa_drr->drr_type = DRR_FREE;
256	drrf->drr_object = object;
257	drrf->drr_offset = offset;
258	drrf->drr_length = length;
259	drrf->drr_toguid = dsp->dsa_toguid;
260	if (length == -1ULL) {
261		if (dump_record(dsp, NULL, 0) != 0)
262			return (SET_ERROR(EINTR));
263	} else {
264		dsp->dsa_pending_op = PENDING_FREE;
265	}
266
267	return (0);
268}
269
270static int
271dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
272    uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data)
273{
274	struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
275
276	/*
277	 * We send data in increasing object, offset order.
278	 * See comment in dump_free() for details.
279	 */
280	ASSERT(object > dsp->dsa_last_data_object ||
281	    (object == dsp->dsa_last_data_object &&
282	    offset > dsp->dsa_last_data_offset));
283	dsp->dsa_last_data_object = object;
284	dsp->dsa_last_data_offset = offset + blksz - 1;
285
286	/*
287	 * If there is any kind of pending aggregation (currently either
288	 * a grouping of free objects or free blocks), push it out to
289	 * the stream, since aggregation can't be done across operations
290	 * of different types.
291	 */
292	if (dsp->dsa_pending_op != PENDING_NONE) {
293		if (dump_record(dsp, NULL, 0) != 0)
294			return (SET_ERROR(EINTR));
295		dsp->dsa_pending_op = PENDING_NONE;
296	}
297	/* write a WRITE record */
298	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
299	dsp->dsa_drr->drr_type = DRR_WRITE;
300	drrw->drr_object = object;
301	drrw->drr_type = type;
302	drrw->drr_offset = offset;
303	drrw->drr_length = blksz;
304	drrw->drr_toguid = dsp->dsa_toguid;
305	if (bp == NULL || BP_IS_EMBEDDED(bp)) {
306		/*
307		 * There's no pre-computed checksum for partial-block
308		 * writes or embedded BP's, so (like
309		 * fletcher4-checkummed blocks) userland will have to
310		 * compute a dedup-capable checksum itself.
311		 */
312		drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
313	} else {
314		drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
315		if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
316		    ZCHECKSUM_FLAG_DEDUP)
317			drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
318		DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
319		DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
320		DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
321		drrw->drr_key.ddk_cksum = bp->blk_cksum;
322	}
323
324	if (dump_record(dsp, data, blksz) != 0)
325		return (SET_ERROR(EINTR));
326	return (0);
327}
328
329static int
330dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
331    int blksz, const blkptr_t *bp)
332{
333	char buf[BPE_PAYLOAD_SIZE];
334	struct drr_write_embedded *drrw =
335	    &(dsp->dsa_drr->drr_u.drr_write_embedded);
336
337	if (dsp->dsa_pending_op != PENDING_NONE) {
338		if (dump_record(dsp, NULL, 0) != 0)
339			return (EINTR);
340		dsp->dsa_pending_op = PENDING_NONE;
341	}
342
343	ASSERT(BP_IS_EMBEDDED(bp));
344
345	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
346	dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
347	drrw->drr_object = object;
348	drrw->drr_offset = offset;
349	drrw->drr_length = blksz;
350	drrw->drr_toguid = dsp->dsa_toguid;
351	drrw->drr_compression = BP_GET_COMPRESS(bp);
352	drrw->drr_etype = BPE_GET_ETYPE(bp);
353	drrw->drr_lsize = BPE_GET_LSIZE(bp);
354	drrw->drr_psize = BPE_GET_PSIZE(bp);
355
356	decode_embedded_bp_compressed(bp, buf);
357
358	if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
359		return (EINTR);
360	return (0);
361}
362
363static int
364dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
365{
366	struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
367
368	if (dsp->dsa_pending_op != PENDING_NONE) {
369		if (dump_record(dsp, NULL, 0) != 0)
370			return (SET_ERROR(EINTR));
371		dsp->dsa_pending_op = PENDING_NONE;
372	}
373
374	/* write a SPILL record */
375	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
376	dsp->dsa_drr->drr_type = DRR_SPILL;
377	drrs->drr_object = object;
378	drrs->drr_length = blksz;
379	drrs->drr_toguid = dsp->dsa_toguid;
380
381	if (dump_record(dsp, data, blksz) != 0)
382		return (SET_ERROR(EINTR));
383	return (0);
384}
385
386static int
387dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
388{
389	struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
390
391	/*
392	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
393	 * push it out, since free block aggregation can only be done for
394	 * blocks of the same type (i.e., DRR_FREE records can only be
395	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
396	 * can only be aggregated with other DRR_FREEOBJECTS records.
397	 */
398	if (dsp->dsa_pending_op != PENDING_NONE &&
399	    dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
400		if (dump_record(dsp, NULL, 0) != 0)
401			return (SET_ERROR(EINTR));
402		dsp->dsa_pending_op = PENDING_NONE;
403	}
404	if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
405		/*
406		 * See whether this free object array can be aggregated
407		 * with pending one
408		 */
409		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
410			drrfo->drr_numobjs += numobjs;
411			return (0);
412		} else {
413			/* can't be aggregated.  Push out pending record */
414			if (dump_record(dsp, NULL, 0) != 0)
415				return (SET_ERROR(EINTR));
416			dsp->dsa_pending_op = PENDING_NONE;
417		}
418	}
419
420	/* write a FREEOBJECTS record */
421	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
422	dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
423	drrfo->drr_firstobj = firstobj;
424	drrfo->drr_numobjs = numobjs;
425	drrfo->drr_toguid = dsp->dsa_toguid;
426
427	dsp->dsa_pending_op = PENDING_FREEOBJECTS;
428
429	return (0);
430}
431
432static int
433dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
434{
435	struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
436
437	if (object < dsp->dsa_resume_object) {
438		/*
439		 * Note: when resuming, we will visit all the dnodes in
440		 * the block of dnodes that we are resuming from.  In
441		 * this case it's unnecessary to send the dnodes prior to
442		 * the one we are resuming from.  We should be at most one
443		 * block's worth of dnodes behind the resume point.
444		 */
445		ASSERT3U(dsp->dsa_resume_object - object, <,
446		    1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
447		return (0);
448	}
449
450	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
451		return (dump_freeobjects(dsp, object, 1));
452
453	if (dsp->dsa_pending_op != PENDING_NONE) {
454		if (dump_record(dsp, NULL, 0) != 0)
455			return (SET_ERROR(EINTR));
456		dsp->dsa_pending_op = PENDING_NONE;
457	}
458
459	/* write an OBJECT record */
460	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
461	dsp->dsa_drr->drr_type = DRR_OBJECT;
462	drro->drr_object = object;
463	drro->drr_type = dnp->dn_type;
464	drro->drr_bonustype = dnp->dn_bonustype;
465	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
466	drro->drr_bonuslen = dnp->dn_bonuslen;
467	drro->drr_checksumtype = dnp->dn_checksum;
468	drro->drr_compress = dnp->dn_compress;
469	drro->drr_toguid = dsp->dsa_toguid;
470
471	if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
472	    drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
473		drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
474
475	if (dump_record(dsp, DN_BONUS(dnp),
476	    P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
477		return (SET_ERROR(EINTR));
478	}
479
480	/* Free anything past the end of the file. */
481	if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
482	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
483		return (SET_ERROR(EINTR));
484	if (dsp->dsa_err != 0)
485		return (SET_ERROR(EINTR));
486	return (0);
487}
488
489static boolean_t
490backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
491{
492	if (!BP_IS_EMBEDDED(bp))
493		return (B_FALSE);
494
495	/*
496	 * Compression function must be legacy, or explicitly enabled.
497	 */
498	if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
499	    !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4)))
500		return (B_FALSE);
501
502	/*
503	 * Embed type must be explicitly enabled.
504	 */
505	switch (BPE_GET_ETYPE(bp)) {
506	case BP_EMBEDDED_TYPE_DATA:
507		if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
508			return (B_TRUE);
509		break;
510	default:
511		return (B_FALSE);
512	}
513	return (B_FALSE);
514}
515
516/*
517 * This is the callback function to traverse_dataset that acts as the worker
518 * thread for dmu_send_impl.
519 */
520/*ARGSUSED*/
521static int
522send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
523    const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
524{
525	struct send_thread_arg *sta = arg;
526	struct send_block_record *record;
527	uint64_t record_size;
528	int err = 0;
529
530	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
531	    zb->zb_object >= sta->resume.zb_object);
532
533	if (sta->cancel)
534		return (SET_ERROR(EINTR));
535
536	if (bp == NULL) {
537		ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
538		return (0);
539	} else if (zb->zb_level < 0) {
540		return (0);
541	}
542
543	record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
544	record->eos_marker = B_FALSE;
545	record->bp = *bp;
546	record->zb = *zb;
547	record->indblkshift = dnp->dn_indblkshift;
548	record->datablkszsec = dnp->dn_datablkszsec;
549	record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
550	bqueue_enqueue(&sta->q, record, record_size);
551
552	return (err);
553}
554
555/*
556 * This function kicks off the traverse_dataset.  It also handles setting the
557 * error code of the thread in case something goes wrong, and pushes the End of
558 * Stream record when the traverse_dataset call has finished.  If there is no
559 * dataset to traverse, the thread immediately pushes End of Stream marker.
560 */
561static void
562send_traverse_thread(void *arg)
563{
564	struct send_thread_arg *st_arg = arg;
565	int err;
566	struct send_block_record *data;
567
568	if (st_arg->ds != NULL) {
569		err = traverse_dataset_resume(st_arg->ds,
570		    st_arg->fromtxg, &st_arg->resume,
571		    st_arg->flags, send_cb, st_arg);
572
573		if (err != EINTR)
574			st_arg->error_code = err;
575	}
576	data = kmem_zalloc(sizeof (*data), KM_SLEEP);
577	data->eos_marker = B_TRUE;
578	bqueue_enqueue(&st_arg->q, data, 1);
579	thread_exit();
580}
581
582/*
583 * This function actually handles figuring out what kind of record needs to be
584 * dumped, reading the data (which has hopefully been prefetched), and calling
585 * the appropriate helper function.
586 */
587static int
588do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
589{
590	dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
591	const blkptr_t *bp = &data->bp;
592	const zbookmark_phys_t *zb = &data->zb;
593	uint8_t indblkshift = data->indblkshift;
594	uint16_t dblkszsec = data->datablkszsec;
595	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
596	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
597	int err = 0;
598
599	ASSERT3U(zb->zb_level, >=, 0);
600
601	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
602	    zb->zb_object >= dsa->dsa_resume_object);
603
604	if (zb->zb_object != DMU_META_DNODE_OBJECT &&
605	    DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
606		return (0);
607	} else if (BP_IS_HOLE(bp) &&
608	    zb->zb_object == DMU_META_DNODE_OBJECT) {
609		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
610		uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
611		err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
612	} else if (BP_IS_HOLE(bp)) {
613		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
614		uint64_t offset = zb->zb_blkid * span;
615		err = dump_free(dsa, zb->zb_object, offset, span);
616	} else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
617		return (0);
618	} else if (type == DMU_OT_DNODE) {
619		int blksz = BP_GET_LSIZE(bp);
620		arc_flags_t aflags = ARC_FLAG_WAIT;
621		arc_buf_t *abuf;
622
623		ASSERT0(zb->zb_level);
624
625		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
626		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
627		    &aflags, zb) != 0)
628			return (SET_ERROR(EIO));
629
630		dnode_phys_t *blk = abuf->b_data;
631		uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
632		for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
633			err = dump_dnode(dsa, dnobj + i, blk + i);
634			if (err != 0)
635				break;
636		}
637		(void) arc_buf_remove_ref(abuf, &abuf);
638	} else if (type == DMU_OT_SA) {
639		arc_flags_t aflags = ARC_FLAG_WAIT;
640		arc_buf_t *abuf;
641		int blksz = BP_GET_LSIZE(bp);
642
643		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
644		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
645		    &aflags, zb) != 0)
646			return (SET_ERROR(EIO));
647
648		err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
649		(void) arc_buf_remove_ref(abuf, &abuf);
650	} else if (backup_do_embed(dsa, bp)) {
651		/* it's an embedded level-0 block of a regular object */
652		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
653		ASSERT0(zb->zb_level);
654		err = dump_write_embedded(dsa, zb->zb_object,
655		    zb->zb_blkid * blksz, blksz, bp);
656	} else {
657		/* it's a level-0 block of a regular object */
658		arc_flags_t aflags = ARC_FLAG_WAIT;
659		arc_buf_t *abuf;
660		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
661		uint64_t offset;
662
663		ASSERT0(zb->zb_level);
664		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
665		    (zb->zb_object == dsa->dsa_resume_object &&
666		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
667
668		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
669		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
670		    &aflags, zb) != 0) {
671			if (zfs_send_corrupt_data) {
672				/* Send a block filled with 0x"zfs badd bloc" */
673				abuf = arc_buf_alloc(spa, blksz, &abuf,
674				    ARC_BUFC_DATA);
675				uint64_t *ptr;
676				for (ptr = abuf->b_data;
677				    (char *)ptr < (char *)abuf->b_data + blksz;
678				    ptr++)
679					*ptr = 0x2f5baddb10cULL;
680			} else {
681				return (SET_ERROR(EIO));
682			}
683		}
684
685		offset = zb->zb_blkid * blksz;
686
687		if (!(dsa->dsa_featureflags &
688		    DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
689		    blksz > SPA_OLD_MAXBLOCKSIZE) {
690			char *buf = abuf->b_data;
691			while (blksz > 0 && err == 0) {
692				int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
693				err = dump_write(dsa, type, zb->zb_object,
694				    offset, n, NULL, buf);
695				offset += n;
696				buf += n;
697				blksz -= n;
698			}
699		} else {
700			err = dump_write(dsa, type, zb->zb_object,
701			    offset, blksz, bp, abuf->b_data);
702		}
703		(void) arc_buf_remove_ref(abuf, &abuf);
704	}
705
706	ASSERT(err == 0 || err == EINTR);
707	return (err);
708}
709
710/*
711 * Pop the new data off the queue, and free the old data.
712 */
713static struct send_block_record *
714get_next_record(bqueue_t *bq, struct send_block_record *data)
715{
716	struct send_block_record *tmp = bqueue_dequeue(bq);
717	kmem_free(data, sizeof (*data));
718	return (tmp);
719}
720
721/*
722 * Actually do the bulk of the work in a zfs send.
723 *
724 * Note: Releases dp using the specified tag.
725 */
726static int
727dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
728    zfs_bookmark_phys_t *ancestor_zb,
729    boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd,
730    uint64_t resumeobj, uint64_t resumeoff,
731#ifdef illumos
732    vnode_t *vp, offset_t *off)
733#else
734    struct file *fp, offset_t *off)
735#endif
736{
737	objset_t *os;
738	dmu_replay_record_t *drr;
739	dmu_sendarg_t *dsp;
740	int err;
741	uint64_t fromtxg = 0;
742	uint64_t featureflags = 0;
743	struct send_thread_arg to_arg = { 0 };
744
745	err = dmu_objset_from_ds(to_ds, &os);
746	if (err != 0) {
747		dsl_pool_rele(dp, tag);
748		return (err);
749	}
750
751	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
752	drr->drr_type = DRR_BEGIN;
753	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
754	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
755	    DMU_SUBSTREAM);
756
757#ifdef _KERNEL
758	if (dmu_objset_type(os) == DMU_OST_ZFS) {
759		uint64_t version;
760		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
761			kmem_free(drr, sizeof (dmu_replay_record_t));
762			dsl_pool_rele(dp, tag);
763			return (SET_ERROR(EINVAL));
764		}
765		if (version >= ZPL_VERSION_SA) {
766			featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
767		}
768	}
769#endif
770
771	if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
772		featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
773	if (embedok &&
774	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
775		featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
776		if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
777			featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4;
778	}
779
780	if (resumeobj != 0 || resumeoff != 0) {
781		featureflags |= DMU_BACKUP_FEATURE_RESUMING;
782	}
783
784	DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
785	    featureflags);
786
787	drr->drr_u.drr_begin.drr_creation_time =
788	    dsl_dataset_phys(to_ds)->ds_creation_time;
789	drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
790	if (is_clone)
791		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
792	drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
793	if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
794		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
795	if (zfs_send_set_freerecords_bit)
796		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
797
798	if (ancestor_zb != NULL) {
799		drr->drr_u.drr_begin.drr_fromguid =
800		    ancestor_zb->zbm_guid;
801		fromtxg = ancestor_zb->zbm_creation_txg;
802	}
803	dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
804	if (!to_ds->ds_is_snapshot) {
805		(void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
806		    sizeof (drr->drr_u.drr_begin.drr_toname));
807	}
808
809	dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
810
811	dsp->dsa_drr = drr;
812	dsp->dsa_outfd = outfd;
813	dsp->dsa_proc = curproc;
814	dsp->dsa_td = curthread;
815	dsp->dsa_fp = fp;
816	dsp->dsa_os = os;
817	dsp->dsa_off = off;
818	dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
819	dsp->dsa_pending_op = PENDING_NONE;
820	dsp->dsa_featureflags = featureflags;
821	dsp->dsa_resume_object = resumeobj;
822	dsp->dsa_resume_offset = resumeoff;
823
824	mutex_enter(&to_ds->ds_sendstream_lock);
825	list_insert_head(&to_ds->ds_sendstreams, dsp);
826	mutex_exit(&to_ds->ds_sendstream_lock);
827
828	dsl_dataset_long_hold(to_ds, FTAG);
829	dsl_pool_rele(dp, tag);
830
831	void *payload = NULL;
832	size_t payload_len = 0;
833	if (resumeobj != 0 || resumeoff != 0) {
834		dmu_object_info_t to_doi;
835		err = dmu_object_info(os, resumeobj, &to_doi);
836		if (err != 0)
837			goto out;
838		SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
839		    resumeoff / to_doi.doi_data_block_size);
840
841		nvlist_t *nvl = fnvlist_alloc();
842		fnvlist_add_uint64(nvl, "resume_object", resumeobj);
843		fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
844		payload = fnvlist_pack(nvl, &payload_len);
845		drr->drr_payloadlen = payload_len;
846		fnvlist_free(nvl);
847	}
848
849	err = dump_record(dsp, payload, payload_len);
850	fnvlist_pack_free(payload, payload_len);
851	if (err != 0) {
852		err = dsp->dsa_err;
853		goto out;
854	}
855
856	err = bqueue_init(&to_arg.q, zfs_send_queue_length,
857	    offsetof(struct send_block_record, ln));
858	to_arg.error_code = 0;
859	to_arg.cancel = B_FALSE;
860	to_arg.ds = to_ds;
861	to_arg.fromtxg = fromtxg;
862	to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
863	(void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0,
864	    TS_RUN, minclsyspri);
865
866	struct send_block_record *to_data;
867	to_data = bqueue_dequeue(&to_arg.q);
868
869	while (!to_data->eos_marker && err == 0) {
870		err = do_dump(dsp, to_data);
871		to_data = get_next_record(&to_arg.q, to_data);
872		if (issig(JUSTLOOKING) && issig(FORREAL))
873			err = EINTR;
874	}
875
876	if (err != 0) {
877		to_arg.cancel = B_TRUE;
878		while (!to_data->eos_marker) {
879			to_data = get_next_record(&to_arg.q, to_data);
880		}
881	}
882	kmem_free(to_data, sizeof (*to_data));
883
884	bqueue_destroy(&to_arg.q);
885
886	if (err == 0 && to_arg.error_code != 0)
887		err = to_arg.error_code;
888
889	if (err != 0)
890		goto out;
891
892	if (dsp->dsa_pending_op != PENDING_NONE)
893		if (dump_record(dsp, NULL, 0) != 0)
894			err = SET_ERROR(EINTR);
895
896	if (err != 0) {
897		if (err == EINTR && dsp->dsa_err != 0)
898			err = dsp->dsa_err;
899		goto out;
900	}
901
902	bzero(drr, sizeof (dmu_replay_record_t));
903	drr->drr_type = DRR_END;
904	drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
905	drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
906
907	if (dump_record(dsp, NULL, 0) != 0)
908		err = dsp->dsa_err;
909
910out:
911	mutex_enter(&to_ds->ds_sendstream_lock);
912	list_remove(&to_ds->ds_sendstreams, dsp);
913	mutex_exit(&to_ds->ds_sendstream_lock);
914
915	kmem_free(drr, sizeof (dmu_replay_record_t));
916	kmem_free(dsp, sizeof (dmu_sendarg_t));
917
918	dsl_dataset_long_rele(to_ds, FTAG);
919
920	return (err);
921}
922
923int
924dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
925    boolean_t embedok, boolean_t large_block_ok,
926#ifdef illumos
927    int outfd, vnode_t *vp, offset_t *off)
928#else
929    int outfd, struct file *fp, offset_t *off)
930#endif
931{
932	dsl_pool_t *dp;
933	dsl_dataset_t *ds;
934	dsl_dataset_t *fromds = NULL;
935	int err;
936
937	err = dsl_pool_hold(pool, FTAG, &dp);
938	if (err != 0)
939		return (err);
940
941	err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
942	if (err != 0) {
943		dsl_pool_rele(dp, FTAG);
944		return (err);
945	}
946
947	if (fromsnap != 0) {
948		zfs_bookmark_phys_t zb;
949		boolean_t is_clone;
950
951		err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
952		if (err != 0) {
953			dsl_dataset_rele(ds, FTAG);
954			dsl_pool_rele(dp, FTAG);
955			return (err);
956		}
957		if (!dsl_dataset_is_before(ds, fromds, 0))
958			err = SET_ERROR(EXDEV);
959		zb.zbm_creation_time =
960		    dsl_dataset_phys(fromds)->ds_creation_time;
961		zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
962		zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
963		is_clone = (fromds->ds_dir != ds->ds_dir);
964		dsl_dataset_rele(fromds, FTAG);
965		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
966		    embedok, large_block_ok, outfd, 0, 0, fp, off);
967	} else {
968		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
969		    embedok, large_block_ok, outfd, 0, 0, fp, off);
970	}
971	dsl_dataset_rele(ds, FTAG);
972	return (err);
973}
974
975int
976dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
977    boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
978#ifdef illumos
979    vnode_t *vp, offset_t *off)
980#else
981    struct file *fp, offset_t *off)
982#endif
983{
984	dsl_pool_t *dp;
985	dsl_dataset_t *ds;
986	int err;
987	boolean_t owned = B_FALSE;
988
989	if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
990		return (SET_ERROR(EINVAL));
991
992	err = dsl_pool_hold(tosnap, FTAG, &dp);
993	if (err != 0)
994		return (err);
995
996	if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
997		/*
998		 * We are sending a filesystem or volume.  Ensure
999		 * that it doesn't change by owning the dataset.
1000		 */
1001		err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
1002		owned = B_TRUE;
1003	} else {
1004		err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
1005	}
1006	if (err != 0) {
1007		dsl_pool_rele(dp, FTAG);
1008		return (err);
1009	}
1010
1011	if (fromsnap != NULL) {
1012		zfs_bookmark_phys_t zb;
1013		boolean_t is_clone = B_FALSE;
1014		int fsnamelen = strchr(tosnap, '@') - tosnap;
1015
1016		/*
1017		 * If the fromsnap is in a different filesystem, then
1018		 * mark the send stream as a clone.
1019		 */
1020		if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1021		    (fromsnap[fsnamelen] != '@' &&
1022		    fromsnap[fsnamelen] != '#')) {
1023			is_clone = B_TRUE;
1024		}
1025
1026		if (strchr(fromsnap, '@')) {
1027			dsl_dataset_t *fromds;
1028			err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1029			if (err == 0) {
1030				if (!dsl_dataset_is_before(ds, fromds, 0))
1031					err = SET_ERROR(EXDEV);
1032				zb.zbm_creation_time =
1033				    dsl_dataset_phys(fromds)->ds_creation_time;
1034				zb.zbm_creation_txg =
1035				    dsl_dataset_phys(fromds)->ds_creation_txg;
1036				zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1037				is_clone = (ds->ds_dir != fromds->ds_dir);
1038				dsl_dataset_rele(fromds, FTAG);
1039			}
1040		} else {
1041			err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1042		}
1043		if (err != 0) {
1044			dsl_dataset_rele(ds, FTAG);
1045			dsl_pool_rele(dp, FTAG);
1046			return (err);
1047		}
1048		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1049		    embedok, large_block_ok,
1050		    outfd, resumeobj, resumeoff, fp, off);
1051	} else {
1052		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1053		    embedok, large_block_ok,
1054		    outfd, resumeobj, resumeoff, fp, off);
1055	}
1056	if (owned)
1057		dsl_dataset_disown(ds, FTAG);
1058	else
1059		dsl_dataset_rele(ds, FTAG);
1060	return (err);
1061}
1062
1063static int
1064dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size,
1065    uint64_t *sizep)
1066{
1067	int err;
1068	/*
1069	 * Assume that space (both on-disk and in-stream) is dominated by
1070	 * data.  We will adjust for indirect blocks and the copies property,
1071	 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1072	 */
1073
1074	/*
1075	 * Subtract out approximate space used by indirect blocks.
1076	 * Assume most space is used by data blocks (non-indirect, non-dnode).
1077	 * Assume all blocks are recordsize.  Assume ditto blocks and
1078	 * internal fragmentation counter out compression.
1079	 *
1080	 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1081	 * block, which we observe in practice.
1082	 */
1083	uint64_t recordsize;
1084	err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize);
1085	if (err != 0)
1086		return (err);
1087	size -= size / recordsize * sizeof (blkptr_t);
1088
1089	/* Add in the space for the record associated with each block. */
1090	size += size / recordsize * sizeof (dmu_replay_record_t);
1091
1092	*sizep = size;
1093
1094	return (0);
1095}
1096
1097int
1098dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep)
1099{
1100	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1101	int err;
1102	uint64_t size;
1103
1104	ASSERT(dsl_pool_config_held(dp));
1105
1106	/* tosnap must be a snapshot */
1107	if (!ds->ds_is_snapshot)
1108		return (SET_ERROR(EINVAL));
1109
1110	/* fromsnap, if provided, must be a snapshot */
1111	if (fromds != NULL && !fromds->ds_is_snapshot)
1112		return (SET_ERROR(EINVAL));
1113
1114	/*
1115	 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1116	 * or the origin's fs.
1117	 */
1118	if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1119		return (SET_ERROR(EXDEV));
1120
1121	/* Get uncompressed size estimate of changed data. */
1122	if (fromds == NULL) {
1123		size = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1124	} else {
1125		uint64_t used, comp;
1126		err = dsl_dataset_space_written(fromds, ds,
1127		    &used, &comp, &size);
1128		if (err != 0)
1129			return (err);
1130	}
1131
1132	err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1133	return (err);
1134}
1135
1136/*
1137 * Simple callback used to traverse the blocks of a snapshot and sum their
1138 * uncompressed size
1139 */
1140/* ARGSUSED */
1141static int
1142dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1143    const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1144{
1145	uint64_t *spaceptr = arg;
1146	if (bp != NULL && !BP_IS_HOLE(bp)) {
1147		*spaceptr += BP_GET_UCSIZE(bp);
1148	}
1149	return (0);
1150}
1151
1152/*
1153 * Given a desination snapshot and a TXG, calculate the approximate size of a
1154 * send stream sent from that TXG. from_txg may be zero, indicating that the
1155 * whole snapshot will be sent.
1156 */
1157int
1158dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1159    uint64_t *sizep)
1160{
1161	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1162	int err;
1163	uint64_t size = 0;
1164
1165	ASSERT(dsl_pool_config_held(dp));
1166
1167	/* tosnap must be a snapshot */
1168	if (!dsl_dataset_is_snapshot(ds))
1169		return (SET_ERROR(EINVAL));
1170
1171	/* verify that from_txg is before the provided snapshot was taken */
1172	if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1173		return (SET_ERROR(EXDEV));
1174	}
1175
1176	/*
1177	 * traverse the blocks of the snapshot with birth times after
1178	 * from_txg, summing their uncompressed size
1179	 */
1180	err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1181	    dmu_calculate_send_traversal, &size);
1182	if (err)
1183		return (err);
1184
1185	err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1186	return (err);
1187}
1188
1189typedef struct dmu_recv_begin_arg {
1190	const char *drba_origin;
1191	dmu_recv_cookie_t *drba_cookie;
1192	cred_t *drba_cred;
1193	uint64_t drba_snapobj;
1194} dmu_recv_begin_arg_t;
1195
1196static int
1197recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1198    uint64_t fromguid)
1199{
1200	uint64_t val;
1201	int error;
1202	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1203
1204	/* temporary clone name must not exist */
1205	error = zap_lookup(dp->dp_meta_objset,
1206	    dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1207	    8, 1, &val);
1208	if (error != ENOENT)
1209		return (error == 0 ? EBUSY : error);
1210
1211	/* new snapshot name must not exist */
1212	error = zap_lookup(dp->dp_meta_objset,
1213	    dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1214	    drba->drba_cookie->drc_tosnap, 8, 1, &val);
1215	if (error != ENOENT)
1216		return (error == 0 ? EEXIST : error);
1217
1218	/*
1219	 * Check snapshot limit before receiving. We'll recheck again at the
1220	 * end, but might as well abort before receiving if we're already over
1221	 * the limit.
1222	 *
1223	 * Note that we do not check the file system limit with
1224	 * dsl_dir_fscount_check because the temporary %clones don't count
1225	 * against that limit.
1226	 */
1227	error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1228	    NULL, drba->drba_cred);
1229	if (error != 0)
1230		return (error);
1231
1232	if (fromguid != 0) {
1233		dsl_dataset_t *snap;
1234		uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1235
1236		/* Find snapshot in this dir that matches fromguid. */
1237		while (obj != 0) {
1238			error = dsl_dataset_hold_obj(dp, obj, FTAG,
1239			    &snap);
1240			if (error != 0)
1241				return (SET_ERROR(ENODEV));
1242			if (snap->ds_dir != ds->ds_dir) {
1243				dsl_dataset_rele(snap, FTAG);
1244				return (SET_ERROR(ENODEV));
1245			}
1246			if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1247				break;
1248			obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1249			dsl_dataset_rele(snap, FTAG);
1250		}
1251		if (obj == 0)
1252			return (SET_ERROR(ENODEV));
1253
1254		if (drba->drba_cookie->drc_force) {
1255			drba->drba_snapobj = obj;
1256		} else {
1257			/*
1258			 * If we are not forcing, there must be no
1259			 * changes since fromsnap.
1260			 */
1261			if (dsl_dataset_modified_since_snap(ds, snap)) {
1262				dsl_dataset_rele(snap, FTAG);
1263				return (SET_ERROR(ETXTBSY));
1264			}
1265			drba->drba_snapobj = ds->ds_prev->ds_object;
1266		}
1267
1268		dsl_dataset_rele(snap, FTAG);
1269	} else {
1270		/* if full, then must be forced */
1271		if (!drba->drba_cookie->drc_force)
1272			return (SET_ERROR(EEXIST));
1273		/* start from $ORIGIN@$ORIGIN, if supported */
1274		drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1275		    dp->dp_origin_snap->ds_object : 0;
1276	}
1277
1278	return (0);
1279
1280}
1281
1282static int
1283dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1284{
1285	dmu_recv_begin_arg_t *drba = arg;
1286	dsl_pool_t *dp = dmu_tx_pool(tx);
1287	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1288	uint64_t fromguid = drrb->drr_fromguid;
1289	int flags = drrb->drr_flags;
1290	int error;
1291	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1292	dsl_dataset_t *ds;
1293	const char *tofs = drba->drba_cookie->drc_tofs;
1294
1295	/* already checked */
1296	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1297	ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
1298
1299	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1300	    DMU_COMPOUNDSTREAM ||
1301	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1302	    ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1303		return (SET_ERROR(EINVAL));
1304
1305	/* Verify pool version supports SA if SA_SPILL feature set */
1306	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1307	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
1308		return (SET_ERROR(ENOTSUP));
1309
1310	if (drba->drba_cookie->drc_resumable &&
1311	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
1312		return (SET_ERROR(ENOTSUP));
1313
1314	/*
1315	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1316	 * record to a plan WRITE record, so the pool must have the
1317	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1318	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1319	 */
1320	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1321	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1322		return (SET_ERROR(ENOTSUP));
1323	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1324	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1325		return (SET_ERROR(ENOTSUP));
1326
1327	/*
1328	 * The receiving code doesn't know how to translate large blocks
1329	 * to smaller ones, so the pool must have the LARGE_BLOCKS
1330	 * feature enabled if the stream has LARGE_BLOCKS.
1331	 */
1332	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1333	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1334		return (SET_ERROR(ENOTSUP));
1335
1336	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1337	if (error == 0) {
1338		/* target fs already exists; recv into temp clone */
1339
1340		/* Can't recv a clone into an existing fs */
1341		if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1342			dsl_dataset_rele(ds, FTAG);
1343			return (SET_ERROR(EINVAL));
1344		}
1345
1346		error = recv_begin_check_existing_impl(drba, ds, fromguid);
1347		dsl_dataset_rele(ds, FTAG);
1348	} else if (error == ENOENT) {
1349		/* target fs does not exist; must be a full backup or clone */
1350		char buf[ZFS_MAX_DATASET_NAME_LEN];
1351
1352		/*
1353		 * If it's a non-clone incremental, we are missing the
1354		 * target fs, so fail the recv.
1355		 */
1356		if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1357		    drba->drba_origin))
1358			return (SET_ERROR(ENOENT));
1359
1360		/*
1361		 * If we're receiving a full send as a clone, and it doesn't
1362		 * contain all the necessary free records and freeobject
1363		 * records, reject it.
1364		 */
1365		if (fromguid == 0 && drba->drba_origin &&
1366		    !(flags & DRR_FLAG_FREERECORDS))
1367			return (SET_ERROR(EINVAL));
1368
1369		/* Open the parent of tofs */
1370		ASSERT3U(strlen(tofs), <, sizeof (buf));
1371		(void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1372		error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1373		if (error != 0)
1374			return (error);
1375
1376		/*
1377		 * Check filesystem and snapshot limits before receiving. We'll
1378		 * recheck snapshot limits again at the end (we create the
1379		 * filesystems and increment those counts during begin_sync).
1380		 */
1381		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1382		    ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1383		if (error != 0) {
1384			dsl_dataset_rele(ds, FTAG);
1385			return (error);
1386		}
1387
1388		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1389		    ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1390		if (error != 0) {
1391			dsl_dataset_rele(ds, FTAG);
1392			return (error);
1393		}
1394
1395		if (drba->drba_origin != NULL) {
1396			dsl_dataset_t *origin;
1397			error = dsl_dataset_hold(dp, drba->drba_origin,
1398			    FTAG, &origin);
1399			if (error != 0) {
1400				dsl_dataset_rele(ds, FTAG);
1401				return (error);
1402			}
1403			if (!origin->ds_is_snapshot) {
1404				dsl_dataset_rele(origin, FTAG);
1405				dsl_dataset_rele(ds, FTAG);
1406				return (SET_ERROR(EINVAL));
1407			}
1408			if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
1409			    fromguid != 0) {
1410				dsl_dataset_rele(origin, FTAG);
1411				dsl_dataset_rele(ds, FTAG);
1412				return (SET_ERROR(ENODEV));
1413			}
1414			dsl_dataset_rele(origin, FTAG);
1415		}
1416		dsl_dataset_rele(ds, FTAG);
1417		error = 0;
1418	}
1419	return (error);
1420}
1421
1422static void
1423dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1424{
1425	dmu_recv_begin_arg_t *drba = arg;
1426	dsl_pool_t *dp = dmu_tx_pool(tx);
1427	objset_t *mos = dp->dp_meta_objset;
1428	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1429	const char *tofs = drba->drba_cookie->drc_tofs;
1430	dsl_dataset_t *ds, *newds;
1431	uint64_t dsobj;
1432	int error;
1433	uint64_t crflags = 0;
1434
1435	if (drrb->drr_flags & DRR_FLAG_CI_DATA)
1436		crflags |= DS_FLAG_CI_DATASET;
1437
1438	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1439	if (error == 0) {
1440		/* create temporary clone */
1441		dsl_dataset_t *snap = NULL;
1442		if (drba->drba_snapobj != 0) {
1443			VERIFY0(dsl_dataset_hold_obj(dp,
1444			    drba->drba_snapobj, FTAG, &snap));
1445		}
1446		dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1447		    snap, crflags, drba->drba_cred, tx);
1448		if (drba->drba_snapobj != 0)
1449			dsl_dataset_rele(snap, FTAG);
1450		dsl_dataset_rele(ds, FTAG);
1451	} else {
1452		dsl_dir_t *dd;
1453		const char *tail;
1454		dsl_dataset_t *origin = NULL;
1455
1456		VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1457
1458		if (drba->drba_origin != NULL) {
1459			VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1460			    FTAG, &origin));
1461		}
1462
1463		/* Create new dataset. */
1464		dsobj = dsl_dataset_create_sync(dd,
1465		    strrchr(tofs, '/') + 1,
1466		    origin, crflags, drba->drba_cred, tx);
1467		if (origin != NULL)
1468			dsl_dataset_rele(origin, FTAG);
1469		dsl_dir_rele(dd, FTAG);
1470		drba->drba_cookie->drc_newfs = B_TRUE;
1471	}
1472	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1473
1474	if (drba->drba_cookie->drc_resumable) {
1475		dsl_dataset_zapify(newds, tx);
1476		if (drrb->drr_fromguid != 0) {
1477			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
1478			    8, 1, &drrb->drr_fromguid, tx));
1479		}
1480		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
1481		    8, 1, &drrb->drr_toguid, tx));
1482		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
1483		    1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
1484		uint64_t one = 1;
1485		uint64_t zero = 0;
1486		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
1487		    8, 1, &one, tx));
1488		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
1489		    8, 1, &zero, tx));
1490		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
1491		    8, 1, &zero, tx));
1492		if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1493		    DMU_BACKUP_FEATURE_EMBED_DATA) {
1494			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
1495			    8, 1, &one, tx));
1496		}
1497	}
1498
1499	dmu_buf_will_dirty(newds->ds_dbuf, tx);
1500	dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1501
1502	/*
1503	 * If we actually created a non-clone, we need to create the
1504	 * objset in our new dataset.
1505	 */
1506	if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1507		(void) dmu_objset_create_impl(dp->dp_spa,
1508		    newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1509	}
1510
1511	drba->drba_cookie->drc_ds = newds;
1512
1513	spa_history_log_internal_ds(newds, "receive", tx, "");
1514}
1515
1516static int
1517dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1518{
1519	dmu_recv_begin_arg_t *drba = arg;
1520	dsl_pool_t *dp = dmu_tx_pool(tx);
1521	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1522	int error;
1523	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1524	dsl_dataset_t *ds;
1525	const char *tofs = drba->drba_cookie->drc_tofs;
1526
1527	/* already checked */
1528	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1529	ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
1530
1531	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1532	    DMU_COMPOUNDSTREAM ||
1533	    drrb->drr_type >= DMU_OST_NUMTYPES)
1534		return (SET_ERROR(EINVAL));
1535
1536	/* Verify pool version supports SA if SA_SPILL feature set */
1537	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1538	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
1539		return (SET_ERROR(ENOTSUP));
1540
1541	/*
1542	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1543	 * record to a plain WRITE record, so the pool must have the
1544	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1545	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1546	 */
1547	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1548	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1549		return (SET_ERROR(ENOTSUP));
1550	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1551	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1552		return (SET_ERROR(ENOTSUP));
1553
1554	/* 6 extra bytes for /%recv */
1555	char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1556
1557	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
1558	    tofs, recv_clone_name);
1559
1560	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1561		/* %recv does not exist; continue in tofs */
1562		error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1563		if (error != 0)
1564			return (error);
1565	}
1566
1567	/* check that ds is marked inconsistent */
1568	if (!DS_IS_INCONSISTENT(ds)) {
1569		dsl_dataset_rele(ds, FTAG);
1570		return (SET_ERROR(EINVAL));
1571	}
1572
1573	/* check that there is resuming data, and that the toguid matches */
1574	if (!dsl_dataset_is_zapified(ds)) {
1575		dsl_dataset_rele(ds, FTAG);
1576		return (SET_ERROR(EINVAL));
1577	}
1578	uint64_t val;
1579	error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1580	    DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1581	if (error != 0 || drrb->drr_toguid != val) {
1582		dsl_dataset_rele(ds, FTAG);
1583		return (SET_ERROR(EINVAL));
1584	}
1585
1586	/*
1587	 * Check if the receive is still running.  If so, it will be owned.
1588	 * Note that nothing else can own the dataset (e.g. after the receive
1589	 * fails) because it will be marked inconsistent.
1590	 */
1591	if (dsl_dataset_has_owner(ds)) {
1592		dsl_dataset_rele(ds, FTAG);
1593		return (SET_ERROR(EBUSY));
1594	}
1595
1596	/* There should not be any snapshots of this fs yet. */
1597	if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1598		dsl_dataset_rele(ds, FTAG);
1599		return (SET_ERROR(EINVAL));
1600	}
1601
1602	/*
1603	 * Note: resume point will be checked when we process the first WRITE
1604	 * record.
1605	 */
1606
1607	/* check that the origin matches */
1608	val = 0;
1609	(void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1610	    DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1611	if (drrb->drr_fromguid != val) {
1612		dsl_dataset_rele(ds, FTAG);
1613		return (SET_ERROR(EINVAL));
1614	}
1615
1616	dsl_dataset_rele(ds, FTAG);
1617	return (0);
1618}
1619
1620static void
1621dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1622{
1623	dmu_recv_begin_arg_t *drba = arg;
1624	dsl_pool_t *dp = dmu_tx_pool(tx);
1625	const char *tofs = drba->drba_cookie->drc_tofs;
1626	dsl_dataset_t *ds;
1627	uint64_t dsobj;
1628	/* 6 extra bytes for /%recv */
1629	char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1630
1631	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
1632	    tofs, recv_clone_name);
1633
1634	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1635		/* %recv does not exist; continue in tofs */
1636		VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
1637		drba->drba_cookie->drc_newfs = B_TRUE;
1638	}
1639
1640	/* clear the inconsistent flag so that we can own it */
1641	ASSERT(DS_IS_INCONSISTENT(ds));
1642	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1643	dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
1644	dsobj = ds->ds_object;
1645	dsl_dataset_rele(ds, FTAG);
1646
1647	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1648
1649	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1650	dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1651
1652	ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1653
1654	drba->drba_cookie->drc_ds = ds;
1655
1656	spa_history_log_internal_ds(ds, "resume receive", tx, "");
1657}
1658
1659/*
1660 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1661 * succeeds; otherwise we will leak the holds on the datasets.
1662 */
1663int
1664dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1665    boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
1666{
1667	dmu_recv_begin_arg_t drba = { 0 };
1668
1669	bzero(drc, sizeof (dmu_recv_cookie_t));
1670	drc->drc_drr_begin = drr_begin;
1671	drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1672	drc->drc_tosnap = tosnap;
1673	drc->drc_tofs = tofs;
1674	drc->drc_force = force;
1675	drc->drc_resumable = resumable;
1676	drc->drc_cred = CRED();
1677
1678	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1679		drc->drc_byteswap = B_TRUE;
1680		fletcher_4_incremental_byteswap(drr_begin,
1681		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
1682		byteswap_record(drr_begin);
1683	} else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1684		fletcher_4_incremental_native(drr_begin,
1685		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
1686	} else {
1687		return (SET_ERROR(EINVAL));
1688	}
1689
1690	drba.drba_origin = origin;
1691	drba.drba_cookie = drc;
1692	drba.drba_cred = CRED();
1693
1694	if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1695	    DMU_BACKUP_FEATURE_RESUMING) {
1696		return (dsl_sync_task(tofs,
1697		    dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1698		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1699	} else  {
1700		return (dsl_sync_task(tofs,
1701		    dmu_recv_begin_check, dmu_recv_begin_sync,
1702		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1703	}
1704}
1705
1706struct receive_record_arg {
1707	dmu_replay_record_t header;
1708	void *payload; /* Pointer to a buffer containing the payload */
1709	/*
1710	 * If the record is a write, pointer to the arc_buf_t containing the
1711	 * payload.
1712	 */
1713	arc_buf_t *write_buf;
1714	int payload_size;
1715	uint64_t bytes_read; /* bytes read from stream when record created */
1716	boolean_t eos_marker; /* Marks the end of the stream */
1717	bqueue_node_t node;
1718};
1719
1720struct receive_writer_arg {
1721	objset_t *os;
1722	boolean_t byteswap;
1723	bqueue_t q;
1724
1725	/*
1726	 * These three args are used to signal to the main thread that we're
1727	 * done.
1728	 */
1729	kmutex_t mutex;
1730	kcondvar_t cv;
1731	boolean_t done;
1732
1733	int err;
1734	/* A map from guid to dataset to help handle dedup'd streams. */
1735	avl_tree_t *guid_to_ds_map;
1736	boolean_t resumable;
1737	uint64_t last_object, last_offset;
1738	uint64_t bytes_read; /* bytes read when current record created */
1739};
1740
1741struct objlist {
1742	list_t list; /* List of struct receive_objnode. */
1743	/*
1744	 * Last object looked up. Used to assert that objects are being looked
1745	 * up in ascending order.
1746	 */
1747	uint64_t last_lookup;
1748};
1749
1750struct receive_objnode {
1751	list_node_t node;
1752	uint64_t object;
1753};
1754
1755struct receive_arg  {
1756	objset_t *os;
1757	kthread_t *td;
1758	struct file *fp;
1759	uint64_t voff; /* The current offset in the stream */
1760	uint64_t bytes_read;
1761	/*
1762	 * A record that has had its payload read in, but hasn't yet been handed
1763	 * off to the worker thread.
1764	 */
1765	struct receive_record_arg *rrd;
1766	/* A record that has had its header read in, but not its payload. */
1767	struct receive_record_arg *next_rrd;
1768	zio_cksum_t cksum;
1769	zio_cksum_t prev_cksum;
1770	int err;
1771	boolean_t byteswap;
1772	/* Sorted list of objects not to issue prefetches for. */
1773	struct objlist ignore_objlist;
1774};
1775
1776typedef struct guid_map_entry {
1777	uint64_t	guid;
1778	dsl_dataset_t	*gme_ds;
1779	avl_node_t	avlnode;
1780} guid_map_entry_t;
1781
1782static int
1783guid_compare(const void *arg1, const void *arg2)
1784{
1785	const guid_map_entry_t *gmep1 = arg1;
1786	const guid_map_entry_t *gmep2 = arg2;
1787
1788	if (gmep1->guid < gmep2->guid)
1789		return (-1);
1790	else if (gmep1->guid > gmep2->guid)
1791		return (1);
1792	return (0);
1793}
1794
1795static void
1796free_guid_map_onexit(void *arg)
1797{
1798	avl_tree_t *ca = arg;
1799	void *cookie = NULL;
1800	guid_map_entry_t *gmep;
1801
1802	while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1803		dsl_dataset_long_rele(gmep->gme_ds, gmep);
1804		dsl_dataset_rele(gmep->gme_ds, gmep);
1805		kmem_free(gmep, sizeof (guid_map_entry_t));
1806	}
1807	avl_destroy(ca);
1808	kmem_free(ca, sizeof (avl_tree_t));
1809}
1810
1811static int
1812restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid)
1813{
1814	struct uio auio;
1815	struct iovec aiov;
1816	int error;
1817
1818	aiov.iov_base = buf;
1819	aiov.iov_len = len;
1820	auio.uio_iov = &aiov;
1821	auio.uio_iovcnt = 1;
1822	auio.uio_resid = len;
1823	auio.uio_segflg = UIO_SYSSPACE;
1824	auio.uio_rw = UIO_READ;
1825	auio.uio_offset = off;
1826	auio.uio_td = ra->td;
1827#ifdef _KERNEL
1828	error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td);
1829#else
1830	fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__);
1831	error = EOPNOTSUPP;
1832#endif
1833	*resid = auio.uio_resid;
1834	return (error);
1835}
1836
1837static int
1838receive_read(struct receive_arg *ra, int len, void *buf)
1839{
1840	int done = 0;
1841
1842	/*
1843	 * The code doesn't rely on this (lengths being multiples of 8).  See
1844	 * comment in dump_bytes.
1845	 */
1846	ASSERT0(len % 8);
1847
1848	while (done < len) {
1849		ssize_t resid;
1850
1851		ra->err = restore_bytes(ra, buf + done,
1852		    len - done, ra->voff, &resid);
1853
1854		if (resid == len - done) {
1855			/*
1856			 * Note: ECKSUM indicates that the receive
1857			 * was interrupted and can potentially be resumed.
1858			 */
1859			ra->err = SET_ERROR(ECKSUM);
1860		}
1861		ra->voff += len - done - resid;
1862		done = len - resid;
1863		if (ra->err != 0)
1864			return (ra->err);
1865	}
1866
1867	ra->bytes_read += len;
1868
1869	ASSERT3U(done, ==, len);
1870	return (0);
1871}
1872
1873static void
1874byteswap_record(dmu_replay_record_t *drr)
1875{
1876#define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1877#define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1878	drr->drr_type = BSWAP_32(drr->drr_type);
1879	drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1880
1881	switch (drr->drr_type) {
1882	case DRR_BEGIN:
1883		DO64(drr_begin.drr_magic);
1884		DO64(drr_begin.drr_versioninfo);
1885		DO64(drr_begin.drr_creation_time);
1886		DO32(drr_begin.drr_type);
1887		DO32(drr_begin.drr_flags);
1888		DO64(drr_begin.drr_toguid);
1889		DO64(drr_begin.drr_fromguid);
1890		break;
1891	case DRR_OBJECT:
1892		DO64(drr_object.drr_object);
1893		DO32(drr_object.drr_type);
1894		DO32(drr_object.drr_bonustype);
1895		DO32(drr_object.drr_blksz);
1896		DO32(drr_object.drr_bonuslen);
1897		DO64(drr_object.drr_toguid);
1898		break;
1899	case DRR_FREEOBJECTS:
1900		DO64(drr_freeobjects.drr_firstobj);
1901		DO64(drr_freeobjects.drr_numobjs);
1902		DO64(drr_freeobjects.drr_toguid);
1903		break;
1904	case DRR_WRITE:
1905		DO64(drr_write.drr_object);
1906		DO32(drr_write.drr_type);
1907		DO64(drr_write.drr_offset);
1908		DO64(drr_write.drr_length);
1909		DO64(drr_write.drr_toguid);
1910		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1911		DO64(drr_write.drr_key.ddk_prop);
1912		break;
1913	case DRR_WRITE_BYREF:
1914		DO64(drr_write_byref.drr_object);
1915		DO64(drr_write_byref.drr_offset);
1916		DO64(drr_write_byref.drr_length);
1917		DO64(drr_write_byref.drr_toguid);
1918		DO64(drr_write_byref.drr_refguid);
1919		DO64(drr_write_byref.drr_refobject);
1920		DO64(drr_write_byref.drr_refoffset);
1921		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1922		    drr_key.ddk_cksum);
1923		DO64(drr_write_byref.drr_key.ddk_prop);
1924		break;
1925	case DRR_WRITE_EMBEDDED:
1926		DO64(drr_write_embedded.drr_object);
1927		DO64(drr_write_embedded.drr_offset);
1928		DO64(drr_write_embedded.drr_length);
1929		DO64(drr_write_embedded.drr_toguid);
1930		DO32(drr_write_embedded.drr_lsize);
1931		DO32(drr_write_embedded.drr_psize);
1932		break;
1933	case DRR_FREE:
1934		DO64(drr_free.drr_object);
1935		DO64(drr_free.drr_offset);
1936		DO64(drr_free.drr_length);
1937		DO64(drr_free.drr_toguid);
1938		break;
1939	case DRR_SPILL:
1940		DO64(drr_spill.drr_object);
1941		DO64(drr_spill.drr_length);
1942		DO64(drr_spill.drr_toguid);
1943		break;
1944	case DRR_END:
1945		DO64(drr_end.drr_toguid);
1946		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1947		break;
1948	}
1949
1950	if (drr->drr_type != DRR_BEGIN) {
1951		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1952	}
1953
1954#undef DO64
1955#undef DO32
1956}
1957
1958static inline uint8_t
1959deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1960{
1961	if (bonus_type == DMU_OT_SA) {
1962		return (1);
1963	} else {
1964		return (1 +
1965		    ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT));
1966	}
1967}
1968
1969static void
1970save_resume_state(struct receive_writer_arg *rwa,
1971    uint64_t object, uint64_t offset, dmu_tx_t *tx)
1972{
1973	int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1974
1975	if (!rwa->resumable)
1976		return;
1977
1978	/*
1979	 * We use ds_resume_bytes[] != 0 to indicate that we need to
1980	 * update this on disk, so it must not be 0.
1981	 */
1982	ASSERT(rwa->bytes_read != 0);
1983
1984	/*
1985	 * We only resume from write records, which have a valid
1986	 * (non-meta-dnode) object number.
1987	 */
1988	ASSERT(object != 0);
1989
1990	/*
1991	 * For resuming to work correctly, we must receive records in order,
1992	 * sorted by object,offset.  This is checked by the callers, but
1993	 * assert it here for good measure.
1994	 */
1995	ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1996	ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1997	    offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1998	ASSERT3U(rwa->bytes_read, >=,
1999	    rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
2000
2001	rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
2002	rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
2003	rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
2004}
2005
2006static int
2007receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
2008    void *data)
2009{
2010	dmu_object_info_t doi;
2011	dmu_tx_t *tx;
2012	uint64_t object;
2013	int err;
2014
2015	if (drro->drr_type == DMU_OT_NONE ||
2016	    !DMU_OT_IS_VALID(drro->drr_type) ||
2017	    !DMU_OT_IS_VALID(drro->drr_bonustype) ||
2018	    drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
2019	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
2020	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
2021	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
2022	    drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
2023	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
2024		return (SET_ERROR(EINVAL));
2025	}
2026
2027	err = dmu_object_info(rwa->os, drro->drr_object, &doi);
2028
2029	if (err != 0 && err != ENOENT)
2030		return (SET_ERROR(EINVAL));
2031	object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
2032
2033	/*
2034	 * If we are losing blkptrs or changing the block size this must
2035	 * be a new file instance.  We must clear out the previous file
2036	 * contents before we can change this type of metadata in the dnode.
2037	 */
2038	if (err == 0) {
2039		int nblkptr;
2040
2041		nblkptr = deduce_nblkptr(drro->drr_bonustype,
2042		    drro->drr_bonuslen);
2043
2044		if (drro->drr_blksz != doi.doi_data_block_size ||
2045		    nblkptr < doi.doi_nblkptr) {
2046			err = dmu_free_long_range(rwa->os, drro->drr_object,
2047			    0, DMU_OBJECT_END);
2048			if (err != 0)
2049				return (SET_ERROR(EINVAL));
2050		}
2051	}
2052
2053	tx = dmu_tx_create(rwa->os);
2054	dmu_tx_hold_bonus(tx, object);
2055	err = dmu_tx_assign(tx, TXG_WAIT);
2056	if (err != 0) {
2057		dmu_tx_abort(tx);
2058		return (err);
2059	}
2060
2061	if (object == DMU_NEW_OBJECT) {
2062		/* currently free, want to be allocated */
2063		err = dmu_object_claim(rwa->os, drro->drr_object,
2064		    drro->drr_type, drro->drr_blksz,
2065		    drro->drr_bonustype, drro->drr_bonuslen, tx);
2066	} else if (drro->drr_type != doi.doi_type ||
2067	    drro->drr_blksz != doi.doi_data_block_size ||
2068	    drro->drr_bonustype != doi.doi_bonus_type ||
2069	    drro->drr_bonuslen != doi.doi_bonus_size) {
2070		/* currently allocated, but with different properties */
2071		err = dmu_object_reclaim(rwa->os, drro->drr_object,
2072		    drro->drr_type, drro->drr_blksz,
2073		    drro->drr_bonustype, drro->drr_bonuslen, tx);
2074	}
2075	if (err != 0) {
2076		dmu_tx_commit(tx);
2077		return (SET_ERROR(EINVAL));
2078	}
2079
2080	dmu_object_set_checksum(rwa->os, drro->drr_object,
2081	    drro->drr_checksumtype, tx);
2082	dmu_object_set_compress(rwa->os, drro->drr_object,
2083	    drro->drr_compress, tx);
2084
2085	if (data != NULL) {
2086		dmu_buf_t *db;
2087
2088		VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
2089		dmu_buf_will_dirty(db, tx);
2090
2091		ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2092		bcopy(data, db->db_data, drro->drr_bonuslen);
2093		if (rwa->byteswap) {
2094			dmu_object_byteswap_t byteswap =
2095			    DMU_OT_BYTESWAP(drro->drr_bonustype);
2096			dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2097			    drro->drr_bonuslen);
2098		}
2099		dmu_buf_rele(db, FTAG);
2100	}
2101	dmu_tx_commit(tx);
2102
2103	return (0);
2104}
2105
2106/* ARGSUSED */
2107static int
2108receive_freeobjects(struct receive_writer_arg *rwa,
2109    struct drr_freeobjects *drrfo)
2110{
2111	uint64_t obj;
2112	int next_err = 0;
2113
2114	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2115		return (SET_ERROR(EINVAL));
2116
2117	for (obj = drrfo->drr_firstobj;
2118	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
2119	    next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2120		int err;
2121
2122		if (dmu_object_info(rwa->os, obj, NULL) != 0)
2123			continue;
2124
2125		err = dmu_free_long_object(rwa->os, obj);
2126		if (err != 0)
2127			return (err);
2128	}
2129	if (next_err != ESRCH)
2130		return (next_err);
2131	return (0);
2132}
2133
2134static int
2135receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
2136    arc_buf_t *abuf)
2137{
2138	dmu_tx_t *tx;
2139	int err;
2140
2141	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
2142	    !DMU_OT_IS_VALID(drrw->drr_type))
2143		return (SET_ERROR(EINVAL));
2144
2145	/*
2146	 * For resuming to work, records must be in increasing order
2147	 * by (object, offset).
2148	 */
2149	if (drrw->drr_object < rwa->last_object ||
2150	    (drrw->drr_object == rwa->last_object &&
2151	    drrw->drr_offset < rwa->last_offset)) {
2152		return (SET_ERROR(EINVAL));
2153	}
2154	rwa->last_object = drrw->drr_object;
2155	rwa->last_offset = drrw->drr_offset;
2156
2157	if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2158		return (SET_ERROR(EINVAL));
2159
2160	tx = dmu_tx_create(rwa->os);
2161
2162	dmu_tx_hold_write(tx, drrw->drr_object,
2163	    drrw->drr_offset, drrw->drr_length);
2164	err = dmu_tx_assign(tx, TXG_WAIT);
2165	if (err != 0) {
2166		dmu_tx_abort(tx);
2167		return (err);
2168	}
2169	if (rwa->byteswap) {
2170		dmu_object_byteswap_t byteswap =
2171		    DMU_OT_BYTESWAP(drrw->drr_type);
2172		dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2173		    drrw->drr_length);
2174	}
2175
2176	dmu_buf_t *bonus;
2177	if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2178		return (SET_ERROR(EINVAL));
2179	dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2180
2181	/*
2182	 * Note: If the receive fails, we want the resume stream to start
2183	 * with the same record that we last successfully received (as opposed
2184	 * to the next record), so that we can verify that we are
2185	 * resuming from the correct location.
2186	 */
2187	save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2188	dmu_tx_commit(tx);
2189	dmu_buf_rele(bonus, FTAG);
2190
2191	return (0);
2192}
2193
2194/*
2195 * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
2196 * streams to refer to a copy of the data that is already on the
2197 * system because it came in earlier in the stream.  This function
2198 * finds the earlier copy of the data, and uses that copy instead of
2199 * data from the stream to fulfill this write.
2200 */
2201static int
2202receive_write_byref(struct receive_writer_arg *rwa,
2203    struct drr_write_byref *drrwbr)
2204{
2205	dmu_tx_t *tx;
2206	int err;
2207	guid_map_entry_t gmesrch;
2208	guid_map_entry_t *gmep;
2209	avl_index_t where;
2210	objset_t *ref_os = NULL;
2211	dmu_buf_t *dbp;
2212
2213	if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
2214		return (SET_ERROR(EINVAL));
2215
2216	/*
2217	 * If the GUID of the referenced dataset is different from the
2218	 * GUID of the target dataset, find the referenced dataset.
2219	 */
2220	if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
2221		gmesrch.guid = drrwbr->drr_refguid;
2222		if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
2223		    &where)) == NULL) {
2224			return (SET_ERROR(EINVAL));
2225		}
2226		if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
2227			return (SET_ERROR(EINVAL));
2228	} else {
2229		ref_os = rwa->os;
2230	}
2231
2232	err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
2233	    drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
2234	if (err != 0)
2235		return (err);
2236
2237	tx = dmu_tx_create(rwa->os);
2238
2239	dmu_tx_hold_write(tx, drrwbr->drr_object,
2240	    drrwbr->drr_offset, drrwbr->drr_length);
2241	err = dmu_tx_assign(tx, TXG_WAIT);
2242	if (err != 0) {
2243		dmu_tx_abort(tx);
2244		return (err);
2245	}
2246	dmu_write(rwa->os, drrwbr->drr_object,
2247	    drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
2248	dmu_buf_rele(dbp, FTAG);
2249
2250	/* See comment in restore_write. */
2251	save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
2252	dmu_tx_commit(tx);
2253	return (0);
2254}
2255
2256static int
2257receive_write_embedded(struct receive_writer_arg *rwa,
2258    struct drr_write_embedded *drrwe, void *data)
2259{
2260	dmu_tx_t *tx;
2261	int err;
2262
2263	if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2264		return (EINVAL);
2265
2266	if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2267		return (EINVAL);
2268
2269	if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2270		return (EINVAL);
2271	if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2272		return (EINVAL);
2273
2274	tx = dmu_tx_create(rwa->os);
2275
2276	dmu_tx_hold_write(tx, drrwe->drr_object,
2277	    drrwe->drr_offset, drrwe->drr_length);
2278	err = dmu_tx_assign(tx, TXG_WAIT);
2279	if (err != 0) {
2280		dmu_tx_abort(tx);
2281		return (err);
2282	}
2283
2284	dmu_write_embedded(rwa->os, drrwe->drr_object,
2285	    drrwe->drr_offset, data, drrwe->drr_etype,
2286	    drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2287	    rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2288
2289	/* See comment in restore_write. */
2290	save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2291	dmu_tx_commit(tx);
2292	return (0);
2293}
2294
2295static int
2296receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2297    void *data)
2298{
2299	dmu_tx_t *tx;
2300	dmu_buf_t *db, *db_spill;
2301	int err;
2302
2303	if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2304	    drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2305		return (SET_ERROR(EINVAL));
2306
2307	if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2308		return (SET_ERROR(EINVAL));
2309
2310	VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2311	if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
2312		dmu_buf_rele(db, FTAG);
2313		return (err);
2314	}
2315
2316	tx = dmu_tx_create(rwa->os);
2317
2318	dmu_tx_hold_spill(tx, db->db_object);
2319
2320	err = dmu_tx_assign(tx, TXG_WAIT);
2321	if (err != 0) {
2322		dmu_buf_rele(db, FTAG);
2323		dmu_buf_rele(db_spill, FTAG);
2324		dmu_tx_abort(tx);
2325		return (err);
2326	}
2327	dmu_buf_will_dirty(db_spill, tx);
2328
2329	if (db_spill->db_size < drrs->drr_length)
2330		VERIFY(0 == dbuf_spill_set_blksz(db_spill,
2331		    drrs->drr_length, tx));
2332	bcopy(data, db_spill->db_data, drrs->drr_length);
2333
2334	dmu_buf_rele(db, FTAG);
2335	dmu_buf_rele(db_spill, FTAG);
2336
2337	dmu_tx_commit(tx);
2338	return (0);
2339}
2340
2341/* ARGSUSED */
2342static int
2343receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2344{
2345	int err;
2346
2347	if (drrf->drr_length != -1ULL &&
2348	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2349		return (SET_ERROR(EINVAL));
2350
2351	if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2352		return (SET_ERROR(EINVAL));
2353
2354	err = dmu_free_long_range(rwa->os, drrf->drr_object,
2355	    drrf->drr_offset, drrf->drr_length);
2356
2357	return (err);
2358}
2359
2360/* used to destroy the drc_ds on error */
2361static void
2362dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2363{
2364	if (drc->drc_resumable) {
2365		/* wait for our resume state to be written to disk */
2366		txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
2367		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2368	} else {
2369		char name[ZFS_MAX_DATASET_NAME_LEN];
2370		dsl_dataset_name(drc->drc_ds, name);
2371		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2372		(void) dsl_destroy_head(name);
2373	}
2374}
2375
2376static void
2377receive_cksum(struct receive_arg *ra, int len, void *buf)
2378{
2379	if (ra->byteswap) {
2380		fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2381	} else {
2382		fletcher_4_incremental_native(buf, len, &ra->cksum);
2383	}
2384}
2385
2386/*
2387 * Read the payload into a buffer of size len, and update the current record's
2388 * payload field.
2389 * Allocate ra->next_rrd and read the next record's header into
2390 * ra->next_rrd->header.
2391 * Verify checksum of payload and next record.
2392 */
2393static int
2394receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2395{
2396	int err;
2397
2398	if (len != 0) {
2399		ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2400		err = receive_read(ra, len, buf);
2401		if (err != 0)
2402			return (err);
2403		receive_cksum(ra, len, buf);
2404
2405		/* note: rrd is NULL when reading the begin record's payload */
2406		if (ra->rrd != NULL) {
2407			ra->rrd->payload = buf;
2408			ra->rrd->payload_size = len;
2409			ra->rrd->bytes_read = ra->bytes_read;
2410		}
2411	}
2412
2413	ra->prev_cksum = ra->cksum;
2414
2415	ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2416	err = receive_read(ra, sizeof (ra->next_rrd->header),
2417	    &ra->next_rrd->header);
2418	ra->next_rrd->bytes_read = ra->bytes_read;
2419	if (err != 0) {
2420		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2421		ra->next_rrd = NULL;
2422		return (err);
2423	}
2424	if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2425		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2426		ra->next_rrd = NULL;
2427		return (SET_ERROR(EINVAL));
2428	}
2429
2430	/*
2431	 * Note: checksum is of everything up to but not including the
2432	 * checksum itself.
2433	 */
2434	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2435	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2436	receive_cksum(ra,
2437	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2438	    &ra->next_rrd->header);
2439
2440	zio_cksum_t cksum_orig =
2441	    ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2442	zio_cksum_t *cksump =
2443	    &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2444
2445	if (ra->byteswap)
2446		byteswap_record(&ra->next_rrd->header);
2447
2448	if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2449	    !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2450		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2451		ra->next_rrd = NULL;
2452		return (SET_ERROR(ECKSUM));
2453	}
2454
2455	receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2456
2457	return (0);
2458}
2459
2460static void
2461objlist_create(struct objlist *list)
2462{
2463	list_create(&list->list, sizeof (struct receive_objnode),
2464	    offsetof(struct receive_objnode, node));
2465	list->last_lookup = 0;
2466}
2467
2468static void
2469objlist_destroy(struct objlist *list)
2470{
2471	for (struct receive_objnode *n = list_remove_head(&list->list);
2472	    n != NULL; n = list_remove_head(&list->list)) {
2473		kmem_free(n, sizeof (*n));
2474	}
2475	list_destroy(&list->list);
2476}
2477
2478/*
2479 * This function looks through the objlist to see if the specified object number
2480 * is contained in the objlist.  In the process, it will remove all object
2481 * numbers in the list that are smaller than the specified object number.  Thus,
2482 * any lookup of an object number smaller than a previously looked up object
2483 * number will always return false; therefore, all lookups should be done in
2484 * ascending order.
2485 */
2486static boolean_t
2487objlist_exists(struct objlist *list, uint64_t object)
2488{
2489	struct receive_objnode *node = list_head(&list->list);
2490	ASSERT3U(object, >=, list->last_lookup);
2491	list->last_lookup = object;
2492	while (node != NULL && node->object < object) {
2493		VERIFY3P(node, ==, list_remove_head(&list->list));
2494		kmem_free(node, sizeof (*node));
2495		node = list_head(&list->list);
2496	}
2497	return (node != NULL && node->object == object);
2498}
2499
2500/*
2501 * The objlist is a list of object numbers stored in ascending order.  However,
2502 * the insertion of new object numbers does not seek out the correct location to
2503 * store a new object number; instead, it appends it to the list for simplicity.
2504 * Thus, any users must take care to only insert new object numbers in ascending
2505 * order.
2506 */
2507static void
2508objlist_insert(struct objlist *list, uint64_t object)
2509{
2510	struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
2511	node->object = object;
2512#ifdef ZFS_DEBUG
2513	struct receive_objnode *last_object = list_tail(&list->list);
2514	uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
2515	ASSERT3U(node->object, >, last_objnum);
2516#endif
2517	list_insert_tail(&list->list, node);
2518}
2519
2520/*
2521 * Issue the prefetch reads for any necessary indirect blocks.
2522 *
2523 * We use the object ignore list to tell us whether or not to issue prefetches
2524 * for a given object.  We do this for both correctness (in case the blocksize
2525 * of an object has changed) and performance (if the object doesn't exist, don't
2526 * needlessly try to issue prefetches).  We also trim the list as we go through
2527 * the stream to prevent it from growing to an unbounded size.
2528 *
2529 * The object numbers within will always be in sorted order, and any write
2530 * records we see will also be in sorted order, but they're not sorted with
2531 * respect to each other (i.e. we can get several object records before
2532 * receiving each object's write records).  As a result, once we've reached a
2533 * given object number, we can safely remove any reference to lower object
2534 * numbers in the ignore list. In practice, we receive up to 32 object records
2535 * before receiving write records, so the list can have up to 32 nodes in it.
2536 */
2537/* ARGSUSED */
2538static void
2539receive_read_prefetch(struct receive_arg *ra,
2540    uint64_t object, uint64_t offset, uint64_t length)
2541{
2542	if (!objlist_exists(&ra->ignore_objlist, object)) {
2543		dmu_prefetch(ra->os, object, 1, offset, length,
2544		    ZIO_PRIORITY_SYNC_READ);
2545	}
2546}
2547
2548/*
2549 * Read records off the stream, issuing any necessary prefetches.
2550 */
2551static int
2552receive_read_record(struct receive_arg *ra)
2553{
2554	int err;
2555
2556	switch (ra->rrd->header.drr_type) {
2557	case DRR_OBJECT:
2558	{
2559		struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2560		uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2561		void *buf = kmem_zalloc(size, KM_SLEEP);
2562		dmu_object_info_t doi;
2563		err = receive_read_payload_and_next_header(ra, size, buf);
2564		if (err != 0) {
2565			kmem_free(buf, size);
2566			return (err);
2567		}
2568		err = dmu_object_info(ra->os, drro->drr_object, &doi);
2569		/*
2570		 * See receive_read_prefetch for an explanation why we're
2571		 * storing this object in the ignore_obj_list.
2572		 */
2573		if (err == ENOENT ||
2574		    (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2575			objlist_insert(&ra->ignore_objlist, drro->drr_object);
2576			err = 0;
2577		}
2578		return (err);
2579	}
2580	case DRR_FREEOBJECTS:
2581	{
2582		err = receive_read_payload_and_next_header(ra, 0, NULL);
2583		return (err);
2584	}
2585	case DRR_WRITE:
2586	{
2587		struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2588		arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2589		    drrw->drr_length);
2590
2591		err = receive_read_payload_and_next_header(ra,
2592		    drrw->drr_length, abuf->b_data);
2593		if (err != 0) {
2594			dmu_return_arcbuf(abuf);
2595			return (err);
2596		}
2597		ra->rrd->write_buf = abuf;
2598		receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2599		    drrw->drr_length);
2600		return (err);
2601	}
2602	case DRR_WRITE_BYREF:
2603	{
2604		struct drr_write_byref *drrwb =
2605		    &ra->rrd->header.drr_u.drr_write_byref;
2606		err = receive_read_payload_and_next_header(ra, 0, NULL);
2607		receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2608		    drrwb->drr_length);
2609		return (err);
2610	}
2611	case DRR_WRITE_EMBEDDED:
2612	{
2613		struct drr_write_embedded *drrwe =
2614		    &ra->rrd->header.drr_u.drr_write_embedded;
2615		uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2616		void *buf = kmem_zalloc(size, KM_SLEEP);
2617
2618		err = receive_read_payload_and_next_header(ra, size, buf);
2619		if (err != 0) {
2620			kmem_free(buf, size);
2621			return (err);
2622		}
2623
2624		receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2625		    drrwe->drr_length);
2626		return (err);
2627	}
2628	case DRR_FREE:
2629	{
2630		/*
2631		 * It might be beneficial to prefetch indirect blocks here, but
2632		 * we don't really have the data to decide for sure.
2633		 */
2634		err = receive_read_payload_and_next_header(ra, 0, NULL);
2635		return (err);
2636	}
2637	case DRR_END:
2638	{
2639		struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2640		if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2641			return (SET_ERROR(ECKSUM));
2642		return (0);
2643	}
2644	case DRR_SPILL:
2645	{
2646		struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2647		void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2648		err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2649		    buf);
2650		if (err != 0)
2651			kmem_free(buf, drrs->drr_length);
2652		return (err);
2653	}
2654	default:
2655		return (SET_ERROR(EINVAL));
2656	}
2657}
2658
2659/*
2660 * Commit the records to the pool.
2661 */
2662static int
2663receive_process_record(struct receive_writer_arg *rwa,
2664    struct receive_record_arg *rrd)
2665{
2666	int err;
2667
2668	/* Processing in order, therefore bytes_read should be increasing. */
2669	ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2670	rwa->bytes_read = rrd->bytes_read;
2671
2672	switch (rrd->header.drr_type) {
2673	case DRR_OBJECT:
2674	{
2675		struct drr_object *drro = &rrd->header.drr_u.drr_object;
2676		err = receive_object(rwa, drro, rrd->payload);
2677		kmem_free(rrd->payload, rrd->payload_size);
2678		rrd->payload = NULL;
2679		return (err);
2680	}
2681	case DRR_FREEOBJECTS:
2682	{
2683		struct drr_freeobjects *drrfo =
2684		    &rrd->header.drr_u.drr_freeobjects;
2685		return (receive_freeobjects(rwa, drrfo));
2686	}
2687	case DRR_WRITE:
2688	{
2689		struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2690		err = receive_write(rwa, drrw, rrd->write_buf);
2691		/* if receive_write() is successful, it consumes the arc_buf */
2692		if (err != 0)
2693			dmu_return_arcbuf(rrd->write_buf);
2694		rrd->write_buf = NULL;
2695		rrd->payload = NULL;
2696		return (err);
2697	}
2698	case DRR_WRITE_BYREF:
2699	{
2700		struct drr_write_byref *drrwbr =
2701		    &rrd->header.drr_u.drr_write_byref;
2702		return (receive_write_byref(rwa, drrwbr));
2703	}
2704	case DRR_WRITE_EMBEDDED:
2705	{
2706		struct drr_write_embedded *drrwe =
2707		    &rrd->header.drr_u.drr_write_embedded;
2708		err = receive_write_embedded(rwa, drrwe, rrd->payload);
2709		kmem_free(rrd->payload, rrd->payload_size);
2710		rrd->payload = NULL;
2711		return (err);
2712	}
2713	case DRR_FREE:
2714	{
2715		struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2716		return (receive_free(rwa, drrf));
2717	}
2718	case DRR_SPILL:
2719	{
2720		struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2721		err = receive_spill(rwa, drrs, rrd->payload);
2722		kmem_free(rrd->payload, rrd->payload_size);
2723		rrd->payload = NULL;
2724		return (err);
2725	}
2726	default:
2727		return (SET_ERROR(EINVAL));
2728	}
2729}
2730
2731/*
2732 * dmu_recv_stream's worker thread; pull records off the queue, and then call
2733 * receive_process_record  When we're done, signal the main thread and exit.
2734 */
2735static void
2736receive_writer_thread(void *arg)
2737{
2738	struct receive_writer_arg *rwa = arg;
2739	struct receive_record_arg *rrd;
2740	for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2741	    rrd = bqueue_dequeue(&rwa->q)) {
2742		/*
2743		 * If there's an error, the main thread will stop putting things
2744		 * on the queue, but we need to clear everything in it before we
2745		 * can exit.
2746		 */
2747		if (rwa->err == 0) {
2748			rwa->err = receive_process_record(rwa, rrd);
2749		} else if (rrd->write_buf != NULL) {
2750			dmu_return_arcbuf(rrd->write_buf);
2751			rrd->write_buf = NULL;
2752			rrd->payload = NULL;
2753		} else if (rrd->payload != NULL) {
2754			kmem_free(rrd->payload, rrd->payload_size);
2755			rrd->payload = NULL;
2756		}
2757		kmem_free(rrd, sizeof (*rrd));
2758	}
2759	kmem_free(rrd, sizeof (*rrd));
2760	mutex_enter(&rwa->mutex);
2761	rwa->done = B_TRUE;
2762	cv_signal(&rwa->cv);
2763	mutex_exit(&rwa->mutex);
2764	thread_exit();
2765}
2766
2767static int
2768resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2769{
2770	uint64_t val;
2771	objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2772	uint64_t dsobj = dmu_objset_id(ra->os);
2773	uint64_t resume_obj, resume_off;
2774
2775	if (nvlist_lookup_uint64(begin_nvl,
2776	    "resume_object", &resume_obj) != 0 ||
2777	    nvlist_lookup_uint64(begin_nvl,
2778	    "resume_offset", &resume_off) != 0) {
2779		return (SET_ERROR(EINVAL));
2780	}
2781	VERIFY0(zap_lookup(mos, dsobj,
2782	    DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2783	if (resume_obj != val)
2784		return (SET_ERROR(EINVAL));
2785	VERIFY0(zap_lookup(mos, dsobj,
2786	    DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2787	if (resume_off != val)
2788		return (SET_ERROR(EINVAL));
2789
2790	return (0);
2791}
2792
2793/*
2794 * Read in the stream's records, one by one, and apply them to the pool.  There
2795 * are two threads involved; the thread that calls this function will spin up a
2796 * worker thread, read the records off the stream one by one, and issue
2797 * prefetches for any necessary indirect blocks.  It will then push the records
2798 * onto an internal blocking queue.  The worker thread will pull the records off
2799 * the queue, and actually write the data into the DMU.  This way, the worker
2800 * thread doesn't have to wait for reads to complete, since everything it needs
2801 * (the indirect blocks) will be prefetched.
2802 *
2803 * NB: callers *must* call dmu_recv_end() if this succeeds.
2804 */
2805int
2806dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp,
2807    int cleanup_fd, uint64_t *action_handlep)
2808{
2809	int err = 0;
2810	struct receive_arg ra = { 0 };
2811	struct receive_writer_arg rwa = { 0 };
2812	int featureflags;
2813	nvlist_t *begin_nvl = NULL;
2814
2815	ra.byteswap = drc->drc_byteswap;
2816	ra.cksum = drc->drc_cksum;
2817	ra.td = curthread;
2818	ra.fp = fp;
2819	ra.voff = *voffp;
2820
2821	if (dsl_dataset_is_zapified(drc->drc_ds)) {
2822		(void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2823		    drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2824		    sizeof (ra.bytes_read), 1, &ra.bytes_read);
2825	}
2826
2827	objlist_create(&ra.ignore_objlist);
2828
2829	/* these were verified in dmu_recv_begin */
2830	ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2831	    DMU_SUBSTREAM);
2832	ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2833
2834	/*
2835	 * Open the objset we are modifying.
2836	 */
2837	VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
2838
2839	ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2840
2841	featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2842
2843	/* if this stream is dedup'ed, set up the avl tree for guid mapping */
2844	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2845		minor_t minor;
2846
2847		if (cleanup_fd == -1) {
2848			ra.err = SET_ERROR(EBADF);
2849			goto out;
2850		}
2851		ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2852		if (ra.err != 0) {
2853			cleanup_fd = -1;
2854			goto out;
2855		}
2856
2857		if (*action_handlep == 0) {
2858			rwa.guid_to_ds_map =
2859			    kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2860			avl_create(rwa.guid_to_ds_map, guid_compare,
2861			    sizeof (guid_map_entry_t),
2862			    offsetof(guid_map_entry_t, avlnode));
2863			err = zfs_onexit_add_cb(minor,
2864			    free_guid_map_onexit, rwa.guid_to_ds_map,
2865			    action_handlep);
2866			if (ra.err != 0)
2867				goto out;
2868		} else {
2869			err = zfs_onexit_cb_data(minor, *action_handlep,
2870			    (void **)&rwa.guid_to_ds_map);
2871			if (ra.err != 0)
2872				goto out;
2873		}
2874
2875		drc->drc_guid_to_ds_map = rwa.guid_to_ds_map;
2876	}
2877
2878	uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
2879	void *payload = NULL;
2880	if (payloadlen != 0)
2881		payload = kmem_alloc(payloadlen, KM_SLEEP);
2882
2883	err = receive_read_payload_and_next_header(&ra, payloadlen, payload);
2884	if (err != 0) {
2885		if (payloadlen != 0)
2886			kmem_free(payload, payloadlen);
2887		goto out;
2888	}
2889	if (payloadlen != 0) {
2890		err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2891		kmem_free(payload, payloadlen);
2892		if (err != 0)
2893			goto out;
2894	}
2895
2896	if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2897		err = resume_check(&ra, begin_nvl);
2898		if (err != 0)
2899			goto out;
2900	}
2901
2902	(void) bqueue_init(&rwa.q, zfs_recv_queue_length,
2903	    offsetof(struct receive_record_arg, node));
2904	cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL);
2905	mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL);
2906	rwa.os = ra.os;
2907	rwa.byteswap = drc->drc_byteswap;
2908	rwa.resumable = drc->drc_resumable;
2909
2910	(void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0,
2911	    TS_RUN, minclsyspri);
2912	/*
2913	 * We're reading rwa.err without locks, which is safe since we are the
2914	 * only reader, and the worker thread is the only writer.  It's ok if we
2915	 * miss a write for an iteration or two of the loop, since the writer
2916	 * thread will keep freeing records we send it until we send it an eos
2917	 * marker.
2918	 *
2919	 * We can leave this loop in 3 ways:  First, if rwa.err is
2920	 * non-zero.  In that case, the writer thread will free the rrd we just
2921	 * pushed.  Second, if  we're interrupted; in that case, either it's the
2922	 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
2923	 * has been handed off to the writer thread who will free it.  Finally,
2924	 * if receive_read_record fails or we're at the end of the stream, then
2925	 * we free ra.rrd and exit.
2926	 */
2927	while (rwa.err == 0) {
2928		if (issig(JUSTLOOKING) && issig(FORREAL)) {
2929			err = SET_ERROR(EINTR);
2930			break;
2931		}
2932
2933		ASSERT3P(ra.rrd, ==, NULL);
2934		ra.rrd = ra.next_rrd;
2935		ra.next_rrd = NULL;
2936		/* Allocates and loads header into ra.next_rrd */
2937		err = receive_read_record(&ra);
2938
2939		if (ra.rrd->header.drr_type == DRR_END || err != 0) {
2940			kmem_free(ra.rrd, sizeof (*ra.rrd));
2941			ra.rrd = NULL;
2942			break;
2943		}
2944
2945		bqueue_enqueue(&rwa.q, ra.rrd,
2946		    sizeof (struct receive_record_arg) + ra.rrd->payload_size);
2947		ra.rrd = NULL;
2948	}
2949	if (ra.next_rrd == NULL)
2950		ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP);
2951	ra.next_rrd->eos_marker = B_TRUE;
2952	bqueue_enqueue(&rwa.q, ra.next_rrd, 1);
2953
2954	mutex_enter(&rwa.mutex);
2955	while (!rwa.done) {
2956		cv_wait(&rwa.cv, &rwa.mutex);
2957	}
2958	mutex_exit(&rwa.mutex);
2959
2960	cv_destroy(&rwa.cv);
2961	mutex_destroy(&rwa.mutex);
2962	bqueue_destroy(&rwa.q);
2963	if (err == 0)
2964		err = rwa.err;
2965
2966out:
2967	nvlist_free(begin_nvl);
2968	if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2969		zfs_onexit_fd_rele(cleanup_fd);
2970
2971	if (err != 0) {
2972		/*
2973		 * Clean up references. If receive is not resumable,
2974		 * destroy what we created, so we don't leave it in
2975		 * the inconsistent state.
2976		 */
2977		dmu_recv_cleanup_ds(drc);
2978	}
2979
2980	*voffp = ra.voff;
2981	objlist_destroy(&ra.ignore_objlist);
2982	return (err);
2983}
2984
2985static int
2986dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2987{
2988	dmu_recv_cookie_t *drc = arg;
2989	dsl_pool_t *dp = dmu_tx_pool(tx);
2990	int error;
2991
2992	ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2993
2994	if (!drc->drc_newfs) {
2995		dsl_dataset_t *origin_head;
2996
2997		error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2998		if (error != 0)
2999			return (error);
3000		if (drc->drc_force) {
3001			/*
3002			 * We will destroy any snapshots in tofs (i.e. before
3003			 * origin_head) that are after the origin (which is
3004			 * the snap before drc_ds, because drc_ds can not
3005			 * have any snaps of its own).
3006			 */
3007			uint64_t obj;
3008
3009			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3010			while (obj !=
3011			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3012				dsl_dataset_t *snap;
3013				error = dsl_dataset_hold_obj(dp, obj, FTAG,
3014				    &snap);
3015				if (error != 0)
3016					break;
3017				if (snap->ds_dir != origin_head->ds_dir)
3018					error = SET_ERROR(EINVAL);
3019				if (error == 0)  {
3020					error = dsl_destroy_snapshot_check_impl(
3021					    snap, B_FALSE);
3022				}
3023				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3024				dsl_dataset_rele(snap, FTAG);
3025				if (error != 0)
3026					break;
3027			}
3028			if (error != 0) {
3029				dsl_dataset_rele(origin_head, FTAG);
3030				return (error);
3031			}
3032		}
3033		error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3034		    origin_head, drc->drc_force, drc->drc_owner, tx);
3035		if (error != 0) {
3036			dsl_dataset_rele(origin_head, FTAG);
3037			return (error);
3038		}
3039		error = dsl_dataset_snapshot_check_impl(origin_head,
3040		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3041		dsl_dataset_rele(origin_head, FTAG);
3042		if (error != 0)
3043			return (error);
3044
3045		error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3046	} else {
3047		error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3048		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3049	}
3050	return (error);
3051}
3052
3053static void
3054dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3055{
3056	dmu_recv_cookie_t *drc = arg;
3057	dsl_pool_t *dp = dmu_tx_pool(tx);
3058
3059	spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3060	    tx, "snap=%s", drc->drc_tosnap);
3061
3062	if (!drc->drc_newfs) {
3063		dsl_dataset_t *origin_head;
3064
3065		VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3066		    &origin_head));
3067
3068		if (drc->drc_force) {
3069			/*
3070			 * Destroy any snapshots of drc_tofs (origin_head)
3071			 * after the origin (the snap before drc_ds).
3072			 */
3073			uint64_t obj;
3074
3075			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3076			while (obj !=
3077			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3078				dsl_dataset_t *snap;
3079				VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3080				    &snap));
3081				ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3082				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3083				dsl_destroy_snapshot_sync_impl(snap,
3084				    B_FALSE, tx);
3085				dsl_dataset_rele(snap, FTAG);
3086			}
3087		}
3088		VERIFY3P(drc->drc_ds->ds_prev, ==,
3089		    origin_head->ds_prev);
3090
3091		dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3092		    origin_head, tx);
3093		dsl_dataset_snapshot_sync_impl(origin_head,
3094		    drc->drc_tosnap, tx);
3095
3096		/* set snapshot's creation time and guid */
3097		dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3098		dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3099		    drc->drc_drrb->drr_creation_time;
3100		dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3101		    drc->drc_drrb->drr_toguid;
3102		dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3103		    ~DS_FLAG_INCONSISTENT;
3104
3105		dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3106		dsl_dataset_phys(origin_head)->ds_flags &=
3107		    ~DS_FLAG_INCONSISTENT;
3108
3109		dsl_dataset_rele(origin_head, FTAG);
3110		dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3111
3112		if (drc->drc_owner != NULL)
3113			VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3114	} else {
3115		dsl_dataset_t *ds = drc->drc_ds;
3116
3117		dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3118
3119		/* set snapshot's creation time and guid */
3120		dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3121		dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3122		    drc->drc_drrb->drr_creation_time;
3123		dsl_dataset_phys(ds->ds_prev)->ds_guid =
3124		    drc->drc_drrb->drr_toguid;
3125		dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3126		    ~DS_FLAG_INCONSISTENT;
3127
3128		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3129		dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3130		if (dsl_dataset_has_resume_receive_state(ds)) {
3131			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3132			    DS_FIELD_RESUME_FROMGUID, tx);
3133			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3134			    DS_FIELD_RESUME_OBJECT, tx);
3135			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3136			    DS_FIELD_RESUME_OFFSET, tx);
3137			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3138			    DS_FIELD_RESUME_BYTES, tx);
3139			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3140			    DS_FIELD_RESUME_TOGUID, tx);
3141			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3142			    DS_FIELD_RESUME_TONAME, tx);
3143		}
3144	}
3145	drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3146	/*
3147	 * Release the hold from dmu_recv_begin.  This must be done before
3148	 * we return to open context, so that when we free the dataset's dnode,
3149	 * we can evict its bonus buffer.
3150	 */
3151	dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
3152	drc->drc_ds = NULL;
3153}
3154
3155static int
3156add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
3157{
3158	dsl_pool_t *dp;
3159	dsl_dataset_t *snapds;
3160	guid_map_entry_t *gmep;
3161	int err;
3162
3163	ASSERT(guid_map != NULL);
3164
3165	err = dsl_pool_hold(name, FTAG, &dp);
3166	if (err != 0)
3167		return (err);
3168	gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
3169	err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
3170	if (err == 0) {
3171		gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
3172		gmep->gme_ds = snapds;
3173		avl_add(guid_map, gmep);
3174		dsl_dataset_long_hold(snapds, gmep);
3175	} else
3176		kmem_free(gmep, sizeof (*gmep));
3177
3178	dsl_pool_rele(dp, FTAG);
3179	return (err);
3180}
3181
3182static int dmu_recv_end_modified_blocks = 3;
3183
3184static int
3185dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3186{
3187	int error;
3188
3189#ifdef _KERNEL
3190	/*
3191	 * We will be destroying the ds; make sure its origin is unmounted if
3192	 * necessary.
3193	 */
3194	char name[ZFS_MAX_DATASET_NAME_LEN];
3195	dsl_dataset_name(drc->drc_ds, name);
3196	zfs_destroy_unmount_origin(name);
3197#endif
3198
3199	error = dsl_sync_task(drc->drc_tofs,
3200	    dmu_recv_end_check, dmu_recv_end_sync, drc,
3201	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3202
3203	if (error != 0)
3204		dmu_recv_cleanup_ds(drc);
3205	return (error);
3206}
3207
3208static int
3209dmu_recv_new_end(dmu_recv_cookie_t *drc)
3210{
3211	int error;
3212
3213	error = dsl_sync_task(drc->drc_tofs,
3214	    dmu_recv_end_check, dmu_recv_end_sync, drc,
3215	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3216
3217	if (error != 0) {
3218		dmu_recv_cleanup_ds(drc);
3219	} else if (drc->drc_guid_to_ds_map != NULL) {
3220		(void) add_ds_to_guidmap(drc->drc_tofs,
3221		    drc->drc_guid_to_ds_map,
3222		    drc->drc_newsnapobj);
3223	}
3224	return (error);
3225}
3226
3227int
3228dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3229{
3230	drc->drc_owner = owner;
3231
3232	if (drc->drc_newfs)
3233		return (dmu_recv_new_end(drc));
3234	else
3235		return (dmu_recv_existing_end(drc));
3236}
3237
3238/*
3239 * Return TRUE if this objset is currently being received into.
3240 */
3241boolean_t
3242dmu_objset_is_receiving(objset_t *os)
3243{
3244	return (os->os_dsl_dataset != NULL &&
3245	    os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3246}
3247