dmu_send.c revision 297112
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved.
27 * Copyright 2014 HybridCluster. All rights reserved.
28 * Copyright 2016 RackTop Systems.
29 * Copyright (c) 2014 Integros [integros.com]
30 */
31
32#include <sys/dmu.h>
33#include <sys/dmu_impl.h>
34#include <sys/dmu_tx.h>
35#include <sys/dbuf.h>
36#include <sys/dnode.h>
37#include <sys/zfs_context.h>
38#include <sys/dmu_objset.h>
39#include <sys/dmu_traverse.h>
40#include <sys/dsl_dataset.h>
41#include <sys/dsl_dir.h>
42#include <sys/dsl_prop.h>
43#include <sys/dsl_pool.h>
44#include <sys/dsl_synctask.h>
45#include <sys/zfs_ioctl.h>
46#include <sys/zap.h>
47#include <sys/zio_checksum.h>
48#include <sys/zfs_znode.h>
49#include <zfs_fletcher.h>
50#include <sys/avl.h>
51#include <sys/ddt.h>
52#include <sys/zfs_onexit.h>
53#include <sys/dmu_send.h>
54#include <sys/dsl_destroy.h>
55#include <sys/blkptr.h>
56#include <sys/dsl_bookmark.h>
57#include <sys/zfeature.h>
58#include <sys/bqueue.h>
59
60#ifdef __FreeBSD__
61#undef dump_write
62#define dump_write dmu_dump_write
63#endif
64
65/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
66int zfs_send_corrupt_data = B_FALSE;
67int zfs_send_queue_length = 16 * 1024 * 1024;
68int zfs_recv_queue_length = 16 * 1024 * 1024;
69/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
70int zfs_send_set_freerecords_bit = B_TRUE;
71
72#ifdef _KERNEL
73TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit);
74#endif
75
76static char *dmu_recv_tag = "dmu_recv_tag";
77const char *recv_clone_name = "%recv";
78
79#define	BP_SPAN(datablkszsec, indblkshift, level) \
80	(((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
81	(level) * (indblkshift - SPA_BLKPTRSHIFT)))
82
83static void byteswap_record(dmu_replay_record_t *drr);
84
85struct send_thread_arg {
86	bqueue_t	q;
87	dsl_dataset_t	*ds;		/* Dataset to traverse */
88	uint64_t	fromtxg;	/* Traverse from this txg */
89	int		flags;		/* flags to pass to traverse_dataset */
90	int		error_code;
91	boolean_t	cancel;
92	zbookmark_phys_t resume;
93};
94
95struct send_block_record {
96	boolean_t		eos_marker; /* Marks the end of the stream */
97	blkptr_t		bp;
98	zbookmark_phys_t	zb;
99	uint8_t			indblkshift;
100	uint16_t		datablkszsec;
101	bqueue_node_t		ln;
102};
103
104static int
105dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
106{
107	dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
108	struct uio auio;
109	struct iovec aiov;
110	ASSERT0(len % 8);
111
112	aiov.iov_base = buf;
113	aiov.iov_len = len;
114	auio.uio_iov = &aiov;
115	auio.uio_iovcnt = 1;
116	auio.uio_resid = len;
117	auio.uio_segflg = UIO_SYSSPACE;
118	auio.uio_rw = UIO_WRITE;
119	auio.uio_offset = (off_t)-1;
120	auio.uio_td = dsp->dsa_td;
121#ifdef _KERNEL
122	if (dsp->dsa_fp->f_type == DTYPE_VNODE)
123		bwillwrite();
124	dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0,
125	    dsp->dsa_td);
126#else
127	fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__);
128	dsp->dsa_err = EOPNOTSUPP;
129#endif
130	mutex_enter(&ds->ds_sendstream_lock);
131	*dsp->dsa_off += len;
132	mutex_exit(&ds->ds_sendstream_lock);
133
134	return (dsp->dsa_err);
135}
136
137/*
138 * For all record types except BEGIN, fill in the checksum (overlaid in
139 * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
140 * up to the start of the checksum itself.
141 */
142static int
143dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
144{
145	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
146	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
147	fletcher_4_incremental_native(dsp->dsa_drr,
148	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
149	    &dsp->dsa_zc);
150	if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
151		ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
152		    drr_checksum.drr_checksum));
153		dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
154	}
155	fletcher_4_incremental_native(&dsp->dsa_drr->
156	    drr_u.drr_checksum.drr_checksum,
157	    sizeof (zio_cksum_t), &dsp->dsa_zc);
158	if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
159		return (SET_ERROR(EINTR));
160	if (payload_len != 0) {
161		fletcher_4_incremental_native(payload, payload_len,
162		    &dsp->dsa_zc);
163		if (dump_bytes(dsp, payload, payload_len) != 0)
164			return (SET_ERROR(EINTR));
165	}
166	return (0);
167}
168
169/*
170 * Fill in the drr_free struct, or perform aggregation if the previous record is
171 * also a free record, and the two are adjacent.
172 *
173 * Note that we send free records even for a full send, because we want to be
174 * able to receive a full send as a clone, which requires a list of all the free
175 * and freeobject records that were generated on the source.
176 */
177static int
178dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
179    uint64_t length)
180{
181	struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
182
183	/*
184	 * When we receive a free record, dbuf_free_range() assumes
185	 * that the receiving system doesn't have any dbufs in the range
186	 * being freed.  This is always true because there is a one-record
187	 * constraint: we only send one WRITE record for any given
188	 * object,offset.  We know that the one-record constraint is
189	 * true because we always send data in increasing order by
190	 * object,offset.
191	 *
192	 * If the increasing-order constraint ever changes, we should find
193	 * another way to assert that the one-record constraint is still
194	 * satisfied.
195	 */
196	ASSERT(object > dsp->dsa_last_data_object ||
197	    (object == dsp->dsa_last_data_object &&
198	    offset > dsp->dsa_last_data_offset));
199
200	if (length != -1ULL && offset + length < offset)
201		length = -1ULL;
202
203	/*
204	 * If there is a pending op, but it's not PENDING_FREE, push it out,
205	 * since free block aggregation can only be done for blocks of the
206	 * same type (i.e., DRR_FREE records can only be aggregated with
207	 * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
208	 * aggregated with other DRR_FREEOBJECTS records.
209	 */
210	if (dsp->dsa_pending_op != PENDING_NONE &&
211	    dsp->dsa_pending_op != PENDING_FREE) {
212		if (dump_record(dsp, NULL, 0) != 0)
213			return (SET_ERROR(EINTR));
214		dsp->dsa_pending_op = PENDING_NONE;
215	}
216
217	if (dsp->dsa_pending_op == PENDING_FREE) {
218		/*
219		 * There should never be a PENDING_FREE if length is -1
220		 * (because dump_dnode is the only place where this
221		 * function is called with a -1, and only after flushing
222		 * any pending record).
223		 */
224		ASSERT(length != -1ULL);
225		/*
226		 * Check to see whether this free block can be aggregated
227		 * with pending one.
228		 */
229		if (drrf->drr_object == object && drrf->drr_offset +
230		    drrf->drr_length == offset) {
231			drrf->drr_length += length;
232			return (0);
233		} else {
234			/* not a continuation.  Push out pending record */
235			if (dump_record(dsp, NULL, 0) != 0)
236				return (SET_ERROR(EINTR));
237			dsp->dsa_pending_op = PENDING_NONE;
238		}
239	}
240	/* create a FREE record and make it pending */
241	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
242	dsp->dsa_drr->drr_type = DRR_FREE;
243	drrf->drr_object = object;
244	drrf->drr_offset = offset;
245	drrf->drr_length = length;
246	drrf->drr_toguid = dsp->dsa_toguid;
247	if (length == -1ULL) {
248		if (dump_record(dsp, NULL, 0) != 0)
249			return (SET_ERROR(EINTR));
250	} else {
251		dsp->dsa_pending_op = PENDING_FREE;
252	}
253
254	return (0);
255}
256
257static int
258dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
259    uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data)
260{
261	struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
262
263	/*
264	 * We send data in increasing object, offset order.
265	 * See comment in dump_free() for details.
266	 */
267	ASSERT(object > dsp->dsa_last_data_object ||
268	    (object == dsp->dsa_last_data_object &&
269	    offset > dsp->dsa_last_data_offset));
270	dsp->dsa_last_data_object = object;
271	dsp->dsa_last_data_offset = offset + blksz - 1;
272
273	/*
274	 * If there is any kind of pending aggregation (currently either
275	 * a grouping of free objects or free blocks), push it out to
276	 * the stream, since aggregation can't be done across operations
277	 * of different types.
278	 */
279	if (dsp->dsa_pending_op != PENDING_NONE) {
280		if (dump_record(dsp, NULL, 0) != 0)
281			return (SET_ERROR(EINTR));
282		dsp->dsa_pending_op = PENDING_NONE;
283	}
284	/* write a WRITE record */
285	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
286	dsp->dsa_drr->drr_type = DRR_WRITE;
287	drrw->drr_object = object;
288	drrw->drr_type = type;
289	drrw->drr_offset = offset;
290	drrw->drr_length = blksz;
291	drrw->drr_toguid = dsp->dsa_toguid;
292	if (bp == NULL || BP_IS_EMBEDDED(bp)) {
293		/*
294		 * There's no pre-computed checksum for partial-block
295		 * writes or embedded BP's, so (like
296		 * fletcher4-checkummed blocks) userland will have to
297		 * compute a dedup-capable checksum itself.
298		 */
299		drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
300	} else {
301		drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
302		if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
303		    ZCHECKSUM_FLAG_DEDUP)
304			drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
305		DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
306		DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
307		DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
308		drrw->drr_key.ddk_cksum = bp->blk_cksum;
309	}
310
311	if (dump_record(dsp, data, blksz) != 0)
312		return (SET_ERROR(EINTR));
313	return (0);
314}
315
316static int
317dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
318    int blksz, const blkptr_t *bp)
319{
320	char buf[BPE_PAYLOAD_SIZE];
321	struct drr_write_embedded *drrw =
322	    &(dsp->dsa_drr->drr_u.drr_write_embedded);
323
324	if (dsp->dsa_pending_op != PENDING_NONE) {
325		if (dump_record(dsp, NULL, 0) != 0)
326			return (EINTR);
327		dsp->dsa_pending_op = PENDING_NONE;
328	}
329
330	ASSERT(BP_IS_EMBEDDED(bp));
331
332	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
333	dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
334	drrw->drr_object = object;
335	drrw->drr_offset = offset;
336	drrw->drr_length = blksz;
337	drrw->drr_toguid = dsp->dsa_toguid;
338	drrw->drr_compression = BP_GET_COMPRESS(bp);
339	drrw->drr_etype = BPE_GET_ETYPE(bp);
340	drrw->drr_lsize = BPE_GET_LSIZE(bp);
341	drrw->drr_psize = BPE_GET_PSIZE(bp);
342
343	decode_embedded_bp_compressed(bp, buf);
344
345	if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
346		return (EINTR);
347	return (0);
348}
349
350static int
351dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
352{
353	struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
354
355	if (dsp->dsa_pending_op != PENDING_NONE) {
356		if (dump_record(dsp, NULL, 0) != 0)
357			return (SET_ERROR(EINTR));
358		dsp->dsa_pending_op = PENDING_NONE;
359	}
360
361	/* write a SPILL record */
362	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
363	dsp->dsa_drr->drr_type = DRR_SPILL;
364	drrs->drr_object = object;
365	drrs->drr_length = blksz;
366	drrs->drr_toguid = dsp->dsa_toguid;
367
368	if (dump_record(dsp, data, blksz) != 0)
369		return (SET_ERROR(EINTR));
370	return (0);
371}
372
373static int
374dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
375{
376	struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
377
378	/*
379	 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
380	 * push it out, since free block aggregation can only be done for
381	 * blocks of the same type (i.e., DRR_FREE records can only be
382	 * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
383	 * can only be aggregated with other DRR_FREEOBJECTS records.
384	 */
385	if (dsp->dsa_pending_op != PENDING_NONE &&
386	    dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
387		if (dump_record(dsp, NULL, 0) != 0)
388			return (SET_ERROR(EINTR));
389		dsp->dsa_pending_op = PENDING_NONE;
390	}
391	if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
392		/*
393		 * See whether this free object array can be aggregated
394		 * with pending one
395		 */
396		if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
397			drrfo->drr_numobjs += numobjs;
398			return (0);
399		} else {
400			/* can't be aggregated.  Push out pending record */
401			if (dump_record(dsp, NULL, 0) != 0)
402				return (SET_ERROR(EINTR));
403			dsp->dsa_pending_op = PENDING_NONE;
404		}
405	}
406
407	/* write a FREEOBJECTS record */
408	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
409	dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
410	drrfo->drr_firstobj = firstobj;
411	drrfo->drr_numobjs = numobjs;
412	drrfo->drr_toguid = dsp->dsa_toguid;
413
414	dsp->dsa_pending_op = PENDING_FREEOBJECTS;
415
416	return (0);
417}
418
419static int
420dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
421{
422	struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
423
424	if (object < dsp->dsa_resume_object) {
425		/*
426		 * Note: when resuming, we will visit all the dnodes in
427		 * the block of dnodes that we are resuming from.  In
428		 * this case it's unnecessary to send the dnodes prior to
429		 * the one we are resuming from.  We should be at most one
430		 * block's worth of dnodes behind the resume point.
431		 */
432		ASSERT3U(dsp->dsa_resume_object - object, <,
433		    1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
434		return (0);
435	}
436
437	if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
438		return (dump_freeobjects(dsp, object, 1));
439
440	if (dsp->dsa_pending_op != PENDING_NONE) {
441		if (dump_record(dsp, NULL, 0) != 0)
442			return (SET_ERROR(EINTR));
443		dsp->dsa_pending_op = PENDING_NONE;
444	}
445
446	/* write an OBJECT record */
447	bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
448	dsp->dsa_drr->drr_type = DRR_OBJECT;
449	drro->drr_object = object;
450	drro->drr_type = dnp->dn_type;
451	drro->drr_bonustype = dnp->dn_bonustype;
452	drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
453	drro->drr_bonuslen = dnp->dn_bonuslen;
454	drro->drr_checksumtype = dnp->dn_checksum;
455	drro->drr_compress = dnp->dn_compress;
456	drro->drr_toguid = dsp->dsa_toguid;
457
458	if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
459	    drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
460		drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
461
462	if (dump_record(dsp, DN_BONUS(dnp),
463	    P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
464		return (SET_ERROR(EINTR));
465	}
466
467	/* Free anything past the end of the file. */
468	if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
469	    (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
470		return (SET_ERROR(EINTR));
471	if (dsp->dsa_err != 0)
472		return (SET_ERROR(EINTR));
473	return (0);
474}
475
476static boolean_t
477backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
478{
479	if (!BP_IS_EMBEDDED(bp))
480		return (B_FALSE);
481
482	/*
483	 * Compression function must be legacy, or explicitly enabled.
484	 */
485	if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
486	    !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4)))
487		return (B_FALSE);
488
489	/*
490	 * Embed type must be explicitly enabled.
491	 */
492	switch (BPE_GET_ETYPE(bp)) {
493	case BP_EMBEDDED_TYPE_DATA:
494		if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
495			return (B_TRUE);
496		break;
497	default:
498		return (B_FALSE);
499	}
500	return (B_FALSE);
501}
502
503/*
504 * This is the callback function to traverse_dataset that acts as the worker
505 * thread for dmu_send_impl.
506 */
507/*ARGSUSED*/
508static int
509send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
510    const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
511{
512	struct send_thread_arg *sta = arg;
513	struct send_block_record *record;
514	uint64_t record_size;
515	int err = 0;
516
517	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
518	    zb->zb_object >= sta->resume.zb_object);
519
520	if (sta->cancel)
521		return (SET_ERROR(EINTR));
522
523	if (bp == NULL) {
524		ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
525		return (0);
526	} else if (zb->zb_level < 0) {
527		return (0);
528	}
529
530	record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
531	record->eos_marker = B_FALSE;
532	record->bp = *bp;
533	record->zb = *zb;
534	record->indblkshift = dnp->dn_indblkshift;
535	record->datablkszsec = dnp->dn_datablkszsec;
536	record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
537	bqueue_enqueue(&sta->q, record, record_size);
538
539	return (err);
540}
541
542/*
543 * This function kicks off the traverse_dataset.  It also handles setting the
544 * error code of the thread in case something goes wrong, and pushes the End of
545 * Stream record when the traverse_dataset call has finished.  If there is no
546 * dataset to traverse, the thread immediately pushes End of Stream marker.
547 */
548static void
549send_traverse_thread(void *arg)
550{
551	struct send_thread_arg *st_arg = arg;
552	int err;
553	struct send_block_record *data;
554
555	if (st_arg->ds != NULL) {
556		err = traverse_dataset_resume(st_arg->ds,
557		    st_arg->fromtxg, &st_arg->resume,
558		    st_arg->flags, send_cb, st_arg);
559
560		if (err != EINTR)
561			st_arg->error_code = err;
562	}
563	data = kmem_zalloc(sizeof (*data), KM_SLEEP);
564	data->eos_marker = B_TRUE;
565	bqueue_enqueue(&st_arg->q, data, 1);
566	thread_exit();
567}
568
569/*
570 * This function actually handles figuring out what kind of record needs to be
571 * dumped, reading the data (which has hopefully been prefetched), and calling
572 * the appropriate helper function.
573 */
574static int
575do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
576{
577	dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
578	const blkptr_t *bp = &data->bp;
579	const zbookmark_phys_t *zb = &data->zb;
580	uint8_t indblkshift = data->indblkshift;
581	uint16_t dblkszsec = data->datablkszsec;
582	spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
583	dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
584	int err = 0;
585
586	ASSERT3U(zb->zb_level, >=, 0);
587
588	ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
589	    zb->zb_object >= dsa->dsa_resume_object);
590
591	if (zb->zb_object != DMU_META_DNODE_OBJECT &&
592	    DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
593		return (0);
594	} else if (BP_IS_HOLE(bp) &&
595	    zb->zb_object == DMU_META_DNODE_OBJECT) {
596		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
597		uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
598		err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
599	} else if (BP_IS_HOLE(bp)) {
600		uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
601		uint64_t offset = zb->zb_blkid * span;
602		err = dump_free(dsa, zb->zb_object, offset, span);
603	} else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
604		return (0);
605	} else if (type == DMU_OT_DNODE) {
606		int blksz = BP_GET_LSIZE(bp);
607		arc_flags_t aflags = ARC_FLAG_WAIT;
608		arc_buf_t *abuf;
609
610		ASSERT0(zb->zb_level);
611
612		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
613		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
614		    &aflags, zb) != 0)
615			return (SET_ERROR(EIO));
616
617		dnode_phys_t *blk = abuf->b_data;
618		uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
619		for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
620			err = dump_dnode(dsa, dnobj + i, blk + i);
621			if (err != 0)
622				break;
623		}
624		(void) arc_buf_remove_ref(abuf, &abuf);
625	} else if (type == DMU_OT_SA) {
626		arc_flags_t aflags = ARC_FLAG_WAIT;
627		arc_buf_t *abuf;
628		int blksz = BP_GET_LSIZE(bp);
629
630		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
631		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
632		    &aflags, zb) != 0)
633			return (SET_ERROR(EIO));
634
635		err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
636		(void) arc_buf_remove_ref(abuf, &abuf);
637	} else if (backup_do_embed(dsa, bp)) {
638		/* it's an embedded level-0 block of a regular object */
639		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
640		ASSERT0(zb->zb_level);
641		err = dump_write_embedded(dsa, zb->zb_object,
642		    zb->zb_blkid * blksz, blksz, bp);
643	} else {
644		/* it's a level-0 block of a regular object */
645		arc_flags_t aflags = ARC_FLAG_WAIT;
646		arc_buf_t *abuf;
647		int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
648		uint64_t offset;
649
650		ASSERT0(zb->zb_level);
651		ASSERT(zb->zb_object > dsa->dsa_resume_object ||
652		    (zb->zb_object == dsa->dsa_resume_object &&
653		    zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
654
655		if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
656		    ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
657		    &aflags, zb) != 0) {
658			if (zfs_send_corrupt_data) {
659				/* Send a block filled with 0x"zfs badd bloc" */
660				abuf = arc_buf_alloc(spa, blksz, &abuf,
661				    ARC_BUFC_DATA);
662				uint64_t *ptr;
663				for (ptr = abuf->b_data;
664				    (char *)ptr < (char *)abuf->b_data + blksz;
665				    ptr++)
666					*ptr = 0x2f5baddb10cULL;
667			} else {
668				return (SET_ERROR(EIO));
669			}
670		}
671
672		offset = zb->zb_blkid * blksz;
673
674		if (!(dsa->dsa_featureflags &
675		    DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
676		    blksz > SPA_OLD_MAXBLOCKSIZE) {
677			char *buf = abuf->b_data;
678			while (blksz > 0 && err == 0) {
679				int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
680				err = dump_write(dsa, type, zb->zb_object,
681				    offset, n, NULL, buf);
682				offset += n;
683				buf += n;
684				blksz -= n;
685			}
686		} else {
687			err = dump_write(dsa, type, zb->zb_object,
688			    offset, blksz, bp, abuf->b_data);
689		}
690		(void) arc_buf_remove_ref(abuf, &abuf);
691	}
692
693	ASSERT(err == 0 || err == EINTR);
694	return (err);
695}
696
697/*
698 * Pop the new data off the queue, and free the old data.
699 */
700static struct send_block_record *
701get_next_record(bqueue_t *bq, struct send_block_record *data)
702{
703	struct send_block_record *tmp = bqueue_dequeue(bq);
704	kmem_free(data, sizeof (*data));
705	return (tmp);
706}
707
708/*
709 * Actually do the bulk of the work in a zfs send.
710 *
711 * Note: Releases dp using the specified tag.
712 */
713static int
714dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
715    zfs_bookmark_phys_t *ancestor_zb,
716    boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd,
717    uint64_t resumeobj, uint64_t resumeoff,
718#ifdef illumos
719    vnode_t *vp, offset_t *off)
720#else
721    struct file *fp, offset_t *off)
722#endif
723{
724	objset_t *os;
725	dmu_replay_record_t *drr;
726	dmu_sendarg_t *dsp;
727	int err;
728	uint64_t fromtxg = 0;
729	uint64_t featureflags = 0;
730	struct send_thread_arg to_arg = { 0 };
731
732	err = dmu_objset_from_ds(to_ds, &os);
733	if (err != 0) {
734		dsl_pool_rele(dp, tag);
735		return (err);
736	}
737
738	drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
739	drr->drr_type = DRR_BEGIN;
740	drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
741	DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
742	    DMU_SUBSTREAM);
743
744#ifdef _KERNEL
745	if (dmu_objset_type(os) == DMU_OST_ZFS) {
746		uint64_t version;
747		if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
748			kmem_free(drr, sizeof (dmu_replay_record_t));
749			dsl_pool_rele(dp, tag);
750			return (SET_ERROR(EINVAL));
751		}
752		if (version >= ZPL_VERSION_SA) {
753			featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
754		}
755	}
756#endif
757
758	if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
759		featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
760	if (embedok &&
761	    spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
762		featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
763		if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
764			featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4;
765	}
766
767	if (resumeobj != 0 || resumeoff != 0) {
768		featureflags |= DMU_BACKUP_FEATURE_RESUMING;
769	}
770
771	DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
772	    featureflags);
773
774	drr->drr_u.drr_begin.drr_creation_time =
775	    dsl_dataset_phys(to_ds)->ds_creation_time;
776	drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
777	if (is_clone)
778		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
779	drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
780	if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
781		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
782	if (zfs_send_set_freerecords_bit)
783		drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
784
785	if (ancestor_zb != NULL) {
786		drr->drr_u.drr_begin.drr_fromguid =
787		    ancestor_zb->zbm_guid;
788		fromtxg = ancestor_zb->zbm_creation_txg;
789	}
790	dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
791	if (!to_ds->ds_is_snapshot) {
792		(void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
793		    sizeof (drr->drr_u.drr_begin.drr_toname));
794	}
795
796	dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
797
798	dsp->dsa_drr = drr;
799	dsp->dsa_outfd = outfd;
800	dsp->dsa_proc = curproc;
801	dsp->dsa_td = curthread;
802	dsp->dsa_fp = fp;
803	dsp->dsa_os = os;
804	dsp->dsa_off = off;
805	dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
806	dsp->dsa_pending_op = PENDING_NONE;
807	dsp->dsa_featureflags = featureflags;
808	dsp->dsa_resume_object = resumeobj;
809	dsp->dsa_resume_offset = resumeoff;
810
811	mutex_enter(&to_ds->ds_sendstream_lock);
812	list_insert_head(&to_ds->ds_sendstreams, dsp);
813	mutex_exit(&to_ds->ds_sendstream_lock);
814
815	dsl_dataset_long_hold(to_ds, FTAG);
816	dsl_pool_rele(dp, tag);
817
818	void *payload = NULL;
819	size_t payload_len = 0;
820	if (resumeobj != 0 || resumeoff != 0) {
821		dmu_object_info_t to_doi;
822		err = dmu_object_info(os, resumeobj, &to_doi);
823		if (err != 0)
824			goto out;
825		SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
826		    resumeoff / to_doi.doi_data_block_size);
827
828		nvlist_t *nvl = fnvlist_alloc();
829		fnvlist_add_uint64(nvl, "resume_object", resumeobj);
830		fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
831		payload = fnvlist_pack(nvl, &payload_len);
832		drr->drr_payloadlen = payload_len;
833		fnvlist_free(nvl);
834	}
835
836	err = dump_record(dsp, payload, payload_len);
837	fnvlist_pack_free(payload, payload_len);
838	if (err != 0) {
839		err = dsp->dsa_err;
840		goto out;
841	}
842
843	err = bqueue_init(&to_arg.q, zfs_send_queue_length,
844	    offsetof(struct send_block_record, ln));
845	to_arg.error_code = 0;
846	to_arg.cancel = B_FALSE;
847	to_arg.ds = to_ds;
848	to_arg.fromtxg = fromtxg;
849	to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
850	(void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0,
851	    TS_RUN, minclsyspri);
852
853	struct send_block_record *to_data;
854	to_data = bqueue_dequeue(&to_arg.q);
855
856	while (!to_data->eos_marker && err == 0) {
857		err = do_dump(dsp, to_data);
858		to_data = get_next_record(&to_arg.q, to_data);
859		if (issig(JUSTLOOKING) && issig(FORREAL))
860			err = EINTR;
861	}
862
863	if (err != 0) {
864		to_arg.cancel = B_TRUE;
865		while (!to_data->eos_marker) {
866			to_data = get_next_record(&to_arg.q, to_data);
867		}
868	}
869	kmem_free(to_data, sizeof (*to_data));
870
871	bqueue_destroy(&to_arg.q);
872
873	if (err == 0 && to_arg.error_code != 0)
874		err = to_arg.error_code;
875
876	if (err != 0)
877		goto out;
878
879	if (dsp->dsa_pending_op != PENDING_NONE)
880		if (dump_record(dsp, NULL, 0) != 0)
881			err = SET_ERROR(EINTR);
882
883	if (err != 0) {
884		if (err == EINTR && dsp->dsa_err != 0)
885			err = dsp->dsa_err;
886		goto out;
887	}
888
889	bzero(drr, sizeof (dmu_replay_record_t));
890	drr->drr_type = DRR_END;
891	drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
892	drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
893
894	if (dump_record(dsp, NULL, 0) != 0)
895		err = dsp->dsa_err;
896
897out:
898	mutex_enter(&to_ds->ds_sendstream_lock);
899	list_remove(&to_ds->ds_sendstreams, dsp);
900	mutex_exit(&to_ds->ds_sendstream_lock);
901
902	kmem_free(drr, sizeof (dmu_replay_record_t));
903	kmem_free(dsp, sizeof (dmu_sendarg_t));
904
905	dsl_dataset_long_rele(to_ds, FTAG);
906
907	return (err);
908}
909
910int
911dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
912    boolean_t embedok, boolean_t large_block_ok,
913#ifdef illumos
914    int outfd, vnode_t *vp, offset_t *off)
915#else
916    int outfd, struct file *fp, offset_t *off)
917#endif
918{
919	dsl_pool_t *dp;
920	dsl_dataset_t *ds;
921	dsl_dataset_t *fromds = NULL;
922	int err;
923
924	err = dsl_pool_hold(pool, FTAG, &dp);
925	if (err != 0)
926		return (err);
927
928	err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
929	if (err != 0) {
930		dsl_pool_rele(dp, FTAG);
931		return (err);
932	}
933
934	if (fromsnap != 0) {
935		zfs_bookmark_phys_t zb;
936		boolean_t is_clone;
937
938		err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
939		if (err != 0) {
940			dsl_dataset_rele(ds, FTAG);
941			dsl_pool_rele(dp, FTAG);
942			return (err);
943		}
944		if (!dsl_dataset_is_before(ds, fromds, 0))
945			err = SET_ERROR(EXDEV);
946		zb.zbm_creation_time =
947		    dsl_dataset_phys(fromds)->ds_creation_time;
948		zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
949		zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
950		is_clone = (fromds->ds_dir != ds->ds_dir);
951		dsl_dataset_rele(fromds, FTAG);
952		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
953		    embedok, large_block_ok, outfd, 0, 0, fp, off);
954	} else {
955		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
956		    embedok, large_block_ok, outfd, 0, 0, fp, off);
957	}
958	dsl_dataset_rele(ds, FTAG);
959	return (err);
960}
961
962int
963dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
964    boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
965#ifdef illumos
966    vnode_t *vp, offset_t *off)
967#else
968    struct file *fp, offset_t *off)
969#endif
970{
971	dsl_pool_t *dp;
972	dsl_dataset_t *ds;
973	int err;
974	boolean_t owned = B_FALSE;
975
976	if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
977		return (SET_ERROR(EINVAL));
978
979	err = dsl_pool_hold(tosnap, FTAG, &dp);
980	if (err != 0)
981		return (err);
982
983	if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
984		/*
985		 * We are sending a filesystem or volume.  Ensure
986		 * that it doesn't change by owning the dataset.
987		 */
988		err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
989		owned = B_TRUE;
990	} else {
991		err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
992	}
993	if (err != 0) {
994		dsl_pool_rele(dp, FTAG);
995		return (err);
996	}
997
998	if (fromsnap != NULL) {
999		zfs_bookmark_phys_t zb;
1000		boolean_t is_clone = B_FALSE;
1001		int fsnamelen = strchr(tosnap, '@') - tosnap;
1002
1003		/*
1004		 * If the fromsnap is in a different filesystem, then
1005		 * mark the send stream as a clone.
1006		 */
1007		if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1008		    (fromsnap[fsnamelen] != '@' &&
1009		    fromsnap[fsnamelen] != '#')) {
1010			is_clone = B_TRUE;
1011		}
1012
1013		if (strchr(fromsnap, '@')) {
1014			dsl_dataset_t *fromds;
1015			err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1016			if (err == 0) {
1017				if (!dsl_dataset_is_before(ds, fromds, 0))
1018					err = SET_ERROR(EXDEV);
1019				zb.zbm_creation_time =
1020				    dsl_dataset_phys(fromds)->ds_creation_time;
1021				zb.zbm_creation_txg =
1022				    dsl_dataset_phys(fromds)->ds_creation_txg;
1023				zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1024				is_clone = (ds->ds_dir != fromds->ds_dir);
1025				dsl_dataset_rele(fromds, FTAG);
1026			}
1027		} else {
1028			err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1029		}
1030		if (err != 0) {
1031			dsl_dataset_rele(ds, FTAG);
1032			dsl_pool_rele(dp, FTAG);
1033			return (err);
1034		}
1035		err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1036		    embedok, large_block_ok,
1037		    outfd, resumeobj, resumeoff, fp, off);
1038	} else {
1039		err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1040		    embedok, large_block_ok,
1041		    outfd, resumeobj, resumeoff, fp, off);
1042	}
1043	if (owned)
1044		dsl_dataset_disown(ds, FTAG);
1045	else
1046		dsl_dataset_rele(ds, FTAG);
1047	return (err);
1048}
1049
1050static int
1051dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size,
1052    uint64_t *sizep)
1053{
1054	int err;
1055	/*
1056	 * Assume that space (both on-disk and in-stream) is dominated by
1057	 * data.  We will adjust for indirect blocks and the copies property,
1058	 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1059	 */
1060
1061	/*
1062	 * Subtract out approximate space used by indirect blocks.
1063	 * Assume most space is used by data blocks (non-indirect, non-dnode).
1064	 * Assume all blocks are recordsize.  Assume ditto blocks and
1065	 * internal fragmentation counter out compression.
1066	 *
1067	 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1068	 * block, which we observe in practice.
1069	 */
1070	uint64_t recordsize;
1071	err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize);
1072	if (err != 0)
1073		return (err);
1074	size -= size / recordsize * sizeof (blkptr_t);
1075
1076	/* Add in the space for the record associated with each block. */
1077	size += size / recordsize * sizeof (dmu_replay_record_t);
1078
1079	*sizep = size;
1080
1081	return (0);
1082}
1083
1084int
1085dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep)
1086{
1087	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1088	int err;
1089	uint64_t size;
1090
1091	ASSERT(dsl_pool_config_held(dp));
1092
1093	/* tosnap must be a snapshot */
1094	if (!ds->ds_is_snapshot)
1095		return (SET_ERROR(EINVAL));
1096
1097	/* fromsnap, if provided, must be a snapshot */
1098	if (fromds != NULL && !fromds->ds_is_snapshot)
1099		return (SET_ERROR(EINVAL));
1100
1101	/*
1102	 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1103	 * or the origin's fs.
1104	 */
1105	if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1106		return (SET_ERROR(EXDEV));
1107
1108	/* Get uncompressed size estimate of changed data. */
1109	if (fromds == NULL) {
1110		size = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1111	} else {
1112		uint64_t used, comp;
1113		err = dsl_dataset_space_written(fromds, ds,
1114		    &used, &comp, &size);
1115		if (err != 0)
1116			return (err);
1117	}
1118
1119	err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1120	return (err);
1121}
1122
1123/*
1124 * Simple callback used to traverse the blocks of a snapshot and sum their
1125 * uncompressed size
1126 */
1127/* ARGSUSED */
1128static int
1129dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1130    const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1131{
1132	uint64_t *spaceptr = arg;
1133	if (bp != NULL && !BP_IS_HOLE(bp)) {
1134		*spaceptr += BP_GET_UCSIZE(bp);
1135	}
1136	return (0);
1137}
1138
1139/*
1140 * Given a desination snapshot and a TXG, calculate the approximate size of a
1141 * send stream sent from that TXG. from_txg may be zero, indicating that the
1142 * whole snapshot will be sent.
1143 */
1144int
1145dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1146    uint64_t *sizep)
1147{
1148	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1149	int err;
1150	uint64_t size = 0;
1151
1152	ASSERT(dsl_pool_config_held(dp));
1153
1154	/* tosnap must be a snapshot */
1155	if (!dsl_dataset_is_snapshot(ds))
1156		return (SET_ERROR(EINVAL));
1157
1158	/* verify that from_txg is before the provided snapshot was taken */
1159	if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1160		return (SET_ERROR(EXDEV));
1161	}
1162
1163	/*
1164	 * traverse the blocks of the snapshot with birth times after
1165	 * from_txg, summing their uncompressed size
1166	 */
1167	err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1168	    dmu_calculate_send_traversal, &size);
1169	if (err)
1170		return (err);
1171
1172	err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1173	return (err);
1174}
1175
1176typedef struct dmu_recv_begin_arg {
1177	const char *drba_origin;
1178	dmu_recv_cookie_t *drba_cookie;
1179	cred_t *drba_cred;
1180	uint64_t drba_snapobj;
1181} dmu_recv_begin_arg_t;
1182
1183static int
1184recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1185    uint64_t fromguid)
1186{
1187	uint64_t val;
1188	int error;
1189	dsl_pool_t *dp = ds->ds_dir->dd_pool;
1190
1191	/* temporary clone name must not exist */
1192	error = zap_lookup(dp->dp_meta_objset,
1193	    dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1194	    8, 1, &val);
1195	if (error != ENOENT)
1196		return (error == 0 ? EBUSY : error);
1197
1198	/* new snapshot name must not exist */
1199	error = zap_lookup(dp->dp_meta_objset,
1200	    dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1201	    drba->drba_cookie->drc_tosnap, 8, 1, &val);
1202	if (error != ENOENT)
1203		return (error == 0 ? EEXIST : error);
1204
1205	/*
1206	 * Check snapshot limit before receiving. We'll recheck again at the
1207	 * end, but might as well abort before receiving if we're already over
1208	 * the limit.
1209	 *
1210	 * Note that we do not check the file system limit with
1211	 * dsl_dir_fscount_check because the temporary %clones don't count
1212	 * against that limit.
1213	 */
1214	error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1215	    NULL, drba->drba_cred);
1216	if (error != 0)
1217		return (error);
1218
1219	if (fromguid != 0) {
1220		dsl_dataset_t *snap;
1221		uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1222
1223		/* Find snapshot in this dir that matches fromguid. */
1224		while (obj != 0) {
1225			error = dsl_dataset_hold_obj(dp, obj, FTAG,
1226			    &snap);
1227			if (error != 0)
1228				return (SET_ERROR(ENODEV));
1229			if (snap->ds_dir != ds->ds_dir) {
1230				dsl_dataset_rele(snap, FTAG);
1231				return (SET_ERROR(ENODEV));
1232			}
1233			if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1234				break;
1235			obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1236			dsl_dataset_rele(snap, FTAG);
1237		}
1238		if (obj == 0)
1239			return (SET_ERROR(ENODEV));
1240
1241		if (drba->drba_cookie->drc_force) {
1242			drba->drba_snapobj = obj;
1243		} else {
1244			/*
1245			 * If we are not forcing, there must be no
1246			 * changes since fromsnap.
1247			 */
1248			if (dsl_dataset_modified_since_snap(ds, snap)) {
1249				dsl_dataset_rele(snap, FTAG);
1250				return (SET_ERROR(ETXTBSY));
1251			}
1252			drba->drba_snapobj = ds->ds_prev->ds_object;
1253		}
1254
1255		dsl_dataset_rele(snap, FTAG);
1256	} else {
1257		/* if full, then must be forced */
1258		if (!drba->drba_cookie->drc_force)
1259			return (SET_ERROR(EEXIST));
1260		/* start from $ORIGIN@$ORIGIN, if supported */
1261		drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1262		    dp->dp_origin_snap->ds_object : 0;
1263	}
1264
1265	return (0);
1266
1267}
1268
1269static int
1270dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1271{
1272	dmu_recv_begin_arg_t *drba = arg;
1273	dsl_pool_t *dp = dmu_tx_pool(tx);
1274	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1275	uint64_t fromguid = drrb->drr_fromguid;
1276	int flags = drrb->drr_flags;
1277	int error;
1278	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1279	dsl_dataset_t *ds;
1280	const char *tofs = drba->drba_cookie->drc_tofs;
1281
1282	/* already checked */
1283	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1284	ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
1285
1286	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1287	    DMU_COMPOUNDSTREAM ||
1288	    drrb->drr_type >= DMU_OST_NUMTYPES ||
1289	    ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1290		return (SET_ERROR(EINVAL));
1291
1292	/* Verify pool version supports SA if SA_SPILL feature set */
1293	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1294	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
1295		return (SET_ERROR(ENOTSUP));
1296
1297	if (drba->drba_cookie->drc_resumable &&
1298	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
1299		return (SET_ERROR(ENOTSUP));
1300
1301	/*
1302	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1303	 * record to a plan WRITE record, so the pool must have the
1304	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1305	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1306	 */
1307	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1308	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1309		return (SET_ERROR(ENOTSUP));
1310	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1311	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1312		return (SET_ERROR(ENOTSUP));
1313
1314	/*
1315	 * The receiving code doesn't know how to translate large blocks
1316	 * to smaller ones, so the pool must have the LARGE_BLOCKS
1317	 * feature enabled if the stream has LARGE_BLOCKS.
1318	 */
1319	if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1320	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1321		return (SET_ERROR(ENOTSUP));
1322
1323	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1324	if (error == 0) {
1325		/* target fs already exists; recv into temp clone */
1326
1327		/* Can't recv a clone into an existing fs */
1328		if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1329			dsl_dataset_rele(ds, FTAG);
1330			return (SET_ERROR(EINVAL));
1331		}
1332
1333		error = recv_begin_check_existing_impl(drba, ds, fromguid);
1334		dsl_dataset_rele(ds, FTAG);
1335	} else if (error == ENOENT) {
1336		/* target fs does not exist; must be a full backup or clone */
1337		char buf[MAXNAMELEN];
1338
1339		/*
1340		 * If it's a non-clone incremental, we are missing the
1341		 * target fs, so fail the recv.
1342		 */
1343		if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1344		    drba->drba_origin))
1345			return (SET_ERROR(ENOENT));
1346
1347		/*
1348		 * If we're receiving a full send as a clone, and it doesn't
1349		 * contain all the necessary free records and freeobject
1350		 * records, reject it.
1351		 */
1352		if (fromguid == 0 && drba->drba_origin &&
1353		    !(flags & DRR_FLAG_FREERECORDS))
1354			return (SET_ERROR(EINVAL));
1355
1356		/* Open the parent of tofs */
1357		ASSERT3U(strlen(tofs), <, MAXNAMELEN);
1358		(void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1359		error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1360		if (error != 0)
1361			return (error);
1362
1363		/*
1364		 * Check filesystem and snapshot limits before receiving. We'll
1365		 * recheck snapshot limits again at the end (we create the
1366		 * filesystems and increment those counts during begin_sync).
1367		 */
1368		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1369		    ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1370		if (error != 0) {
1371			dsl_dataset_rele(ds, FTAG);
1372			return (error);
1373		}
1374
1375		error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1376		    ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1377		if (error != 0) {
1378			dsl_dataset_rele(ds, FTAG);
1379			return (error);
1380		}
1381
1382		if (drba->drba_origin != NULL) {
1383			dsl_dataset_t *origin;
1384			error = dsl_dataset_hold(dp, drba->drba_origin,
1385			    FTAG, &origin);
1386			if (error != 0) {
1387				dsl_dataset_rele(ds, FTAG);
1388				return (error);
1389			}
1390			if (!origin->ds_is_snapshot) {
1391				dsl_dataset_rele(origin, FTAG);
1392				dsl_dataset_rele(ds, FTAG);
1393				return (SET_ERROR(EINVAL));
1394			}
1395			if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
1396			    fromguid != 0) {
1397				dsl_dataset_rele(origin, FTAG);
1398				dsl_dataset_rele(ds, FTAG);
1399				return (SET_ERROR(ENODEV));
1400			}
1401			dsl_dataset_rele(origin, FTAG);
1402		}
1403		dsl_dataset_rele(ds, FTAG);
1404		error = 0;
1405	}
1406	return (error);
1407}
1408
1409static void
1410dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1411{
1412	dmu_recv_begin_arg_t *drba = arg;
1413	dsl_pool_t *dp = dmu_tx_pool(tx);
1414	objset_t *mos = dp->dp_meta_objset;
1415	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1416	const char *tofs = drba->drba_cookie->drc_tofs;
1417	dsl_dataset_t *ds, *newds;
1418	uint64_t dsobj;
1419	int error;
1420	uint64_t crflags = 0;
1421
1422	if (drrb->drr_flags & DRR_FLAG_CI_DATA)
1423		crflags |= DS_FLAG_CI_DATASET;
1424
1425	error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1426	if (error == 0) {
1427		/* create temporary clone */
1428		dsl_dataset_t *snap = NULL;
1429		if (drba->drba_snapobj != 0) {
1430			VERIFY0(dsl_dataset_hold_obj(dp,
1431			    drba->drba_snapobj, FTAG, &snap));
1432		}
1433		dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1434		    snap, crflags, drba->drba_cred, tx);
1435		if (drba->drba_snapobj != 0)
1436			dsl_dataset_rele(snap, FTAG);
1437		dsl_dataset_rele(ds, FTAG);
1438	} else {
1439		dsl_dir_t *dd;
1440		const char *tail;
1441		dsl_dataset_t *origin = NULL;
1442
1443		VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1444
1445		if (drba->drba_origin != NULL) {
1446			VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1447			    FTAG, &origin));
1448		}
1449
1450		/* Create new dataset. */
1451		dsobj = dsl_dataset_create_sync(dd,
1452		    strrchr(tofs, '/') + 1,
1453		    origin, crflags, drba->drba_cred, tx);
1454		if (origin != NULL)
1455			dsl_dataset_rele(origin, FTAG);
1456		dsl_dir_rele(dd, FTAG);
1457		drba->drba_cookie->drc_newfs = B_TRUE;
1458	}
1459	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1460
1461	if (drba->drba_cookie->drc_resumable) {
1462		dsl_dataset_zapify(newds, tx);
1463		if (drrb->drr_fromguid != 0) {
1464			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
1465			    8, 1, &drrb->drr_fromguid, tx));
1466		}
1467		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
1468		    8, 1, &drrb->drr_toguid, tx));
1469		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
1470		    1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
1471		uint64_t one = 1;
1472		uint64_t zero = 0;
1473		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
1474		    8, 1, &one, tx));
1475		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
1476		    8, 1, &zero, tx));
1477		VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
1478		    8, 1, &zero, tx));
1479		if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) &
1480		    DMU_BACKUP_FEATURE_EMBED_DATA) {
1481			VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
1482			    8, 1, &one, tx));
1483		}
1484	}
1485
1486	dmu_buf_will_dirty(newds->ds_dbuf, tx);
1487	dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1488
1489	/*
1490	 * If we actually created a non-clone, we need to create the
1491	 * objset in our new dataset.
1492	 */
1493	if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1494		(void) dmu_objset_create_impl(dp->dp_spa,
1495		    newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1496	}
1497
1498	drba->drba_cookie->drc_ds = newds;
1499
1500	spa_history_log_internal_ds(newds, "receive", tx, "");
1501}
1502
1503static int
1504dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1505{
1506	dmu_recv_begin_arg_t *drba = arg;
1507	dsl_pool_t *dp = dmu_tx_pool(tx);
1508	struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1509	int error;
1510	uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1511	dsl_dataset_t *ds;
1512	const char *tofs = drba->drba_cookie->drc_tofs;
1513
1514	/* already checked */
1515	ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1516	ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
1517
1518	if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1519	    DMU_COMPOUNDSTREAM ||
1520	    drrb->drr_type >= DMU_OST_NUMTYPES)
1521		return (SET_ERROR(EINVAL));
1522
1523	/* Verify pool version supports SA if SA_SPILL feature set */
1524	if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1525	    spa_version(dp->dp_spa) < SPA_VERSION_SA)
1526		return (SET_ERROR(ENOTSUP));
1527
1528	/*
1529	 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1530	 * record to a plain WRITE record, so the pool must have the
1531	 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1532	 * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
1533	 */
1534	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1535	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1536		return (SET_ERROR(ENOTSUP));
1537	if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1538	    !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1539		return (SET_ERROR(ENOTSUP));
1540
1541	char recvname[ZFS_MAXNAMELEN];
1542
1543	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
1544	    tofs, recv_clone_name);
1545
1546	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1547		/* %recv does not exist; continue in tofs */
1548		error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1549		if (error != 0)
1550			return (error);
1551	}
1552
1553	/* check that ds is marked inconsistent */
1554	if (!DS_IS_INCONSISTENT(ds)) {
1555		dsl_dataset_rele(ds, FTAG);
1556		return (SET_ERROR(EINVAL));
1557	}
1558
1559	/* check that there is resuming data, and that the toguid matches */
1560	if (!dsl_dataset_is_zapified(ds)) {
1561		dsl_dataset_rele(ds, FTAG);
1562		return (SET_ERROR(EINVAL));
1563	}
1564	uint64_t val;
1565	error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1566	    DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1567	if (error != 0 || drrb->drr_toguid != val) {
1568		dsl_dataset_rele(ds, FTAG);
1569		return (SET_ERROR(EINVAL));
1570	}
1571
1572	/*
1573	 * Check if the receive is still running.  If so, it will be owned.
1574	 * Note that nothing else can own the dataset (e.g. after the receive
1575	 * fails) because it will be marked inconsistent.
1576	 */
1577	if (dsl_dataset_has_owner(ds)) {
1578		dsl_dataset_rele(ds, FTAG);
1579		return (SET_ERROR(EBUSY));
1580	}
1581
1582	/* There should not be any snapshots of this fs yet. */
1583	if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1584		dsl_dataset_rele(ds, FTAG);
1585		return (SET_ERROR(EINVAL));
1586	}
1587
1588	/*
1589	 * Note: resume point will be checked when we process the first WRITE
1590	 * record.
1591	 */
1592
1593	/* check that the origin matches */
1594	val = 0;
1595	(void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1596	    DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1597	if (drrb->drr_fromguid != val) {
1598		dsl_dataset_rele(ds, FTAG);
1599		return (SET_ERROR(EINVAL));
1600	}
1601
1602	dsl_dataset_rele(ds, FTAG);
1603	return (0);
1604}
1605
1606static void
1607dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1608{
1609	dmu_recv_begin_arg_t *drba = arg;
1610	dsl_pool_t *dp = dmu_tx_pool(tx);
1611	const char *tofs = drba->drba_cookie->drc_tofs;
1612	dsl_dataset_t *ds;
1613	uint64_t dsobj;
1614	char recvname[ZFS_MAXNAMELEN];
1615
1616	(void) snprintf(recvname, sizeof (recvname), "%s/%s",
1617	    tofs, recv_clone_name);
1618
1619	if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1620		/* %recv does not exist; continue in tofs */
1621		VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds));
1622		drba->drba_cookie->drc_newfs = B_TRUE;
1623	}
1624
1625	/* clear the inconsistent flag so that we can own it */
1626	ASSERT(DS_IS_INCONSISTENT(ds));
1627	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1628	dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
1629	dsobj = ds->ds_object;
1630	dsl_dataset_rele(ds, FTAG);
1631
1632	VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1633
1634	dmu_buf_will_dirty(ds->ds_dbuf, tx);
1635	dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1636
1637	ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1638
1639	drba->drba_cookie->drc_ds = ds;
1640
1641	spa_history_log_internal_ds(ds, "resume receive", tx, "");
1642}
1643
1644/*
1645 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1646 * succeeds; otherwise we will leak the holds on the datasets.
1647 */
1648int
1649dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1650    boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)
1651{
1652	dmu_recv_begin_arg_t drba = { 0 };
1653
1654	bzero(drc, sizeof (dmu_recv_cookie_t));
1655	drc->drc_drr_begin = drr_begin;
1656	drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1657	drc->drc_tosnap = tosnap;
1658	drc->drc_tofs = tofs;
1659	drc->drc_force = force;
1660	drc->drc_resumable = resumable;
1661	drc->drc_cred = CRED();
1662
1663	if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1664		drc->drc_byteswap = B_TRUE;
1665		fletcher_4_incremental_byteswap(drr_begin,
1666		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
1667		byteswap_record(drr_begin);
1668	} else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1669		fletcher_4_incremental_native(drr_begin,
1670		    sizeof (dmu_replay_record_t), &drc->drc_cksum);
1671	} else {
1672		return (SET_ERROR(EINVAL));
1673	}
1674
1675	drba.drba_origin = origin;
1676	drba.drba_cookie = drc;
1677	drba.drba_cred = CRED();
1678
1679	if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1680	    DMU_BACKUP_FEATURE_RESUMING) {
1681		return (dsl_sync_task(tofs,
1682		    dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1683		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1684	} else  {
1685		return (dsl_sync_task(tofs,
1686		    dmu_recv_begin_check, dmu_recv_begin_sync,
1687		    &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1688	}
1689}
1690
1691struct receive_record_arg {
1692	dmu_replay_record_t header;
1693	void *payload; /* Pointer to a buffer containing the payload */
1694	/*
1695	 * If the record is a write, pointer to the arc_buf_t containing the
1696	 * payload.
1697	 */
1698	arc_buf_t *write_buf;
1699	int payload_size;
1700	uint64_t bytes_read; /* bytes read from stream when record created */
1701	boolean_t eos_marker; /* Marks the end of the stream */
1702	bqueue_node_t node;
1703};
1704
1705struct receive_writer_arg {
1706	objset_t *os;
1707	boolean_t byteswap;
1708	bqueue_t q;
1709
1710	/*
1711	 * These three args are used to signal to the main thread that we're
1712	 * done.
1713	 */
1714	kmutex_t mutex;
1715	kcondvar_t cv;
1716	boolean_t done;
1717
1718	int err;
1719	/* A map from guid to dataset to help handle dedup'd streams. */
1720	avl_tree_t *guid_to_ds_map;
1721	boolean_t resumable;
1722	uint64_t last_object, last_offset;
1723	uint64_t bytes_read; /* bytes read when current record created */
1724};
1725
1726struct objlist {
1727	list_t list; /* List of struct receive_objnode. */
1728	/*
1729	 * Last object looked up. Used to assert that objects are being looked
1730	 * up in ascending order.
1731	 */
1732	uint64_t last_lookup;
1733};
1734
1735struct receive_objnode {
1736	list_node_t node;
1737	uint64_t object;
1738};
1739
1740struct receive_arg  {
1741	objset_t *os;
1742	kthread_t *td;
1743	struct file *fp;
1744	uint64_t voff; /* The current offset in the stream */
1745	uint64_t bytes_read;
1746	/*
1747	 * A record that has had its payload read in, but hasn't yet been handed
1748	 * off to the worker thread.
1749	 */
1750	struct receive_record_arg *rrd;
1751	/* A record that has had its header read in, but not its payload. */
1752	struct receive_record_arg *next_rrd;
1753	zio_cksum_t cksum;
1754	zio_cksum_t prev_cksum;
1755	int err;
1756	boolean_t byteswap;
1757	/* Sorted list of objects not to issue prefetches for. */
1758	struct objlist ignore_objlist;
1759};
1760
1761typedef struct guid_map_entry {
1762	uint64_t	guid;
1763	dsl_dataset_t	*gme_ds;
1764	avl_node_t	avlnode;
1765} guid_map_entry_t;
1766
1767static int
1768guid_compare(const void *arg1, const void *arg2)
1769{
1770	const guid_map_entry_t *gmep1 = arg1;
1771	const guid_map_entry_t *gmep2 = arg2;
1772
1773	if (gmep1->guid < gmep2->guid)
1774		return (-1);
1775	else if (gmep1->guid > gmep2->guid)
1776		return (1);
1777	return (0);
1778}
1779
1780static void
1781free_guid_map_onexit(void *arg)
1782{
1783	avl_tree_t *ca = arg;
1784	void *cookie = NULL;
1785	guid_map_entry_t *gmep;
1786
1787	while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1788		dsl_dataset_long_rele(gmep->gme_ds, gmep);
1789		dsl_dataset_rele(gmep->gme_ds, gmep);
1790		kmem_free(gmep, sizeof (guid_map_entry_t));
1791	}
1792	avl_destroy(ca);
1793	kmem_free(ca, sizeof (avl_tree_t));
1794}
1795
1796static int
1797restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid)
1798{
1799	struct uio auio;
1800	struct iovec aiov;
1801	int error;
1802
1803	aiov.iov_base = buf;
1804	aiov.iov_len = len;
1805	auio.uio_iov = &aiov;
1806	auio.uio_iovcnt = 1;
1807	auio.uio_resid = len;
1808	auio.uio_segflg = UIO_SYSSPACE;
1809	auio.uio_rw = UIO_READ;
1810	auio.uio_offset = off;
1811	auio.uio_td = ra->td;
1812#ifdef _KERNEL
1813	error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td);
1814#else
1815	fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__);
1816	error = EOPNOTSUPP;
1817#endif
1818	*resid = auio.uio_resid;
1819	return (error);
1820}
1821
1822static int
1823receive_read(struct receive_arg *ra, int len, void *buf)
1824{
1825	int done = 0;
1826
1827	/* some things will require 8-byte alignment, so everything must */
1828	ASSERT0(len % 8);
1829
1830	while (done < len) {
1831		ssize_t resid;
1832
1833		ra->err = restore_bytes(ra, buf + done,
1834		    len - done, ra->voff, &resid);
1835
1836		if (resid == len - done) {
1837			/*
1838			 * Note: ECKSUM indicates that the receive
1839			 * was interrupted and can potentially be resumed.
1840			 */
1841			ra->err = SET_ERROR(ECKSUM);
1842		}
1843		ra->voff += len - done - resid;
1844		done = len - resid;
1845		if (ra->err != 0)
1846			return (ra->err);
1847	}
1848
1849	ra->bytes_read += len;
1850
1851	ASSERT3U(done, ==, len);
1852	return (0);
1853}
1854
1855static void
1856byteswap_record(dmu_replay_record_t *drr)
1857{
1858#define	DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1859#define	DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1860	drr->drr_type = BSWAP_32(drr->drr_type);
1861	drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1862
1863	switch (drr->drr_type) {
1864	case DRR_BEGIN:
1865		DO64(drr_begin.drr_magic);
1866		DO64(drr_begin.drr_versioninfo);
1867		DO64(drr_begin.drr_creation_time);
1868		DO32(drr_begin.drr_type);
1869		DO32(drr_begin.drr_flags);
1870		DO64(drr_begin.drr_toguid);
1871		DO64(drr_begin.drr_fromguid);
1872		break;
1873	case DRR_OBJECT:
1874		DO64(drr_object.drr_object);
1875		DO32(drr_object.drr_type);
1876		DO32(drr_object.drr_bonustype);
1877		DO32(drr_object.drr_blksz);
1878		DO32(drr_object.drr_bonuslen);
1879		DO64(drr_object.drr_toguid);
1880		break;
1881	case DRR_FREEOBJECTS:
1882		DO64(drr_freeobjects.drr_firstobj);
1883		DO64(drr_freeobjects.drr_numobjs);
1884		DO64(drr_freeobjects.drr_toguid);
1885		break;
1886	case DRR_WRITE:
1887		DO64(drr_write.drr_object);
1888		DO32(drr_write.drr_type);
1889		DO64(drr_write.drr_offset);
1890		DO64(drr_write.drr_length);
1891		DO64(drr_write.drr_toguid);
1892		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1893		DO64(drr_write.drr_key.ddk_prop);
1894		break;
1895	case DRR_WRITE_BYREF:
1896		DO64(drr_write_byref.drr_object);
1897		DO64(drr_write_byref.drr_offset);
1898		DO64(drr_write_byref.drr_length);
1899		DO64(drr_write_byref.drr_toguid);
1900		DO64(drr_write_byref.drr_refguid);
1901		DO64(drr_write_byref.drr_refobject);
1902		DO64(drr_write_byref.drr_refoffset);
1903		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1904		    drr_key.ddk_cksum);
1905		DO64(drr_write_byref.drr_key.ddk_prop);
1906		break;
1907	case DRR_WRITE_EMBEDDED:
1908		DO64(drr_write_embedded.drr_object);
1909		DO64(drr_write_embedded.drr_offset);
1910		DO64(drr_write_embedded.drr_length);
1911		DO64(drr_write_embedded.drr_toguid);
1912		DO32(drr_write_embedded.drr_lsize);
1913		DO32(drr_write_embedded.drr_psize);
1914		break;
1915	case DRR_FREE:
1916		DO64(drr_free.drr_object);
1917		DO64(drr_free.drr_offset);
1918		DO64(drr_free.drr_length);
1919		DO64(drr_free.drr_toguid);
1920		break;
1921	case DRR_SPILL:
1922		DO64(drr_spill.drr_object);
1923		DO64(drr_spill.drr_length);
1924		DO64(drr_spill.drr_toguid);
1925		break;
1926	case DRR_END:
1927		DO64(drr_end.drr_toguid);
1928		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1929		break;
1930	}
1931
1932	if (drr->drr_type != DRR_BEGIN) {
1933		ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1934	}
1935
1936#undef DO64
1937#undef DO32
1938}
1939
1940static inline uint8_t
1941deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1942{
1943	if (bonus_type == DMU_OT_SA) {
1944		return (1);
1945	} else {
1946		return (1 +
1947		    ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT));
1948	}
1949}
1950
1951static void
1952save_resume_state(struct receive_writer_arg *rwa,
1953    uint64_t object, uint64_t offset, dmu_tx_t *tx)
1954{
1955	int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1956
1957	if (!rwa->resumable)
1958		return;
1959
1960	/*
1961	 * We use ds_resume_bytes[] != 0 to indicate that we need to
1962	 * update this on disk, so it must not be 0.
1963	 */
1964	ASSERT(rwa->bytes_read != 0);
1965
1966	/*
1967	 * We only resume from write records, which have a valid
1968	 * (non-meta-dnode) object number.
1969	 */
1970	ASSERT(object != 0);
1971
1972	/*
1973	 * For resuming to work correctly, we must receive records in order,
1974	 * sorted by object,offset.  This is checked by the callers, but
1975	 * assert it here for good measure.
1976	 */
1977	ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1978	ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1979	    offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1980	ASSERT3U(rwa->bytes_read, >=,
1981	    rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1982
1983	rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1984	rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1985	rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1986}
1987
1988static int
1989receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1990    void *data)
1991{
1992	dmu_object_info_t doi;
1993	dmu_tx_t *tx;
1994	uint64_t object;
1995	int err;
1996
1997	if (drro->drr_type == DMU_OT_NONE ||
1998	    !DMU_OT_IS_VALID(drro->drr_type) ||
1999	    !DMU_OT_IS_VALID(drro->drr_bonustype) ||
2000	    drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
2001	    drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
2002	    P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
2003	    drro->drr_blksz < SPA_MINBLOCKSIZE ||
2004	    drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
2005	    drro->drr_bonuslen > DN_MAX_BONUSLEN) {
2006		return (SET_ERROR(EINVAL));
2007	}
2008
2009	err = dmu_object_info(rwa->os, drro->drr_object, &doi);
2010
2011	if (err != 0 && err != ENOENT)
2012		return (SET_ERROR(EINVAL));
2013	object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
2014
2015	/*
2016	 * If we are losing blkptrs or changing the block size this must
2017	 * be a new file instance.  We must clear out the previous file
2018	 * contents before we can change this type of metadata in the dnode.
2019	 */
2020	if (err == 0) {
2021		int nblkptr;
2022
2023		nblkptr = deduce_nblkptr(drro->drr_bonustype,
2024		    drro->drr_bonuslen);
2025
2026		if (drro->drr_blksz != doi.doi_data_block_size ||
2027		    nblkptr < doi.doi_nblkptr) {
2028			err = dmu_free_long_range(rwa->os, drro->drr_object,
2029			    0, DMU_OBJECT_END);
2030			if (err != 0)
2031				return (SET_ERROR(EINVAL));
2032		}
2033	}
2034
2035	tx = dmu_tx_create(rwa->os);
2036	dmu_tx_hold_bonus(tx, object);
2037	err = dmu_tx_assign(tx, TXG_WAIT);
2038	if (err != 0) {
2039		dmu_tx_abort(tx);
2040		return (err);
2041	}
2042
2043	if (object == DMU_NEW_OBJECT) {
2044		/* currently free, want to be allocated */
2045		err = dmu_object_claim(rwa->os, drro->drr_object,
2046		    drro->drr_type, drro->drr_blksz,
2047		    drro->drr_bonustype, drro->drr_bonuslen, tx);
2048	} else if (drro->drr_type != doi.doi_type ||
2049	    drro->drr_blksz != doi.doi_data_block_size ||
2050	    drro->drr_bonustype != doi.doi_bonus_type ||
2051	    drro->drr_bonuslen != doi.doi_bonus_size) {
2052		/* currently allocated, but with different properties */
2053		err = dmu_object_reclaim(rwa->os, drro->drr_object,
2054		    drro->drr_type, drro->drr_blksz,
2055		    drro->drr_bonustype, drro->drr_bonuslen, tx);
2056	}
2057	if (err != 0) {
2058		dmu_tx_commit(tx);
2059		return (SET_ERROR(EINVAL));
2060	}
2061
2062	dmu_object_set_checksum(rwa->os, drro->drr_object,
2063	    drro->drr_checksumtype, tx);
2064	dmu_object_set_compress(rwa->os, drro->drr_object,
2065	    drro->drr_compress, tx);
2066
2067	if (data != NULL) {
2068		dmu_buf_t *db;
2069
2070		VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
2071		dmu_buf_will_dirty(db, tx);
2072
2073		ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2074		bcopy(data, db->db_data, drro->drr_bonuslen);
2075		if (rwa->byteswap) {
2076			dmu_object_byteswap_t byteswap =
2077			    DMU_OT_BYTESWAP(drro->drr_bonustype);
2078			dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2079			    drro->drr_bonuslen);
2080		}
2081		dmu_buf_rele(db, FTAG);
2082	}
2083	dmu_tx_commit(tx);
2084
2085	return (0);
2086}
2087
2088/* ARGSUSED */
2089static int
2090receive_freeobjects(struct receive_writer_arg *rwa,
2091    struct drr_freeobjects *drrfo)
2092{
2093	uint64_t obj;
2094	int next_err = 0;
2095
2096	if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2097		return (SET_ERROR(EINVAL));
2098
2099	for (obj = drrfo->drr_firstobj;
2100	    obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
2101	    next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2102		int err;
2103
2104		if (dmu_object_info(rwa->os, obj, NULL) != 0)
2105			continue;
2106
2107		err = dmu_free_long_object(rwa->os, obj);
2108		if (err != 0)
2109			return (err);
2110	}
2111	if (next_err != ESRCH)
2112		return (next_err);
2113	return (0);
2114}
2115
2116static int
2117receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
2118    arc_buf_t *abuf)
2119{
2120	dmu_tx_t *tx;
2121	int err;
2122
2123	if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
2124	    !DMU_OT_IS_VALID(drrw->drr_type))
2125		return (SET_ERROR(EINVAL));
2126
2127	/*
2128	 * For resuming to work, records must be in increasing order
2129	 * by (object, offset).
2130	 */
2131	if (drrw->drr_object < rwa->last_object ||
2132	    (drrw->drr_object == rwa->last_object &&
2133	    drrw->drr_offset < rwa->last_offset)) {
2134		return (SET_ERROR(EINVAL));
2135	}
2136	rwa->last_object = drrw->drr_object;
2137	rwa->last_offset = drrw->drr_offset;
2138
2139	if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2140		return (SET_ERROR(EINVAL));
2141
2142	tx = dmu_tx_create(rwa->os);
2143
2144	dmu_tx_hold_write(tx, drrw->drr_object,
2145	    drrw->drr_offset, drrw->drr_length);
2146	err = dmu_tx_assign(tx, TXG_WAIT);
2147	if (err != 0) {
2148		dmu_tx_abort(tx);
2149		return (err);
2150	}
2151	if (rwa->byteswap) {
2152		dmu_object_byteswap_t byteswap =
2153		    DMU_OT_BYTESWAP(drrw->drr_type);
2154		dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2155		    drrw->drr_length);
2156	}
2157
2158	dmu_buf_t *bonus;
2159	if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2160		return (SET_ERROR(EINVAL));
2161	dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2162
2163	/*
2164	 * Note: If the receive fails, we want the resume stream to start
2165	 * with the same record that we last successfully received (as opposed
2166	 * to the next record), so that we can verify that we are
2167	 * resuming from the correct location.
2168	 */
2169	save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2170	dmu_tx_commit(tx);
2171	dmu_buf_rele(bonus, FTAG);
2172
2173	return (0);
2174}
2175
2176/*
2177 * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
2178 * streams to refer to a copy of the data that is already on the
2179 * system because it came in earlier in the stream.  This function
2180 * finds the earlier copy of the data, and uses that copy instead of
2181 * data from the stream to fulfill this write.
2182 */
2183static int
2184receive_write_byref(struct receive_writer_arg *rwa,
2185    struct drr_write_byref *drrwbr)
2186{
2187	dmu_tx_t *tx;
2188	int err;
2189	guid_map_entry_t gmesrch;
2190	guid_map_entry_t *gmep;
2191	avl_index_t where;
2192	objset_t *ref_os = NULL;
2193	dmu_buf_t *dbp;
2194
2195	if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
2196		return (SET_ERROR(EINVAL));
2197
2198	/*
2199	 * If the GUID of the referenced dataset is different from the
2200	 * GUID of the target dataset, find the referenced dataset.
2201	 */
2202	if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
2203		gmesrch.guid = drrwbr->drr_refguid;
2204		if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
2205		    &where)) == NULL) {
2206			return (SET_ERROR(EINVAL));
2207		}
2208		if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
2209			return (SET_ERROR(EINVAL));
2210	} else {
2211		ref_os = rwa->os;
2212	}
2213
2214	err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
2215	    drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
2216	if (err != 0)
2217		return (err);
2218
2219	tx = dmu_tx_create(rwa->os);
2220
2221	dmu_tx_hold_write(tx, drrwbr->drr_object,
2222	    drrwbr->drr_offset, drrwbr->drr_length);
2223	err = dmu_tx_assign(tx, TXG_WAIT);
2224	if (err != 0) {
2225		dmu_tx_abort(tx);
2226		return (err);
2227	}
2228	dmu_write(rwa->os, drrwbr->drr_object,
2229	    drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
2230	dmu_buf_rele(dbp, FTAG);
2231
2232	/* See comment in restore_write. */
2233	save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
2234	dmu_tx_commit(tx);
2235	return (0);
2236}
2237
2238static int
2239receive_write_embedded(struct receive_writer_arg *rwa,
2240    struct drr_write_embedded *drrwe, void *data)
2241{
2242	dmu_tx_t *tx;
2243	int err;
2244
2245	if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2246		return (EINVAL);
2247
2248	if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2249		return (EINVAL);
2250
2251	if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2252		return (EINVAL);
2253	if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2254		return (EINVAL);
2255
2256	tx = dmu_tx_create(rwa->os);
2257
2258	dmu_tx_hold_write(tx, drrwe->drr_object,
2259	    drrwe->drr_offset, drrwe->drr_length);
2260	err = dmu_tx_assign(tx, TXG_WAIT);
2261	if (err != 0) {
2262		dmu_tx_abort(tx);
2263		return (err);
2264	}
2265
2266	dmu_write_embedded(rwa->os, drrwe->drr_object,
2267	    drrwe->drr_offset, data, drrwe->drr_etype,
2268	    drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2269	    rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2270
2271	/* See comment in restore_write. */
2272	save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2273	dmu_tx_commit(tx);
2274	return (0);
2275}
2276
2277static int
2278receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2279    void *data)
2280{
2281	dmu_tx_t *tx;
2282	dmu_buf_t *db, *db_spill;
2283	int err;
2284
2285	if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2286	    drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2287		return (SET_ERROR(EINVAL));
2288
2289	if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2290		return (SET_ERROR(EINVAL));
2291
2292	VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2293	if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
2294		dmu_buf_rele(db, FTAG);
2295		return (err);
2296	}
2297
2298	tx = dmu_tx_create(rwa->os);
2299
2300	dmu_tx_hold_spill(tx, db->db_object);
2301
2302	err = dmu_tx_assign(tx, TXG_WAIT);
2303	if (err != 0) {
2304		dmu_buf_rele(db, FTAG);
2305		dmu_buf_rele(db_spill, FTAG);
2306		dmu_tx_abort(tx);
2307		return (err);
2308	}
2309	dmu_buf_will_dirty(db_spill, tx);
2310
2311	if (db_spill->db_size < drrs->drr_length)
2312		VERIFY(0 == dbuf_spill_set_blksz(db_spill,
2313		    drrs->drr_length, tx));
2314	bcopy(data, db_spill->db_data, drrs->drr_length);
2315
2316	dmu_buf_rele(db, FTAG);
2317	dmu_buf_rele(db_spill, FTAG);
2318
2319	dmu_tx_commit(tx);
2320	return (0);
2321}
2322
2323/* ARGSUSED */
2324static int
2325receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2326{
2327	int err;
2328
2329	if (drrf->drr_length != -1ULL &&
2330	    drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2331		return (SET_ERROR(EINVAL));
2332
2333	if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2334		return (SET_ERROR(EINVAL));
2335
2336	err = dmu_free_long_range(rwa->os, drrf->drr_object,
2337	    drrf->drr_offset, drrf->drr_length);
2338
2339	return (err);
2340}
2341
2342/* used to destroy the drc_ds on error */
2343static void
2344dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2345{
2346	if (drc->drc_resumable) {
2347		/* wait for our resume state to be written to disk */
2348		txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
2349		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2350	} else {
2351		char name[MAXNAMELEN];
2352		dsl_dataset_name(drc->drc_ds, name);
2353		dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2354		(void) dsl_destroy_head(name);
2355	}
2356}
2357
2358static void
2359receive_cksum(struct receive_arg *ra, int len, void *buf)
2360{
2361	if (ra->byteswap) {
2362		fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2363	} else {
2364		fletcher_4_incremental_native(buf, len, &ra->cksum);
2365	}
2366}
2367
2368/*
2369 * Read the payload into a buffer of size len, and update the current record's
2370 * payload field.
2371 * Allocate ra->next_rrd and read the next record's header into
2372 * ra->next_rrd->header.
2373 * Verify checksum of payload and next record.
2374 */
2375static int
2376receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2377{
2378	int err;
2379
2380	if (len != 0) {
2381		ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2382		err = receive_read(ra, len, buf);
2383		if (err != 0)
2384			return (err);
2385		receive_cksum(ra, len, buf);
2386
2387		/* note: rrd is NULL when reading the begin record's payload */
2388		if (ra->rrd != NULL) {
2389			ra->rrd->payload = buf;
2390			ra->rrd->payload_size = len;
2391			ra->rrd->bytes_read = ra->bytes_read;
2392		}
2393	}
2394
2395	ra->prev_cksum = ra->cksum;
2396
2397	ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2398	err = receive_read(ra, sizeof (ra->next_rrd->header),
2399	    &ra->next_rrd->header);
2400	ra->next_rrd->bytes_read = ra->bytes_read;
2401	if (err != 0) {
2402		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2403		ra->next_rrd = NULL;
2404		return (err);
2405	}
2406	if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2407		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2408		ra->next_rrd = NULL;
2409		return (SET_ERROR(EINVAL));
2410	}
2411
2412	/*
2413	 * Note: checksum is of everything up to but not including the
2414	 * checksum itself.
2415	 */
2416	ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2417	    ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2418	receive_cksum(ra,
2419	    offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2420	    &ra->next_rrd->header);
2421
2422	zio_cksum_t cksum_orig =
2423	    ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2424	zio_cksum_t *cksump =
2425	    &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2426
2427	if (ra->byteswap)
2428		byteswap_record(&ra->next_rrd->header);
2429
2430	if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2431	    !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2432		kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2433		ra->next_rrd = NULL;
2434		return (SET_ERROR(ECKSUM));
2435	}
2436
2437	receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2438
2439	return (0);
2440}
2441
2442static void
2443objlist_create(struct objlist *list)
2444{
2445	list_create(&list->list, sizeof (struct receive_objnode),
2446	    offsetof(struct receive_objnode, node));
2447	list->last_lookup = 0;
2448}
2449
2450static void
2451objlist_destroy(struct objlist *list)
2452{
2453	for (struct receive_objnode *n = list_remove_head(&list->list);
2454	    n != NULL; n = list_remove_head(&list->list)) {
2455		kmem_free(n, sizeof (*n));
2456	}
2457	list_destroy(&list->list);
2458}
2459
2460/*
2461 * This function looks through the objlist to see if the specified object number
2462 * is contained in the objlist.  In the process, it will remove all object
2463 * numbers in the list that are smaller than the specified object number.  Thus,
2464 * any lookup of an object number smaller than a previously looked up object
2465 * number will always return false; therefore, all lookups should be done in
2466 * ascending order.
2467 */
2468static boolean_t
2469objlist_exists(struct objlist *list, uint64_t object)
2470{
2471	struct receive_objnode *node = list_head(&list->list);
2472	ASSERT3U(object, >=, list->last_lookup);
2473	list->last_lookup = object;
2474	while (node != NULL && node->object < object) {
2475		VERIFY3P(node, ==, list_remove_head(&list->list));
2476		kmem_free(node, sizeof (*node));
2477		node = list_head(&list->list);
2478	}
2479	return (node != NULL && node->object == object);
2480}
2481
2482/*
2483 * The objlist is a list of object numbers stored in ascending order.  However,
2484 * the insertion of new object numbers does not seek out the correct location to
2485 * store a new object number; instead, it appends it to the list for simplicity.
2486 * Thus, any users must take care to only insert new object numbers in ascending
2487 * order.
2488 */
2489static void
2490objlist_insert(struct objlist *list, uint64_t object)
2491{
2492	struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
2493	node->object = object;
2494#ifdef ZFS_DEBUG
2495	struct receive_objnode *last_object = list_tail(&list->list);
2496	uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
2497	ASSERT3U(node->object, >, last_objnum);
2498#endif
2499	list_insert_tail(&list->list, node);
2500}
2501
2502/*
2503 * Issue the prefetch reads for any necessary indirect blocks.
2504 *
2505 * We use the object ignore list to tell us whether or not to issue prefetches
2506 * for a given object.  We do this for both correctness (in case the blocksize
2507 * of an object has changed) and performance (if the object doesn't exist, don't
2508 * needlessly try to issue prefetches).  We also trim the list as we go through
2509 * the stream to prevent it from growing to an unbounded size.
2510 *
2511 * The object numbers within will always be in sorted order, and any write
2512 * records we see will also be in sorted order, but they're not sorted with
2513 * respect to each other (i.e. we can get several object records before
2514 * receiving each object's write records).  As a result, once we've reached a
2515 * given object number, we can safely remove any reference to lower object
2516 * numbers in the ignore list. In practice, we receive up to 32 object records
2517 * before receiving write records, so the list can have up to 32 nodes in it.
2518 */
2519/* ARGSUSED */
2520static void
2521receive_read_prefetch(struct receive_arg *ra,
2522    uint64_t object, uint64_t offset, uint64_t length)
2523{
2524	if (!objlist_exists(&ra->ignore_objlist, object)) {
2525		dmu_prefetch(ra->os, object, 1, offset, length,
2526		    ZIO_PRIORITY_SYNC_READ);
2527	}
2528}
2529
2530/*
2531 * Read records off the stream, issuing any necessary prefetches.
2532 */
2533static int
2534receive_read_record(struct receive_arg *ra)
2535{
2536	int err;
2537
2538	switch (ra->rrd->header.drr_type) {
2539	case DRR_OBJECT:
2540	{
2541		struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2542		uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2543		void *buf = kmem_zalloc(size, KM_SLEEP);
2544		dmu_object_info_t doi;
2545		err = receive_read_payload_and_next_header(ra, size, buf);
2546		if (err != 0) {
2547			kmem_free(buf, size);
2548			return (err);
2549		}
2550		err = dmu_object_info(ra->os, drro->drr_object, &doi);
2551		/*
2552		 * See receive_read_prefetch for an explanation why we're
2553		 * storing this object in the ignore_obj_list.
2554		 */
2555		if (err == ENOENT ||
2556		    (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2557			objlist_insert(&ra->ignore_objlist, drro->drr_object);
2558			err = 0;
2559		}
2560		return (err);
2561	}
2562	case DRR_FREEOBJECTS:
2563	{
2564		err = receive_read_payload_and_next_header(ra, 0, NULL);
2565		return (err);
2566	}
2567	case DRR_WRITE:
2568	{
2569		struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2570		arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2571		    drrw->drr_length);
2572
2573		err = receive_read_payload_and_next_header(ra,
2574		    drrw->drr_length, abuf->b_data);
2575		if (err != 0) {
2576			dmu_return_arcbuf(abuf);
2577			return (err);
2578		}
2579		ra->rrd->write_buf = abuf;
2580		receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2581		    drrw->drr_length);
2582		return (err);
2583	}
2584	case DRR_WRITE_BYREF:
2585	{
2586		struct drr_write_byref *drrwb =
2587		    &ra->rrd->header.drr_u.drr_write_byref;
2588		err = receive_read_payload_and_next_header(ra, 0, NULL);
2589		receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2590		    drrwb->drr_length);
2591		return (err);
2592	}
2593	case DRR_WRITE_EMBEDDED:
2594	{
2595		struct drr_write_embedded *drrwe =
2596		    &ra->rrd->header.drr_u.drr_write_embedded;
2597		uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2598		void *buf = kmem_zalloc(size, KM_SLEEP);
2599
2600		err = receive_read_payload_and_next_header(ra, size, buf);
2601		if (err != 0) {
2602			kmem_free(buf, size);
2603			return (err);
2604		}
2605
2606		receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2607		    drrwe->drr_length);
2608		return (err);
2609	}
2610	case DRR_FREE:
2611	{
2612		/*
2613		 * It might be beneficial to prefetch indirect blocks here, but
2614		 * we don't really have the data to decide for sure.
2615		 */
2616		err = receive_read_payload_and_next_header(ra, 0, NULL);
2617		return (err);
2618	}
2619	case DRR_END:
2620	{
2621		struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2622		if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2623			return (SET_ERROR(ECKSUM));
2624		return (0);
2625	}
2626	case DRR_SPILL:
2627	{
2628		struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2629		void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2630		err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2631		    buf);
2632		if (err != 0)
2633			kmem_free(buf, drrs->drr_length);
2634		return (err);
2635	}
2636	default:
2637		return (SET_ERROR(EINVAL));
2638	}
2639}
2640
2641/*
2642 * Commit the records to the pool.
2643 */
2644static int
2645receive_process_record(struct receive_writer_arg *rwa,
2646    struct receive_record_arg *rrd)
2647{
2648	int err;
2649
2650	/* Processing in order, therefore bytes_read should be increasing. */
2651	ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2652	rwa->bytes_read = rrd->bytes_read;
2653
2654	switch (rrd->header.drr_type) {
2655	case DRR_OBJECT:
2656	{
2657		struct drr_object *drro = &rrd->header.drr_u.drr_object;
2658		err = receive_object(rwa, drro, rrd->payload);
2659		kmem_free(rrd->payload, rrd->payload_size);
2660		rrd->payload = NULL;
2661		return (err);
2662	}
2663	case DRR_FREEOBJECTS:
2664	{
2665		struct drr_freeobjects *drrfo =
2666		    &rrd->header.drr_u.drr_freeobjects;
2667		return (receive_freeobjects(rwa, drrfo));
2668	}
2669	case DRR_WRITE:
2670	{
2671		struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2672		err = receive_write(rwa, drrw, rrd->write_buf);
2673		/* if receive_write() is successful, it consumes the arc_buf */
2674		if (err != 0)
2675			dmu_return_arcbuf(rrd->write_buf);
2676		rrd->write_buf = NULL;
2677		rrd->payload = NULL;
2678		return (err);
2679	}
2680	case DRR_WRITE_BYREF:
2681	{
2682		struct drr_write_byref *drrwbr =
2683		    &rrd->header.drr_u.drr_write_byref;
2684		return (receive_write_byref(rwa, drrwbr));
2685	}
2686	case DRR_WRITE_EMBEDDED:
2687	{
2688		struct drr_write_embedded *drrwe =
2689		    &rrd->header.drr_u.drr_write_embedded;
2690		err = receive_write_embedded(rwa, drrwe, rrd->payload);
2691		kmem_free(rrd->payload, rrd->payload_size);
2692		rrd->payload = NULL;
2693		return (err);
2694	}
2695	case DRR_FREE:
2696	{
2697		struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2698		return (receive_free(rwa, drrf));
2699	}
2700	case DRR_SPILL:
2701	{
2702		struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2703		err = receive_spill(rwa, drrs, rrd->payload);
2704		kmem_free(rrd->payload, rrd->payload_size);
2705		rrd->payload = NULL;
2706		return (err);
2707	}
2708	default:
2709		return (SET_ERROR(EINVAL));
2710	}
2711}
2712
2713/*
2714 * dmu_recv_stream's worker thread; pull records off the queue, and then call
2715 * receive_process_record  When we're done, signal the main thread and exit.
2716 */
2717static void
2718receive_writer_thread(void *arg)
2719{
2720	struct receive_writer_arg *rwa = arg;
2721	struct receive_record_arg *rrd;
2722	for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2723	    rrd = bqueue_dequeue(&rwa->q)) {
2724		/*
2725		 * If there's an error, the main thread will stop putting things
2726		 * on the queue, but we need to clear everything in it before we
2727		 * can exit.
2728		 */
2729		if (rwa->err == 0) {
2730			rwa->err = receive_process_record(rwa, rrd);
2731		} else if (rrd->write_buf != NULL) {
2732			dmu_return_arcbuf(rrd->write_buf);
2733			rrd->write_buf = NULL;
2734			rrd->payload = NULL;
2735		} else if (rrd->payload != NULL) {
2736			kmem_free(rrd->payload, rrd->payload_size);
2737			rrd->payload = NULL;
2738		}
2739		kmem_free(rrd, sizeof (*rrd));
2740	}
2741	kmem_free(rrd, sizeof (*rrd));
2742	mutex_enter(&rwa->mutex);
2743	rwa->done = B_TRUE;
2744	cv_signal(&rwa->cv);
2745	mutex_exit(&rwa->mutex);
2746	thread_exit();
2747}
2748
2749static int
2750resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2751{
2752	uint64_t val;
2753	objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2754	uint64_t dsobj = dmu_objset_id(ra->os);
2755	uint64_t resume_obj, resume_off;
2756
2757	if (nvlist_lookup_uint64(begin_nvl,
2758	    "resume_object", &resume_obj) != 0 ||
2759	    nvlist_lookup_uint64(begin_nvl,
2760	    "resume_offset", &resume_off) != 0) {
2761		return (SET_ERROR(EINVAL));
2762	}
2763	VERIFY0(zap_lookup(mos, dsobj,
2764	    DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2765	if (resume_obj != val)
2766		return (SET_ERROR(EINVAL));
2767	VERIFY0(zap_lookup(mos, dsobj,
2768	    DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2769	if (resume_off != val)
2770		return (SET_ERROR(EINVAL));
2771
2772	return (0);
2773}
2774
2775/*
2776 * Read in the stream's records, one by one, and apply them to the pool.  There
2777 * are two threads involved; the thread that calls this function will spin up a
2778 * worker thread, read the records off the stream one by one, and issue
2779 * prefetches for any necessary indirect blocks.  It will then push the records
2780 * onto an internal blocking queue.  The worker thread will pull the records off
2781 * the queue, and actually write the data into the DMU.  This way, the worker
2782 * thread doesn't have to wait for reads to complete, since everything it needs
2783 * (the indirect blocks) will be prefetched.
2784 *
2785 * NB: callers *must* call dmu_recv_end() if this succeeds.
2786 */
2787int
2788dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp,
2789    int cleanup_fd, uint64_t *action_handlep)
2790{
2791	int err = 0;
2792	struct receive_arg ra = { 0 };
2793	struct receive_writer_arg rwa = { 0 };
2794	int featureflags;
2795	nvlist_t *begin_nvl = NULL;
2796
2797	ra.byteswap = drc->drc_byteswap;
2798	ra.cksum = drc->drc_cksum;
2799	ra.td = curthread;
2800	ra.fp = fp;
2801	ra.voff = *voffp;
2802
2803	if (dsl_dataset_is_zapified(drc->drc_ds)) {
2804		(void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2805		    drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2806		    sizeof (ra.bytes_read), 1, &ra.bytes_read);
2807	}
2808
2809	objlist_create(&ra.ignore_objlist);
2810
2811	/* these were verified in dmu_recv_begin */
2812	ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2813	    DMU_SUBSTREAM);
2814	ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2815
2816	/*
2817	 * Open the objset we are modifying.
2818	 */
2819	VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
2820
2821	ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2822
2823	featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2824
2825	/* if this stream is dedup'ed, set up the avl tree for guid mapping */
2826	if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2827		minor_t minor;
2828
2829		if (cleanup_fd == -1) {
2830			ra.err = SET_ERROR(EBADF);
2831			goto out;
2832		}
2833		ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2834		if (ra.err != 0) {
2835			cleanup_fd = -1;
2836			goto out;
2837		}
2838
2839		if (*action_handlep == 0) {
2840			rwa.guid_to_ds_map =
2841			    kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2842			avl_create(rwa.guid_to_ds_map, guid_compare,
2843			    sizeof (guid_map_entry_t),
2844			    offsetof(guid_map_entry_t, avlnode));
2845			err = zfs_onexit_add_cb(minor,
2846			    free_guid_map_onexit, rwa.guid_to_ds_map,
2847			    action_handlep);
2848			if (ra.err != 0)
2849				goto out;
2850		} else {
2851			err = zfs_onexit_cb_data(minor, *action_handlep,
2852			    (void **)&rwa.guid_to_ds_map);
2853			if (ra.err != 0)
2854				goto out;
2855		}
2856
2857		drc->drc_guid_to_ds_map = rwa.guid_to_ds_map;
2858	}
2859
2860	uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
2861	void *payload = NULL;
2862	if (payloadlen != 0)
2863		payload = kmem_alloc(payloadlen, KM_SLEEP);
2864
2865	err = receive_read_payload_and_next_header(&ra, payloadlen, payload);
2866	if (err != 0) {
2867		if (payloadlen != 0)
2868			kmem_free(payload, payloadlen);
2869		goto out;
2870	}
2871	if (payloadlen != 0) {
2872		err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2873		kmem_free(payload, payloadlen);
2874		if (err != 0)
2875			goto out;
2876	}
2877
2878	if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2879		err = resume_check(&ra, begin_nvl);
2880		if (err != 0)
2881			goto out;
2882	}
2883
2884	(void) bqueue_init(&rwa.q, zfs_recv_queue_length,
2885	    offsetof(struct receive_record_arg, node));
2886	cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL);
2887	mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL);
2888	rwa.os = ra.os;
2889	rwa.byteswap = drc->drc_byteswap;
2890	rwa.resumable = drc->drc_resumable;
2891
2892	(void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0,
2893	    TS_RUN, minclsyspri);
2894	/*
2895	 * We're reading rwa.err without locks, which is safe since we are the
2896	 * only reader, and the worker thread is the only writer.  It's ok if we
2897	 * miss a write for an iteration or two of the loop, since the writer
2898	 * thread will keep freeing records we send it until we send it an eos
2899	 * marker.
2900	 *
2901	 * We can leave this loop in 3 ways:  First, if rwa.err is
2902	 * non-zero.  In that case, the writer thread will free the rrd we just
2903	 * pushed.  Second, if  we're interrupted; in that case, either it's the
2904	 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
2905	 * has been handed off to the writer thread who will free it.  Finally,
2906	 * if receive_read_record fails or we're at the end of the stream, then
2907	 * we free ra.rrd and exit.
2908	 */
2909	while (rwa.err == 0) {
2910		if (issig(JUSTLOOKING) && issig(FORREAL)) {
2911			err = SET_ERROR(EINTR);
2912			break;
2913		}
2914
2915		ASSERT3P(ra.rrd, ==, NULL);
2916		ra.rrd = ra.next_rrd;
2917		ra.next_rrd = NULL;
2918		/* Allocates and loads header into ra.next_rrd */
2919		err = receive_read_record(&ra);
2920
2921		if (ra.rrd->header.drr_type == DRR_END || err != 0) {
2922			kmem_free(ra.rrd, sizeof (*ra.rrd));
2923			ra.rrd = NULL;
2924			break;
2925		}
2926
2927		bqueue_enqueue(&rwa.q, ra.rrd,
2928		    sizeof (struct receive_record_arg) + ra.rrd->payload_size);
2929		ra.rrd = NULL;
2930	}
2931	if (ra.next_rrd == NULL)
2932		ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP);
2933	ra.next_rrd->eos_marker = B_TRUE;
2934	bqueue_enqueue(&rwa.q, ra.next_rrd, 1);
2935
2936	mutex_enter(&rwa.mutex);
2937	while (!rwa.done) {
2938		cv_wait(&rwa.cv, &rwa.mutex);
2939	}
2940	mutex_exit(&rwa.mutex);
2941
2942	cv_destroy(&rwa.cv);
2943	mutex_destroy(&rwa.mutex);
2944	bqueue_destroy(&rwa.q);
2945	if (err == 0)
2946		err = rwa.err;
2947
2948out:
2949	nvlist_free(begin_nvl);
2950	if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2951		zfs_onexit_fd_rele(cleanup_fd);
2952
2953	if (err != 0) {
2954		/*
2955		 * Clean up references. If receive is not resumable,
2956		 * destroy what we created, so we don't leave it in
2957		 * the inconsistent state.
2958		 */
2959		dmu_recv_cleanup_ds(drc);
2960	}
2961
2962	*voffp = ra.voff;
2963	objlist_destroy(&ra.ignore_objlist);
2964	return (err);
2965}
2966
2967static int
2968dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2969{
2970	dmu_recv_cookie_t *drc = arg;
2971	dsl_pool_t *dp = dmu_tx_pool(tx);
2972	int error;
2973
2974	ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2975
2976	if (!drc->drc_newfs) {
2977		dsl_dataset_t *origin_head;
2978
2979		error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2980		if (error != 0)
2981			return (error);
2982		if (drc->drc_force) {
2983			/*
2984			 * We will destroy any snapshots in tofs (i.e. before
2985			 * origin_head) that are after the origin (which is
2986			 * the snap before drc_ds, because drc_ds can not
2987			 * have any snaps of its own).
2988			 */
2989			uint64_t obj;
2990
2991			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2992			while (obj !=
2993			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2994				dsl_dataset_t *snap;
2995				error = dsl_dataset_hold_obj(dp, obj, FTAG,
2996				    &snap);
2997				if (error != 0)
2998					break;
2999				if (snap->ds_dir != origin_head->ds_dir)
3000					error = SET_ERROR(EINVAL);
3001				if (error == 0)  {
3002					error = dsl_destroy_snapshot_check_impl(
3003					    snap, B_FALSE);
3004				}
3005				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3006				dsl_dataset_rele(snap, FTAG);
3007				if (error != 0)
3008					break;
3009			}
3010			if (error != 0) {
3011				dsl_dataset_rele(origin_head, FTAG);
3012				return (error);
3013			}
3014		}
3015		error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3016		    origin_head, drc->drc_force, drc->drc_owner, tx);
3017		if (error != 0) {
3018			dsl_dataset_rele(origin_head, FTAG);
3019			return (error);
3020		}
3021		error = dsl_dataset_snapshot_check_impl(origin_head,
3022		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3023		dsl_dataset_rele(origin_head, FTAG);
3024		if (error != 0)
3025			return (error);
3026
3027		error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3028	} else {
3029		error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3030		    drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3031	}
3032	return (error);
3033}
3034
3035static void
3036dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3037{
3038	dmu_recv_cookie_t *drc = arg;
3039	dsl_pool_t *dp = dmu_tx_pool(tx);
3040
3041	spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3042	    tx, "snap=%s", drc->drc_tosnap);
3043
3044	if (!drc->drc_newfs) {
3045		dsl_dataset_t *origin_head;
3046
3047		VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3048		    &origin_head));
3049
3050		if (drc->drc_force) {
3051			/*
3052			 * Destroy any snapshots of drc_tofs (origin_head)
3053			 * after the origin (the snap before drc_ds).
3054			 */
3055			uint64_t obj;
3056
3057			obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3058			while (obj !=
3059			    dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3060				dsl_dataset_t *snap;
3061				VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3062				    &snap));
3063				ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3064				obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3065				dsl_destroy_snapshot_sync_impl(snap,
3066				    B_FALSE, tx);
3067				dsl_dataset_rele(snap, FTAG);
3068			}
3069		}
3070		VERIFY3P(drc->drc_ds->ds_prev, ==,
3071		    origin_head->ds_prev);
3072
3073		dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3074		    origin_head, tx);
3075		dsl_dataset_snapshot_sync_impl(origin_head,
3076		    drc->drc_tosnap, tx);
3077
3078		/* set snapshot's creation time and guid */
3079		dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3080		dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3081		    drc->drc_drrb->drr_creation_time;
3082		dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3083		    drc->drc_drrb->drr_toguid;
3084		dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3085		    ~DS_FLAG_INCONSISTENT;
3086
3087		dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3088		dsl_dataset_phys(origin_head)->ds_flags &=
3089		    ~DS_FLAG_INCONSISTENT;
3090
3091		dsl_dataset_rele(origin_head, FTAG);
3092		dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3093
3094		if (drc->drc_owner != NULL)
3095			VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3096	} else {
3097		dsl_dataset_t *ds = drc->drc_ds;
3098
3099		dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3100
3101		/* set snapshot's creation time and guid */
3102		dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3103		dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3104		    drc->drc_drrb->drr_creation_time;
3105		dsl_dataset_phys(ds->ds_prev)->ds_guid =
3106		    drc->drc_drrb->drr_toguid;
3107		dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3108		    ~DS_FLAG_INCONSISTENT;
3109
3110		dmu_buf_will_dirty(ds->ds_dbuf, tx);
3111		dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3112		if (dsl_dataset_has_resume_receive_state(ds)) {
3113			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3114			    DS_FIELD_RESUME_FROMGUID, tx);
3115			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3116			    DS_FIELD_RESUME_OBJECT, tx);
3117			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3118			    DS_FIELD_RESUME_OFFSET, tx);
3119			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3120			    DS_FIELD_RESUME_BYTES, tx);
3121			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3122			    DS_FIELD_RESUME_TOGUID, tx);
3123			(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3124			    DS_FIELD_RESUME_TONAME, tx);
3125		}
3126	}
3127	drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3128	/*
3129	 * Release the hold from dmu_recv_begin.  This must be done before
3130	 * we return to open context, so that when we free the dataset's dnode,
3131	 * we can evict its bonus buffer.
3132	 */
3133	dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
3134	drc->drc_ds = NULL;
3135}
3136
3137static int
3138add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
3139{
3140	dsl_pool_t *dp;
3141	dsl_dataset_t *snapds;
3142	guid_map_entry_t *gmep;
3143	int err;
3144
3145	ASSERT(guid_map != NULL);
3146
3147	err = dsl_pool_hold(name, FTAG, &dp);
3148	if (err != 0)
3149		return (err);
3150	gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
3151	err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
3152	if (err == 0) {
3153		gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
3154		gmep->gme_ds = snapds;
3155		avl_add(guid_map, gmep);
3156		dsl_dataset_long_hold(snapds, gmep);
3157	} else
3158		kmem_free(gmep, sizeof (*gmep));
3159
3160	dsl_pool_rele(dp, FTAG);
3161	return (err);
3162}
3163
3164static int dmu_recv_end_modified_blocks = 3;
3165
3166static int
3167dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3168{
3169	int error;
3170	char name[MAXNAMELEN];
3171
3172#ifdef _KERNEL
3173	/*
3174	 * We will be destroying the ds; make sure its origin is unmounted if
3175	 * necessary.
3176	 */
3177	dsl_dataset_name(drc->drc_ds, name);
3178	zfs_destroy_unmount_origin(name);
3179#endif
3180
3181	error = dsl_sync_task(drc->drc_tofs,
3182	    dmu_recv_end_check, dmu_recv_end_sync, drc,
3183	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3184
3185	if (error != 0)
3186		dmu_recv_cleanup_ds(drc);
3187	return (error);
3188}
3189
3190static int
3191dmu_recv_new_end(dmu_recv_cookie_t *drc)
3192{
3193	int error;
3194
3195	error = dsl_sync_task(drc->drc_tofs,
3196	    dmu_recv_end_check, dmu_recv_end_sync, drc,
3197	    dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
3198
3199	if (error != 0) {
3200		dmu_recv_cleanup_ds(drc);
3201	} else if (drc->drc_guid_to_ds_map != NULL) {
3202		(void) add_ds_to_guidmap(drc->drc_tofs,
3203		    drc->drc_guid_to_ds_map,
3204		    drc->drc_newsnapobj);
3205	}
3206	return (error);
3207}
3208
3209int
3210dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3211{
3212	drc->drc_owner = owner;
3213
3214	if (drc->drc_newfs)
3215		return (dmu_recv_new_end(drc));
3216	else
3217		return (dmu_recv_existing_end(drc));
3218}
3219
3220/*
3221 * Return TRUE if this objset is currently being received into.
3222 */
3223boolean_t
3224dmu_objset_is_receiving(objset_t *os)
3225{
3226	return (os->os_dsl_dataset != NULL &&
3227	    os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3228}
3229