dmu_send.c revision 297102
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved. 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 */ 29 30#include <sys/dmu.h> 31#include <sys/dmu_impl.h> 32#include <sys/dmu_tx.h> 33#include <sys/dbuf.h> 34#include <sys/dnode.h> 35#include <sys/zfs_context.h> 36#include <sys/dmu_objset.h> 37#include <sys/dmu_traverse.h> 38#include <sys/dsl_dataset.h> 39#include <sys/dsl_dir.h> 40#include <sys/dsl_prop.h> 41#include <sys/dsl_pool.h> 42#include <sys/dsl_synctask.h> 43#include <sys/zfs_ioctl.h> 44#include <sys/zap.h> 45#include <sys/zio_checksum.h> 46#include <sys/zfs_znode.h> 47#include <zfs_fletcher.h> 48#include <sys/avl.h> 49#include <sys/ddt.h> 50#include <sys/zfs_onexit.h> 51#include <sys/dmu_send.h> 52#include <sys/dsl_destroy.h> 53#include <sys/blkptr.h> 54#include <sys/dsl_bookmark.h> 55#include <sys/zfeature.h> 56#include <sys/bqueue.h> 57 58#ifdef __FreeBSD__ 59#undef dump_write 60#define dump_write dmu_dump_write 61#endif 62 63/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 64int zfs_send_corrupt_data = B_FALSE; 65int zfs_send_queue_length = 16 * 1024 * 1024; 66int zfs_recv_queue_length = 16 * 1024 * 1024; 67 68static char *dmu_recv_tag = "dmu_recv_tag"; 69const char *recv_clone_name = "%recv"; 70 71#define BP_SPAN(datablkszsec, indblkshift, level) \ 72 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \ 73 (level) * (indblkshift - SPA_BLKPTRSHIFT))) 74 75static void byteswap_record(dmu_replay_record_t *drr); 76 77struct send_thread_arg { 78 bqueue_t q; 79 dsl_dataset_t *ds; /* Dataset to traverse */ 80 uint64_t fromtxg; /* Traverse from this txg */ 81 int flags; /* flags to pass to traverse_dataset */ 82 int error_code; 83 boolean_t cancel; 84 zbookmark_phys_t resume; 85}; 86 87struct send_block_record { 88 boolean_t eos_marker; /* Marks the end of the stream */ 89 blkptr_t bp; 90 zbookmark_phys_t zb; 91 uint8_t indblkshift; 92 uint16_t datablkszsec; 93 bqueue_node_t ln; 94}; 95 96static int 97dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) 98{ 99 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os); 100 struct uio auio; 101 struct iovec aiov; 102 ASSERT0(len % 8); 103 104 aiov.iov_base = buf; 105 aiov.iov_len = len; 106 auio.uio_iov = &aiov; 107 auio.uio_iovcnt = 1; 108 auio.uio_resid = len; 109 auio.uio_segflg = UIO_SYSSPACE; 110 auio.uio_rw = UIO_WRITE; 111 auio.uio_offset = (off_t)-1; 112 auio.uio_td = dsp->dsa_td; 113#ifdef _KERNEL 114 if (dsp->dsa_fp->f_type == DTYPE_VNODE) 115 bwillwrite(); 116 dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0, 117 dsp->dsa_td); 118#else 119 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 120 dsp->dsa_err = EOPNOTSUPP; 121#endif 122 mutex_enter(&ds->ds_sendstream_lock); 123 *dsp->dsa_off += len; 124 mutex_exit(&ds->ds_sendstream_lock); 125 126 return (dsp->dsa_err); 127} 128 129/* 130 * For all record types except BEGIN, fill in the checksum (overlaid in 131 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 132 * up to the start of the checksum itself. 133 */ 134static int 135dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len) 136{ 137 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 138 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 139 fletcher_4_incremental_native(dsp->dsa_drr, 140 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 141 &dsp->dsa_zc); 142 if (dsp->dsa_drr->drr_type != DRR_BEGIN) { 143 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u. 144 drr_checksum.drr_checksum)); 145 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc; 146 } 147 fletcher_4_incremental_native(&dsp->dsa_drr-> 148 drr_u.drr_checksum.drr_checksum, 149 sizeof (zio_cksum_t), &dsp->dsa_zc); 150 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0) 151 return (SET_ERROR(EINTR)); 152 if (payload_len != 0) { 153 fletcher_4_incremental_native(payload, payload_len, 154 &dsp->dsa_zc); 155 if (dump_bytes(dsp, payload, payload_len) != 0) 156 return (SET_ERROR(EINTR)); 157 } 158 return (0); 159} 160 161/* 162 * Fill in the drr_free struct, or perform aggregation if the previous record is 163 * also a free record, and the two are adjacent. 164 * 165 * Note that we send free records even for a full send, because we want to be 166 * able to receive a full send as a clone, which requires a list of all the free 167 * and freeobject records that were generated on the source. 168 */ 169static int 170dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 171 uint64_t length) 172{ 173 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free); 174 175 /* 176 * When we receive a free record, dbuf_free_range() assumes 177 * that the receiving system doesn't have any dbufs in the range 178 * being freed. This is always true because there is a one-record 179 * constraint: we only send one WRITE record for any given 180 * object,offset. We know that the one-record constraint is 181 * true because we always send data in increasing order by 182 * object,offset. 183 * 184 * If the increasing-order constraint ever changes, we should find 185 * another way to assert that the one-record constraint is still 186 * satisfied. 187 */ 188 ASSERT(object > dsp->dsa_last_data_object || 189 (object == dsp->dsa_last_data_object && 190 offset > dsp->dsa_last_data_offset)); 191 192 if (length != -1ULL && offset + length < offset) 193 length = -1ULL; 194 195 /* 196 * If there is a pending op, but it's not PENDING_FREE, push it out, 197 * since free block aggregation can only be done for blocks of the 198 * same type (i.e., DRR_FREE records can only be aggregated with 199 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 200 * aggregated with other DRR_FREEOBJECTS records. 201 */ 202 if (dsp->dsa_pending_op != PENDING_NONE && 203 dsp->dsa_pending_op != PENDING_FREE) { 204 if (dump_record(dsp, NULL, 0) != 0) 205 return (SET_ERROR(EINTR)); 206 dsp->dsa_pending_op = PENDING_NONE; 207 } 208 209 if (dsp->dsa_pending_op == PENDING_FREE) { 210 /* 211 * There should never be a PENDING_FREE if length is -1 212 * (because dump_dnode is the only place where this 213 * function is called with a -1, and only after flushing 214 * any pending record). 215 */ 216 ASSERT(length != -1ULL); 217 /* 218 * Check to see whether this free block can be aggregated 219 * with pending one. 220 */ 221 if (drrf->drr_object == object && drrf->drr_offset + 222 drrf->drr_length == offset) { 223 drrf->drr_length += length; 224 return (0); 225 } else { 226 /* not a continuation. Push out pending record */ 227 if (dump_record(dsp, NULL, 0) != 0) 228 return (SET_ERROR(EINTR)); 229 dsp->dsa_pending_op = PENDING_NONE; 230 } 231 } 232 /* create a FREE record and make it pending */ 233 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 234 dsp->dsa_drr->drr_type = DRR_FREE; 235 drrf->drr_object = object; 236 drrf->drr_offset = offset; 237 drrf->drr_length = length; 238 drrf->drr_toguid = dsp->dsa_toguid; 239 if (length == -1ULL) { 240 if (dump_record(dsp, NULL, 0) != 0) 241 return (SET_ERROR(EINTR)); 242 } else { 243 dsp->dsa_pending_op = PENDING_FREE; 244 } 245 246 return (0); 247} 248 249static int 250dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, 251 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data) 252{ 253 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write); 254 255 /* 256 * We send data in increasing object, offset order. 257 * See comment in dump_free() for details. 258 */ 259 ASSERT(object > dsp->dsa_last_data_object || 260 (object == dsp->dsa_last_data_object && 261 offset > dsp->dsa_last_data_offset)); 262 dsp->dsa_last_data_object = object; 263 dsp->dsa_last_data_offset = offset + blksz - 1; 264 265 /* 266 * If there is any kind of pending aggregation (currently either 267 * a grouping of free objects or free blocks), push it out to 268 * the stream, since aggregation can't be done across operations 269 * of different types. 270 */ 271 if (dsp->dsa_pending_op != PENDING_NONE) { 272 if (dump_record(dsp, NULL, 0) != 0) 273 return (SET_ERROR(EINTR)); 274 dsp->dsa_pending_op = PENDING_NONE; 275 } 276 /* write a WRITE record */ 277 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 278 dsp->dsa_drr->drr_type = DRR_WRITE; 279 drrw->drr_object = object; 280 drrw->drr_type = type; 281 drrw->drr_offset = offset; 282 drrw->drr_length = blksz; 283 drrw->drr_toguid = dsp->dsa_toguid; 284 if (bp == NULL || BP_IS_EMBEDDED(bp)) { 285 /* 286 * There's no pre-computed checksum for partial-block 287 * writes or embedded BP's, so (like 288 * fletcher4-checkummed blocks) userland will have to 289 * compute a dedup-capable checksum itself. 290 */ 291 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 292 } else { 293 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 294 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 295 ZCHECKSUM_FLAG_DEDUP) 296 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP; 297 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 298 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 299 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 300 drrw->drr_key.ddk_cksum = bp->blk_cksum; 301 } 302 303 if (dump_record(dsp, data, blksz) != 0) 304 return (SET_ERROR(EINTR)); 305 return (0); 306} 307 308static int 309dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 310 int blksz, const blkptr_t *bp) 311{ 312 char buf[BPE_PAYLOAD_SIZE]; 313 struct drr_write_embedded *drrw = 314 &(dsp->dsa_drr->drr_u.drr_write_embedded); 315 316 if (dsp->dsa_pending_op != PENDING_NONE) { 317 if (dump_record(dsp, NULL, 0) != 0) 318 return (EINTR); 319 dsp->dsa_pending_op = PENDING_NONE; 320 } 321 322 ASSERT(BP_IS_EMBEDDED(bp)); 323 324 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 325 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED; 326 drrw->drr_object = object; 327 drrw->drr_offset = offset; 328 drrw->drr_length = blksz; 329 drrw->drr_toguid = dsp->dsa_toguid; 330 drrw->drr_compression = BP_GET_COMPRESS(bp); 331 drrw->drr_etype = BPE_GET_ETYPE(bp); 332 drrw->drr_lsize = BPE_GET_LSIZE(bp); 333 drrw->drr_psize = BPE_GET_PSIZE(bp); 334 335 decode_embedded_bp_compressed(bp, buf); 336 337 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0) 338 return (EINTR); 339 return (0); 340} 341 342static int 343dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data) 344{ 345 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill); 346 347 if (dsp->dsa_pending_op != PENDING_NONE) { 348 if (dump_record(dsp, NULL, 0) != 0) 349 return (SET_ERROR(EINTR)); 350 dsp->dsa_pending_op = PENDING_NONE; 351 } 352 353 /* write a SPILL record */ 354 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 355 dsp->dsa_drr->drr_type = DRR_SPILL; 356 drrs->drr_object = object; 357 drrs->drr_length = blksz; 358 drrs->drr_toguid = dsp->dsa_toguid; 359 360 if (dump_record(dsp, data, blksz) != 0) 361 return (SET_ERROR(EINTR)); 362 return (0); 363} 364 365static int 366dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs) 367{ 368 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects); 369 370 /* 371 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 372 * push it out, since free block aggregation can only be done for 373 * blocks of the same type (i.e., DRR_FREE records can only be 374 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 375 * can only be aggregated with other DRR_FREEOBJECTS records. 376 */ 377 if (dsp->dsa_pending_op != PENDING_NONE && 378 dsp->dsa_pending_op != PENDING_FREEOBJECTS) { 379 if (dump_record(dsp, NULL, 0) != 0) 380 return (SET_ERROR(EINTR)); 381 dsp->dsa_pending_op = PENDING_NONE; 382 } 383 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) { 384 /* 385 * See whether this free object array can be aggregated 386 * with pending one 387 */ 388 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 389 drrfo->drr_numobjs += numobjs; 390 return (0); 391 } else { 392 /* can't be aggregated. Push out pending record */ 393 if (dump_record(dsp, NULL, 0) != 0) 394 return (SET_ERROR(EINTR)); 395 dsp->dsa_pending_op = PENDING_NONE; 396 } 397 } 398 399 /* write a FREEOBJECTS record */ 400 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 401 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS; 402 drrfo->drr_firstobj = firstobj; 403 drrfo->drr_numobjs = numobjs; 404 drrfo->drr_toguid = dsp->dsa_toguid; 405 406 dsp->dsa_pending_op = PENDING_FREEOBJECTS; 407 408 return (0); 409} 410 411static int 412dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp) 413{ 414 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object); 415 416 if (object < dsp->dsa_resume_object) { 417 /* 418 * Note: when resuming, we will visit all the dnodes in 419 * the block of dnodes that we are resuming from. In 420 * this case it's unnecessary to send the dnodes prior to 421 * the one we are resuming from. We should be at most one 422 * block's worth of dnodes behind the resume point. 423 */ 424 ASSERT3U(dsp->dsa_resume_object - object, <, 425 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 426 return (0); 427 } 428 429 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 430 return (dump_freeobjects(dsp, object, 1)); 431 432 if (dsp->dsa_pending_op != PENDING_NONE) { 433 if (dump_record(dsp, NULL, 0) != 0) 434 return (SET_ERROR(EINTR)); 435 dsp->dsa_pending_op = PENDING_NONE; 436 } 437 438 /* write an OBJECT record */ 439 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 440 dsp->dsa_drr->drr_type = DRR_OBJECT; 441 drro->drr_object = object; 442 drro->drr_type = dnp->dn_type; 443 drro->drr_bonustype = dnp->dn_bonustype; 444 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 445 drro->drr_bonuslen = dnp->dn_bonuslen; 446 drro->drr_checksumtype = dnp->dn_checksum; 447 drro->drr_compress = dnp->dn_compress; 448 drro->drr_toguid = dsp->dsa_toguid; 449 450 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 451 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 452 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 453 454 if (dump_record(dsp, DN_BONUS(dnp), 455 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) { 456 return (SET_ERROR(EINTR)); 457 } 458 459 /* Free anything past the end of the file. */ 460 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) * 461 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0) 462 return (SET_ERROR(EINTR)); 463 if (dsp->dsa_err != 0) 464 return (SET_ERROR(EINTR)); 465 return (0); 466} 467 468static boolean_t 469backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp) 470{ 471 if (!BP_IS_EMBEDDED(bp)) 472 return (B_FALSE); 473 474 /* 475 * Compression function must be legacy, or explicitly enabled. 476 */ 477 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 478 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4))) 479 return (B_FALSE); 480 481 /* 482 * Embed type must be explicitly enabled. 483 */ 484 switch (BPE_GET_ETYPE(bp)) { 485 case BP_EMBEDDED_TYPE_DATA: 486 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 487 return (B_TRUE); 488 break; 489 default: 490 return (B_FALSE); 491 } 492 return (B_FALSE); 493} 494 495/* 496 * This is the callback function to traverse_dataset that acts as the worker 497 * thread for dmu_send_impl. 498 */ 499/*ARGSUSED*/ 500static int 501send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 502 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 503{ 504 struct send_thread_arg *sta = arg; 505 struct send_block_record *record; 506 uint64_t record_size; 507 int err = 0; 508 509 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 510 zb->zb_object >= sta->resume.zb_object); 511 512 if (sta->cancel) 513 return (SET_ERROR(EINTR)); 514 515 if (bp == NULL) { 516 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL); 517 return (0); 518 } else if (zb->zb_level < 0) { 519 return (0); 520 } 521 522 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP); 523 record->eos_marker = B_FALSE; 524 record->bp = *bp; 525 record->zb = *zb; 526 record->indblkshift = dnp->dn_indblkshift; 527 record->datablkszsec = dnp->dn_datablkszsec; 528 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 529 bqueue_enqueue(&sta->q, record, record_size); 530 531 return (err); 532} 533 534/* 535 * This function kicks off the traverse_dataset. It also handles setting the 536 * error code of the thread in case something goes wrong, and pushes the End of 537 * Stream record when the traverse_dataset call has finished. If there is no 538 * dataset to traverse, the thread immediately pushes End of Stream marker. 539 */ 540static void 541send_traverse_thread(void *arg) 542{ 543 struct send_thread_arg *st_arg = arg; 544 int err; 545 struct send_block_record *data; 546 547 if (st_arg->ds != NULL) { 548 err = traverse_dataset_resume(st_arg->ds, 549 st_arg->fromtxg, &st_arg->resume, 550 st_arg->flags, send_cb, st_arg); 551 552 if (err != EINTR) 553 st_arg->error_code = err; 554 } 555 data = kmem_zalloc(sizeof (*data), KM_SLEEP); 556 data->eos_marker = B_TRUE; 557 bqueue_enqueue(&st_arg->q, data, 1); 558 thread_exit(); 559} 560 561/* 562 * This function actually handles figuring out what kind of record needs to be 563 * dumped, reading the data (which has hopefully been prefetched), and calling 564 * the appropriate helper function. 565 */ 566static int 567do_dump(dmu_sendarg_t *dsa, struct send_block_record *data) 568{ 569 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os); 570 const blkptr_t *bp = &data->bp; 571 const zbookmark_phys_t *zb = &data->zb; 572 uint8_t indblkshift = data->indblkshift; 573 uint16_t dblkszsec = data->datablkszsec; 574 spa_t *spa = ds->ds_dir->dd_pool->dp_spa; 575 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE; 576 int err = 0; 577 578 ASSERT3U(zb->zb_level, >=, 0); 579 580 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 581 zb->zb_object >= dsa->dsa_resume_object); 582 583 if (zb->zb_object != DMU_META_DNODE_OBJECT && 584 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) { 585 return (0); 586 } else if (BP_IS_HOLE(bp) && 587 zb->zb_object == DMU_META_DNODE_OBJECT) { 588 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 589 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT; 590 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT); 591 } else if (BP_IS_HOLE(bp)) { 592 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 593 uint64_t offset = zb->zb_blkid * span; 594 err = dump_free(dsa, zb->zb_object, offset, span); 595 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) { 596 return (0); 597 } else if (type == DMU_OT_DNODE) { 598 int blksz = BP_GET_LSIZE(bp); 599 arc_flags_t aflags = ARC_FLAG_WAIT; 600 arc_buf_t *abuf; 601 602 ASSERT0(zb->zb_level); 603 604 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 605 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 606 &aflags, zb) != 0) 607 return (SET_ERROR(EIO)); 608 609 dnode_phys_t *blk = abuf->b_data; 610 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT); 611 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) { 612 err = dump_dnode(dsa, dnobj + i, blk + i); 613 if (err != 0) 614 break; 615 } 616 (void) arc_buf_remove_ref(abuf, &abuf); 617 } else if (type == DMU_OT_SA) { 618 arc_flags_t aflags = ARC_FLAG_WAIT; 619 arc_buf_t *abuf; 620 int blksz = BP_GET_LSIZE(bp); 621 622 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 623 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 624 &aflags, zb) != 0) 625 return (SET_ERROR(EIO)); 626 627 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data); 628 (void) arc_buf_remove_ref(abuf, &abuf); 629 } else if (backup_do_embed(dsa, bp)) { 630 /* it's an embedded level-0 block of a regular object */ 631 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 632 ASSERT0(zb->zb_level); 633 err = dump_write_embedded(dsa, zb->zb_object, 634 zb->zb_blkid * blksz, blksz, bp); 635 } else { 636 /* it's a level-0 block of a regular object */ 637 arc_flags_t aflags = ARC_FLAG_WAIT; 638 arc_buf_t *abuf; 639 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 640 uint64_t offset; 641 642 ASSERT0(zb->zb_level); 643 ASSERT(zb->zb_object > dsa->dsa_resume_object || 644 (zb->zb_object == dsa->dsa_resume_object && 645 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 646 647 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 648 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 649 &aflags, zb) != 0) { 650 if (zfs_send_corrupt_data) { 651 /* Send a block filled with 0x"zfs badd bloc" */ 652 abuf = arc_buf_alloc(spa, blksz, &abuf, 653 ARC_BUFC_DATA); 654 uint64_t *ptr; 655 for (ptr = abuf->b_data; 656 (char *)ptr < (char *)abuf->b_data + blksz; 657 ptr++) 658 *ptr = 0x2f5baddb10cULL; 659 } else { 660 return (SET_ERROR(EIO)); 661 } 662 } 663 664 offset = zb->zb_blkid * blksz; 665 666 if (!(dsa->dsa_featureflags & 667 DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 668 blksz > SPA_OLD_MAXBLOCKSIZE) { 669 char *buf = abuf->b_data; 670 while (blksz > 0 && err == 0) { 671 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE); 672 err = dump_write(dsa, type, zb->zb_object, 673 offset, n, NULL, buf); 674 offset += n; 675 buf += n; 676 blksz -= n; 677 } 678 } else { 679 err = dump_write(dsa, type, zb->zb_object, 680 offset, blksz, bp, abuf->b_data); 681 } 682 (void) arc_buf_remove_ref(abuf, &abuf); 683 } 684 685 ASSERT(err == 0 || err == EINTR); 686 return (err); 687} 688 689/* 690 * Pop the new data off the queue, and free the old data. 691 */ 692static struct send_block_record * 693get_next_record(bqueue_t *bq, struct send_block_record *data) 694{ 695 struct send_block_record *tmp = bqueue_dequeue(bq); 696 kmem_free(data, sizeof (*data)); 697 return (tmp); 698} 699 700/* 701 * Actually do the bulk of the work in a zfs send. 702 * 703 * Note: Releases dp using the specified tag. 704 */ 705static int 706dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds, 707 zfs_bookmark_phys_t *ancestor_zb, 708 boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd, 709 uint64_t resumeobj, uint64_t resumeoff, 710#ifdef illumos 711 vnode_t *vp, offset_t *off) 712#else 713 struct file *fp, offset_t *off) 714#endif 715{ 716 objset_t *os; 717 dmu_replay_record_t *drr; 718 dmu_sendarg_t *dsp; 719 int err; 720 uint64_t fromtxg = 0; 721 uint64_t featureflags = 0; 722 struct send_thread_arg to_arg = { 0 }; 723 724 err = dmu_objset_from_ds(to_ds, &os); 725 if (err != 0) { 726 dsl_pool_rele(dp, tag); 727 return (err); 728 } 729 730 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); 731 drr->drr_type = DRR_BEGIN; 732 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 733 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, 734 DMU_SUBSTREAM); 735 736#ifdef _KERNEL 737 if (dmu_objset_type(os) == DMU_OST_ZFS) { 738 uint64_t version; 739 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) { 740 kmem_free(drr, sizeof (dmu_replay_record_t)); 741 dsl_pool_rele(dp, tag); 742 return (SET_ERROR(EINVAL)); 743 } 744 if (version >= ZPL_VERSION_SA) { 745 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 746 } 747 } 748#endif 749 750 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS]) 751 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 752 if (embedok && 753 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 754 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 755 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 756 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4; 757 } 758 759 if (resumeobj != 0 || resumeoff != 0) { 760 featureflags |= DMU_BACKUP_FEATURE_RESUMING; 761 } 762 763 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo, 764 featureflags); 765 766 drr->drr_u.drr_begin.drr_creation_time = 767 dsl_dataset_phys(to_ds)->ds_creation_time; 768 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os); 769 if (is_clone) 770 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE; 771 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 772 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET) 773 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA; 774 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS; 775 776 if (ancestor_zb != NULL) { 777 drr->drr_u.drr_begin.drr_fromguid = 778 ancestor_zb->zbm_guid; 779 fromtxg = ancestor_zb->zbm_creation_txg; 780 } 781 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname); 782 if (!to_ds->ds_is_snapshot) { 783 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--", 784 sizeof (drr->drr_u.drr_begin.drr_toname)); 785 } 786 787 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP); 788 789 dsp->dsa_drr = drr; 790 dsp->dsa_outfd = outfd; 791 dsp->dsa_proc = curproc; 792 dsp->dsa_td = curthread; 793 dsp->dsa_fp = fp; 794 dsp->dsa_os = os; 795 dsp->dsa_off = off; 796 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid; 797 dsp->dsa_pending_op = PENDING_NONE; 798 dsp->dsa_featureflags = featureflags; 799 dsp->dsa_resume_object = resumeobj; 800 dsp->dsa_resume_offset = resumeoff; 801 802 mutex_enter(&to_ds->ds_sendstream_lock); 803 list_insert_head(&to_ds->ds_sendstreams, dsp); 804 mutex_exit(&to_ds->ds_sendstream_lock); 805 806 dsl_dataset_long_hold(to_ds, FTAG); 807 dsl_pool_rele(dp, tag); 808 809 void *payload = NULL; 810 size_t payload_len = 0; 811 if (resumeobj != 0 || resumeoff != 0) { 812 dmu_object_info_t to_doi; 813 err = dmu_object_info(os, resumeobj, &to_doi); 814 if (err != 0) 815 goto out; 816 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0, 817 resumeoff / to_doi.doi_data_block_size); 818 819 nvlist_t *nvl = fnvlist_alloc(); 820 fnvlist_add_uint64(nvl, "resume_object", resumeobj); 821 fnvlist_add_uint64(nvl, "resume_offset", resumeoff); 822 payload = fnvlist_pack(nvl, &payload_len); 823 drr->drr_payloadlen = payload_len; 824 fnvlist_free(nvl); 825 } 826 827 err = dump_record(dsp, payload, payload_len); 828 fnvlist_pack_free(payload, payload_len); 829 if (err != 0) { 830 err = dsp->dsa_err; 831 goto out; 832 } 833 834 err = bqueue_init(&to_arg.q, zfs_send_queue_length, 835 offsetof(struct send_block_record, ln)); 836 to_arg.error_code = 0; 837 to_arg.cancel = B_FALSE; 838 to_arg.ds = to_ds; 839 to_arg.fromtxg = fromtxg; 840 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH; 841 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0, 842 TS_RUN, minclsyspri); 843 844 struct send_block_record *to_data; 845 to_data = bqueue_dequeue(&to_arg.q); 846 847 while (!to_data->eos_marker && err == 0) { 848 err = do_dump(dsp, to_data); 849 to_data = get_next_record(&to_arg.q, to_data); 850 if (issig(JUSTLOOKING) && issig(FORREAL)) 851 err = EINTR; 852 } 853 854 if (err != 0) { 855 to_arg.cancel = B_TRUE; 856 while (!to_data->eos_marker) { 857 to_data = get_next_record(&to_arg.q, to_data); 858 } 859 } 860 kmem_free(to_data, sizeof (*to_data)); 861 862 bqueue_destroy(&to_arg.q); 863 864 if (err == 0 && to_arg.error_code != 0) 865 err = to_arg.error_code; 866 867 if (err != 0) 868 goto out; 869 870 if (dsp->dsa_pending_op != PENDING_NONE) 871 if (dump_record(dsp, NULL, 0) != 0) 872 err = SET_ERROR(EINTR); 873 874 if (err != 0) { 875 if (err == EINTR && dsp->dsa_err != 0) 876 err = dsp->dsa_err; 877 goto out; 878 } 879 880 bzero(drr, sizeof (dmu_replay_record_t)); 881 drr->drr_type = DRR_END; 882 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc; 883 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid; 884 885 if (dump_record(dsp, NULL, 0) != 0) 886 err = dsp->dsa_err; 887 888out: 889 mutex_enter(&to_ds->ds_sendstream_lock); 890 list_remove(&to_ds->ds_sendstreams, dsp); 891 mutex_exit(&to_ds->ds_sendstream_lock); 892 893 kmem_free(drr, sizeof (dmu_replay_record_t)); 894 kmem_free(dsp, sizeof (dmu_sendarg_t)); 895 896 dsl_dataset_long_rele(to_ds, FTAG); 897 898 return (err); 899} 900 901int 902dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 903 boolean_t embedok, boolean_t large_block_ok, 904#ifdef illumos 905 int outfd, vnode_t *vp, offset_t *off) 906#else 907 int outfd, struct file *fp, offset_t *off) 908#endif 909{ 910 dsl_pool_t *dp; 911 dsl_dataset_t *ds; 912 dsl_dataset_t *fromds = NULL; 913 int err; 914 915 err = dsl_pool_hold(pool, FTAG, &dp); 916 if (err != 0) 917 return (err); 918 919 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds); 920 if (err != 0) { 921 dsl_pool_rele(dp, FTAG); 922 return (err); 923 } 924 925 if (fromsnap != 0) { 926 zfs_bookmark_phys_t zb; 927 boolean_t is_clone; 928 929 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds); 930 if (err != 0) { 931 dsl_dataset_rele(ds, FTAG); 932 dsl_pool_rele(dp, FTAG); 933 return (err); 934 } 935 if (!dsl_dataset_is_before(ds, fromds, 0)) 936 err = SET_ERROR(EXDEV); 937 zb.zbm_creation_time = 938 dsl_dataset_phys(fromds)->ds_creation_time; 939 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg; 940 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 941 is_clone = (fromds->ds_dir != ds->ds_dir); 942 dsl_dataset_rele(fromds, FTAG); 943 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 944 embedok, large_block_ok, outfd, 0, 0, fp, off); 945 } else { 946 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 947 embedok, large_block_ok, outfd, 0, 0, fp, off); 948 } 949 dsl_dataset_rele(ds, FTAG); 950 return (err); 951} 952 953int 954dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 955 boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff, 956#ifdef illumos 957 vnode_t *vp, offset_t *off) 958#else 959 struct file *fp, offset_t *off) 960#endif 961{ 962 dsl_pool_t *dp; 963 dsl_dataset_t *ds; 964 int err; 965 boolean_t owned = B_FALSE; 966 967 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 968 return (SET_ERROR(EINVAL)); 969 970 err = dsl_pool_hold(tosnap, FTAG, &dp); 971 if (err != 0) 972 return (err); 973 974 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) { 975 /* 976 * We are sending a filesystem or volume. Ensure 977 * that it doesn't change by owning the dataset. 978 */ 979 err = dsl_dataset_own(dp, tosnap, FTAG, &ds); 980 owned = B_TRUE; 981 } else { 982 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds); 983 } 984 if (err != 0) { 985 dsl_pool_rele(dp, FTAG); 986 return (err); 987 } 988 989 if (fromsnap != NULL) { 990 zfs_bookmark_phys_t zb; 991 boolean_t is_clone = B_FALSE; 992 int fsnamelen = strchr(tosnap, '@') - tosnap; 993 994 /* 995 * If the fromsnap is in a different filesystem, then 996 * mark the send stream as a clone. 997 */ 998 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 999 (fromsnap[fsnamelen] != '@' && 1000 fromsnap[fsnamelen] != '#')) { 1001 is_clone = B_TRUE; 1002 } 1003 1004 if (strchr(fromsnap, '@')) { 1005 dsl_dataset_t *fromds; 1006 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds); 1007 if (err == 0) { 1008 if (!dsl_dataset_is_before(ds, fromds, 0)) 1009 err = SET_ERROR(EXDEV); 1010 zb.zbm_creation_time = 1011 dsl_dataset_phys(fromds)->ds_creation_time; 1012 zb.zbm_creation_txg = 1013 dsl_dataset_phys(fromds)->ds_creation_txg; 1014 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1015 is_clone = (ds->ds_dir != fromds->ds_dir); 1016 dsl_dataset_rele(fromds, FTAG); 1017 } 1018 } else { 1019 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb); 1020 } 1021 if (err != 0) { 1022 dsl_dataset_rele(ds, FTAG); 1023 dsl_pool_rele(dp, FTAG); 1024 return (err); 1025 } 1026 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1027 embedok, large_block_ok, 1028 outfd, resumeobj, resumeoff, fp, off); 1029 } else { 1030 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1031 embedok, large_block_ok, 1032 outfd, resumeobj, resumeoff, fp, off); 1033 } 1034 if (owned) 1035 dsl_dataset_disown(ds, FTAG); 1036 else 1037 dsl_dataset_rele(ds, FTAG); 1038 return (err); 1039} 1040 1041static int 1042dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size, 1043 uint64_t *sizep) 1044{ 1045 int err; 1046 /* 1047 * Assume that space (both on-disk and in-stream) is dominated by 1048 * data. We will adjust for indirect blocks and the copies property, 1049 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 1050 */ 1051 1052 /* 1053 * Subtract out approximate space used by indirect blocks. 1054 * Assume most space is used by data blocks (non-indirect, non-dnode). 1055 * Assume all blocks are recordsize. Assume ditto blocks and 1056 * internal fragmentation counter out compression. 1057 * 1058 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 1059 * block, which we observe in practice. 1060 */ 1061 uint64_t recordsize; 1062 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize); 1063 if (err != 0) 1064 return (err); 1065 size -= size / recordsize * sizeof (blkptr_t); 1066 1067 /* Add in the space for the record associated with each block. */ 1068 size += size / recordsize * sizeof (dmu_replay_record_t); 1069 1070 *sizep = size; 1071 1072 return (0); 1073} 1074 1075int 1076dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep) 1077{ 1078 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1079 int err; 1080 uint64_t size; 1081 1082 ASSERT(dsl_pool_config_held(dp)); 1083 1084 /* tosnap must be a snapshot */ 1085 if (!ds->ds_is_snapshot) 1086 return (SET_ERROR(EINVAL)); 1087 1088 /* fromsnap, if provided, must be a snapshot */ 1089 if (fromds != NULL && !fromds->ds_is_snapshot) 1090 return (SET_ERROR(EINVAL)); 1091 1092 /* 1093 * fromsnap must be an earlier snapshot from the same fs as tosnap, 1094 * or the origin's fs. 1095 */ 1096 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0)) 1097 return (SET_ERROR(EXDEV)); 1098 1099 /* Get uncompressed size estimate of changed data. */ 1100 if (fromds == NULL) { 1101 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 1102 } else { 1103 uint64_t used, comp; 1104 err = dsl_dataset_space_written(fromds, ds, 1105 &used, &comp, &size); 1106 if (err != 0) 1107 return (err); 1108 } 1109 1110 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1111 return (err); 1112} 1113 1114/* 1115 * Simple callback used to traverse the blocks of a snapshot and sum their 1116 * uncompressed size 1117 */ 1118/* ARGSUSED */ 1119static int 1120dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1121 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg) 1122{ 1123 uint64_t *spaceptr = arg; 1124 if (bp != NULL && !BP_IS_HOLE(bp)) { 1125 *spaceptr += BP_GET_UCSIZE(bp); 1126 } 1127 return (0); 1128} 1129 1130/* 1131 * Given a desination snapshot and a TXG, calculate the approximate size of a 1132 * send stream sent from that TXG. from_txg may be zero, indicating that the 1133 * whole snapshot will be sent. 1134 */ 1135int 1136dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg, 1137 uint64_t *sizep) 1138{ 1139 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1140 int err; 1141 uint64_t size = 0; 1142 1143 ASSERT(dsl_pool_config_held(dp)); 1144 1145 /* tosnap must be a snapshot */ 1146 if (!dsl_dataset_is_snapshot(ds)) 1147 return (SET_ERROR(EINVAL)); 1148 1149 /* verify that from_txg is before the provided snapshot was taken */ 1150 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) { 1151 return (SET_ERROR(EXDEV)); 1152 } 1153 1154 /* 1155 * traverse the blocks of the snapshot with birth times after 1156 * from_txg, summing their uncompressed size 1157 */ 1158 err = traverse_dataset(ds, from_txg, TRAVERSE_POST, 1159 dmu_calculate_send_traversal, &size); 1160 if (err) 1161 return (err); 1162 1163 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1164 return (err); 1165} 1166 1167typedef struct dmu_recv_begin_arg { 1168 const char *drba_origin; 1169 dmu_recv_cookie_t *drba_cookie; 1170 cred_t *drba_cred; 1171 uint64_t drba_snapobj; 1172} dmu_recv_begin_arg_t; 1173 1174static int 1175recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, 1176 uint64_t fromguid) 1177{ 1178 uint64_t val; 1179 int error; 1180 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1181 1182 /* temporary clone name must not exist */ 1183 error = zap_lookup(dp->dp_meta_objset, 1184 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 1185 8, 1, &val); 1186 if (error != ENOENT) 1187 return (error == 0 ? EBUSY : error); 1188 1189 /* new snapshot name must not exist */ 1190 error = zap_lookup(dp->dp_meta_objset, 1191 dsl_dataset_phys(ds)->ds_snapnames_zapobj, 1192 drba->drba_cookie->drc_tosnap, 8, 1, &val); 1193 if (error != ENOENT) 1194 return (error == 0 ? EEXIST : error); 1195 1196 /* 1197 * Check snapshot limit before receiving. We'll recheck again at the 1198 * end, but might as well abort before receiving if we're already over 1199 * the limit. 1200 * 1201 * Note that we do not check the file system limit with 1202 * dsl_dir_fscount_check because the temporary %clones don't count 1203 * against that limit. 1204 */ 1205 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT, 1206 NULL, drba->drba_cred); 1207 if (error != 0) 1208 return (error); 1209 1210 if (fromguid != 0) { 1211 dsl_dataset_t *snap; 1212 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; 1213 1214 /* Find snapshot in this dir that matches fromguid. */ 1215 while (obj != 0) { 1216 error = dsl_dataset_hold_obj(dp, obj, FTAG, 1217 &snap); 1218 if (error != 0) 1219 return (SET_ERROR(ENODEV)); 1220 if (snap->ds_dir != ds->ds_dir) { 1221 dsl_dataset_rele(snap, FTAG); 1222 return (SET_ERROR(ENODEV)); 1223 } 1224 if (dsl_dataset_phys(snap)->ds_guid == fromguid) 1225 break; 1226 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 1227 dsl_dataset_rele(snap, FTAG); 1228 } 1229 if (obj == 0) 1230 return (SET_ERROR(ENODEV)); 1231 1232 if (drba->drba_cookie->drc_force) { 1233 drba->drba_snapobj = obj; 1234 } else { 1235 /* 1236 * If we are not forcing, there must be no 1237 * changes since fromsnap. 1238 */ 1239 if (dsl_dataset_modified_since_snap(ds, snap)) { 1240 dsl_dataset_rele(snap, FTAG); 1241 return (SET_ERROR(ETXTBSY)); 1242 } 1243 drba->drba_snapobj = ds->ds_prev->ds_object; 1244 } 1245 1246 dsl_dataset_rele(snap, FTAG); 1247 } else { 1248 /* if full, then must be forced */ 1249 if (!drba->drba_cookie->drc_force) 1250 return (SET_ERROR(EEXIST)); 1251 /* start from $ORIGIN@$ORIGIN, if supported */ 1252 drba->drba_snapobj = dp->dp_origin_snap != NULL ? 1253 dp->dp_origin_snap->ds_object : 0; 1254 } 1255 1256 return (0); 1257 1258} 1259 1260static int 1261dmu_recv_begin_check(void *arg, dmu_tx_t *tx) 1262{ 1263 dmu_recv_begin_arg_t *drba = arg; 1264 dsl_pool_t *dp = dmu_tx_pool(tx); 1265 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1266 uint64_t fromguid = drrb->drr_fromguid; 1267 int flags = drrb->drr_flags; 1268 int error; 1269 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1270 dsl_dataset_t *ds; 1271 const char *tofs = drba->drba_cookie->drc_tofs; 1272 1273 /* already checked */ 1274 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1275 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING)); 1276 1277 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1278 DMU_COMPOUNDSTREAM || 1279 drrb->drr_type >= DMU_OST_NUMTYPES || 1280 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) 1281 return (SET_ERROR(EINVAL)); 1282 1283 /* Verify pool version supports SA if SA_SPILL feature set */ 1284 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1285 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1286 return (SET_ERROR(ENOTSUP)); 1287 1288 if (drba->drba_cookie->drc_resumable && 1289 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) 1290 return (SET_ERROR(ENOTSUP)); 1291 1292 /* 1293 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1294 * record to a plan WRITE record, so the pool must have the 1295 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1296 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1297 */ 1298 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1299 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1300 return (SET_ERROR(ENOTSUP)); 1301 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1302 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1303 return (SET_ERROR(ENOTSUP)); 1304 1305 /* 1306 * The receiving code doesn't know how to translate large blocks 1307 * to smaller ones, so the pool must have the LARGE_BLOCKS 1308 * feature enabled if the stream has LARGE_BLOCKS. 1309 */ 1310 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 1311 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) 1312 return (SET_ERROR(ENOTSUP)); 1313 1314 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1315 if (error == 0) { 1316 /* target fs already exists; recv into temp clone */ 1317 1318 /* Can't recv a clone into an existing fs */ 1319 if (flags & DRR_FLAG_CLONE || drba->drba_origin) { 1320 dsl_dataset_rele(ds, FTAG); 1321 return (SET_ERROR(EINVAL)); 1322 } 1323 1324 error = recv_begin_check_existing_impl(drba, ds, fromguid); 1325 dsl_dataset_rele(ds, FTAG); 1326 } else if (error == ENOENT) { 1327 /* target fs does not exist; must be a full backup or clone */ 1328 char buf[MAXNAMELEN]; 1329 1330 /* 1331 * If it's a non-clone incremental, we are missing the 1332 * target fs, so fail the recv. 1333 */ 1334 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || 1335 drba->drba_origin)) 1336 return (SET_ERROR(ENOENT)); 1337 1338 /* 1339 * If we're receiving a full send as a clone, and it doesn't 1340 * contain all the necessary free records and freeobject 1341 * records, reject it. 1342 */ 1343 if (fromguid == 0 && drba->drba_origin && 1344 !(flags & DRR_FLAG_FREERECORDS)) 1345 return (SET_ERROR(EINVAL)); 1346 1347 /* Open the parent of tofs */ 1348 ASSERT3U(strlen(tofs), <, MAXNAMELEN); 1349 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1); 1350 error = dsl_dataset_hold(dp, buf, FTAG, &ds); 1351 if (error != 0) 1352 return (error); 1353 1354 /* 1355 * Check filesystem and snapshot limits before receiving. We'll 1356 * recheck snapshot limits again at the end (we create the 1357 * filesystems and increment those counts during begin_sync). 1358 */ 1359 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1360 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred); 1361 if (error != 0) { 1362 dsl_dataset_rele(ds, FTAG); 1363 return (error); 1364 } 1365 1366 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1367 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred); 1368 if (error != 0) { 1369 dsl_dataset_rele(ds, FTAG); 1370 return (error); 1371 } 1372 1373 if (drba->drba_origin != NULL) { 1374 dsl_dataset_t *origin; 1375 error = dsl_dataset_hold(dp, drba->drba_origin, 1376 FTAG, &origin); 1377 if (error != 0) { 1378 dsl_dataset_rele(ds, FTAG); 1379 return (error); 1380 } 1381 if (!origin->ds_is_snapshot) { 1382 dsl_dataset_rele(origin, FTAG); 1383 dsl_dataset_rele(ds, FTAG); 1384 return (SET_ERROR(EINVAL)); 1385 } 1386 if (dsl_dataset_phys(origin)->ds_guid != fromguid && 1387 fromguid != 0) { 1388 dsl_dataset_rele(origin, FTAG); 1389 dsl_dataset_rele(ds, FTAG); 1390 return (SET_ERROR(ENODEV)); 1391 } 1392 dsl_dataset_rele(origin, FTAG); 1393 } 1394 dsl_dataset_rele(ds, FTAG); 1395 error = 0; 1396 } 1397 return (error); 1398} 1399 1400static void 1401dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) 1402{ 1403 dmu_recv_begin_arg_t *drba = arg; 1404 dsl_pool_t *dp = dmu_tx_pool(tx); 1405 objset_t *mos = dp->dp_meta_objset; 1406 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1407 const char *tofs = drba->drba_cookie->drc_tofs; 1408 dsl_dataset_t *ds, *newds; 1409 uint64_t dsobj; 1410 int error; 1411 uint64_t crflags = 0; 1412 1413 if (drrb->drr_flags & DRR_FLAG_CI_DATA) 1414 crflags |= DS_FLAG_CI_DATASET; 1415 1416 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1417 if (error == 0) { 1418 /* create temporary clone */ 1419 dsl_dataset_t *snap = NULL; 1420 if (drba->drba_snapobj != 0) { 1421 VERIFY0(dsl_dataset_hold_obj(dp, 1422 drba->drba_snapobj, FTAG, &snap)); 1423 } 1424 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, 1425 snap, crflags, drba->drba_cred, tx); 1426 if (drba->drba_snapobj != 0) 1427 dsl_dataset_rele(snap, FTAG); 1428 dsl_dataset_rele(ds, FTAG); 1429 } else { 1430 dsl_dir_t *dd; 1431 const char *tail; 1432 dsl_dataset_t *origin = NULL; 1433 1434 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail)); 1435 1436 if (drba->drba_origin != NULL) { 1437 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin, 1438 FTAG, &origin)); 1439 } 1440 1441 /* Create new dataset. */ 1442 dsobj = dsl_dataset_create_sync(dd, 1443 strrchr(tofs, '/') + 1, 1444 origin, crflags, drba->drba_cred, tx); 1445 if (origin != NULL) 1446 dsl_dataset_rele(origin, FTAG); 1447 dsl_dir_rele(dd, FTAG); 1448 drba->drba_cookie->drc_newfs = B_TRUE; 1449 } 1450 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds)); 1451 1452 if (drba->drba_cookie->drc_resumable) { 1453 dsl_dataset_zapify(newds, tx); 1454 if (drrb->drr_fromguid != 0) { 1455 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, 1456 8, 1, &drrb->drr_fromguid, tx)); 1457 } 1458 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID, 1459 8, 1, &drrb->drr_toguid, tx)); 1460 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME, 1461 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx)); 1462 uint64_t one = 1; 1463 uint64_t zero = 0; 1464 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT, 1465 8, 1, &one, tx)); 1466 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET, 1467 8, 1, &zero, tx)); 1468 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES, 1469 8, 1, &zero, tx)); 1470 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1471 DMU_BACKUP_FEATURE_EMBED_DATA) { 1472 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK, 1473 8, 1, &one, tx)); 1474 } 1475 } 1476 1477 dmu_buf_will_dirty(newds->ds_dbuf, tx); 1478 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; 1479 1480 /* 1481 * If we actually created a non-clone, we need to create the 1482 * objset in our new dataset. 1483 */ 1484 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) { 1485 (void) dmu_objset_create_impl(dp->dp_spa, 1486 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); 1487 } 1488 1489 drba->drba_cookie->drc_ds = newds; 1490 1491 spa_history_log_internal_ds(newds, "receive", tx, ""); 1492} 1493 1494static int 1495dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) 1496{ 1497 dmu_recv_begin_arg_t *drba = arg; 1498 dsl_pool_t *dp = dmu_tx_pool(tx); 1499 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1500 int error; 1501 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1502 dsl_dataset_t *ds; 1503 const char *tofs = drba->drba_cookie->drc_tofs; 1504 1505 /* already checked */ 1506 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1507 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); 1508 1509 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1510 DMU_COMPOUNDSTREAM || 1511 drrb->drr_type >= DMU_OST_NUMTYPES) 1512 return (SET_ERROR(EINVAL)); 1513 1514 /* Verify pool version supports SA if SA_SPILL feature set */ 1515 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1516 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1517 return (SET_ERROR(ENOTSUP)); 1518 1519 /* 1520 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1521 * record to a plain WRITE record, so the pool must have the 1522 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1523 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1524 */ 1525 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1526 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1527 return (SET_ERROR(ENOTSUP)); 1528 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1529 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1530 return (SET_ERROR(ENOTSUP)); 1531 1532 char recvname[ZFS_MAXNAMELEN]; 1533 1534 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1535 tofs, recv_clone_name); 1536 1537 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1538 /* %recv does not exist; continue in tofs */ 1539 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1540 if (error != 0) 1541 return (error); 1542 } 1543 1544 /* check that ds is marked inconsistent */ 1545 if (!DS_IS_INCONSISTENT(ds)) { 1546 dsl_dataset_rele(ds, FTAG); 1547 return (SET_ERROR(EINVAL)); 1548 } 1549 1550 /* check that there is resuming data, and that the toguid matches */ 1551 if (!dsl_dataset_is_zapified(ds)) { 1552 dsl_dataset_rele(ds, FTAG); 1553 return (SET_ERROR(EINVAL)); 1554 } 1555 uint64_t val; 1556 error = zap_lookup(dp->dp_meta_objset, ds->ds_object, 1557 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val); 1558 if (error != 0 || drrb->drr_toguid != val) { 1559 dsl_dataset_rele(ds, FTAG); 1560 return (SET_ERROR(EINVAL)); 1561 } 1562 1563 /* 1564 * Check if the receive is still running. If so, it will be owned. 1565 * Note that nothing else can own the dataset (e.g. after the receive 1566 * fails) because it will be marked inconsistent. 1567 */ 1568 if (dsl_dataset_has_owner(ds)) { 1569 dsl_dataset_rele(ds, FTAG); 1570 return (SET_ERROR(EBUSY)); 1571 } 1572 1573 /* There should not be any snapshots of this fs yet. */ 1574 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) { 1575 dsl_dataset_rele(ds, FTAG); 1576 return (SET_ERROR(EINVAL)); 1577 } 1578 1579 /* 1580 * Note: resume point will be checked when we process the first WRITE 1581 * record. 1582 */ 1583 1584 /* check that the origin matches */ 1585 val = 0; 1586 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object, 1587 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val); 1588 if (drrb->drr_fromguid != val) { 1589 dsl_dataset_rele(ds, FTAG); 1590 return (SET_ERROR(EINVAL)); 1591 } 1592 1593 dsl_dataset_rele(ds, FTAG); 1594 return (0); 1595} 1596 1597static void 1598dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) 1599{ 1600 dmu_recv_begin_arg_t *drba = arg; 1601 dsl_pool_t *dp = dmu_tx_pool(tx); 1602 const char *tofs = drba->drba_cookie->drc_tofs; 1603 dsl_dataset_t *ds; 1604 uint64_t dsobj; 1605 char recvname[ZFS_MAXNAMELEN]; 1606 1607 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1608 tofs, recv_clone_name); 1609 1610 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1611 /* %recv does not exist; continue in tofs */ 1612 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds)); 1613 drba->drba_cookie->drc_newfs = B_TRUE; 1614 } 1615 1616 /* clear the inconsistent flag so that we can own it */ 1617 ASSERT(DS_IS_INCONSISTENT(ds)); 1618 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1619 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 1620 dsobj = ds->ds_object; 1621 dsl_dataset_rele(ds, FTAG); 1622 1623 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds)); 1624 1625 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1626 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; 1627 1628 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))); 1629 1630 drba->drba_cookie->drc_ds = ds; 1631 1632 spa_history_log_internal_ds(ds, "resume receive", tx, ""); 1633} 1634 1635/* 1636 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin() 1637 * succeeds; otherwise we will leak the holds on the datasets. 1638 */ 1639int 1640dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, 1641 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc) 1642{ 1643 dmu_recv_begin_arg_t drba = { 0 }; 1644 1645 bzero(drc, sizeof (dmu_recv_cookie_t)); 1646 drc->drc_drr_begin = drr_begin; 1647 drc->drc_drrb = &drr_begin->drr_u.drr_begin; 1648 drc->drc_tosnap = tosnap; 1649 drc->drc_tofs = tofs; 1650 drc->drc_force = force; 1651 drc->drc_resumable = resumable; 1652 drc->drc_cred = CRED(); 1653 1654 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) { 1655 drc->drc_byteswap = B_TRUE; 1656 fletcher_4_incremental_byteswap(drr_begin, 1657 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1658 byteswap_record(drr_begin); 1659 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) { 1660 fletcher_4_incremental_native(drr_begin, 1661 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1662 } else { 1663 return (SET_ERROR(EINVAL)); 1664 } 1665 1666 drba.drba_origin = origin; 1667 drba.drba_cookie = drc; 1668 drba.drba_cred = CRED(); 1669 1670 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & 1671 DMU_BACKUP_FEATURE_RESUMING) { 1672 return (dsl_sync_task(tofs, 1673 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, 1674 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1675 } else { 1676 return (dsl_sync_task(tofs, 1677 dmu_recv_begin_check, dmu_recv_begin_sync, 1678 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1679 } 1680} 1681 1682struct receive_record_arg { 1683 dmu_replay_record_t header; 1684 void *payload; /* Pointer to a buffer containing the payload */ 1685 /* 1686 * If the record is a write, pointer to the arc_buf_t containing the 1687 * payload. 1688 */ 1689 arc_buf_t *write_buf; 1690 int payload_size; 1691 uint64_t bytes_read; /* bytes read from stream when record created */ 1692 boolean_t eos_marker; /* Marks the end of the stream */ 1693 bqueue_node_t node; 1694}; 1695 1696struct receive_writer_arg { 1697 objset_t *os; 1698 boolean_t byteswap; 1699 bqueue_t q; 1700 1701 /* 1702 * These three args are used to signal to the main thread that we're 1703 * done. 1704 */ 1705 kmutex_t mutex; 1706 kcondvar_t cv; 1707 boolean_t done; 1708 1709 int err; 1710 /* A map from guid to dataset to help handle dedup'd streams. */ 1711 avl_tree_t *guid_to_ds_map; 1712 boolean_t resumable; 1713 uint64_t last_object, last_offset; 1714 uint64_t bytes_read; /* bytes read when current record created */ 1715}; 1716 1717struct objlist { 1718 list_t list; /* List of struct receive_objnode. */ 1719 /* 1720 * Last object looked up. Used to assert that objects are being looked 1721 * up in ascending order. 1722 */ 1723 uint64_t last_lookup; 1724}; 1725 1726struct receive_objnode { 1727 list_node_t node; 1728 uint64_t object; 1729}; 1730 1731struct receive_arg { 1732 objset_t *os; 1733 kthread_t *td; 1734 struct file *fp; 1735 uint64_t voff; /* The current offset in the stream */ 1736 uint64_t bytes_read; 1737 /* 1738 * A record that has had its payload read in, but hasn't yet been handed 1739 * off to the worker thread. 1740 */ 1741 struct receive_record_arg *rrd; 1742 /* A record that has had its header read in, but not its payload. */ 1743 struct receive_record_arg *next_rrd; 1744 zio_cksum_t cksum; 1745 zio_cksum_t prev_cksum; 1746 int err; 1747 boolean_t byteswap; 1748 /* Sorted list of objects not to issue prefetches for. */ 1749 struct objlist ignore_objlist; 1750}; 1751 1752typedef struct guid_map_entry { 1753 uint64_t guid; 1754 dsl_dataset_t *gme_ds; 1755 avl_node_t avlnode; 1756} guid_map_entry_t; 1757 1758static int 1759guid_compare(const void *arg1, const void *arg2) 1760{ 1761 const guid_map_entry_t *gmep1 = arg1; 1762 const guid_map_entry_t *gmep2 = arg2; 1763 1764 if (gmep1->guid < gmep2->guid) 1765 return (-1); 1766 else if (gmep1->guid > gmep2->guid) 1767 return (1); 1768 return (0); 1769} 1770 1771static void 1772free_guid_map_onexit(void *arg) 1773{ 1774 avl_tree_t *ca = arg; 1775 void *cookie = NULL; 1776 guid_map_entry_t *gmep; 1777 1778 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) { 1779 dsl_dataset_long_rele(gmep->gme_ds, gmep); 1780 dsl_dataset_rele(gmep->gme_ds, gmep); 1781 kmem_free(gmep, sizeof (guid_map_entry_t)); 1782 } 1783 avl_destroy(ca); 1784 kmem_free(ca, sizeof (avl_tree_t)); 1785} 1786 1787static int 1788restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid) 1789{ 1790 struct uio auio; 1791 struct iovec aiov; 1792 int error; 1793 1794 aiov.iov_base = buf; 1795 aiov.iov_len = len; 1796 auio.uio_iov = &aiov; 1797 auio.uio_iovcnt = 1; 1798 auio.uio_resid = len; 1799 auio.uio_segflg = UIO_SYSSPACE; 1800 auio.uio_rw = UIO_READ; 1801 auio.uio_offset = off; 1802 auio.uio_td = ra->td; 1803#ifdef _KERNEL 1804 error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td); 1805#else 1806 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 1807 error = EOPNOTSUPP; 1808#endif 1809 *resid = auio.uio_resid; 1810 return (error); 1811} 1812 1813static int 1814receive_read(struct receive_arg *ra, int len, void *buf) 1815{ 1816 int done = 0; 1817 1818 /* some things will require 8-byte alignment, so everything must */ 1819 ASSERT0(len % 8); 1820 1821 while (done < len) { 1822 ssize_t resid; 1823 1824 ra->err = restore_bytes(ra, buf + done, 1825 len - done, ra->voff, &resid); 1826 1827 if (resid == len - done) { 1828 /* 1829 * Note: ECKSUM indicates that the receive 1830 * was interrupted and can potentially be resumed. 1831 */ 1832 ra->err = SET_ERROR(ECKSUM); 1833 } 1834 ra->voff += len - done - resid; 1835 done = len - resid; 1836 if (ra->err != 0) 1837 return (ra->err); 1838 } 1839 1840 ra->bytes_read += len; 1841 1842 ASSERT3U(done, ==, len); 1843 return (0); 1844} 1845 1846static void 1847byteswap_record(dmu_replay_record_t *drr) 1848{ 1849#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) 1850#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) 1851 drr->drr_type = BSWAP_32(drr->drr_type); 1852 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); 1853 1854 switch (drr->drr_type) { 1855 case DRR_BEGIN: 1856 DO64(drr_begin.drr_magic); 1857 DO64(drr_begin.drr_versioninfo); 1858 DO64(drr_begin.drr_creation_time); 1859 DO32(drr_begin.drr_type); 1860 DO32(drr_begin.drr_flags); 1861 DO64(drr_begin.drr_toguid); 1862 DO64(drr_begin.drr_fromguid); 1863 break; 1864 case DRR_OBJECT: 1865 DO64(drr_object.drr_object); 1866 DO32(drr_object.drr_type); 1867 DO32(drr_object.drr_bonustype); 1868 DO32(drr_object.drr_blksz); 1869 DO32(drr_object.drr_bonuslen); 1870 DO64(drr_object.drr_toguid); 1871 break; 1872 case DRR_FREEOBJECTS: 1873 DO64(drr_freeobjects.drr_firstobj); 1874 DO64(drr_freeobjects.drr_numobjs); 1875 DO64(drr_freeobjects.drr_toguid); 1876 break; 1877 case DRR_WRITE: 1878 DO64(drr_write.drr_object); 1879 DO32(drr_write.drr_type); 1880 DO64(drr_write.drr_offset); 1881 DO64(drr_write.drr_length); 1882 DO64(drr_write.drr_toguid); 1883 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); 1884 DO64(drr_write.drr_key.ddk_prop); 1885 break; 1886 case DRR_WRITE_BYREF: 1887 DO64(drr_write_byref.drr_object); 1888 DO64(drr_write_byref.drr_offset); 1889 DO64(drr_write_byref.drr_length); 1890 DO64(drr_write_byref.drr_toguid); 1891 DO64(drr_write_byref.drr_refguid); 1892 DO64(drr_write_byref.drr_refobject); 1893 DO64(drr_write_byref.drr_refoffset); 1894 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. 1895 drr_key.ddk_cksum); 1896 DO64(drr_write_byref.drr_key.ddk_prop); 1897 break; 1898 case DRR_WRITE_EMBEDDED: 1899 DO64(drr_write_embedded.drr_object); 1900 DO64(drr_write_embedded.drr_offset); 1901 DO64(drr_write_embedded.drr_length); 1902 DO64(drr_write_embedded.drr_toguid); 1903 DO32(drr_write_embedded.drr_lsize); 1904 DO32(drr_write_embedded.drr_psize); 1905 break; 1906 case DRR_FREE: 1907 DO64(drr_free.drr_object); 1908 DO64(drr_free.drr_offset); 1909 DO64(drr_free.drr_length); 1910 DO64(drr_free.drr_toguid); 1911 break; 1912 case DRR_SPILL: 1913 DO64(drr_spill.drr_object); 1914 DO64(drr_spill.drr_length); 1915 DO64(drr_spill.drr_toguid); 1916 break; 1917 case DRR_END: 1918 DO64(drr_end.drr_toguid); 1919 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); 1920 break; 1921 } 1922 1923 if (drr->drr_type != DRR_BEGIN) { 1924 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); 1925 } 1926 1927#undef DO64 1928#undef DO32 1929} 1930 1931static inline uint8_t 1932deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) 1933{ 1934 if (bonus_type == DMU_OT_SA) { 1935 return (1); 1936 } else { 1937 return (1 + 1938 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT)); 1939 } 1940} 1941 1942static void 1943save_resume_state(struct receive_writer_arg *rwa, 1944 uint64_t object, uint64_t offset, dmu_tx_t *tx) 1945{ 1946 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK; 1947 1948 if (!rwa->resumable) 1949 return; 1950 1951 /* 1952 * We use ds_resume_bytes[] != 0 to indicate that we need to 1953 * update this on disk, so it must not be 0. 1954 */ 1955 ASSERT(rwa->bytes_read != 0); 1956 1957 /* 1958 * We only resume from write records, which have a valid 1959 * (non-meta-dnode) object number. 1960 */ 1961 ASSERT(object != 0); 1962 1963 /* 1964 * For resuming to work correctly, we must receive records in order, 1965 * sorted by object,offset. This is checked by the callers, but 1966 * assert it here for good measure. 1967 */ 1968 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]); 1969 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] || 1970 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]); 1971 ASSERT3U(rwa->bytes_read, >=, 1972 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]); 1973 1974 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object; 1975 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset; 1976 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read; 1977} 1978 1979static int 1980receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, 1981 void *data) 1982{ 1983 dmu_object_info_t doi; 1984 dmu_tx_t *tx; 1985 uint64_t object; 1986 int err; 1987 1988 if (drro->drr_type == DMU_OT_NONE || 1989 !DMU_OT_IS_VALID(drro->drr_type) || 1990 !DMU_OT_IS_VALID(drro->drr_bonustype) || 1991 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || 1992 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || 1993 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || 1994 drro->drr_blksz < SPA_MINBLOCKSIZE || 1995 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) || 1996 drro->drr_bonuslen > DN_MAX_BONUSLEN) { 1997 return (SET_ERROR(EINVAL)); 1998 } 1999 2000 err = dmu_object_info(rwa->os, drro->drr_object, &doi); 2001 2002 if (err != 0 && err != ENOENT) 2003 return (SET_ERROR(EINVAL)); 2004 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT; 2005 2006 /* 2007 * If we are losing blkptrs or changing the block size this must 2008 * be a new file instance. We must clear out the previous file 2009 * contents before we can change this type of metadata in the dnode. 2010 */ 2011 if (err == 0) { 2012 int nblkptr; 2013 2014 nblkptr = deduce_nblkptr(drro->drr_bonustype, 2015 drro->drr_bonuslen); 2016 2017 if (drro->drr_blksz != doi.doi_data_block_size || 2018 nblkptr < doi.doi_nblkptr) { 2019 err = dmu_free_long_range(rwa->os, drro->drr_object, 2020 0, DMU_OBJECT_END); 2021 if (err != 0) 2022 return (SET_ERROR(EINVAL)); 2023 } 2024 } 2025 2026 tx = dmu_tx_create(rwa->os); 2027 dmu_tx_hold_bonus(tx, object); 2028 err = dmu_tx_assign(tx, TXG_WAIT); 2029 if (err != 0) { 2030 dmu_tx_abort(tx); 2031 return (err); 2032 } 2033 2034 if (object == DMU_NEW_OBJECT) { 2035 /* currently free, want to be allocated */ 2036 err = dmu_object_claim(rwa->os, drro->drr_object, 2037 drro->drr_type, drro->drr_blksz, 2038 drro->drr_bonustype, drro->drr_bonuslen, tx); 2039 } else if (drro->drr_type != doi.doi_type || 2040 drro->drr_blksz != doi.doi_data_block_size || 2041 drro->drr_bonustype != doi.doi_bonus_type || 2042 drro->drr_bonuslen != doi.doi_bonus_size) { 2043 /* currently allocated, but with different properties */ 2044 err = dmu_object_reclaim(rwa->os, drro->drr_object, 2045 drro->drr_type, drro->drr_blksz, 2046 drro->drr_bonustype, drro->drr_bonuslen, tx); 2047 } 2048 if (err != 0) { 2049 dmu_tx_commit(tx); 2050 return (SET_ERROR(EINVAL)); 2051 } 2052 2053 dmu_object_set_checksum(rwa->os, drro->drr_object, 2054 drro->drr_checksumtype, tx); 2055 dmu_object_set_compress(rwa->os, drro->drr_object, 2056 drro->drr_compress, tx); 2057 2058 if (data != NULL) { 2059 dmu_buf_t *db; 2060 2061 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db)); 2062 dmu_buf_will_dirty(db, tx); 2063 2064 ASSERT3U(db->db_size, >=, drro->drr_bonuslen); 2065 bcopy(data, db->db_data, drro->drr_bonuslen); 2066 if (rwa->byteswap) { 2067 dmu_object_byteswap_t byteswap = 2068 DMU_OT_BYTESWAP(drro->drr_bonustype); 2069 dmu_ot_byteswap[byteswap].ob_func(db->db_data, 2070 drro->drr_bonuslen); 2071 } 2072 dmu_buf_rele(db, FTAG); 2073 } 2074 dmu_tx_commit(tx); 2075 2076 return (0); 2077} 2078 2079/* ARGSUSED */ 2080static int 2081receive_freeobjects(struct receive_writer_arg *rwa, 2082 struct drr_freeobjects *drrfo) 2083{ 2084 uint64_t obj; 2085 int next_err = 0; 2086 2087 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj) 2088 return (SET_ERROR(EINVAL)); 2089 2090 for (obj = drrfo->drr_firstobj; 2091 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; 2092 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { 2093 int err; 2094 2095 if (dmu_object_info(rwa->os, obj, NULL) != 0) 2096 continue; 2097 2098 err = dmu_free_long_object(rwa->os, obj); 2099 if (err != 0) 2100 return (err); 2101 } 2102 if (next_err != ESRCH) 2103 return (next_err); 2104 return (0); 2105} 2106 2107static int 2108receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, 2109 arc_buf_t *abuf) 2110{ 2111 dmu_tx_t *tx; 2112 int err; 2113 2114 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset || 2115 !DMU_OT_IS_VALID(drrw->drr_type)) 2116 return (SET_ERROR(EINVAL)); 2117 2118 /* 2119 * For resuming to work, records must be in increasing order 2120 * by (object, offset). 2121 */ 2122 if (drrw->drr_object < rwa->last_object || 2123 (drrw->drr_object == rwa->last_object && 2124 drrw->drr_offset < rwa->last_offset)) { 2125 return (SET_ERROR(EINVAL)); 2126 } 2127 rwa->last_object = drrw->drr_object; 2128 rwa->last_offset = drrw->drr_offset; 2129 2130 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) 2131 return (SET_ERROR(EINVAL)); 2132 2133 tx = dmu_tx_create(rwa->os); 2134 2135 dmu_tx_hold_write(tx, drrw->drr_object, 2136 drrw->drr_offset, drrw->drr_length); 2137 err = dmu_tx_assign(tx, TXG_WAIT); 2138 if (err != 0) { 2139 dmu_tx_abort(tx); 2140 return (err); 2141 } 2142 if (rwa->byteswap) { 2143 dmu_object_byteswap_t byteswap = 2144 DMU_OT_BYTESWAP(drrw->drr_type); 2145 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, 2146 drrw->drr_length); 2147 } 2148 2149 dmu_buf_t *bonus; 2150 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0) 2151 return (SET_ERROR(EINVAL)); 2152 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx); 2153 2154 /* 2155 * Note: If the receive fails, we want the resume stream to start 2156 * with the same record that we last successfully received (as opposed 2157 * to the next record), so that we can verify that we are 2158 * resuming from the correct location. 2159 */ 2160 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); 2161 dmu_tx_commit(tx); 2162 dmu_buf_rele(bonus, FTAG); 2163 2164 return (0); 2165} 2166 2167/* 2168 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed 2169 * streams to refer to a copy of the data that is already on the 2170 * system because it came in earlier in the stream. This function 2171 * finds the earlier copy of the data, and uses that copy instead of 2172 * data from the stream to fulfill this write. 2173 */ 2174static int 2175receive_write_byref(struct receive_writer_arg *rwa, 2176 struct drr_write_byref *drrwbr) 2177{ 2178 dmu_tx_t *tx; 2179 int err; 2180 guid_map_entry_t gmesrch; 2181 guid_map_entry_t *gmep; 2182 avl_index_t where; 2183 objset_t *ref_os = NULL; 2184 dmu_buf_t *dbp; 2185 2186 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) 2187 return (SET_ERROR(EINVAL)); 2188 2189 /* 2190 * If the GUID of the referenced dataset is different from the 2191 * GUID of the target dataset, find the referenced dataset. 2192 */ 2193 if (drrwbr->drr_toguid != drrwbr->drr_refguid) { 2194 gmesrch.guid = drrwbr->drr_refguid; 2195 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch, 2196 &where)) == NULL) { 2197 return (SET_ERROR(EINVAL)); 2198 } 2199 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) 2200 return (SET_ERROR(EINVAL)); 2201 } else { 2202 ref_os = rwa->os; 2203 } 2204 2205 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, 2206 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH); 2207 if (err != 0) 2208 return (err); 2209 2210 tx = dmu_tx_create(rwa->os); 2211 2212 dmu_tx_hold_write(tx, drrwbr->drr_object, 2213 drrwbr->drr_offset, drrwbr->drr_length); 2214 err = dmu_tx_assign(tx, TXG_WAIT); 2215 if (err != 0) { 2216 dmu_tx_abort(tx); 2217 return (err); 2218 } 2219 dmu_write(rwa->os, drrwbr->drr_object, 2220 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); 2221 dmu_buf_rele(dbp, FTAG); 2222 2223 /* See comment in restore_write. */ 2224 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx); 2225 dmu_tx_commit(tx); 2226 return (0); 2227} 2228 2229static int 2230receive_write_embedded(struct receive_writer_arg *rwa, 2231 struct drr_write_embedded *drrwe, void *data) 2232{ 2233 dmu_tx_t *tx; 2234 int err; 2235 2236 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset) 2237 return (EINVAL); 2238 2239 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE) 2240 return (EINVAL); 2241 2242 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES) 2243 return (EINVAL); 2244 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS) 2245 return (EINVAL); 2246 2247 tx = dmu_tx_create(rwa->os); 2248 2249 dmu_tx_hold_write(tx, drrwe->drr_object, 2250 drrwe->drr_offset, drrwe->drr_length); 2251 err = dmu_tx_assign(tx, TXG_WAIT); 2252 if (err != 0) { 2253 dmu_tx_abort(tx); 2254 return (err); 2255 } 2256 2257 dmu_write_embedded(rwa->os, drrwe->drr_object, 2258 drrwe->drr_offset, data, drrwe->drr_etype, 2259 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize, 2260 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx); 2261 2262 /* See comment in restore_write. */ 2263 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx); 2264 dmu_tx_commit(tx); 2265 return (0); 2266} 2267 2268static int 2269receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, 2270 void *data) 2271{ 2272 dmu_tx_t *tx; 2273 dmu_buf_t *db, *db_spill; 2274 int err; 2275 2276 if (drrs->drr_length < SPA_MINBLOCKSIZE || 2277 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) 2278 return (SET_ERROR(EINVAL)); 2279 2280 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) 2281 return (SET_ERROR(EINVAL)); 2282 2283 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db)); 2284 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) { 2285 dmu_buf_rele(db, FTAG); 2286 return (err); 2287 } 2288 2289 tx = dmu_tx_create(rwa->os); 2290 2291 dmu_tx_hold_spill(tx, db->db_object); 2292 2293 err = dmu_tx_assign(tx, TXG_WAIT); 2294 if (err != 0) { 2295 dmu_buf_rele(db, FTAG); 2296 dmu_buf_rele(db_spill, FTAG); 2297 dmu_tx_abort(tx); 2298 return (err); 2299 } 2300 dmu_buf_will_dirty(db_spill, tx); 2301 2302 if (db_spill->db_size < drrs->drr_length) 2303 VERIFY(0 == dbuf_spill_set_blksz(db_spill, 2304 drrs->drr_length, tx)); 2305 bcopy(data, db_spill->db_data, drrs->drr_length); 2306 2307 dmu_buf_rele(db, FTAG); 2308 dmu_buf_rele(db_spill, FTAG); 2309 2310 dmu_tx_commit(tx); 2311 return (0); 2312} 2313 2314/* ARGSUSED */ 2315static int 2316receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) 2317{ 2318 int err; 2319 2320 if (drrf->drr_length != -1ULL && 2321 drrf->drr_offset + drrf->drr_length < drrf->drr_offset) 2322 return (SET_ERROR(EINVAL)); 2323 2324 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0) 2325 return (SET_ERROR(EINVAL)); 2326 2327 err = dmu_free_long_range(rwa->os, drrf->drr_object, 2328 drrf->drr_offset, drrf->drr_length); 2329 2330 return (err); 2331} 2332 2333/* used to destroy the drc_ds on error */ 2334static void 2335dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) 2336{ 2337 if (drc->drc_resumable) { 2338 /* wait for our resume state to be written to disk */ 2339 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0); 2340 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2341 } else { 2342 char name[MAXNAMELEN]; 2343 dsl_dataset_name(drc->drc_ds, name); 2344 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2345 (void) dsl_destroy_head(name); 2346 } 2347} 2348 2349static void 2350receive_cksum(struct receive_arg *ra, int len, void *buf) 2351{ 2352 if (ra->byteswap) { 2353 fletcher_4_incremental_byteswap(buf, len, &ra->cksum); 2354 } else { 2355 fletcher_4_incremental_native(buf, len, &ra->cksum); 2356 } 2357} 2358 2359/* 2360 * Read the payload into a buffer of size len, and update the current record's 2361 * payload field. 2362 * Allocate ra->next_rrd and read the next record's header into 2363 * ra->next_rrd->header. 2364 * Verify checksum of payload and next record. 2365 */ 2366static int 2367receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) 2368{ 2369 int err; 2370 2371 if (len != 0) { 2372 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); 2373 err = receive_read(ra, len, buf); 2374 if (err != 0) 2375 return (err); 2376 receive_cksum(ra, len, buf); 2377 2378 /* note: rrd is NULL when reading the begin record's payload */ 2379 if (ra->rrd != NULL) { 2380 ra->rrd->payload = buf; 2381 ra->rrd->payload_size = len; 2382 ra->rrd->bytes_read = ra->bytes_read; 2383 } 2384 } 2385 2386 ra->prev_cksum = ra->cksum; 2387 2388 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); 2389 err = receive_read(ra, sizeof (ra->next_rrd->header), 2390 &ra->next_rrd->header); 2391 ra->next_rrd->bytes_read = ra->bytes_read; 2392 if (err != 0) { 2393 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2394 ra->next_rrd = NULL; 2395 return (err); 2396 } 2397 if (ra->next_rrd->header.drr_type == DRR_BEGIN) { 2398 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2399 ra->next_rrd = NULL; 2400 return (SET_ERROR(EINVAL)); 2401 } 2402 2403 /* 2404 * Note: checksum is of everything up to but not including the 2405 * checksum itself. 2406 */ 2407 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2408 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 2409 receive_cksum(ra, 2410 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2411 &ra->next_rrd->header); 2412 2413 zio_cksum_t cksum_orig = 2414 ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2415 zio_cksum_t *cksump = 2416 &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2417 2418 if (ra->byteswap) 2419 byteswap_record(&ra->next_rrd->header); 2420 2421 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && 2422 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { 2423 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2424 ra->next_rrd = NULL; 2425 return (SET_ERROR(ECKSUM)); 2426 } 2427 2428 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); 2429 2430 return (0); 2431} 2432 2433static void 2434objlist_create(struct objlist *list) 2435{ 2436 list_create(&list->list, sizeof (struct receive_objnode), 2437 offsetof(struct receive_objnode, node)); 2438 list->last_lookup = 0; 2439} 2440 2441static void 2442objlist_destroy(struct objlist *list) 2443{ 2444 for (struct receive_objnode *n = list_remove_head(&list->list); 2445 n != NULL; n = list_remove_head(&list->list)) { 2446 kmem_free(n, sizeof (*n)); 2447 } 2448 list_destroy(&list->list); 2449} 2450 2451/* 2452 * This function looks through the objlist to see if the specified object number 2453 * is contained in the objlist. In the process, it will remove all object 2454 * numbers in the list that are smaller than the specified object number. Thus, 2455 * any lookup of an object number smaller than a previously looked up object 2456 * number will always return false; therefore, all lookups should be done in 2457 * ascending order. 2458 */ 2459static boolean_t 2460objlist_exists(struct objlist *list, uint64_t object) 2461{ 2462 struct receive_objnode *node = list_head(&list->list); 2463 ASSERT3U(object, >=, list->last_lookup); 2464 list->last_lookup = object; 2465 while (node != NULL && node->object < object) { 2466 VERIFY3P(node, ==, list_remove_head(&list->list)); 2467 kmem_free(node, sizeof (*node)); 2468 node = list_head(&list->list); 2469 } 2470 return (node != NULL && node->object == object); 2471} 2472 2473/* 2474 * The objlist is a list of object numbers stored in ascending order. However, 2475 * the insertion of new object numbers does not seek out the correct location to 2476 * store a new object number; instead, it appends it to the list for simplicity. 2477 * Thus, any users must take care to only insert new object numbers in ascending 2478 * order. 2479 */ 2480static void 2481objlist_insert(struct objlist *list, uint64_t object) 2482{ 2483 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); 2484 node->object = object; 2485#ifdef ZFS_DEBUG 2486 struct receive_objnode *last_object = list_tail(&list->list); 2487 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); 2488 ASSERT3U(node->object, >, last_objnum); 2489#endif 2490 list_insert_tail(&list->list, node); 2491} 2492 2493/* 2494 * Issue the prefetch reads for any necessary indirect blocks. 2495 * 2496 * We use the object ignore list to tell us whether or not to issue prefetches 2497 * for a given object. We do this for both correctness (in case the blocksize 2498 * of an object has changed) and performance (if the object doesn't exist, don't 2499 * needlessly try to issue prefetches). We also trim the list as we go through 2500 * the stream to prevent it from growing to an unbounded size. 2501 * 2502 * The object numbers within will always be in sorted order, and any write 2503 * records we see will also be in sorted order, but they're not sorted with 2504 * respect to each other (i.e. we can get several object records before 2505 * receiving each object's write records). As a result, once we've reached a 2506 * given object number, we can safely remove any reference to lower object 2507 * numbers in the ignore list. In practice, we receive up to 32 object records 2508 * before receiving write records, so the list can have up to 32 nodes in it. 2509 */ 2510/* ARGSUSED */ 2511static void 2512receive_read_prefetch(struct receive_arg *ra, 2513 uint64_t object, uint64_t offset, uint64_t length) 2514{ 2515 if (!objlist_exists(&ra->ignore_objlist, object)) { 2516 dmu_prefetch(ra->os, object, 1, offset, length, 2517 ZIO_PRIORITY_SYNC_READ); 2518 } 2519} 2520 2521/* 2522 * Read records off the stream, issuing any necessary prefetches. 2523 */ 2524static int 2525receive_read_record(struct receive_arg *ra) 2526{ 2527 int err; 2528 2529 switch (ra->rrd->header.drr_type) { 2530 case DRR_OBJECT: 2531 { 2532 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; 2533 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8); 2534 void *buf = kmem_zalloc(size, KM_SLEEP); 2535 dmu_object_info_t doi; 2536 err = receive_read_payload_and_next_header(ra, size, buf); 2537 if (err != 0) { 2538 kmem_free(buf, size); 2539 return (err); 2540 } 2541 err = dmu_object_info(ra->os, drro->drr_object, &doi); 2542 /* 2543 * See receive_read_prefetch for an explanation why we're 2544 * storing this object in the ignore_obj_list. 2545 */ 2546 if (err == ENOENT || 2547 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { 2548 objlist_insert(&ra->ignore_objlist, drro->drr_object); 2549 err = 0; 2550 } 2551 return (err); 2552 } 2553 case DRR_FREEOBJECTS: 2554 { 2555 err = receive_read_payload_and_next_header(ra, 0, NULL); 2556 return (err); 2557 } 2558 case DRR_WRITE: 2559 { 2560 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; 2561 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os), 2562 drrw->drr_length); 2563 2564 err = receive_read_payload_and_next_header(ra, 2565 drrw->drr_length, abuf->b_data); 2566 if (err != 0) { 2567 dmu_return_arcbuf(abuf); 2568 return (err); 2569 } 2570 ra->rrd->write_buf = abuf; 2571 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, 2572 drrw->drr_length); 2573 return (err); 2574 } 2575 case DRR_WRITE_BYREF: 2576 { 2577 struct drr_write_byref *drrwb = 2578 &ra->rrd->header.drr_u.drr_write_byref; 2579 err = receive_read_payload_and_next_header(ra, 0, NULL); 2580 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, 2581 drrwb->drr_length); 2582 return (err); 2583 } 2584 case DRR_WRITE_EMBEDDED: 2585 { 2586 struct drr_write_embedded *drrwe = 2587 &ra->rrd->header.drr_u.drr_write_embedded; 2588 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); 2589 void *buf = kmem_zalloc(size, KM_SLEEP); 2590 2591 err = receive_read_payload_and_next_header(ra, size, buf); 2592 if (err != 0) { 2593 kmem_free(buf, size); 2594 return (err); 2595 } 2596 2597 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, 2598 drrwe->drr_length); 2599 return (err); 2600 } 2601 case DRR_FREE: 2602 { 2603 /* 2604 * It might be beneficial to prefetch indirect blocks here, but 2605 * we don't really have the data to decide for sure. 2606 */ 2607 err = receive_read_payload_and_next_header(ra, 0, NULL); 2608 return (err); 2609 } 2610 case DRR_END: 2611 { 2612 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; 2613 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) 2614 return (SET_ERROR(ECKSUM)); 2615 return (0); 2616 } 2617 case DRR_SPILL: 2618 { 2619 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; 2620 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP); 2621 err = receive_read_payload_and_next_header(ra, drrs->drr_length, 2622 buf); 2623 if (err != 0) 2624 kmem_free(buf, drrs->drr_length); 2625 return (err); 2626 } 2627 default: 2628 return (SET_ERROR(EINVAL)); 2629 } 2630} 2631 2632/* 2633 * Commit the records to the pool. 2634 */ 2635static int 2636receive_process_record(struct receive_writer_arg *rwa, 2637 struct receive_record_arg *rrd) 2638{ 2639 int err; 2640 2641 /* Processing in order, therefore bytes_read should be increasing. */ 2642 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); 2643 rwa->bytes_read = rrd->bytes_read; 2644 2645 switch (rrd->header.drr_type) { 2646 case DRR_OBJECT: 2647 { 2648 struct drr_object *drro = &rrd->header.drr_u.drr_object; 2649 err = receive_object(rwa, drro, rrd->payload); 2650 kmem_free(rrd->payload, rrd->payload_size); 2651 rrd->payload = NULL; 2652 return (err); 2653 } 2654 case DRR_FREEOBJECTS: 2655 { 2656 struct drr_freeobjects *drrfo = 2657 &rrd->header.drr_u.drr_freeobjects; 2658 return (receive_freeobjects(rwa, drrfo)); 2659 } 2660 case DRR_WRITE: 2661 { 2662 struct drr_write *drrw = &rrd->header.drr_u.drr_write; 2663 err = receive_write(rwa, drrw, rrd->write_buf); 2664 /* if receive_write() is successful, it consumes the arc_buf */ 2665 if (err != 0) 2666 dmu_return_arcbuf(rrd->write_buf); 2667 rrd->write_buf = NULL; 2668 rrd->payload = NULL; 2669 return (err); 2670 } 2671 case DRR_WRITE_BYREF: 2672 { 2673 struct drr_write_byref *drrwbr = 2674 &rrd->header.drr_u.drr_write_byref; 2675 return (receive_write_byref(rwa, drrwbr)); 2676 } 2677 case DRR_WRITE_EMBEDDED: 2678 { 2679 struct drr_write_embedded *drrwe = 2680 &rrd->header.drr_u.drr_write_embedded; 2681 err = receive_write_embedded(rwa, drrwe, rrd->payload); 2682 kmem_free(rrd->payload, rrd->payload_size); 2683 rrd->payload = NULL; 2684 return (err); 2685 } 2686 case DRR_FREE: 2687 { 2688 struct drr_free *drrf = &rrd->header.drr_u.drr_free; 2689 return (receive_free(rwa, drrf)); 2690 } 2691 case DRR_SPILL: 2692 { 2693 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; 2694 err = receive_spill(rwa, drrs, rrd->payload); 2695 kmem_free(rrd->payload, rrd->payload_size); 2696 rrd->payload = NULL; 2697 return (err); 2698 } 2699 default: 2700 return (SET_ERROR(EINVAL)); 2701 } 2702} 2703 2704/* 2705 * dmu_recv_stream's worker thread; pull records off the queue, and then call 2706 * receive_process_record When we're done, signal the main thread and exit. 2707 */ 2708static void 2709receive_writer_thread(void *arg) 2710{ 2711 struct receive_writer_arg *rwa = arg; 2712 struct receive_record_arg *rrd; 2713 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker; 2714 rrd = bqueue_dequeue(&rwa->q)) { 2715 /* 2716 * If there's an error, the main thread will stop putting things 2717 * on the queue, but we need to clear everything in it before we 2718 * can exit. 2719 */ 2720 if (rwa->err == 0) { 2721 rwa->err = receive_process_record(rwa, rrd); 2722 } else if (rrd->write_buf != NULL) { 2723 dmu_return_arcbuf(rrd->write_buf); 2724 rrd->write_buf = NULL; 2725 rrd->payload = NULL; 2726 } else if (rrd->payload != NULL) { 2727 kmem_free(rrd->payload, rrd->payload_size); 2728 rrd->payload = NULL; 2729 } 2730 kmem_free(rrd, sizeof (*rrd)); 2731 } 2732 kmem_free(rrd, sizeof (*rrd)); 2733 mutex_enter(&rwa->mutex); 2734 rwa->done = B_TRUE; 2735 cv_signal(&rwa->cv); 2736 mutex_exit(&rwa->mutex); 2737 thread_exit(); 2738} 2739 2740static int 2741resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) 2742{ 2743 uint64_t val; 2744 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; 2745 uint64_t dsobj = dmu_objset_id(ra->os); 2746 uint64_t resume_obj, resume_off; 2747 2748 if (nvlist_lookup_uint64(begin_nvl, 2749 "resume_object", &resume_obj) != 0 || 2750 nvlist_lookup_uint64(begin_nvl, 2751 "resume_offset", &resume_off) != 0) { 2752 return (SET_ERROR(EINVAL)); 2753 } 2754 VERIFY0(zap_lookup(mos, dsobj, 2755 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val)); 2756 if (resume_obj != val) 2757 return (SET_ERROR(EINVAL)); 2758 VERIFY0(zap_lookup(mos, dsobj, 2759 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val)); 2760 if (resume_off != val) 2761 return (SET_ERROR(EINVAL)); 2762 2763 return (0); 2764} 2765 2766/* 2767 * Read in the stream's records, one by one, and apply them to the pool. There 2768 * are two threads involved; the thread that calls this function will spin up a 2769 * worker thread, read the records off the stream one by one, and issue 2770 * prefetches for any necessary indirect blocks. It will then push the records 2771 * onto an internal blocking queue. The worker thread will pull the records off 2772 * the queue, and actually write the data into the DMU. This way, the worker 2773 * thread doesn't have to wait for reads to complete, since everything it needs 2774 * (the indirect blocks) will be prefetched. 2775 * 2776 * NB: callers *must* call dmu_recv_end() if this succeeds. 2777 */ 2778int 2779dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp, 2780 int cleanup_fd, uint64_t *action_handlep) 2781{ 2782 int err = 0; 2783 struct receive_arg ra = { 0 }; 2784 struct receive_writer_arg rwa = { 0 }; 2785 int featureflags; 2786 nvlist_t *begin_nvl = NULL; 2787 2788 ra.byteswap = drc->drc_byteswap; 2789 ra.cksum = drc->drc_cksum; 2790 ra.td = curthread; 2791 ra.fp = fp; 2792 ra.voff = *voffp; 2793 2794 if (dsl_dataset_is_zapified(drc->drc_ds)) { 2795 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, 2796 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, 2797 sizeof (ra.bytes_read), 1, &ra.bytes_read); 2798 } 2799 2800 objlist_create(&ra.ignore_objlist); 2801 2802 /* these were verified in dmu_recv_begin */ 2803 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, 2804 DMU_SUBSTREAM); 2805 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES); 2806 2807 /* 2808 * Open the objset we are modifying. 2809 */ 2810 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os)); 2811 2812 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); 2813 2814 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); 2815 2816 /* if this stream is dedup'ed, set up the avl tree for guid mapping */ 2817 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { 2818 minor_t minor; 2819 2820 if (cleanup_fd == -1) { 2821 ra.err = SET_ERROR(EBADF); 2822 goto out; 2823 } 2824 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor); 2825 if (ra.err != 0) { 2826 cleanup_fd = -1; 2827 goto out; 2828 } 2829 2830 if (*action_handlep == 0) { 2831 rwa.guid_to_ds_map = 2832 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP); 2833 avl_create(rwa.guid_to_ds_map, guid_compare, 2834 sizeof (guid_map_entry_t), 2835 offsetof(guid_map_entry_t, avlnode)); 2836 err = zfs_onexit_add_cb(minor, 2837 free_guid_map_onexit, rwa.guid_to_ds_map, 2838 action_handlep); 2839 if (ra.err != 0) 2840 goto out; 2841 } else { 2842 err = zfs_onexit_cb_data(minor, *action_handlep, 2843 (void **)&rwa.guid_to_ds_map); 2844 if (ra.err != 0) 2845 goto out; 2846 } 2847 2848 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map; 2849 } 2850 2851 uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; 2852 void *payload = NULL; 2853 if (payloadlen != 0) 2854 payload = kmem_alloc(payloadlen, KM_SLEEP); 2855 2856 err = receive_read_payload_and_next_header(&ra, payloadlen, payload); 2857 if (err != 0) { 2858 if (payloadlen != 0) 2859 kmem_free(payload, payloadlen); 2860 goto out; 2861 } 2862 if (payloadlen != 0) { 2863 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); 2864 kmem_free(payload, payloadlen); 2865 if (err != 0) 2866 goto out; 2867 } 2868 2869 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { 2870 err = resume_check(&ra, begin_nvl); 2871 if (err != 0) 2872 goto out; 2873 } 2874 2875 (void) bqueue_init(&rwa.q, zfs_recv_queue_length, 2876 offsetof(struct receive_record_arg, node)); 2877 cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL); 2878 mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL); 2879 rwa.os = ra.os; 2880 rwa.byteswap = drc->drc_byteswap; 2881 rwa.resumable = drc->drc_resumable; 2882 2883 (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0, 2884 TS_RUN, minclsyspri); 2885 /* 2886 * We're reading rwa.err without locks, which is safe since we are the 2887 * only reader, and the worker thread is the only writer. It's ok if we 2888 * miss a write for an iteration or two of the loop, since the writer 2889 * thread will keep freeing records we send it until we send it an eos 2890 * marker. 2891 * 2892 * We can leave this loop in 3 ways: First, if rwa.err is 2893 * non-zero. In that case, the writer thread will free the rrd we just 2894 * pushed. Second, if we're interrupted; in that case, either it's the 2895 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd 2896 * has been handed off to the writer thread who will free it. Finally, 2897 * if receive_read_record fails or we're at the end of the stream, then 2898 * we free ra.rrd and exit. 2899 */ 2900 while (rwa.err == 0) { 2901 if (issig(JUSTLOOKING) && issig(FORREAL)) { 2902 err = SET_ERROR(EINTR); 2903 break; 2904 } 2905 2906 ASSERT3P(ra.rrd, ==, NULL); 2907 ra.rrd = ra.next_rrd; 2908 ra.next_rrd = NULL; 2909 /* Allocates and loads header into ra.next_rrd */ 2910 err = receive_read_record(&ra); 2911 2912 if (ra.rrd->header.drr_type == DRR_END || err != 0) { 2913 kmem_free(ra.rrd, sizeof (*ra.rrd)); 2914 ra.rrd = NULL; 2915 break; 2916 } 2917 2918 bqueue_enqueue(&rwa.q, ra.rrd, 2919 sizeof (struct receive_record_arg) + ra.rrd->payload_size); 2920 ra.rrd = NULL; 2921 } 2922 if (ra.next_rrd == NULL) 2923 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP); 2924 ra.next_rrd->eos_marker = B_TRUE; 2925 bqueue_enqueue(&rwa.q, ra.next_rrd, 1); 2926 2927 mutex_enter(&rwa.mutex); 2928 while (!rwa.done) { 2929 cv_wait(&rwa.cv, &rwa.mutex); 2930 } 2931 mutex_exit(&rwa.mutex); 2932 2933 cv_destroy(&rwa.cv); 2934 mutex_destroy(&rwa.mutex); 2935 bqueue_destroy(&rwa.q); 2936 if (err == 0) 2937 err = rwa.err; 2938 2939out: 2940 nvlist_free(begin_nvl); 2941 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) 2942 zfs_onexit_fd_rele(cleanup_fd); 2943 2944 if (err != 0) { 2945 /* 2946 * Clean up references. If receive is not resumable, 2947 * destroy what we created, so we don't leave it in 2948 * the inconsistent state. 2949 */ 2950 dmu_recv_cleanup_ds(drc); 2951 } 2952 2953 *voffp = ra.voff; 2954 objlist_destroy(&ra.ignore_objlist); 2955 return (err); 2956} 2957 2958static int 2959dmu_recv_end_check(void *arg, dmu_tx_t *tx) 2960{ 2961 dmu_recv_cookie_t *drc = arg; 2962 dsl_pool_t *dp = dmu_tx_pool(tx); 2963 int error; 2964 2965 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); 2966 2967 if (!drc->drc_newfs) { 2968 dsl_dataset_t *origin_head; 2969 2970 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); 2971 if (error != 0) 2972 return (error); 2973 if (drc->drc_force) { 2974 /* 2975 * We will destroy any snapshots in tofs (i.e. before 2976 * origin_head) that are after the origin (which is 2977 * the snap before drc_ds, because drc_ds can not 2978 * have any snaps of its own). 2979 */ 2980 uint64_t obj; 2981 2982 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 2983 while (obj != 2984 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 2985 dsl_dataset_t *snap; 2986 error = dsl_dataset_hold_obj(dp, obj, FTAG, 2987 &snap); 2988 if (error != 0) 2989 break; 2990 if (snap->ds_dir != origin_head->ds_dir) 2991 error = SET_ERROR(EINVAL); 2992 if (error == 0) { 2993 error = dsl_destroy_snapshot_check_impl( 2994 snap, B_FALSE); 2995 } 2996 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 2997 dsl_dataset_rele(snap, FTAG); 2998 if (error != 0) 2999 break; 3000 } 3001 if (error != 0) { 3002 dsl_dataset_rele(origin_head, FTAG); 3003 return (error); 3004 } 3005 } 3006 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds, 3007 origin_head, drc->drc_force, drc->drc_owner, tx); 3008 if (error != 0) { 3009 dsl_dataset_rele(origin_head, FTAG); 3010 return (error); 3011 } 3012 error = dsl_dataset_snapshot_check_impl(origin_head, 3013 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3014 dsl_dataset_rele(origin_head, FTAG); 3015 if (error != 0) 3016 return (error); 3017 3018 error = dsl_destroy_head_check_impl(drc->drc_ds, 1); 3019 } else { 3020 error = dsl_dataset_snapshot_check_impl(drc->drc_ds, 3021 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3022 } 3023 return (error); 3024} 3025 3026static void 3027dmu_recv_end_sync(void *arg, dmu_tx_t *tx) 3028{ 3029 dmu_recv_cookie_t *drc = arg; 3030 dsl_pool_t *dp = dmu_tx_pool(tx); 3031 3032 spa_history_log_internal_ds(drc->drc_ds, "finish receiving", 3033 tx, "snap=%s", drc->drc_tosnap); 3034 3035 if (!drc->drc_newfs) { 3036 dsl_dataset_t *origin_head; 3037 3038 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, 3039 &origin_head)); 3040 3041 if (drc->drc_force) { 3042 /* 3043 * Destroy any snapshots of drc_tofs (origin_head) 3044 * after the origin (the snap before drc_ds). 3045 */ 3046 uint64_t obj; 3047 3048 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3049 while (obj != 3050 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3051 dsl_dataset_t *snap; 3052 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, 3053 &snap)); 3054 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir); 3055 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3056 dsl_destroy_snapshot_sync_impl(snap, 3057 B_FALSE, tx); 3058 dsl_dataset_rele(snap, FTAG); 3059 } 3060 } 3061 VERIFY3P(drc->drc_ds->ds_prev, ==, 3062 origin_head->ds_prev); 3063 3064 dsl_dataset_clone_swap_sync_impl(drc->drc_ds, 3065 origin_head, tx); 3066 dsl_dataset_snapshot_sync_impl(origin_head, 3067 drc->drc_tosnap, tx); 3068 3069 /* set snapshot's creation time and guid */ 3070 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx); 3071 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time = 3072 drc->drc_drrb->drr_creation_time; 3073 dsl_dataset_phys(origin_head->ds_prev)->ds_guid = 3074 drc->drc_drrb->drr_toguid; 3075 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &= 3076 ~DS_FLAG_INCONSISTENT; 3077 3078 dmu_buf_will_dirty(origin_head->ds_dbuf, tx); 3079 dsl_dataset_phys(origin_head)->ds_flags &= 3080 ~DS_FLAG_INCONSISTENT; 3081 3082 dsl_dataset_rele(origin_head, FTAG); 3083 dsl_destroy_head_sync_impl(drc->drc_ds, tx); 3084 3085 if (drc->drc_owner != NULL) 3086 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner); 3087 } else { 3088 dsl_dataset_t *ds = drc->drc_ds; 3089 3090 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx); 3091 3092 /* set snapshot's creation time and guid */ 3093 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx); 3094 dsl_dataset_phys(ds->ds_prev)->ds_creation_time = 3095 drc->drc_drrb->drr_creation_time; 3096 dsl_dataset_phys(ds->ds_prev)->ds_guid = 3097 drc->drc_drrb->drr_toguid; 3098 dsl_dataset_phys(ds->ds_prev)->ds_flags &= 3099 ~DS_FLAG_INCONSISTENT; 3100 3101 dmu_buf_will_dirty(ds->ds_dbuf, tx); 3102 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 3103 if (dsl_dataset_has_resume_receive_state(ds)) { 3104 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3105 DS_FIELD_RESUME_FROMGUID, tx); 3106 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3107 DS_FIELD_RESUME_OBJECT, tx); 3108 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3109 DS_FIELD_RESUME_OFFSET, tx); 3110 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3111 DS_FIELD_RESUME_BYTES, tx); 3112 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3113 DS_FIELD_RESUME_TOGUID, tx); 3114 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3115 DS_FIELD_RESUME_TONAME, tx); 3116 } 3117 } 3118 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; 3119 /* 3120 * Release the hold from dmu_recv_begin. This must be done before 3121 * we return to open context, so that when we free the dataset's dnode, 3122 * we can evict its bonus buffer. 3123 */ 3124 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 3125 drc->drc_ds = NULL; 3126} 3127 3128static int 3129add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj) 3130{ 3131 dsl_pool_t *dp; 3132 dsl_dataset_t *snapds; 3133 guid_map_entry_t *gmep; 3134 int err; 3135 3136 ASSERT(guid_map != NULL); 3137 3138 err = dsl_pool_hold(name, FTAG, &dp); 3139 if (err != 0) 3140 return (err); 3141 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); 3142 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds); 3143 if (err == 0) { 3144 gmep->guid = dsl_dataset_phys(snapds)->ds_guid; 3145 gmep->gme_ds = snapds; 3146 avl_add(guid_map, gmep); 3147 dsl_dataset_long_hold(snapds, gmep); 3148 } else 3149 kmem_free(gmep, sizeof (*gmep)); 3150 3151 dsl_pool_rele(dp, FTAG); 3152 return (err); 3153} 3154 3155static int dmu_recv_end_modified_blocks = 3; 3156 3157static int 3158dmu_recv_existing_end(dmu_recv_cookie_t *drc) 3159{ 3160 int error; 3161 char name[MAXNAMELEN]; 3162 3163#ifdef _KERNEL 3164 /* 3165 * We will be destroying the ds; make sure its origin is unmounted if 3166 * necessary. 3167 */ 3168 dsl_dataset_name(drc->drc_ds, name); 3169 zfs_destroy_unmount_origin(name); 3170#endif 3171 3172 error = dsl_sync_task(drc->drc_tofs, 3173 dmu_recv_end_check, dmu_recv_end_sync, drc, 3174 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3175 3176 if (error != 0) 3177 dmu_recv_cleanup_ds(drc); 3178 return (error); 3179} 3180 3181static int 3182dmu_recv_new_end(dmu_recv_cookie_t *drc) 3183{ 3184 int error; 3185 3186 error = dsl_sync_task(drc->drc_tofs, 3187 dmu_recv_end_check, dmu_recv_end_sync, drc, 3188 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3189 3190 if (error != 0) { 3191 dmu_recv_cleanup_ds(drc); 3192 } else if (drc->drc_guid_to_ds_map != NULL) { 3193 (void) add_ds_to_guidmap(drc->drc_tofs, 3194 drc->drc_guid_to_ds_map, 3195 drc->drc_newsnapobj); 3196 } 3197 return (error); 3198} 3199 3200int 3201dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) 3202{ 3203 drc->drc_owner = owner; 3204 3205 if (drc->drc_newfs) 3206 return (dmu_recv_new_end(drc)); 3207 else 3208 return (dmu_recv_existing_end(drc)); 3209} 3210 3211/* 3212 * Return TRUE if this objset is currently being received into. 3213 */ 3214boolean_t 3215dmu_objset_is_receiving(objset_t *os) 3216{ 3217 return (os->os_dsl_dataset != NULL && 3218 os->os_dsl_dataset->ds_owner == dmu_recv_tag); 3219} 3220