dmu_send.c revision 297112
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved. 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 * Copyright 2016 RackTop Systems. 29 * Copyright (c) 2014 Integros [integros.com] 30 */ 31 32#include <sys/dmu.h> 33#include <sys/dmu_impl.h> 34#include <sys/dmu_tx.h> 35#include <sys/dbuf.h> 36#include <sys/dnode.h> 37#include <sys/zfs_context.h> 38#include <sys/dmu_objset.h> 39#include <sys/dmu_traverse.h> 40#include <sys/dsl_dataset.h> 41#include <sys/dsl_dir.h> 42#include <sys/dsl_prop.h> 43#include <sys/dsl_pool.h> 44#include <sys/dsl_synctask.h> 45#include <sys/zfs_ioctl.h> 46#include <sys/zap.h> 47#include <sys/zio_checksum.h> 48#include <sys/zfs_znode.h> 49#include <zfs_fletcher.h> 50#include <sys/avl.h> 51#include <sys/ddt.h> 52#include <sys/zfs_onexit.h> 53#include <sys/dmu_send.h> 54#include <sys/dsl_destroy.h> 55#include <sys/blkptr.h> 56#include <sys/dsl_bookmark.h> 57#include <sys/zfeature.h> 58#include <sys/bqueue.h> 59 60#ifdef __FreeBSD__ 61#undef dump_write 62#define dump_write dmu_dump_write 63#endif 64 65/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 66int zfs_send_corrupt_data = B_FALSE; 67int zfs_send_queue_length = 16 * 1024 * 1024; 68int zfs_recv_queue_length = 16 * 1024 * 1024; 69/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */ 70int zfs_send_set_freerecords_bit = B_TRUE; 71 72#ifdef _KERNEL 73TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit); 74#endif 75 76static char *dmu_recv_tag = "dmu_recv_tag"; 77const char *recv_clone_name = "%recv"; 78 79#define BP_SPAN(datablkszsec, indblkshift, level) \ 80 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \ 81 (level) * (indblkshift - SPA_BLKPTRSHIFT))) 82 83static void byteswap_record(dmu_replay_record_t *drr); 84 85struct send_thread_arg { 86 bqueue_t q; 87 dsl_dataset_t *ds; /* Dataset to traverse */ 88 uint64_t fromtxg; /* Traverse from this txg */ 89 int flags; /* flags to pass to traverse_dataset */ 90 int error_code; 91 boolean_t cancel; 92 zbookmark_phys_t resume; 93}; 94 95struct send_block_record { 96 boolean_t eos_marker; /* Marks the end of the stream */ 97 blkptr_t bp; 98 zbookmark_phys_t zb; 99 uint8_t indblkshift; 100 uint16_t datablkszsec; 101 bqueue_node_t ln; 102}; 103 104static int 105dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) 106{ 107 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os); 108 struct uio auio; 109 struct iovec aiov; 110 ASSERT0(len % 8); 111 112 aiov.iov_base = buf; 113 aiov.iov_len = len; 114 auio.uio_iov = &aiov; 115 auio.uio_iovcnt = 1; 116 auio.uio_resid = len; 117 auio.uio_segflg = UIO_SYSSPACE; 118 auio.uio_rw = UIO_WRITE; 119 auio.uio_offset = (off_t)-1; 120 auio.uio_td = dsp->dsa_td; 121#ifdef _KERNEL 122 if (dsp->dsa_fp->f_type == DTYPE_VNODE) 123 bwillwrite(); 124 dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0, 125 dsp->dsa_td); 126#else 127 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 128 dsp->dsa_err = EOPNOTSUPP; 129#endif 130 mutex_enter(&ds->ds_sendstream_lock); 131 *dsp->dsa_off += len; 132 mutex_exit(&ds->ds_sendstream_lock); 133 134 return (dsp->dsa_err); 135} 136 137/* 138 * For all record types except BEGIN, fill in the checksum (overlaid in 139 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 140 * up to the start of the checksum itself. 141 */ 142static int 143dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len) 144{ 145 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 146 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 147 fletcher_4_incremental_native(dsp->dsa_drr, 148 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 149 &dsp->dsa_zc); 150 if (dsp->dsa_drr->drr_type != DRR_BEGIN) { 151 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u. 152 drr_checksum.drr_checksum)); 153 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc; 154 } 155 fletcher_4_incremental_native(&dsp->dsa_drr-> 156 drr_u.drr_checksum.drr_checksum, 157 sizeof (zio_cksum_t), &dsp->dsa_zc); 158 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0) 159 return (SET_ERROR(EINTR)); 160 if (payload_len != 0) { 161 fletcher_4_incremental_native(payload, payload_len, 162 &dsp->dsa_zc); 163 if (dump_bytes(dsp, payload, payload_len) != 0) 164 return (SET_ERROR(EINTR)); 165 } 166 return (0); 167} 168 169/* 170 * Fill in the drr_free struct, or perform aggregation if the previous record is 171 * also a free record, and the two are adjacent. 172 * 173 * Note that we send free records even for a full send, because we want to be 174 * able to receive a full send as a clone, which requires a list of all the free 175 * and freeobject records that were generated on the source. 176 */ 177static int 178dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 179 uint64_t length) 180{ 181 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free); 182 183 /* 184 * When we receive a free record, dbuf_free_range() assumes 185 * that the receiving system doesn't have any dbufs in the range 186 * being freed. This is always true because there is a one-record 187 * constraint: we only send one WRITE record for any given 188 * object,offset. We know that the one-record constraint is 189 * true because we always send data in increasing order by 190 * object,offset. 191 * 192 * If the increasing-order constraint ever changes, we should find 193 * another way to assert that the one-record constraint is still 194 * satisfied. 195 */ 196 ASSERT(object > dsp->dsa_last_data_object || 197 (object == dsp->dsa_last_data_object && 198 offset > dsp->dsa_last_data_offset)); 199 200 if (length != -1ULL && offset + length < offset) 201 length = -1ULL; 202 203 /* 204 * If there is a pending op, but it's not PENDING_FREE, push it out, 205 * since free block aggregation can only be done for blocks of the 206 * same type (i.e., DRR_FREE records can only be aggregated with 207 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 208 * aggregated with other DRR_FREEOBJECTS records. 209 */ 210 if (dsp->dsa_pending_op != PENDING_NONE && 211 dsp->dsa_pending_op != PENDING_FREE) { 212 if (dump_record(dsp, NULL, 0) != 0) 213 return (SET_ERROR(EINTR)); 214 dsp->dsa_pending_op = PENDING_NONE; 215 } 216 217 if (dsp->dsa_pending_op == PENDING_FREE) { 218 /* 219 * There should never be a PENDING_FREE if length is -1 220 * (because dump_dnode is the only place where this 221 * function is called with a -1, and only after flushing 222 * any pending record). 223 */ 224 ASSERT(length != -1ULL); 225 /* 226 * Check to see whether this free block can be aggregated 227 * with pending one. 228 */ 229 if (drrf->drr_object == object && drrf->drr_offset + 230 drrf->drr_length == offset) { 231 drrf->drr_length += length; 232 return (0); 233 } else { 234 /* not a continuation. Push out pending record */ 235 if (dump_record(dsp, NULL, 0) != 0) 236 return (SET_ERROR(EINTR)); 237 dsp->dsa_pending_op = PENDING_NONE; 238 } 239 } 240 /* create a FREE record and make it pending */ 241 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 242 dsp->dsa_drr->drr_type = DRR_FREE; 243 drrf->drr_object = object; 244 drrf->drr_offset = offset; 245 drrf->drr_length = length; 246 drrf->drr_toguid = dsp->dsa_toguid; 247 if (length == -1ULL) { 248 if (dump_record(dsp, NULL, 0) != 0) 249 return (SET_ERROR(EINTR)); 250 } else { 251 dsp->dsa_pending_op = PENDING_FREE; 252 } 253 254 return (0); 255} 256 257static int 258dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, 259 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data) 260{ 261 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write); 262 263 /* 264 * We send data in increasing object, offset order. 265 * See comment in dump_free() for details. 266 */ 267 ASSERT(object > dsp->dsa_last_data_object || 268 (object == dsp->dsa_last_data_object && 269 offset > dsp->dsa_last_data_offset)); 270 dsp->dsa_last_data_object = object; 271 dsp->dsa_last_data_offset = offset + blksz - 1; 272 273 /* 274 * If there is any kind of pending aggregation (currently either 275 * a grouping of free objects or free blocks), push it out to 276 * the stream, since aggregation can't be done across operations 277 * of different types. 278 */ 279 if (dsp->dsa_pending_op != PENDING_NONE) { 280 if (dump_record(dsp, NULL, 0) != 0) 281 return (SET_ERROR(EINTR)); 282 dsp->dsa_pending_op = PENDING_NONE; 283 } 284 /* write a WRITE record */ 285 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 286 dsp->dsa_drr->drr_type = DRR_WRITE; 287 drrw->drr_object = object; 288 drrw->drr_type = type; 289 drrw->drr_offset = offset; 290 drrw->drr_length = blksz; 291 drrw->drr_toguid = dsp->dsa_toguid; 292 if (bp == NULL || BP_IS_EMBEDDED(bp)) { 293 /* 294 * There's no pre-computed checksum for partial-block 295 * writes or embedded BP's, so (like 296 * fletcher4-checkummed blocks) userland will have to 297 * compute a dedup-capable checksum itself. 298 */ 299 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 300 } else { 301 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 302 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 303 ZCHECKSUM_FLAG_DEDUP) 304 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP; 305 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 306 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 307 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 308 drrw->drr_key.ddk_cksum = bp->blk_cksum; 309 } 310 311 if (dump_record(dsp, data, blksz) != 0) 312 return (SET_ERROR(EINTR)); 313 return (0); 314} 315 316static int 317dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 318 int blksz, const blkptr_t *bp) 319{ 320 char buf[BPE_PAYLOAD_SIZE]; 321 struct drr_write_embedded *drrw = 322 &(dsp->dsa_drr->drr_u.drr_write_embedded); 323 324 if (dsp->dsa_pending_op != PENDING_NONE) { 325 if (dump_record(dsp, NULL, 0) != 0) 326 return (EINTR); 327 dsp->dsa_pending_op = PENDING_NONE; 328 } 329 330 ASSERT(BP_IS_EMBEDDED(bp)); 331 332 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 333 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED; 334 drrw->drr_object = object; 335 drrw->drr_offset = offset; 336 drrw->drr_length = blksz; 337 drrw->drr_toguid = dsp->dsa_toguid; 338 drrw->drr_compression = BP_GET_COMPRESS(bp); 339 drrw->drr_etype = BPE_GET_ETYPE(bp); 340 drrw->drr_lsize = BPE_GET_LSIZE(bp); 341 drrw->drr_psize = BPE_GET_PSIZE(bp); 342 343 decode_embedded_bp_compressed(bp, buf); 344 345 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0) 346 return (EINTR); 347 return (0); 348} 349 350static int 351dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data) 352{ 353 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill); 354 355 if (dsp->dsa_pending_op != PENDING_NONE) { 356 if (dump_record(dsp, NULL, 0) != 0) 357 return (SET_ERROR(EINTR)); 358 dsp->dsa_pending_op = PENDING_NONE; 359 } 360 361 /* write a SPILL record */ 362 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 363 dsp->dsa_drr->drr_type = DRR_SPILL; 364 drrs->drr_object = object; 365 drrs->drr_length = blksz; 366 drrs->drr_toguid = dsp->dsa_toguid; 367 368 if (dump_record(dsp, data, blksz) != 0) 369 return (SET_ERROR(EINTR)); 370 return (0); 371} 372 373static int 374dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs) 375{ 376 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects); 377 378 /* 379 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 380 * push it out, since free block aggregation can only be done for 381 * blocks of the same type (i.e., DRR_FREE records can only be 382 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 383 * can only be aggregated with other DRR_FREEOBJECTS records. 384 */ 385 if (dsp->dsa_pending_op != PENDING_NONE && 386 dsp->dsa_pending_op != PENDING_FREEOBJECTS) { 387 if (dump_record(dsp, NULL, 0) != 0) 388 return (SET_ERROR(EINTR)); 389 dsp->dsa_pending_op = PENDING_NONE; 390 } 391 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) { 392 /* 393 * See whether this free object array can be aggregated 394 * with pending one 395 */ 396 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 397 drrfo->drr_numobjs += numobjs; 398 return (0); 399 } else { 400 /* can't be aggregated. Push out pending record */ 401 if (dump_record(dsp, NULL, 0) != 0) 402 return (SET_ERROR(EINTR)); 403 dsp->dsa_pending_op = PENDING_NONE; 404 } 405 } 406 407 /* write a FREEOBJECTS record */ 408 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 409 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS; 410 drrfo->drr_firstobj = firstobj; 411 drrfo->drr_numobjs = numobjs; 412 drrfo->drr_toguid = dsp->dsa_toguid; 413 414 dsp->dsa_pending_op = PENDING_FREEOBJECTS; 415 416 return (0); 417} 418 419static int 420dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp) 421{ 422 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object); 423 424 if (object < dsp->dsa_resume_object) { 425 /* 426 * Note: when resuming, we will visit all the dnodes in 427 * the block of dnodes that we are resuming from. In 428 * this case it's unnecessary to send the dnodes prior to 429 * the one we are resuming from. We should be at most one 430 * block's worth of dnodes behind the resume point. 431 */ 432 ASSERT3U(dsp->dsa_resume_object - object, <, 433 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 434 return (0); 435 } 436 437 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 438 return (dump_freeobjects(dsp, object, 1)); 439 440 if (dsp->dsa_pending_op != PENDING_NONE) { 441 if (dump_record(dsp, NULL, 0) != 0) 442 return (SET_ERROR(EINTR)); 443 dsp->dsa_pending_op = PENDING_NONE; 444 } 445 446 /* write an OBJECT record */ 447 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 448 dsp->dsa_drr->drr_type = DRR_OBJECT; 449 drro->drr_object = object; 450 drro->drr_type = dnp->dn_type; 451 drro->drr_bonustype = dnp->dn_bonustype; 452 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 453 drro->drr_bonuslen = dnp->dn_bonuslen; 454 drro->drr_checksumtype = dnp->dn_checksum; 455 drro->drr_compress = dnp->dn_compress; 456 drro->drr_toguid = dsp->dsa_toguid; 457 458 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 459 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 460 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 461 462 if (dump_record(dsp, DN_BONUS(dnp), 463 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) { 464 return (SET_ERROR(EINTR)); 465 } 466 467 /* Free anything past the end of the file. */ 468 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) * 469 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0) 470 return (SET_ERROR(EINTR)); 471 if (dsp->dsa_err != 0) 472 return (SET_ERROR(EINTR)); 473 return (0); 474} 475 476static boolean_t 477backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp) 478{ 479 if (!BP_IS_EMBEDDED(bp)) 480 return (B_FALSE); 481 482 /* 483 * Compression function must be legacy, or explicitly enabled. 484 */ 485 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 486 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4))) 487 return (B_FALSE); 488 489 /* 490 * Embed type must be explicitly enabled. 491 */ 492 switch (BPE_GET_ETYPE(bp)) { 493 case BP_EMBEDDED_TYPE_DATA: 494 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 495 return (B_TRUE); 496 break; 497 default: 498 return (B_FALSE); 499 } 500 return (B_FALSE); 501} 502 503/* 504 * This is the callback function to traverse_dataset that acts as the worker 505 * thread for dmu_send_impl. 506 */ 507/*ARGSUSED*/ 508static int 509send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 510 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 511{ 512 struct send_thread_arg *sta = arg; 513 struct send_block_record *record; 514 uint64_t record_size; 515 int err = 0; 516 517 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 518 zb->zb_object >= sta->resume.zb_object); 519 520 if (sta->cancel) 521 return (SET_ERROR(EINTR)); 522 523 if (bp == NULL) { 524 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL); 525 return (0); 526 } else if (zb->zb_level < 0) { 527 return (0); 528 } 529 530 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP); 531 record->eos_marker = B_FALSE; 532 record->bp = *bp; 533 record->zb = *zb; 534 record->indblkshift = dnp->dn_indblkshift; 535 record->datablkszsec = dnp->dn_datablkszsec; 536 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 537 bqueue_enqueue(&sta->q, record, record_size); 538 539 return (err); 540} 541 542/* 543 * This function kicks off the traverse_dataset. It also handles setting the 544 * error code of the thread in case something goes wrong, and pushes the End of 545 * Stream record when the traverse_dataset call has finished. If there is no 546 * dataset to traverse, the thread immediately pushes End of Stream marker. 547 */ 548static void 549send_traverse_thread(void *arg) 550{ 551 struct send_thread_arg *st_arg = arg; 552 int err; 553 struct send_block_record *data; 554 555 if (st_arg->ds != NULL) { 556 err = traverse_dataset_resume(st_arg->ds, 557 st_arg->fromtxg, &st_arg->resume, 558 st_arg->flags, send_cb, st_arg); 559 560 if (err != EINTR) 561 st_arg->error_code = err; 562 } 563 data = kmem_zalloc(sizeof (*data), KM_SLEEP); 564 data->eos_marker = B_TRUE; 565 bqueue_enqueue(&st_arg->q, data, 1); 566 thread_exit(); 567} 568 569/* 570 * This function actually handles figuring out what kind of record needs to be 571 * dumped, reading the data (which has hopefully been prefetched), and calling 572 * the appropriate helper function. 573 */ 574static int 575do_dump(dmu_sendarg_t *dsa, struct send_block_record *data) 576{ 577 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os); 578 const blkptr_t *bp = &data->bp; 579 const zbookmark_phys_t *zb = &data->zb; 580 uint8_t indblkshift = data->indblkshift; 581 uint16_t dblkszsec = data->datablkszsec; 582 spa_t *spa = ds->ds_dir->dd_pool->dp_spa; 583 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE; 584 int err = 0; 585 586 ASSERT3U(zb->zb_level, >=, 0); 587 588 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 589 zb->zb_object >= dsa->dsa_resume_object); 590 591 if (zb->zb_object != DMU_META_DNODE_OBJECT && 592 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) { 593 return (0); 594 } else if (BP_IS_HOLE(bp) && 595 zb->zb_object == DMU_META_DNODE_OBJECT) { 596 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 597 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT; 598 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT); 599 } else if (BP_IS_HOLE(bp)) { 600 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 601 uint64_t offset = zb->zb_blkid * span; 602 err = dump_free(dsa, zb->zb_object, offset, span); 603 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) { 604 return (0); 605 } else if (type == DMU_OT_DNODE) { 606 int blksz = BP_GET_LSIZE(bp); 607 arc_flags_t aflags = ARC_FLAG_WAIT; 608 arc_buf_t *abuf; 609 610 ASSERT0(zb->zb_level); 611 612 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 613 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 614 &aflags, zb) != 0) 615 return (SET_ERROR(EIO)); 616 617 dnode_phys_t *blk = abuf->b_data; 618 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT); 619 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) { 620 err = dump_dnode(dsa, dnobj + i, blk + i); 621 if (err != 0) 622 break; 623 } 624 (void) arc_buf_remove_ref(abuf, &abuf); 625 } else if (type == DMU_OT_SA) { 626 arc_flags_t aflags = ARC_FLAG_WAIT; 627 arc_buf_t *abuf; 628 int blksz = BP_GET_LSIZE(bp); 629 630 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 631 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 632 &aflags, zb) != 0) 633 return (SET_ERROR(EIO)); 634 635 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data); 636 (void) arc_buf_remove_ref(abuf, &abuf); 637 } else if (backup_do_embed(dsa, bp)) { 638 /* it's an embedded level-0 block of a regular object */ 639 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 640 ASSERT0(zb->zb_level); 641 err = dump_write_embedded(dsa, zb->zb_object, 642 zb->zb_blkid * blksz, blksz, bp); 643 } else { 644 /* it's a level-0 block of a regular object */ 645 arc_flags_t aflags = ARC_FLAG_WAIT; 646 arc_buf_t *abuf; 647 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 648 uint64_t offset; 649 650 ASSERT0(zb->zb_level); 651 ASSERT(zb->zb_object > dsa->dsa_resume_object || 652 (zb->zb_object == dsa->dsa_resume_object && 653 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 654 655 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 656 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 657 &aflags, zb) != 0) { 658 if (zfs_send_corrupt_data) { 659 /* Send a block filled with 0x"zfs badd bloc" */ 660 abuf = arc_buf_alloc(spa, blksz, &abuf, 661 ARC_BUFC_DATA); 662 uint64_t *ptr; 663 for (ptr = abuf->b_data; 664 (char *)ptr < (char *)abuf->b_data + blksz; 665 ptr++) 666 *ptr = 0x2f5baddb10cULL; 667 } else { 668 return (SET_ERROR(EIO)); 669 } 670 } 671 672 offset = zb->zb_blkid * blksz; 673 674 if (!(dsa->dsa_featureflags & 675 DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 676 blksz > SPA_OLD_MAXBLOCKSIZE) { 677 char *buf = abuf->b_data; 678 while (blksz > 0 && err == 0) { 679 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE); 680 err = dump_write(dsa, type, zb->zb_object, 681 offset, n, NULL, buf); 682 offset += n; 683 buf += n; 684 blksz -= n; 685 } 686 } else { 687 err = dump_write(dsa, type, zb->zb_object, 688 offset, blksz, bp, abuf->b_data); 689 } 690 (void) arc_buf_remove_ref(abuf, &abuf); 691 } 692 693 ASSERT(err == 0 || err == EINTR); 694 return (err); 695} 696 697/* 698 * Pop the new data off the queue, and free the old data. 699 */ 700static struct send_block_record * 701get_next_record(bqueue_t *bq, struct send_block_record *data) 702{ 703 struct send_block_record *tmp = bqueue_dequeue(bq); 704 kmem_free(data, sizeof (*data)); 705 return (tmp); 706} 707 708/* 709 * Actually do the bulk of the work in a zfs send. 710 * 711 * Note: Releases dp using the specified tag. 712 */ 713static int 714dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds, 715 zfs_bookmark_phys_t *ancestor_zb, 716 boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd, 717 uint64_t resumeobj, uint64_t resumeoff, 718#ifdef illumos 719 vnode_t *vp, offset_t *off) 720#else 721 struct file *fp, offset_t *off) 722#endif 723{ 724 objset_t *os; 725 dmu_replay_record_t *drr; 726 dmu_sendarg_t *dsp; 727 int err; 728 uint64_t fromtxg = 0; 729 uint64_t featureflags = 0; 730 struct send_thread_arg to_arg = { 0 }; 731 732 err = dmu_objset_from_ds(to_ds, &os); 733 if (err != 0) { 734 dsl_pool_rele(dp, tag); 735 return (err); 736 } 737 738 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); 739 drr->drr_type = DRR_BEGIN; 740 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 741 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, 742 DMU_SUBSTREAM); 743 744#ifdef _KERNEL 745 if (dmu_objset_type(os) == DMU_OST_ZFS) { 746 uint64_t version; 747 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) { 748 kmem_free(drr, sizeof (dmu_replay_record_t)); 749 dsl_pool_rele(dp, tag); 750 return (SET_ERROR(EINVAL)); 751 } 752 if (version >= ZPL_VERSION_SA) { 753 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 754 } 755 } 756#endif 757 758 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS]) 759 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 760 if (embedok && 761 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 762 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 763 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 764 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4; 765 } 766 767 if (resumeobj != 0 || resumeoff != 0) { 768 featureflags |= DMU_BACKUP_FEATURE_RESUMING; 769 } 770 771 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo, 772 featureflags); 773 774 drr->drr_u.drr_begin.drr_creation_time = 775 dsl_dataset_phys(to_ds)->ds_creation_time; 776 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os); 777 if (is_clone) 778 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE; 779 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 780 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET) 781 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA; 782 if (zfs_send_set_freerecords_bit) 783 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS; 784 785 if (ancestor_zb != NULL) { 786 drr->drr_u.drr_begin.drr_fromguid = 787 ancestor_zb->zbm_guid; 788 fromtxg = ancestor_zb->zbm_creation_txg; 789 } 790 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname); 791 if (!to_ds->ds_is_snapshot) { 792 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--", 793 sizeof (drr->drr_u.drr_begin.drr_toname)); 794 } 795 796 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP); 797 798 dsp->dsa_drr = drr; 799 dsp->dsa_outfd = outfd; 800 dsp->dsa_proc = curproc; 801 dsp->dsa_td = curthread; 802 dsp->dsa_fp = fp; 803 dsp->dsa_os = os; 804 dsp->dsa_off = off; 805 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid; 806 dsp->dsa_pending_op = PENDING_NONE; 807 dsp->dsa_featureflags = featureflags; 808 dsp->dsa_resume_object = resumeobj; 809 dsp->dsa_resume_offset = resumeoff; 810 811 mutex_enter(&to_ds->ds_sendstream_lock); 812 list_insert_head(&to_ds->ds_sendstreams, dsp); 813 mutex_exit(&to_ds->ds_sendstream_lock); 814 815 dsl_dataset_long_hold(to_ds, FTAG); 816 dsl_pool_rele(dp, tag); 817 818 void *payload = NULL; 819 size_t payload_len = 0; 820 if (resumeobj != 0 || resumeoff != 0) { 821 dmu_object_info_t to_doi; 822 err = dmu_object_info(os, resumeobj, &to_doi); 823 if (err != 0) 824 goto out; 825 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0, 826 resumeoff / to_doi.doi_data_block_size); 827 828 nvlist_t *nvl = fnvlist_alloc(); 829 fnvlist_add_uint64(nvl, "resume_object", resumeobj); 830 fnvlist_add_uint64(nvl, "resume_offset", resumeoff); 831 payload = fnvlist_pack(nvl, &payload_len); 832 drr->drr_payloadlen = payload_len; 833 fnvlist_free(nvl); 834 } 835 836 err = dump_record(dsp, payload, payload_len); 837 fnvlist_pack_free(payload, payload_len); 838 if (err != 0) { 839 err = dsp->dsa_err; 840 goto out; 841 } 842 843 err = bqueue_init(&to_arg.q, zfs_send_queue_length, 844 offsetof(struct send_block_record, ln)); 845 to_arg.error_code = 0; 846 to_arg.cancel = B_FALSE; 847 to_arg.ds = to_ds; 848 to_arg.fromtxg = fromtxg; 849 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH; 850 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0, 851 TS_RUN, minclsyspri); 852 853 struct send_block_record *to_data; 854 to_data = bqueue_dequeue(&to_arg.q); 855 856 while (!to_data->eos_marker && err == 0) { 857 err = do_dump(dsp, to_data); 858 to_data = get_next_record(&to_arg.q, to_data); 859 if (issig(JUSTLOOKING) && issig(FORREAL)) 860 err = EINTR; 861 } 862 863 if (err != 0) { 864 to_arg.cancel = B_TRUE; 865 while (!to_data->eos_marker) { 866 to_data = get_next_record(&to_arg.q, to_data); 867 } 868 } 869 kmem_free(to_data, sizeof (*to_data)); 870 871 bqueue_destroy(&to_arg.q); 872 873 if (err == 0 && to_arg.error_code != 0) 874 err = to_arg.error_code; 875 876 if (err != 0) 877 goto out; 878 879 if (dsp->dsa_pending_op != PENDING_NONE) 880 if (dump_record(dsp, NULL, 0) != 0) 881 err = SET_ERROR(EINTR); 882 883 if (err != 0) { 884 if (err == EINTR && dsp->dsa_err != 0) 885 err = dsp->dsa_err; 886 goto out; 887 } 888 889 bzero(drr, sizeof (dmu_replay_record_t)); 890 drr->drr_type = DRR_END; 891 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc; 892 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid; 893 894 if (dump_record(dsp, NULL, 0) != 0) 895 err = dsp->dsa_err; 896 897out: 898 mutex_enter(&to_ds->ds_sendstream_lock); 899 list_remove(&to_ds->ds_sendstreams, dsp); 900 mutex_exit(&to_ds->ds_sendstream_lock); 901 902 kmem_free(drr, sizeof (dmu_replay_record_t)); 903 kmem_free(dsp, sizeof (dmu_sendarg_t)); 904 905 dsl_dataset_long_rele(to_ds, FTAG); 906 907 return (err); 908} 909 910int 911dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 912 boolean_t embedok, boolean_t large_block_ok, 913#ifdef illumos 914 int outfd, vnode_t *vp, offset_t *off) 915#else 916 int outfd, struct file *fp, offset_t *off) 917#endif 918{ 919 dsl_pool_t *dp; 920 dsl_dataset_t *ds; 921 dsl_dataset_t *fromds = NULL; 922 int err; 923 924 err = dsl_pool_hold(pool, FTAG, &dp); 925 if (err != 0) 926 return (err); 927 928 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds); 929 if (err != 0) { 930 dsl_pool_rele(dp, FTAG); 931 return (err); 932 } 933 934 if (fromsnap != 0) { 935 zfs_bookmark_phys_t zb; 936 boolean_t is_clone; 937 938 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds); 939 if (err != 0) { 940 dsl_dataset_rele(ds, FTAG); 941 dsl_pool_rele(dp, FTAG); 942 return (err); 943 } 944 if (!dsl_dataset_is_before(ds, fromds, 0)) 945 err = SET_ERROR(EXDEV); 946 zb.zbm_creation_time = 947 dsl_dataset_phys(fromds)->ds_creation_time; 948 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg; 949 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 950 is_clone = (fromds->ds_dir != ds->ds_dir); 951 dsl_dataset_rele(fromds, FTAG); 952 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 953 embedok, large_block_ok, outfd, 0, 0, fp, off); 954 } else { 955 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 956 embedok, large_block_ok, outfd, 0, 0, fp, off); 957 } 958 dsl_dataset_rele(ds, FTAG); 959 return (err); 960} 961 962int 963dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 964 boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff, 965#ifdef illumos 966 vnode_t *vp, offset_t *off) 967#else 968 struct file *fp, offset_t *off) 969#endif 970{ 971 dsl_pool_t *dp; 972 dsl_dataset_t *ds; 973 int err; 974 boolean_t owned = B_FALSE; 975 976 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 977 return (SET_ERROR(EINVAL)); 978 979 err = dsl_pool_hold(tosnap, FTAG, &dp); 980 if (err != 0) 981 return (err); 982 983 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) { 984 /* 985 * We are sending a filesystem or volume. Ensure 986 * that it doesn't change by owning the dataset. 987 */ 988 err = dsl_dataset_own(dp, tosnap, FTAG, &ds); 989 owned = B_TRUE; 990 } else { 991 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds); 992 } 993 if (err != 0) { 994 dsl_pool_rele(dp, FTAG); 995 return (err); 996 } 997 998 if (fromsnap != NULL) { 999 zfs_bookmark_phys_t zb; 1000 boolean_t is_clone = B_FALSE; 1001 int fsnamelen = strchr(tosnap, '@') - tosnap; 1002 1003 /* 1004 * If the fromsnap is in a different filesystem, then 1005 * mark the send stream as a clone. 1006 */ 1007 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 1008 (fromsnap[fsnamelen] != '@' && 1009 fromsnap[fsnamelen] != '#')) { 1010 is_clone = B_TRUE; 1011 } 1012 1013 if (strchr(fromsnap, '@')) { 1014 dsl_dataset_t *fromds; 1015 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds); 1016 if (err == 0) { 1017 if (!dsl_dataset_is_before(ds, fromds, 0)) 1018 err = SET_ERROR(EXDEV); 1019 zb.zbm_creation_time = 1020 dsl_dataset_phys(fromds)->ds_creation_time; 1021 zb.zbm_creation_txg = 1022 dsl_dataset_phys(fromds)->ds_creation_txg; 1023 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1024 is_clone = (ds->ds_dir != fromds->ds_dir); 1025 dsl_dataset_rele(fromds, FTAG); 1026 } 1027 } else { 1028 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb); 1029 } 1030 if (err != 0) { 1031 dsl_dataset_rele(ds, FTAG); 1032 dsl_pool_rele(dp, FTAG); 1033 return (err); 1034 } 1035 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1036 embedok, large_block_ok, 1037 outfd, resumeobj, resumeoff, fp, off); 1038 } else { 1039 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1040 embedok, large_block_ok, 1041 outfd, resumeobj, resumeoff, fp, off); 1042 } 1043 if (owned) 1044 dsl_dataset_disown(ds, FTAG); 1045 else 1046 dsl_dataset_rele(ds, FTAG); 1047 return (err); 1048} 1049 1050static int 1051dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size, 1052 uint64_t *sizep) 1053{ 1054 int err; 1055 /* 1056 * Assume that space (both on-disk and in-stream) is dominated by 1057 * data. We will adjust for indirect blocks and the copies property, 1058 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 1059 */ 1060 1061 /* 1062 * Subtract out approximate space used by indirect blocks. 1063 * Assume most space is used by data blocks (non-indirect, non-dnode). 1064 * Assume all blocks are recordsize. Assume ditto blocks and 1065 * internal fragmentation counter out compression. 1066 * 1067 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 1068 * block, which we observe in practice. 1069 */ 1070 uint64_t recordsize; 1071 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize); 1072 if (err != 0) 1073 return (err); 1074 size -= size / recordsize * sizeof (blkptr_t); 1075 1076 /* Add in the space for the record associated with each block. */ 1077 size += size / recordsize * sizeof (dmu_replay_record_t); 1078 1079 *sizep = size; 1080 1081 return (0); 1082} 1083 1084int 1085dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep) 1086{ 1087 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1088 int err; 1089 uint64_t size; 1090 1091 ASSERT(dsl_pool_config_held(dp)); 1092 1093 /* tosnap must be a snapshot */ 1094 if (!ds->ds_is_snapshot) 1095 return (SET_ERROR(EINVAL)); 1096 1097 /* fromsnap, if provided, must be a snapshot */ 1098 if (fromds != NULL && !fromds->ds_is_snapshot) 1099 return (SET_ERROR(EINVAL)); 1100 1101 /* 1102 * fromsnap must be an earlier snapshot from the same fs as tosnap, 1103 * or the origin's fs. 1104 */ 1105 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0)) 1106 return (SET_ERROR(EXDEV)); 1107 1108 /* Get uncompressed size estimate of changed data. */ 1109 if (fromds == NULL) { 1110 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 1111 } else { 1112 uint64_t used, comp; 1113 err = dsl_dataset_space_written(fromds, ds, 1114 &used, &comp, &size); 1115 if (err != 0) 1116 return (err); 1117 } 1118 1119 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1120 return (err); 1121} 1122 1123/* 1124 * Simple callback used to traverse the blocks of a snapshot and sum their 1125 * uncompressed size 1126 */ 1127/* ARGSUSED */ 1128static int 1129dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1130 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg) 1131{ 1132 uint64_t *spaceptr = arg; 1133 if (bp != NULL && !BP_IS_HOLE(bp)) { 1134 *spaceptr += BP_GET_UCSIZE(bp); 1135 } 1136 return (0); 1137} 1138 1139/* 1140 * Given a desination snapshot and a TXG, calculate the approximate size of a 1141 * send stream sent from that TXG. from_txg may be zero, indicating that the 1142 * whole snapshot will be sent. 1143 */ 1144int 1145dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg, 1146 uint64_t *sizep) 1147{ 1148 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1149 int err; 1150 uint64_t size = 0; 1151 1152 ASSERT(dsl_pool_config_held(dp)); 1153 1154 /* tosnap must be a snapshot */ 1155 if (!dsl_dataset_is_snapshot(ds)) 1156 return (SET_ERROR(EINVAL)); 1157 1158 /* verify that from_txg is before the provided snapshot was taken */ 1159 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) { 1160 return (SET_ERROR(EXDEV)); 1161 } 1162 1163 /* 1164 * traverse the blocks of the snapshot with birth times after 1165 * from_txg, summing their uncompressed size 1166 */ 1167 err = traverse_dataset(ds, from_txg, TRAVERSE_POST, 1168 dmu_calculate_send_traversal, &size); 1169 if (err) 1170 return (err); 1171 1172 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1173 return (err); 1174} 1175 1176typedef struct dmu_recv_begin_arg { 1177 const char *drba_origin; 1178 dmu_recv_cookie_t *drba_cookie; 1179 cred_t *drba_cred; 1180 uint64_t drba_snapobj; 1181} dmu_recv_begin_arg_t; 1182 1183static int 1184recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, 1185 uint64_t fromguid) 1186{ 1187 uint64_t val; 1188 int error; 1189 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1190 1191 /* temporary clone name must not exist */ 1192 error = zap_lookup(dp->dp_meta_objset, 1193 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 1194 8, 1, &val); 1195 if (error != ENOENT) 1196 return (error == 0 ? EBUSY : error); 1197 1198 /* new snapshot name must not exist */ 1199 error = zap_lookup(dp->dp_meta_objset, 1200 dsl_dataset_phys(ds)->ds_snapnames_zapobj, 1201 drba->drba_cookie->drc_tosnap, 8, 1, &val); 1202 if (error != ENOENT) 1203 return (error == 0 ? EEXIST : error); 1204 1205 /* 1206 * Check snapshot limit before receiving. We'll recheck again at the 1207 * end, but might as well abort before receiving if we're already over 1208 * the limit. 1209 * 1210 * Note that we do not check the file system limit with 1211 * dsl_dir_fscount_check because the temporary %clones don't count 1212 * against that limit. 1213 */ 1214 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT, 1215 NULL, drba->drba_cred); 1216 if (error != 0) 1217 return (error); 1218 1219 if (fromguid != 0) { 1220 dsl_dataset_t *snap; 1221 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; 1222 1223 /* Find snapshot in this dir that matches fromguid. */ 1224 while (obj != 0) { 1225 error = dsl_dataset_hold_obj(dp, obj, FTAG, 1226 &snap); 1227 if (error != 0) 1228 return (SET_ERROR(ENODEV)); 1229 if (snap->ds_dir != ds->ds_dir) { 1230 dsl_dataset_rele(snap, FTAG); 1231 return (SET_ERROR(ENODEV)); 1232 } 1233 if (dsl_dataset_phys(snap)->ds_guid == fromguid) 1234 break; 1235 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 1236 dsl_dataset_rele(snap, FTAG); 1237 } 1238 if (obj == 0) 1239 return (SET_ERROR(ENODEV)); 1240 1241 if (drba->drba_cookie->drc_force) { 1242 drba->drba_snapobj = obj; 1243 } else { 1244 /* 1245 * If we are not forcing, there must be no 1246 * changes since fromsnap. 1247 */ 1248 if (dsl_dataset_modified_since_snap(ds, snap)) { 1249 dsl_dataset_rele(snap, FTAG); 1250 return (SET_ERROR(ETXTBSY)); 1251 } 1252 drba->drba_snapobj = ds->ds_prev->ds_object; 1253 } 1254 1255 dsl_dataset_rele(snap, FTAG); 1256 } else { 1257 /* if full, then must be forced */ 1258 if (!drba->drba_cookie->drc_force) 1259 return (SET_ERROR(EEXIST)); 1260 /* start from $ORIGIN@$ORIGIN, if supported */ 1261 drba->drba_snapobj = dp->dp_origin_snap != NULL ? 1262 dp->dp_origin_snap->ds_object : 0; 1263 } 1264 1265 return (0); 1266 1267} 1268 1269static int 1270dmu_recv_begin_check(void *arg, dmu_tx_t *tx) 1271{ 1272 dmu_recv_begin_arg_t *drba = arg; 1273 dsl_pool_t *dp = dmu_tx_pool(tx); 1274 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1275 uint64_t fromguid = drrb->drr_fromguid; 1276 int flags = drrb->drr_flags; 1277 int error; 1278 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1279 dsl_dataset_t *ds; 1280 const char *tofs = drba->drba_cookie->drc_tofs; 1281 1282 /* already checked */ 1283 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1284 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING)); 1285 1286 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1287 DMU_COMPOUNDSTREAM || 1288 drrb->drr_type >= DMU_OST_NUMTYPES || 1289 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) 1290 return (SET_ERROR(EINVAL)); 1291 1292 /* Verify pool version supports SA if SA_SPILL feature set */ 1293 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1294 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1295 return (SET_ERROR(ENOTSUP)); 1296 1297 if (drba->drba_cookie->drc_resumable && 1298 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) 1299 return (SET_ERROR(ENOTSUP)); 1300 1301 /* 1302 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1303 * record to a plan WRITE record, so the pool must have the 1304 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1305 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1306 */ 1307 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1308 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1309 return (SET_ERROR(ENOTSUP)); 1310 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1311 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1312 return (SET_ERROR(ENOTSUP)); 1313 1314 /* 1315 * The receiving code doesn't know how to translate large blocks 1316 * to smaller ones, so the pool must have the LARGE_BLOCKS 1317 * feature enabled if the stream has LARGE_BLOCKS. 1318 */ 1319 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 1320 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) 1321 return (SET_ERROR(ENOTSUP)); 1322 1323 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1324 if (error == 0) { 1325 /* target fs already exists; recv into temp clone */ 1326 1327 /* Can't recv a clone into an existing fs */ 1328 if (flags & DRR_FLAG_CLONE || drba->drba_origin) { 1329 dsl_dataset_rele(ds, FTAG); 1330 return (SET_ERROR(EINVAL)); 1331 } 1332 1333 error = recv_begin_check_existing_impl(drba, ds, fromguid); 1334 dsl_dataset_rele(ds, FTAG); 1335 } else if (error == ENOENT) { 1336 /* target fs does not exist; must be a full backup or clone */ 1337 char buf[MAXNAMELEN]; 1338 1339 /* 1340 * If it's a non-clone incremental, we are missing the 1341 * target fs, so fail the recv. 1342 */ 1343 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || 1344 drba->drba_origin)) 1345 return (SET_ERROR(ENOENT)); 1346 1347 /* 1348 * If we're receiving a full send as a clone, and it doesn't 1349 * contain all the necessary free records and freeobject 1350 * records, reject it. 1351 */ 1352 if (fromguid == 0 && drba->drba_origin && 1353 !(flags & DRR_FLAG_FREERECORDS)) 1354 return (SET_ERROR(EINVAL)); 1355 1356 /* Open the parent of tofs */ 1357 ASSERT3U(strlen(tofs), <, MAXNAMELEN); 1358 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1); 1359 error = dsl_dataset_hold(dp, buf, FTAG, &ds); 1360 if (error != 0) 1361 return (error); 1362 1363 /* 1364 * Check filesystem and snapshot limits before receiving. We'll 1365 * recheck snapshot limits again at the end (we create the 1366 * filesystems and increment those counts during begin_sync). 1367 */ 1368 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1369 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred); 1370 if (error != 0) { 1371 dsl_dataset_rele(ds, FTAG); 1372 return (error); 1373 } 1374 1375 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1376 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred); 1377 if (error != 0) { 1378 dsl_dataset_rele(ds, FTAG); 1379 return (error); 1380 } 1381 1382 if (drba->drba_origin != NULL) { 1383 dsl_dataset_t *origin; 1384 error = dsl_dataset_hold(dp, drba->drba_origin, 1385 FTAG, &origin); 1386 if (error != 0) { 1387 dsl_dataset_rele(ds, FTAG); 1388 return (error); 1389 } 1390 if (!origin->ds_is_snapshot) { 1391 dsl_dataset_rele(origin, FTAG); 1392 dsl_dataset_rele(ds, FTAG); 1393 return (SET_ERROR(EINVAL)); 1394 } 1395 if (dsl_dataset_phys(origin)->ds_guid != fromguid && 1396 fromguid != 0) { 1397 dsl_dataset_rele(origin, FTAG); 1398 dsl_dataset_rele(ds, FTAG); 1399 return (SET_ERROR(ENODEV)); 1400 } 1401 dsl_dataset_rele(origin, FTAG); 1402 } 1403 dsl_dataset_rele(ds, FTAG); 1404 error = 0; 1405 } 1406 return (error); 1407} 1408 1409static void 1410dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) 1411{ 1412 dmu_recv_begin_arg_t *drba = arg; 1413 dsl_pool_t *dp = dmu_tx_pool(tx); 1414 objset_t *mos = dp->dp_meta_objset; 1415 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1416 const char *tofs = drba->drba_cookie->drc_tofs; 1417 dsl_dataset_t *ds, *newds; 1418 uint64_t dsobj; 1419 int error; 1420 uint64_t crflags = 0; 1421 1422 if (drrb->drr_flags & DRR_FLAG_CI_DATA) 1423 crflags |= DS_FLAG_CI_DATASET; 1424 1425 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1426 if (error == 0) { 1427 /* create temporary clone */ 1428 dsl_dataset_t *snap = NULL; 1429 if (drba->drba_snapobj != 0) { 1430 VERIFY0(dsl_dataset_hold_obj(dp, 1431 drba->drba_snapobj, FTAG, &snap)); 1432 } 1433 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, 1434 snap, crflags, drba->drba_cred, tx); 1435 if (drba->drba_snapobj != 0) 1436 dsl_dataset_rele(snap, FTAG); 1437 dsl_dataset_rele(ds, FTAG); 1438 } else { 1439 dsl_dir_t *dd; 1440 const char *tail; 1441 dsl_dataset_t *origin = NULL; 1442 1443 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail)); 1444 1445 if (drba->drba_origin != NULL) { 1446 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin, 1447 FTAG, &origin)); 1448 } 1449 1450 /* Create new dataset. */ 1451 dsobj = dsl_dataset_create_sync(dd, 1452 strrchr(tofs, '/') + 1, 1453 origin, crflags, drba->drba_cred, tx); 1454 if (origin != NULL) 1455 dsl_dataset_rele(origin, FTAG); 1456 dsl_dir_rele(dd, FTAG); 1457 drba->drba_cookie->drc_newfs = B_TRUE; 1458 } 1459 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds)); 1460 1461 if (drba->drba_cookie->drc_resumable) { 1462 dsl_dataset_zapify(newds, tx); 1463 if (drrb->drr_fromguid != 0) { 1464 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, 1465 8, 1, &drrb->drr_fromguid, tx)); 1466 } 1467 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID, 1468 8, 1, &drrb->drr_toguid, tx)); 1469 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME, 1470 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx)); 1471 uint64_t one = 1; 1472 uint64_t zero = 0; 1473 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT, 1474 8, 1, &one, tx)); 1475 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET, 1476 8, 1, &zero, tx)); 1477 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES, 1478 8, 1, &zero, tx)); 1479 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1480 DMU_BACKUP_FEATURE_EMBED_DATA) { 1481 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK, 1482 8, 1, &one, tx)); 1483 } 1484 } 1485 1486 dmu_buf_will_dirty(newds->ds_dbuf, tx); 1487 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; 1488 1489 /* 1490 * If we actually created a non-clone, we need to create the 1491 * objset in our new dataset. 1492 */ 1493 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) { 1494 (void) dmu_objset_create_impl(dp->dp_spa, 1495 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); 1496 } 1497 1498 drba->drba_cookie->drc_ds = newds; 1499 1500 spa_history_log_internal_ds(newds, "receive", tx, ""); 1501} 1502 1503static int 1504dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) 1505{ 1506 dmu_recv_begin_arg_t *drba = arg; 1507 dsl_pool_t *dp = dmu_tx_pool(tx); 1508 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1509 int error; 1510 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1511 dsl_dataset_t *ds; 1512 const char *tofs = drba->drba_cookie->drc_tofs; 1513 1514 /* already checked */ 1515 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1516 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); 1517 1518 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1519 DMU_COMPOUNDSTREAM || 1520 drrb->drr_type >= DMU_OST_NUMTYPES) 1521 return (SET_ERROR(EINVAL)); 1522 1523 /* Verify pool version supports SA if SA_SPILL feature set */ 1524 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1525 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1526 return (SET_ERROR(ENOTSUP)); 1527 1528 /* 1529 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1530 * record to a plain WRITE record, so the pool must have the 1531 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1532 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1533 */ 1534 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1535 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1536 return (SET_ERROR(ENOTSUP)); 1537 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1538 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1539 return (SET_ERROR(ENOTSUP)); 1540 1541 char recvname[ZFS_MAXNAMELEN]; 1542 1543 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1544 tofs, recv_clone_name); 1545 1546 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1547 /* %recv does not exist; continue in tofs */ 1548 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1549 if (error != 0) 1550 return (error); 1551 } 1552 1553 /* check that ds is marked inconsistent */ 1554 if (!DS_IS_INCONSISTENT(ds)) { 1555 dsl_dataset_rele(ds, FTAG); 1556 return (SET_ERROR(EINVAL)); 1557 } 1558 1559 /* check that there is resuming data, and that the toguid matches */ 1560 if (!dsl_dataset_is_zapified(ds)) { 1561 dsl_dataset_rele(ds, FTAG); 1562 return (SET_ERROR(EINVAL)); 1563 } 1564 uint64_t val; 1565 error = zap_lookup(dp->dp_meta_objset, ds->ds_object, 1566 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val); 1567 if (error != 0 || drrb->drr_toguid != val) { 1568 dsl_dataset_rele(ds, FTAG); 1569 return (SET_ERROR(EINVAL)); 1570 } 1571 1572 /* 1573 * Check if the receive is still running. If so, it will be owned. 1574 * Note that nothing else can own the dataset (e.g. after the receive 1575 * fails) because it will be marked inconsistent. 1576 */ 1577 if (dsl_dataset_has_owner(ds)) { 1578 dsl_dataset_rele(ds, FTAG); 1579 return (SET_ERROR(EBUSY)); 1580 } 1581 1582 /* There should not be any snapshots of this fs yet. */ 1583 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) { 1584 dsl_dataset_rele(ds, FTAG); 1585 return (SET_ERROR(EINVAL)); 1586 } 1587 1588 /* 1589 * Note: resume point will be checked when we process the first WRITE 1590 * record. 1591 */ 1592 1593 /* check that the origin matches */ 1594 val = 0; 1595 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object, 1596 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val); 1597 if (drrb->drr_fromguid != val) { 1598 dsl_dataset_rele(ds, FTAG); 1599 return (SET_ERROR(EINVAL)); 1600 } 1601 1602 dsl_dataset_rele(ds, FTAG); 1603 return (0); 1604} 1605 1606static void 1607dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) 1608{ 1609 dmu_recv_begin_arg_t *drba = arg; 1610 dsl_pool_t *dp = dmu_tx_pool(tx); 1611 const char *tofs = drba->drba_cookie->drc_tofs; 1612 dsl_dataset_t *ds; 1613 uint64_t dsobj; 1614 char recvname[ZFS_MAXNAMELEN]; 1615 1616 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1617 tofs, recv_clone_name); 1618 1619 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1620 /* %recv does not exist; continue in tofs */ 1621 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds)); 1622 drba->drba_cookie->drc_newfs = B_TRUE; 1623 } 1624 1625 /* clear the inconsistent flag so that we can own it */ 1626 ASSERT(DS_IS_INCONSISTENT(ds)); 1627 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1628 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 1629 dsobj = ds->ds_object; 1630 dsl_dataset_rele(ds, FTAG); 1631 1632 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds)); 1633 1634 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1635 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; 1636 1637 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))); 1638 1639 drba->drba_cookie->drc_ds = ds; 1640 1641 spa_history_log_internal_ds(ds, "resume receive", tx, ""); 1642} 1643 1644/* 1645 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin() 1646 * succeeds; otherwise we will leak the holds on the datasets. 1647 */ 1648int 1649dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, 1650 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc) 1651{ 1652 dmu_recv_begin_arg_t drba = { 0 }; 1653 1654 bzero(drc, sizeof (dmu_recv_cookie_t)); 1655 drc->drc_drr_begin = drr_begin; 1656 drc->drc_drrb = &drr_begin->drr_u.drr_begin; 1657 drc->drc_tosnap = tosnap; 1658 drc->drc_tofs = tofs; 1659 drc->drc_force = force; 1660 drc->drc_resumable = resumable; 1661 drc->drc_cred = CRED(); 1662 1663 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) { 1664 drc->drc_byteswap = B_TRUE; 1665 fletcher_4_incremental_byteswap(drr_begin, 1666 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1667 byteswap_record(drr_begin); 1668 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) { 1669 fletcher_4_incremental_native(drr_begin, 1670 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1671 } else { 1672 return (SET_ERROR(EINVAL)); 1673 } 1674 1675 drba.drba_origin = origin; 1676 drba.drba_cookie = drc; 1677 drba.drba_cred = CRED(); 1678 1679 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & 1680 DMU_BACKUP_FEATURE_RESUMING) { 1681 return (dsl_sync_task(tofs, 1682 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, 1683 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1684 } else { 1685 return (dsl_sync_task(tofs, 1686 dmu_recv_begin_check, dmu_recv_begin_sync, 1687 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1688 } 1689} 1690 1691struct receive_record_arg { 1692 dmu_replay_record_t header; 1693 void *payload; /* Pointer to a buffer containing the payload */ 1694 /* 1695 * If the record is a write, pointer to the arc_buf_t containing the 1696 * payload. 1697 */ 1698 arc_buf_t *write_buf; 1699 int payload_size; 1700 uint64_t bytes_read; /* bytes read from stream when record created */ 1701 boolean_t eos_marker; /* Marks the end of the stream */ 1702 bqueue_node_t node; 1703}; 1704 1705struct receive_writer_arg { 1706 objset_t *os; 1707 boolean_t byteswap; 1708 bqueue_t q; 1709 1710 /* 1711 * These three args are used to signal to the main thread that we're 1712 * done. 1713 */ 1714 kmutex_t mutex; 1715 kcondvar_t cv; 1716 boolean_t done; 1717 1718 int err; 1719 /* A map from guid to dataset to help handle dedup'd streams. */ 1720 avl_tree_t *guid_to_ds_map; 1721 boolean_t resumable; 1722 uint64_t last_object, last_offset; 1723 uint64_t bytes_read; /* bytes read when current record created */ 1724}; 1725 1726struct objlist { 1727 list_t list; /* List of struct receive_objnode. */ 1728 /* 1729 * Last object looked up. Used to assert that objects are being looked 1730 * up in ascending order. 1731 */ 1732 uint64_t last_lookup; 1733}; 1734 1735struct receive_objnode { 1736 list_node_t node; 1737 uint64_t object; 1738}; 1739 1740struct receive_arg { 1741 objset_t *os; 1742 kthread_t *td; 1743 struct file *fp; 1744 uint64_t voff; /* The current offset in the stream */ 1745 uint64_t bytes_read; 1746 /* 1747 * A record that has had its payload read in, but hasn't yet been handed 1748 * off to the worker thread. 1749 */ 1750 struct receive_record_arg *rrd; 1751 /* A record that has had its header read in, but not its payload. */ 1752 struct receive_record_arg *next_rrd; 1753 zio_cksum_t cksum; 1754 zio_cksum_t prev_cksum; 1755 int err; 1756 boolean_t byteswap; 1757 /* Sorted list of objects not to issue prefetches for. */ 1758 struct objlist ignore_objlist; 1759}; 1760 1761typedef struct guid_map_entry { 1762 uint64_t guid; 1763 dsl_dataset_t *gme_ds; 1764 avl_node_t avlnode; 1765} guid_map_entry_t; 1766 1767static int 1768guid_compare(const void *arg1, const void *arg2) 1769{ 1770 const guid_map_entry_t *gmep1 = arg1; 1771 const guid_map_entry_t *gmep2 = arg2; 1772 1773 if (gmep1->guid < gmep2->guid) 1774 return (-1); 1775 else if (gmep1->guid > gmep2->guid) 1776 return (1); 1777 return (0); 1778} 1779 1780static void 1781free_guid_map_onexit(void *arg) 1782{ 1783 avl_tree_t *ca = arg; 1784 void *cookie = NULL; 1785 guid_map_entry_t *gmep; 1786 1787 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) { 1788 dsl_dataset_long_rele(gmep->gme_ds, gmep); 1789 dsl_dataset_rele(gmep->gme_ds, gmep); 1790 kmem_free(gmep, sizeof (guid_map_entry_t)); 1791 } 1792 avl_destroy(ca); 1793 kmem_free(ca, sizeof (avl_tree_t)); 1794} 1795 1796static int 1797restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid) 1798{ 1799 struct uio auio; 1800 struct iovec aiov; 1801 int error; 1802 1803 aiov.iov_base = buf; 1804 aiov.iov_len = len; 1805 auio.uio_iov = &aiov; 1806 auio.uio_iovcnt = 1; 1807 auio.uio_resid = len; 1808 auio.uio_segflg = UIO_SYSSPACE; 1809 auio.uio_rw = UIO_READ; 1810 auio.uio_offset = off; 1811 auio.uio_td = ra->td; 1812#ifdef _KERNEL 1813 error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td); 1814#else 1815 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 1816 error = EOPNOTSUPP; 1817#endif 1818 *resid = auio.uio_resid; 1819 return (error); 1820} 1821 1822static int 1823receive_read(struct receive_arg *ra, int len, void *buf) 1824{ 1825 int done = 0; 1826 1827 /* some things will require 8-byte alignment, so everything must */ 1828 ASSERT0(len % 8); 1829 1830 while (done < len) { 1831 ssize_t resid; 1832 1833 ra->err = restore_bytes(ra, buf + done, 1834 len - done, ra->voff, &resid); 1835 1836 if (resid == len - done) { 1837 /* 1838 * Note: ECKSUM indicates that the receive 1839 * was interrupted and can potentially be resumed. 1840 */ 1841 ra->err = SET_ERROR(ECKSUM); 1842 } 1843 ra->voff += len - done - resid; 1844 done = len - resid; 1845 if (ra->err != 0) 1846 return (ra->err); 1847 } 1848 1849 ra->bytes_read += len; 1850 1851 ASSERT3U(done, ==, len); 1852 return (0); 1853} 1854 1855static void 1856byteswap_record(dmu_replay_record_t *drr) 1857{ 1858#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) 1859#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) 1860 drr->drr_type = BSWAP_32(drr->drr_type); 1861 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); 1862 1863 switch (drr->drr_type) { 1864 case DRR_BEGIN: 1865 DO64(drr_begin.drr_magic); 1866 DO64(drr_begin.drr_versioninfo); 1867 DO64(drr_begin.drr_creation_time); 1868 DO32(drr_begin.drr_type); 1869 DO32(drr_begin.drr_flags); 1870 DO64(drr_begin.drr_toguid); 1871 DO64(drr_begin.drr_fromguid); 1872 break; 1873 case DRR_OBJECT: 1874 DO64(drr_object.drr_object); 1875 DO32(drr_object.drr_type); 1876 DO32(drr_object.drr_bonustype); 1877 DO32(drr_object.drr_blksz); 1878 DO32(drr_object.drr_bonuslen); 1879 DO64(drr_object.drr_toguid); 1880 break; 1881 case DRR_FREEOBJECTS: 1882 DO64(drr_freeobjects.drr_firstobj); 1883 DO64(drr_freeobjects.drr_numobjs); 1884 DO64(drr_freeobjects.drr_toguid); 1885 break; 1886 case DRR_WRITE: 1887 DO64(drr_write.drr_object); 1888 DO32(drr_write.drr_type); 1889 DO64(drr_write.drr_offset); 1890 DO64(drr_write.drr_length); 1891 DO64(drr_write.drr_toguid); 1892 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); 1893 DO64(drr_write.drr_key.ddk_prop); 1894 break; 1895 case DRR_WRITE_BYREF: 1896 DO64(drr_write_byref.drr_object); 1897 DO64(drr_write_byref.drr_offset); 1898 DO64(drr_write_byref.drr_length); 1899 DO64(drr_write_byref.drr_toguid); 1900 DO64(drr_write_byref.drr_refguid); 1901 DO64(drr_write_byref.drr_refobject); 1902 DO64(drr_write_byref.drr_refoffset); 1903 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. 1904 drr_key.ddk_cksum); 1905 DO64(drr_write_byref.drr_key.ddk_prop); 1906 break; 1907 case DRR_WRITE_EMBEDDED: 1908 DO64(drr_write_embedded.drr_object); 1909 DO64(drr_write_embedded.drr_offset); 1910 DO64(drr_write_embedded.drr_length); 1911 DO64(drr_write_embedded.drr_toguid); 1912 DO32(drr_write_embedded.drr_lsize); 1913 DO32(drr_write_embedded.drr_psize); 1914 break; 1915 case DRR_FREE: 1916 DO64(drr_free.drr_object); 1917 DO64(drr_free.drr_offset); 1918 DO64(drr_free.drr_length); 1919 DO64(drr_free.drr_toguid); 1920 break; 1921 case DRR_SPILL: 1922 DO64(drr_spill.drr_object); 1923 DO64(drr_spill.drr_length); 1924 DO64(drr_spill.drr_toguid); 1925 break; 1926 case DRR_END: 1927 DO64(drr_end.drr_toguid); 1928 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); 1929 break; 1930 } 1931 1932 if (drr->drr_type != DRR_BEGIN) { 1933 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); 1934 } 1935 1936#undef DO64 1937#undef DO32 1938} 1939 1940static inline uint8_t 1941deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) 1942{ 1943 if (bonus_type == DMU_OT_SA) { 1944 return (1); 1945 } else { 1946 return (1 + 1947 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT)); 1948 } 1949} 1950 1951static void 1952save_resume_state(struct receive_writer_arg *rwa, 1953 uint64_t object, uint64_t offset, dmu_tx_t *tx) 1954{ 1955 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK; 1956 1957 if (!rwa->resumable) 1958 return; 1959 1960 /* 1961 * We use ds_resume_bytes[] != 0 to indicate that we need to 1962 * update this on disk, so it must not be 0. 1963 */ 1964 ASSERT(rwa->bytes_read != 0); 1965 1966 /* 1967 * We only resume from write records, which have a valid 1968 * (non-meta-dnode) object number. 1969 */ 1970 ASSERT(object != 0); 1971 1972 /* 1973 * For resuming to work correctly, we must receive records in order, 1974 * sorted by object,offset. This is checked by the callers, but 1975 * assert it here for good measure. 1976 */ 1977 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]); 1978 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] || 1979 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]); 1980 ASSERT3U(rwa->bytes_read, >=, 1981 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]); 1982 1983 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object; 1984 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset; 1985 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read; 1986} 1987 1988static int 1989receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, 1990 void *data) 1991{ 1992 dmu_object_info_t doi; 1993 dmu_tx_t *tx; 1994 uint64_t object; 1995 int err; 1996 1997 if (drro->drr_type == DMU_OT_NONE || 1998 !DMU_OT_IS_VALID(drro->drr_type) || 1999 !DMU_OT_IS_VALID(drro->drr_bonustype) || 2000 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || 2001 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || 2002 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || 2003 drro->drr_blksz < SPA_MINBLOCKSIZE || 2004 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) || 2005 drro->drr_bonuslen > DN_MAX_BONUSLEN) { 2006 return (SET_ERROR(EINVAL)); 2007 } 2008 2009 err = dmu_object_info(rwa->os, drro->drr_object, &doi); 2010 2011 if (err != 0 && err != ENOENT) 2012 return (SET_ERROR(EINVAL)); 2013 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT; 2014 2015 /* 2016 * If we are losing blkptrs or changing the block size this must 2017 * be a new file instance. We must clear out the previous file 2018 * contents before we can change this type of metadata in the dnode. 2019 */ 2020 if (err == 0) { 2021 int nblkptr; 2022 2023 nblkptr = deduce_nblkptr(drro->drr_bonustype, 2024 drro->drr_bonuslen); 2025 2026 if (drro->drr_blksz != doi.doi_data_block_size || 2027 nblkptr < doi.doi_nblkptr) { 2028 err = dmu_free_long_range(rwa->os, drro->drr_object, 2029 0, DMU_OBJECT_END); 2030 if (err != 0) 2031 return (SET_ERROR(EINVAL)); 2032 } 2033 } 2034 2035 tx = dmu_tx_create(rwa->os); 2036 dmu_tx_hold_bonus(tx, object); 2037 err = dmu_tx_assign(tx, TXG_WAIT); 2038 if (err != 0) { 2039 dmu_tx_abort(tx); 2040 return (err); 2041 } 2042 2043 if (object == DMU_NEW_OBJECT) { 2044 /* currently free, want to be allocated */ 2045 err = dmu_object_claim(rwa->os, drro->drr_object, 2046 drro->drr_type, drro->drr_blksz, 2047 drro->drr_bonustype, drro->drr_bonuslen, tx); 2048 } else if (drro->drr_type != doi.doi_type || 2049 drro->drr_blksz != doi.doi_data_block_size || 2050 drro->drr_bonustype != doi.doi_bonus_type || 2051 drro->drr_bonuslen != doi.doi_bonus_size) { 2052 /* currently allocated, but with different properties */ 2053 err = dmu_object_reclaim(rwa->os, drro->drr_object, 2054 drro->drr_type, drro->drr_blksz, 2055 drro->drr_bonustype, drro->drr_bonuslen, tx); 2056 } 2057 if (err != 0) { 2058 dmu_tx_commit(tx); 2059 return (SET_ERROR(EINVAL)); 2060 } 2061 2062 dmu_object_set_checksum(rwa->os, drro->drr_object, 2063 drro->drr_checksumtype, tx); 2064 dmu_object_set_compress(rwa->os, drro->drr_object, 2065 drro->drr_compress, tx); 2066 2067 if (data != NULL) { 2068 dmu_buf_t *db; 2069 2070 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db)); 2071 dmu_buf_will_dirty(db, tx); 2072 2073 ASSERT3U(db->db_size, >=, drro->drr_bonuslen); 2074 bcopy(data, db->db_data, drro->drr_bonuslen); 2075 if (rwa->byteswap) { 2076 dmu_object_byteswap_t byteswap = 2077 DMU_OT_BYTESWAP(drro->drr_bonustype); 2078 dmu_ot_byteswap[byteswap].ob_func(db->db_data, 2079 drro->drr_bonuslen); 2080 } 2081 dmu_buf_rele(db, FTAG); 2082 } 2083 dmu_tx_commit(tx); 2084 2085 return (0); 2086} 2087 2088/* ARGSUSED */ 2089static int 2090receive_freeobjects(struct receive_writer_arg *rwa, 2091 struct drr_freeobjects *drrfo) 2092{ 2093 uint64_t obj; 2094 int next_err = 0; 2095 2096 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj) 2097 return (SET_ERROR(EINVAL)); 2098 2099 for (obj = drrfo->drr_firstobj; 2100 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; 2101 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { 2102 int err; 2103 2104 if (dmu_object_info(rwa->os, obj, NULL) != 0) 2105 continue; 2106 2107 err = dmu_free_long_object(rwa->os, obj); 2108 if (err != 0) 2109 return (err); 2110 } 2111 if (next_err != ESRCH) 2112 return (next_err); 2113 return (0); 2114} 2115 2116static int 2117receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, 2118 arc_buf_t *abuf) 2119{ 2120 dmu_tx_t *tx; 2121 int err; 2122 2123 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset || 2124 !DMU_OT_IS_VALID(drrw->drr_type)) 2125 return (SET_ERROR(EINVAL)); 2126 2127 /* 2128 * For resuming to work, records must be in increasing order 2129 * by (object, offset). 2130 */ 2131 if (drrw->drr_object < rwa->last_object || 2132 (drrw->drr_object == rwa->last_object && 2133 drrw->drr_offset < rwa->last_offset)) { 2134 return (SET_ERROR(EINVAL)); 2135 } 2136 rwa->last_object = drrw->drr_object; 2137 rwa->last_offset = drrw->drr_offset; 2138 2139 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) 2140 return (SET_ERROR(EINVAL)); 2141 2142 tx = dmu_tx_create(rwa->os); 2143 2144 dmu_tx_hold_write(tx, drrw->drr_object, 2145 drrw->drr_offset, drrw->drr_length); 2146 err = dmu_tx_assign(tx, TXG_WAIT); 2147 if (err != 0) { 2148 dmu_tx_abort(tx); 2149 return (err); 2150 } 2151 if (rwa->byteswap) { 2152 dmu_object_byteswap_t byteswap = 2153 DMU_OT_BYTESWAP(drrw->drr_type); 2154 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, 2155 drrw->drr_length); 2156 } 2157 2158 dmu_buf_t *bonus; 2159 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0) 2160 return (SET_ERROR(EINVAL)); 2161 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx); 2162 2163 /* 2164 * Note: If the receive fails, we want the resume stream to start 2165 * with the same record that we last successfully received (as opposed 2166 * to the next record), so that we can verify that we are 2167 * resuming from the correct location. 2168 */ 2169 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); 2170 dmu_tx_commit(tx); 2171 dmu_buf_rele(bonus, FTAG); 2172 2173 return (0); 2174} 2175 2176/* 2177 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed 2178 * streams to refer to a copy of the data that is already on the 2179 * system because it came in earlier in the stream. This function 2180 * finds the earlier copy of the data, and uses that copy instead of 2181 * data from the stream to fulfill this write. 2182 */ 2183static int 2184receive_write_byref(struct receive_writer_arg *rwa, 2185 struct drr_write_byref *drrwbr) 2186{ 2187 dmu_tx_t *tx; 2188 int err; 2189 guid_map_entry_t gmesrch; 2190 guid_map_entry_t *gmep; 2191 avl_index_t where; 2192 objset_t *ref_os = NULL; 2193 dmu_buf_t *dbp; 2194 2195 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) 2196 return (SET_ERROR(EINVAL)); 2197 2198 /* 2199 * If the GUID of the referenced dataset is different from the 2200 * GUID of the target dataset, find the referenced dataset. 2201 */ 2202 if (drrwbr->drr_toguid != drrwbr->drr_refguid) { 2203 gmesrch.guid = drrwbr->drr_refguid; 2204 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch, 2205 &where)) == NULL) { 2206 return (SET_ERROR(EINVAL)); 2207 } 2208 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) 2209 return (SET_ERROR(EINVAL)); 2210 } else { 2211 ref_os = rwa->os; 2212 } 2213 2214 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, 2215 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH); 2216 if (err != 0) 2217 return (err); 2218 2219 tx = dmu_tx_create(rwa->os); 2220 2221 dmu_tx_hold_write(tx, drrwbr->drr_object, 2222 drrwbr->drr_offset, drrwbr->drr_length); 2223 err = dmu_tx_assign(tx, TXG_WAIT); 2224 if (err != 0) { 2225 dmu_tx_abort(tx); 2226 return (err); 2227 } 2228 dmu_write(rwa->os, drrwbr->drr_object, 2229 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); 2230 dmu_buf_rele(dbp, FTAG); 2231 2232 /* See comment in restore_write. */ 2233 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx); 2234 dmu_tx_commit(tx); 2235 return (0); 2236} 2237 2238static int 2239receive_write_embedded(struct receive_writer_arg *rwa, 2240 struct drr_write_embedded *drrwe, void *data) 2241{ 2242 dmu_tx_t *tx; 2243 int err; 2244 2245 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset) 2246 return (EINVAL); 2247 2248 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE) 2249 return (EINVAL); 2250 2251 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES) 2252 return (EINVAL); 2253 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS) 2254 return (EINVAL); 2255 2256 tx = dmu_tx_create(rwa->os); 2257 2258 dmu_tx_hold_write(tx, drrwe->drr_object, 2259 drrwe->drr_offset, drrwe->drr_length); 2260 err = dmu_tx_assign(tx, TXG_WAIT); 2261 if (err != 0) { 2262 dmu_tx_abort(tx); 2263 return (err); 2264 } 2265 2266 dmu_write_embedded(rwa->os, drrwe->drr_object, 2267 drrwe->drr_offset, data, drrwe->drr_etype, 2268 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize, 2269 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx); 2270 2271 /* See comment in restore_write. */ 2272 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx); 2273 dmu_tx_commit(tx); 2274 return (0); 2275} 2276 2277static int 2278receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, 2279 void *data) 2280{ 2281 dmu_tx_t *tx; 2282 dmu_buf_t *db, *db_spill; 2283 int err; 2284 2285 if (drrs->drr_length < SPA_MINBLOCKSIZE || 2286 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) 2287 return (SET_ERROR(EINVAL)); 2288 2289 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) 2290 return (SET_ERROR(EINVAL)); 2291 2292 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db)); 2293 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) { 2294 dmu_buf_rele(db, FTAG); 2295 return (err); 2296 } 2297 2298 tx = dmu_tx_create(rwa->os); 2299 2300 dmu_tx_hold_spill(tx, db->db_object); 2301 2302 err = dmu_tx_assign(tx, TXG_WAIT); 2303 if (err != 0) { 2304 dmu_buf_rele(db, FTAG); 2305 dmu_buf_rele(db_spill, FTAG); 2306 dmu_tx_abort(tx); 2307 return (err); 2308 } 2309 dmu_buf_will_dirty(db_spill, tx); 2310 2311 if (db_spill->db_size < drrs->drr_length) 2312 VERIFY(0 == dbuf_spill_set_blksz(db_spill, 2313 drrs->drr_length, tx)); 2314 bcopy(data, db_spill->db_data, drrs->drr_length); 2315 2316 dmu_buf_rele(db, FTAG); 2317 dmu_buf_rele(db_spill, FTAG); 2318 2319 dmu_tx_commit(tx); 2320 return (0); 2321} 2322 2323/* ARGSUSED */ 2324static int 2325receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) 2326{ 2327 int err; 2328 2329 if (drrf->drr_length != -1ULL && 2330 drrf->drr_offset + drrf->drr_length < drrf->drr_offset) 2331 return (SET_ERROR(EINVAL)); 2332 2333 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0) 2334 return (SET_ERROR(EINVAL)); 2335 2336 err = dmu_free_long_range(rwa->os, drrf->drr_object, 2337 drrf->drr_offset, drrf->drr_length); 2338 2339 return (err); 2340} 2341 2342/* used to destroy the drc_ds on error */ 2343static void 2344dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) 2345{ 2346 if (drc->drc_resumable) { 2347 /* wait for our resume state to be written to disk */ 2348 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0); 2349 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2350 } else { 2351 char name[MAXNAMELEN]; 2352 dsl_dataset_name(drc->drc_ds, name); 2353 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2354 (void) dsl_destroy_head(name); 2355 } 2356} 2357 2358static void 2359receive_cksum(struct receive_arg *ra, int len, void *buf) 2360{ 2361 if (ra->byteswap) { 2362 fletcher_4_incremental_byteswap(buf, len, &ra->cksum); 2363 } else { 2364 fletcher_4_incremental_native(buf, len, &ra->cksum); 2365 } 2366} 2367 2368/* 2369 * Read the payload into a buffer of size len, and update the current record's 2370 * payload field. 2371 * Allocate ra->next_rrd and read the next record's header into 2372 * ra->next_rrd->header. 2373 * Verify checksum of payload and next record. 2374 */ 2375static int 2376receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) 2377{ 2378 int err; 2379 2380 if (len != 0) { 2381 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); 2382 err = receive_read(ra, len, buf); 2383 if (err != 0) 2384 return (err); 2385 receive_cksum(ra, len, buf); 2386 2387 /* note: rrd is NULL when reading the begin record's payload */ 2388 if (ra->rrd != NULL) { 2389 ra->rrd->payload = buf; 2390 ra->rrd->payload_size = len; 2391 ra->rrd->bytes_read = ra->bytes_read; 2392 } 2393 } 2394 2395 ra->prev_cksum = ra->cksum; 2396 2397 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); 2398 err = receive_read(ra, sizeof (ra->next_rrd->header), 2399 &ra->next_rrd->header); 2400 ra->next_rrd->bytes_read = ra->bytes_read; 2401 if (err != 0) { 2402 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2403 ra->next_rrd = NULL; 2404 return (err); 2405 } 2406 if (ra->next_rrd->header.drr_type == DRR_BEGIN) { 2407 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2408 ra->next_rrd = NULL; 2409 return (SET_ERROR(EINVAL)); 2410 } 2411 2412 /* 2413 * Note: checksum is of everything up to but not including the 2414 * checksum itself. 2415 */ 2416 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2417 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 2418 receive_cksum(ra, 2419 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2420 &ra->next_rrd->header); 2421 2422 zio_cksum_t cksum_orig = 2423 ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2424 zio_cksum_t *cksump = 2425 &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2426 2427 if (ra->byteswap) 2428 byteswap_record(&ra->next_rrd->header); 2429 2430 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && 2431 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { 2432 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2433 ra->next_rrd = NULL; 2434 return (SET_ERROR(ECKSUM)); 2435 } 2436 2437 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); 2438 2439 return (0); 2440} 2441 2442static void 2443objlist_create(struct objlist *list) 2444{ 2445 list_create(&list->list, sizeof (struct receive_objnode), 2446 offsetof(struct receive_objnode, node)); 2447 list->last_lookup = 0; 2448} 2449 2450static void 2451objlist_destroy(struct objlist *list) 2452{ 2453 for (struct receive_objnode *n = list_remove_head(&list->list); 2454 n != NULL; n = list_remove_head(&list->list)) { 2455 kmem_free(n, sizeof (*n)); 2456 } 2457 list_destroy(&list->list); 2458} 2459 2460/* 2461 * This function looks through the objlist to see if the specified object number 2462 * is contained in the objlist. In the process, it will remove all object 2463 * numbers in the list that are smaller than the specified object number. Thus, 2464 * any lookup of an object number smaller than a previously looked up object 2465 * number will always return false; therefore, all lookups should be done in 2466 * ascending order. 2467 */ 2468static boolean_t 2469objlist_exists(struct objlist *list, uint64_t object) 2470{ 2471 struct receive_objnode *node = list_head(&list->list); 2472 ASSERT3U(object, >=, list->last_lookup); 2473 list->last_lookup = object; 2474 while (node != NULL && node->object < object) { 2475 VERIFY3P(node, ==, list_remove_head(&list->list)); 2476 kmem_free(node, sizeof (*node)); 2477 node = list_head(&list->list); 2478 } 2479 return (node != NULL && node->object == object); 2480} 2481 2482/* 2483 * The objlist is a list of object numbers stored in ascending order. However, 2484 * the insertion of new object numbers does not seek out the correct location to 2485 * store a new object number; instead, it appends it to the list for simplicity. 2486 * Thus, any users must take care to only insert new object numbers in ascending 2487 * order. 2488 */ 2489static void 2490objlist_insert(struct objlist *list, uint64_t object) 2491{ 2492 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); 2493 node->object = object; 2494#ifdef ZFS_DEBUG 2495 struct receive_objnode *last_object = list_tail(&list->list); 2496 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); 2497 ASSERT3U(node->object, >, last_objnum); 2498#endif 2499 list_insert_tail(&list->list, node); 2500} 2501 2502/* 2503 * Issue the prefetch reads for any necessary indirect blocks. 2504 * 2505 * We use the object ignore list to tell us whether or not to issue prefetches 2506 * for a given object. We do this for both correctness (in case the blocksize 2507 * of an object has changed) and performance (if the object doesn't exist, don't 2508 * needlessly try to issue prefetches). We also trim the list as we go through 2509 * the stream to prevent it from growing to an unbounded size. 2510 * 2511 * The object numbers within will always be in sorted order, and any write 2512 * records we see will also be in sorted order, but they're not sorted with 2513 * respect to each other (i.e. we can get several object records before 2514 * receiving each object's write records). As a result, once we've reached a 2515 * given object number, we can safely remove any reference to lower object 2516 * numbers in the ignore list. In practice, we receive up to 32 object records 2517 * before receiving write records, so the list can have up to 32 nodes in it. 2518 */ 2519/* ARGSUSED */ 2520static void 2521receive_read_prefetch(struct receive_arg *ra, 2522 uint64_t object, uint64_t offset, uint64_t length) 2523{ 2524 if (!objlist_exists(&ra->ignore_objlist, object)) { 2525 dmu_prefetch(ra->os, object, 1, offset, length, 2526 ZIO_PRIORITY_SYNC_READ); 2527 } 2528} 2529 2530/* 2531 * Read records off the stream, issuing any necessary prefetches. 2532 */ 2533static int 2534receive_read_record(struct receive_arg *ra) 2535{ 2536 int err; 2537 2538 switch (ra->rrd->header.drr_type) { 2539 case DRR_OBJECT: 2540 { 2541 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; 2542 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8); 2543 void *buf = kmem_zalloc(size, KM_SLEEP); 2544 dmu_object_info_t doi; 2545 err = receive_read_payload_and_next_header(ra, size, buf); 2546 if (err != 0) { 2547 kmem_free(buf, size); 2548 return (err); 2549 } 2550 err = dmu_object_info(ra->os, drro->drr_object, &doi); 2551 /* 2552 * See receive_read_prefetch for an explanation why we're 2553 * storing this object in the ignore_obj_list. 2554 */ 2555 if (err == ENOENT || 2556 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { 2557 objlist_insert(&ra->ignore_objlist, drro->drr_object); 2558 err = 0; 2559 } 2560 return (err); 2561 } 2562 case DRR_FREEOBJECTS: 2563 { 2564 err = receive_read_payload_and_next_header(ra, 0, NULL); 2565 return (err); 2566 } 2567 case DRR_WRITE: 2568 { 2569 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; 2570 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os), 2571 drrw->drr_length); 2572 2573 err = receive_read_payload_and_next_header(ra, 2574 drrw->drr_length, abuf->b_data); 2575 if (err != 0) { 2576 dmu_return_arcbuf(abuf); 2577 return (err); 2578 } 2579 ra->rrd->write_buf = abuf; 2580 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, 2581 drrw->drr_length); 2582 return (err); 2583 } 2584 case DRR_WRITE_BYREF: 2585 { 2586 struct drr_write_byref *drrwb = 2587 &ra->rrd->header.drr_u.drr_write_byref; 2588 err = receive_read_payload_and_next_header(ra, 0, NULL); 2589 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, 2590 drrwb->drr_length); 2591 return (err); 2592 } 2593 case DRR_WRITE_EMBEDDED: 2594 { 2595 struct drr_write_embedded *drrwe = 2596 &ra->rrd->header.drr_u.drr_write_embedded; 2597 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); 2598 void *buf = kmem_zalloc(size, KM_SLEEP); 2599 2600 err = receive_read_payload_and_next_header(ra, size, buf); 2601 if (err != 0) { 2602 kmem_free(buf, size); 2603 return (err); 2604 } 2605 2606 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, 2607 drrwe->drr_length); 2608 return (err); 2609 } 2610 case DRR_FREE: 2611 { 2612 /* 2613 * It might be beneficial to prefetch indirect blocks here, but 2614 * we don't really have the data to decide for sure. 2615 */ 2616 err = receive_read_payload_and_next_header(ra, 0, NULL); 2617 return (err); 2618 } 2619 case DRR_END: 2620 { 2621 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; 2622 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) 2623 return (SET_ERROR(ECKSUM)); 2624 return (0); 2625 } 2626 case DRR_SPILL: 2627 { 2628 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; 2629 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP); 2630 err = receive_read_payload_and_next_header(ra, drrs->drr_length, 2631 buf); 2632 if (err != 0) 2633 kmem_free(buf, drrs->drr_length); 2634 return (err); 2635 } 2636 default: 2637 return (SET_ERROR(EINVAL)); 2638 } 2639} 2640 2641/* 2642 * Commit the records to the pool. 2643 */ 2644static int 2645receive_process_record(struct receive_writer_arg *rwa, 2646 struct receive_record_arg *rrd) 2647{ 2648 int err; 2649 2650 /* Processing in order, therefore bytes_read should be increasing. */ 2651 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); 2652 rwa->bytes_read = rrd->bytes_read; 2653 2654 switch (rrd->header.drr_type) { 2655 case DRR_OBJECT: 2656 { 2657 struct drr_object *drro = &rrd->header.drr_u.drr_object; 2658 err = receive_object(rwa, drro, rrd->payload); 2659 kmem_free(rrd->payload, rrd->payload_size); 2660 rrd->payload = NULL; 2661 return (err); 2662 } 2663 case DRR_FREEOBJECTS: 2664 { 2665 struct drr_freeobjects *drrfo = 2666 &rrd->header.drr_u.drr_freeobjects; 2667 return (receive_freeobjects(rwa, drrfo)); 2668 } 2669 case DRR_WRITE: 2670 { 2671 struct drr_write *drrw = &rrd->header.drr_u.drr_write; 2672 err = receive_write(rwa, drrw, rrd->write_buf); 2673 /* if receive_write() is successful, it consumes the arc_buf */ 2674 if (err != 0) 2675 dmu_return_arcbuf(rrd->write_buf); 2676 rrd->write_buf = NULL; 2677 rrd->payload = NULL; 2678 return (err); 2679 } 2680 case DRR_WRITE_BYREF: 2681 { 2682 struct drr_write_byref *drrwbr = 2683 &rrd->header.drr_u.drr_write_byref; 2684 return (receive_write_byref(rwa, drrwbr)); 2685 } 2686 case DRR_WRITE_EMBEDDED: 2687 { 2688 struct drr_write_embedded *drrwe = 2689 &rrd->header.drr_u.drr_write_embedded; 2690 err = receive_write_embedded(rwa, drrwe, rrd->payload); 2691 kmem_free(rrd->payload, rrd->payload_size); 2692 rrd->payload = NULL; 2693 return (err); 2694 } 2695 case DRR_FREE: 2696 { 2697 struct drr_free *drrf = &rrd->header.drr_u.drr_free; 2698 return (receive_free(rwa, drrf)); 2699 } 2700 case DRR_SPILL: 2701 { 2702 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; 2703 err = receive_spill(rwa, drrs, rrd->payload); 2704 kmem_free(rrd->payload, rrd->payload_size); 2705 rrd->payload = NULL; 2706 return (err); 2707 } 2708 default: 2709 return (SET_ERROR(EINVAL)); 2710 } 2711} 2712 2713/* 2714 * dmu_recv_stream's worker thread; pull records off the queue, and then call 2715 * receive_process_record When we're done, signal the main thread and exit. 2716 */ 2717static void 2718receive_writer_thread(void *arg) 2719{ 2720 struct receive_writer_arg *rwa = arg; 2721 struct receive_record_arg *rrd; 2722 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker; 2723 rrd = bqueue_dequeue(&rwa->q)) { 2724 /* 2725 * If there's an error, the main thread will stop putting things 2726 * on the queue, but we need to clear everything in it before we 2727 * can exit. 2728 */ 2729 if (rwa->err == 0) { 2730 rwa->err = receive_process_record(rwa, rrd); 2731 } else if (rrd->write_buf != NULL) { 2732 dmu_return_arcbuf(rrd->write_buf); 2733 rrd->write_buf = NULL; 2734 rrd->payload = NULL; 2735 } else if (rrd->payload != NULL) { 2736 kmem_free(rrd->payload, rrd->payload_size); 2737 rrd->payload = NULL; 2738 } 2739 kmem_free(rrd, sizeof (*rrd)); 2740 } 2741 kmem_free(rrd, sizeof (*rrd)); 2742 mutex_enter(&rwa->mutex); 2743 rwa->done = B_TRUE; 2744 cv_signal(&rwa->cv); 2745 mutex_exit(&rwa->mutex); 2746 thread_exit(); 2747} 2748 2749static int 2750resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) 2751{ 2752 uint64_t val; 2753 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; 2754 uint64_t dsobj = dmu_objset_id(ra->os); 2755 uint64_t resume_obj, resume_off; 2756 2757 if (nvlist_lookup_uint64(begin_nvl, 2758 "resume_object", &resume_obj) != 0 || 2759 nvlist_lookup_uint64(begin_nvl, 2760 "resume_offset", &resume_off) != 0) { 2761 return (SET_ERROR(EINVAL)); 2762 } 2763 VERIFY0(zap_lookup(mos, dsobj, 2764 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val)); 2765 if (resume_obj != val) 2766 return (SET_ERROR(EINVAL)); 2767 VERIFY0(zap_lookup(mos, dsobj, 2768 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val)); 2769 if (resume_off != val) 2770 return (SET_ERROR(EINVAL)); 2771 2772 return (0); 2773} 2774 2775/* 2776 * Read in the stream's records, one by one, and apply them to the pool. There 2777 * are two threads involved; the thread that calls this function will spin up a 2778 * worker thread, read the records off the stream one by one, and issue 2779 * prefetches for any necessary indirect blocks. It will then push the records 2780 * onto an internal blocking queue. The worker thread will pull the records off 2781 * the queue, and actually write the data into the DMU. This way, the worker 2782 * thread doesn't have to wait for reads to complete, since everything it needs 2783 * (the indirect blocks) will be prefetched. 2784 * 2785 * NB: callers *must* call dmu_recv_end() if this succeeds. 2786 */ 2787int 2788dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp, 2789 int cleanup_fd, uint64_t *action_handlep) 2790{ 2791 int err = 0; 2792 struct receive_arg ra = { 0 }; 2793 struct receive_writer_arg rwa = { 0 }; 2794 int featureflags; 2795 nvlist_t *begin_nvl = NULL; 2796 2797 ra.byteswap = drc->drc_byteswap; 2798 ra.cksum = drc->drc_cksum; 2799 ra.td = curthread; 2800 ra.fp = fp; 2801 ra.voff = *voffp; 2802 2803 if (dsl_dataset_is_zapified(drc->drc_ds)) { 2804 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, 2805 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, 2806 sizeof (ra.bytes_read), 1, &ra.bytes_read); 2807 } 2808 2809 objlist_create(&ra.ignore_objlist); 2810 2811 /* these were verified in dmu_recv_begin */ 2812 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, 2813 DMU_SUBSTREAM); 2814 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES); 2815 2816 /* 2817 * Open the objset we are modifying. 2818 */ 2819 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os)); 2820 2821 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); 2822 2823 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); 2824 2825 /* if this stream is dedup'ed, set up the avl tree for guid mapping */ 2826 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { 2827 minor_t minor; 2828 2829 if (cleanup_fd == -1) { 2830 ra.err = SET_ERROR(EBADF); 2831 goto out; 2832 } 2833 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor); 2834 if (ra.err != 0) { 2835 cleanup_fd = -1; 2836 goto out; 2837 } 2838 2839 if (*action_handlep == 0) { 2840 rwa.guid_to_ds_map = 2841 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP); 2842 avl_create(rwa.guid_to_ds_map, guid_compare, 2843 sizeof (guid_map_entry_t), 2844 offsetof(guid_map_entry_t, avlnode)); 2845 err = zfs_onexit_add_cb(minor, 2846 free_guid_map_onexit, rwa.guid_to_ds_map, 2847 action_handlep); 2848 if (ra.err != 0) 2849 goto out; 2850 } else { 2851 err = zfs_onexit_cb_data(minor, *action_handlep, 2852 (void **)&rwa.guid_to_ds_map); 2853 if (ra.err != 0) 2854 goto out; 2855 } 2856 2857 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map; 2858 } 2859 2860 uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; 2861 void *payload = NULL; 2862 if (payloadlen != 0) 2863 payload = kmem_alloc(payloadlen, KM_SLEEP); 2864 2865 err = receive_read_payload_and_next_header(&ra, payloadlen, payload); 2866 if (err != 0) { 2867 if (payloadlen != 0) 2868 kmem_free(payload, payloadlen); 2869 goto out; 2870 } 2871 if (payloadlen != 0) { 2872 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); 2873 kmem_free(payload, payloadlen); 2874 if (err != 0) 2875 goto out; 2876 } 2877 2878 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { 2879 err = resume_check(&ra, begin_nvl); 2880 if (err != 0) 2881 goto out; 2882 } 2883 2884 (void) bqueue_init(&rwa.q, zfs_recv_queue_length, 2885 offsetof(struct receive_record_arg, node)); 2886 cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL); 2887 mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL); 2888 rwa.os = ra.os; 2889 rwa.byteswap = drc->drc_byteswap; 2890 rwa.resumable = drc->drc_resumable; 2891 2892 (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0, 2893 TS_RUN, minclsyspri); 2894 /* 2895 * We're reading rwa.err without locks, which is safe since we are the 2896 * only reader, and the worker thread is the only writer. It's ok if we 2897 * miss a write for an iteration or two of the loop, since the writer 2898 * thread will keep freeing records we send it until we send it an eos 2899 * marker. 2900 * 2901 * We can leave this loop in 3 ways: First, if rwa.err is 2902 * non-zero. In that case, the writer thread will free the rrd we just 2903 * pushed. Second, if we're interrupted; in that case, either it's the 2904 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd 2905 * has been handed off to the writer thread who will free it. Finally, 2906 * if receive_read_record fails or we're at the end of the stream, then 2907 * we free ra.rrd and exit. 2908 */ 2909 while (rwa.err == 0) { 2910 if (issig(JUSTLOOKING) && issig(FORREAL)) { 2911 err = SET_ERROR(EINTR); 2912 break; 2913 } 2914 2915 ASSERT3P(ra.rrd, ==, NULL); 2916 ra.rrd = ra.next_rrd; 2917 ra.next_rrd = NULL; 2918 /* Allocates and loads header into ra.next_rrd */ 2919 err = receive_read_record(&ra); 2920 2921 if (ra.rrd->header.drr_type == DRR_END || err != 0) { 2922 kmem_free(ra.rrd, sizeof (*ra.rrd)); 2923 ra.rrd = NULL; 2924 break; 2925 } 2926 2927 bqueue_enqueue(&rwa.q, ra.rrd, 2928 sizeof (struct receive_record_arg) + ra.rrd->payload_size); 2929 ra.rrd = NULL; 2930 } 2931 if (ra.next_rrd == NULL) 2932 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP); 2933 ra.next_rrd->eos_marker = B_TRUE; 2934 bqueue_enqueue(&rwa.q, ra.next_rrd, 1); 2935 2936 mutex_enter(&rwa.mutex); 2937 while (!rwa.done) { 2938 cv_wait(&rwa.cv, &rwa.mutex); 2939 } 2940 mutex_exit(&rwa.mutex); 2941 2942 cv_destroy(&rwa.cv); 2943 mutex_destroy(&rwa.mutex); 2944 bqueue_destroy(&rwa.q); 2945 if (err == 0) 2946 err = rwa.err; 2947 2948out: 2949 nvlist_free(begin_nvl); 2950 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) 2951 zfs_onexit_fd_rele(cleanup_fd); 2952 2953 if (err != 0) { 2954 /* 2955 * Clean up references. If receive is not resumable, 2956 * destroy what we created, so we don't leave it in 2957 * the inconsistent state. 2958 */ 2959 dmu_recv_cleanup_ds(drc); 2960 } 2961 2962 *voffp = ra.voff; 2963 objlist_destroy(&ra.ignore_objlist); 2964 return (err); 2965} 2966 2967static int 2968dmu_recv_end_check(void *arg, dmu_tx_t *tx) 2969{ 2970 dmu_recv_cookie_t *drc = arg; 2971 dsl_pool_t *dp = dmu_tx_pool(tx); 2972 int error; 2973 2974 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); 2975 2976 if (!drc->drc_newfs) { 2977 dsl_dataset_t *origin_head; 2978 2979 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); 2980 if (error != 0) 2981 return (error); 2982 if (drc->drc_force) { 2983 /* 2984 * We will destroy any snapshots in tofs (i.e. before 2985 * origin_head) that are after the origin (which is 2986 * the snap before drc_ds, because drc_ds can not 2987 * have any snaps of its own). 2988 */ 2989 uint64_t obj; 2990 2991 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 2992 while (obj != 2993 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 2994 dsl_dataset_t *snap; 2995 error = dsl_dataset_hold_obj(dp, obj, FTAG, 2996 &snap); 2997 if (error != 0) 2998 break; 2999 if (snap->ds_dir != origin_head->ds_dir) 3000 error = SET_ERROR(EINVAL); 3001 if (error == 0) { 3002 error = dsl_destroy_snapshot_check_impl( 3003 snap, B_FALSE); 3004 } 3005 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3006 dsl_dataset_rele(snap, FTAG); 3007 if (error != 0) 3008 break; 3009 } 3010 if (error != 0) { 3011 dsl_dataset_rele(origin_head, FTAG); 3012 return (error); 3013 } 3014 } 3015 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds, 3016 origin_head, drc->drc_force, drc->drc_owner, tx); 3017 if (error != 0) { 3018 dsl_dataset_rele(origin_head, FTAG); 3019 return (error); 3020 } 3021 error = dsl_dataset_snapshot_check_impl(origin_head, 3022 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3023 dsl_dataset_rele(origin_head, FTAG); 3024 if (error != 0) 3025 return (error); 3026 3027 error = dsl_destroy_head_check_impl(drc->drc_ds, 1); 3028 } else { 3029 error = dsl_dataset_snapshot_check_impl(drc->drc_ds, 3030 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3031 } 3032 return (error); 3033} 3034 3035static void 3036dmu_recv_end_sync(void *arg, dmu_tx_t *tx) 3037{ 3038 dmu_recv_cookie_t *drc = arg; 3039 dsl_pool_t *dp = dmu_tx_pool(tx); 3040 3041 spa_history_log_internal_ds(drc->drc_ds, "finish receiving", 3042 tx, "snap=%s", drc->drc_tosnap); 3043 3044 if (!drc->drc_newfs) { 3045 dsl_dataset_t *origin_head; 3046 3047 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, 3048 &origin_head)); 3049 3050 if (drc->drc_force) { 3051 /* 3052 * Destroy any snapshots of drc_tofs (origin_head) 3053 * after the origin (the snap before drc_ds). 3054 */ 3055 uint64_t obj; 3056 3057 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3058 while (obj != 3059 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3060 dsl_dataset_t *snap; 3061 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, 3062 &snap)); 3063 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir); 3064 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3065 dsl_destroy_snapshot_sync_impl(snap, 3066 B_FALSE, tx); 3067 dsl_dataset_rele(snap, FTAG); 3068 } 3069 } 3070 VERIFY3P(drc->drc_ds->ds_prev, ==, 3071 origin_head->ds_prev); 3072 3073 dsl_dataset_clone_swap_sync_impl(drc->drc_ds, 3074 origin_head, tx); 3075 dsl_dataset_snapshot_sync_impl(origin_head, 3076 drc->drc_tosnap, tx); 3077 3078 /* set snapshot's creation time and guid */ 3079 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx); 3080 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time = 3081 drc->drc_drrb->drr_creation_time; 3082 dsl_dataset_phys(origin_head->ds_prev)->ds_guid = 3083 drc->drc_drrb->drr_toguid; 3084 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &= 3085 ~DS_FLAG_INCONSISTENT; 3086 3087 dmu_buf_will_dirty(origin_head->ds_dbuf, tx); 3088 dsl_dataset_phys(origin_head)->ds_flags &= 3089 ~DS_FLAG_INCONSISTENT; 3090 3091 dsl_dataset_rele(origin_head, FTAG); 3092 dsl_destroy_head_sync_impl(drc->drc_ds, tx); 3093 3094 if (drc->drc_owner != NULL) 3095 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner); 3096 } else { 3097 dsl_dataset_t *ds = drc->drc_ds; 3098 3099 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx); 3100 3101 /* set snapshot's creation time and guid */ 3102 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx); 3103 dsl_dataset_phys(ds->ds_prev)->ds_creation_time = 3104 drc->drc_drrb->drr_creation_time; 3105 dsl_dataset_phys(ds->ds_prev)->ds_guid = 3106 drc->drc_drrb->drr_toguid; 3107 dsl_dataset_phys(ds->ds_prev)->ds_flags &= 3108 ~DS_FLAG_INCONSISTENT; 3109 3110 dmu_buf_will_dirty(ds->ds_dbuf, tx); 3111 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 3112 if (dsl_dataset_has_resume_receive_state(ds)) { 3113 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3114 DS_FIELD_RESUME_FROMGUID, tx); 3115 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3116 DS_FIELD_RESUME_OBJECT, tx); 3117 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3118 DS_FIELD_RESUME_OFFSET, tx); 3119 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3120 DS_FIELD_RESUME_BYTES, tx); 3121 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3122 DS_FIELD_RESUME_TOGUID, tx); 3123 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3124 DS_FIELD_RESUME_TONAME, tx); 3125 } 3126 } 3127 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; 3128 /* 3129 * Release the hold from dmu_recv_begin. This must be done before 3130 * we return to open context, so that when we free the dataset's dnode, 3131 * we can evict its bonus buffer. 3132 */ 3133 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 3134 drc->drc_ds = NULL; 3135} 3136 3137static int 3138add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj) 3139{ 3140 dsl_pool_t *dp; 3141 dsl_dataset_t *snapds; 3142 guid_map_entry_t *gmep; 3143 int err; 3144 3145 ASSERT(guid_map != NULL); 3146 3147 err = dsl_pool_hold(name, FTAG, &dp); 3148 if (err != 0) 3149 return (err); 3150 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); 3151 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds); 3152 if (err == 0) { 3153 gmep->guid = dsl_dataset_phys(snapds)->ds_guid; 3154 gmep->gme_ds = snapds; 3155 avl_add(guid_map, gmep); 3156 dsl_dataset_long_hold(snapds, gmep); 3157 } else 3158 kmem_free(gmep, sizeof (*gmep)); 3159 3160 dsl_pool_rele(dp, FTAG); 3161 return (err); 3162} 3163 3164static int dmu_recv_end_modified_blocks = 3; 3165 3166static int 3167dmu_recv_existing_end(dmu_recv_cookie_t *drc) 3168{ 3169 int error; 3170 char name[MAXNAMELEN]; 3171 3172#ifdef _KERNEL 3173 /* 3174 * We will be destroying the ds; make sure its origin is unmounted if 3175 * necessary. 3176 */ 3177 dsl_dataset_name(drc->drc_ds, name); 3178 zfs_destroy_unmount_origin(name); 3179#endif 3180 3181 error = dsl_sync_task(drc->drc_tofs, 3182 dmu_recv_end_check, dmu_recv_end_sync, drc, 3183 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3184 3185 if (error != 0) 3186 dmu_recv_cleanup_ds(drc); 3187 return (error); 3188} 3189 3190static int 3191dmu_recv_new_end(dmu_recv_cookie_t *drc) 3192{ 3193 int error; 3194 3195 error = dsl_sync_task(drc->drc_tofs, 3196 dmu_recv_end_check, dmu_recv_end_sync, drc, 3197 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3198 3199 if (error != 0) { 3200 dmu_recv_cleanup_ds(drc); 3201 } else if (drc->drc_guid_to_ds_map != NULL) { 3202 (void) add_ds_to_guidmap(drc->drc_tofs, 3203 drc->drc_guid_to_ds_map, 3204 drc->drc_newsnapobj); 3205 } 3206 return (error); 3207} 3208 3209int 3210dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) 3211{ 3212 drc->drc_owner = owner; 3213 3214 if (drc->drc_newfs) 3215 return (dmu_recv_new_end(drc)); 3216 else 3217 return (dmu_recv_existing_end(drc)); 3218} 3219 3220/* 3221 * Return TRUE if this objset is currently being received into. 3222 */ 3223boolean_t 3224dmu_objset_is_receiving(objset_t *os) 3225{ 3226 return (os->os_dsl_dataset != NULL && 3227 os->os_dsl_dataset->ds_owner == dmu_recv_tag); 3228} 3229