dmu_send.c revision 307266
1/* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21/* 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved. 23 * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 24 * Copyright (c) 2011, 2015 by Delphix. All rights reserved. 25 * Copyright (c) 2014, Joyent, Inc. All rights reserved. 26 * Copyright (c) 2012, Martin Matuska <mm@FreeBSD.org>. All rights reserved. 27 * Copyright 2014 HybridCluster. All rights reserved. 28 * Copyright 2016 RackTop Systems. 29 * Copyright (c) 2014 Integros [integros.com] 30 */ 31 32#include <sys/dmu.h> 33#include <sys/dmu_impl.h> 34#include <sys/dmu_tx.h> 35#include <sys/dbuf.h> 36#include <sys/dnode.h> 37#include <sys/zfs_context.h> 38#include <sys/dmu_objset.h> 39#include <sys/dmu_traverse.h> 40#include <sys/dsl_dataset.h> 41#include <sys/dsl_dir.h> 42#include <sys/dsl_prop.h> 43#include <sys/dsl_pool.h> 44#include <sys/dsl_synctask.h> 45#include <sys/zfs_ioctl.h> 46#include <sys/zap.h> 47#include <sys/zio_checksum.h> 48#include <sys/zfs_znode.h> 49#include <zfs_fletcher.h> 50#include <sys/avl.h> 51#include <sys/ddt.h> 52#include <sys/zfs_onexit.h> 53#include <sys/dmu_send.h> 54#include <sys/dsl_destroy.h> 55#include <sys/blkptr.h> 56#include <sys/dsl_bookmark.h> 57#include <sys/zfeature.h> 58#include <sys/bqueue.h> 59 60#ifdef __FreeBSD__ 61#undef dump_write 62#define dump_write dmu_dump_write 63#endif 64 65/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */ 66int zfs_send_corrupt_data = B_FALSE; 67int zfs_send_queue_length = 16 * 1024 * 1024; 68int zfs_recv_queue_length = 16 * 1024 * 1024; 69/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */ 70int zfs_send_set_freerecords_bit = B_TRUE; 71 72#ifdef _KERNEL 73TUNABLE_INT("vfs.zfs.send_set_freerecords_bit", &zfs_send_set_freerecords_bit); 74#endif 75 76static char *dmu_recv_tag = "dmu_recv_tag"; 77const char *recv_clone_name = "%recv"; 78 79#define BP_SPAN(datablkszsec, indblkshift, level) \ 80 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \ 81 (level) * (indblkshift - SPA_BLKPTRSHIFT))) 82 83static void byteswap_record(dmu_replay_record_t *drr); 84 85struct send_thread_arg { 86 bqueue_t q; 87 dsl_dataset_t *ds; /* Dataset to traverse */ 88 uint64_t fromtxg; /* Traverse from this txg */ 89 int flags; /* flags to pass to traverse_dataset */ 90 int error_code; 91 boolean_t cancel; 92 zbookmark_phys_t resume; 93}; 94 95struct send_block_record { 96 boolean_t eos_marker; /* Marks the end of the stream */ 97 blkptr_t bp; 98 zbookmark_phys_t zb; 99 uint8_t indblkshift; 100 uint16_t datablkszsec; 101 bqueue_node_t ln; 102}; 103 104static int 105dump_bytes(dmu_sendarg_t *dsp, void *buf, int len) 106{ 107 dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os); 108 struct uio auio; 109 struct iovec aiov; 110 111 /* 112 * The code does not rely on this (len being a multiple of 8). We keep 113 * this assertion because of the corresponding assertion in 114 * receive_read(). Keeping this assertion ensures that we do not 115 * inadvertently break backwards compatibility (causing the assertion 116 * in receive_read() to trigger on old software). 117 * 118 * Removing the assertions could be rolled into a new feature that uses 119 * data that isn't 8-byte aligned; if the assertions were removed, a 120 * feature flag would have to be added. 121 */ 122 123 ASSERT0(len % 8); 124 125 aiov.iov_base = buf; 126 aiov.iov_len = len; 127 auio.uio_iov = &aiov; 128 auio.uio_iovcnt = 1; 129 auio.uio_resid = len; 130 auio.uio_segflg = UIO_SYSSPACE; 131 auio.uio_rw = UIO_WRITE; 132 auio.uio_offset = (off_t)-1; 133 auio.uio_td = dsp->dsa_td; 134#ifdef _KERNEL 135 if (dsp->dsa_fp->f_type == DTYPE_VNODE) 136 bwillwrite(); 137 dsp->dsa_err = fo_write(dsp->dsa_fp, &auio, dsp->dsa_td->td_ucred, 0, 138 dsp->dsa_td); 139#else 140 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 141 dsp->dsa_err = EOPNOTSUPP; 142#endif 143 mutex_enter(&ds->ds_sendstream_lock); 144 *dsp->dsa_off += len; 145 mutex_exit(&ds->ds_sendstream_lock); 146 147 return (dsp->dsa_err); 148} 149 150/* 151 * For all record types except BEGIN, fill in the checksum (overlaid in 152 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything 153 * up to the start of the checksum itself. 154 */ 155static int 156dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len) 157{ 158 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 159 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 160 fletcher_4_incremental_native(dsp->dsa_drr, 161 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 162 &dsp->dsa_zc); 163 if (dsp->dsa_drr->drr_type != DRR_BEGIN) { 164 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u. 165 drr_checksum.drr_checksum)); 166 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc; 167 } 168 fletcher_4_incremental_native(&dsp->dsa_drr-> 169 drr_u.drr_checksum.drr_checksum, 170 sizeof (zio_cksum_t), &dsp->dsa_zc); 171 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0) 172 return (SET_ERROR(EINTR)); 173 if (payload_len != 0) { 174 fletcher_4_incremental_native(payload, payload_len, 175 &dsp->dsa_zc); 176 if (dump_bytes(dsp, payload, payload_len) != 0) 177 return (SET_ERROR(EINTR)); 178 } 179 return (0); 180} 181 182/* 183 * Fill in the drr_free struct, or perform aggregation if the previous record is 184 * also a free record, and the two are adjacent. 185 * 186 * Note that we send free records even for a full send, because we want to be 187 * able to receive a full send as a clone, which requires a list of all the free 188 * and freeobject records that were generated on the source. 189 */ 190static int 191dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 192 uint64_t length) 193{ 194 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free); 195 196 /* 197 * When we receive a free record, dbuf_free_range() assumes 198 * that the receiving system doesn't have any dbufs in the range 199 * being freed. This is always true because there is a one-record 200 * constraint: we only send one WRITE record for any given 201 * object,offset. We know that the one-record constraint is 202 * true because we always send data in increasing order by 203 * object,offset. 204 * 205 * If the increasing-order constraint ever changes, we should find 206 * another way to assert that the one-record constraint is still 207 * satisfied. 208 */ 209 ASSERT(object > dsp->dsa_last_data_object || 210 (object == dsp->dsa_last_data_object && 211 offset > dsp->dsa_last_data_offset)); 212 213 if (length != -1ULL && offset + length < offset) 214 length = -1ULL; 215 216 /* 217 * If there is a pending op, but it's not PENDING_FREE, push it out, 218 * since free block aggregation can only be done for blocks of the 219 * same type (i.e., DRR_FREE records can only be aggregated with 220 * other DRR_FREE records. DRR_FREEOBJECTS records can only be 221 * aggregated with other DRR_FREEOBJECTS records. 222 */ 223 if (dsp->dsa_pending_op != PENDING_NONE && 224 dsp->dsa_pending_op != PENDING_FREE) { 225 if (dump_record(dsp, NULL, 0) != 0) 226 return (SET_ERROR(EINTR)); 227 dsp->dsa_pending_op = PENDING_NONE; 228 } 229 230 if (dsp->dsa_pending_op == PENDING_FREE) { 231 /* 232 * There should never be a PENDING_FREE if length is -1 233 * (because dump_dnode is the only place where this 234 * function is called with a -1, and only after flushing 235 * any pending record). 236 */ 237 ASSERT(length != -1ULL); 238 /* 239 * Check to see whether this free block can be aggregated 240 * with pending one. 241 */ 242 if (drrf->drr_object == object && drrf->drr_offset + 243 drrf->drr_length == offset) { 244 drrf->drr_length += length; 245 return (0); 246 } else { 247 /* not a continuation. Push out pending record */ 248 if (dump_record(dsp, NULL, 0) != 0) 249 return (SET_ERROR(EINTR)); 250 dsp->dsa_pending_op = PENDING_NONE; 251 } 252 } 253 /* create a FREE record and make it pending */ 254 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 255 dsp->dsa_drr->drr_type = DRR_FREE; 256 drrf->drr_object = object; 257 drrf->drr_offset = offset; 258 drrf->drr_length = length; 259 drrf->drr_toguid = dsp->dsa_toguid; 260 if (length == -1ULL) { 261 if (dump_record(dsp, NULL, 0) != 0) 262 return (SET_ERROR(EINTR)); 263 } else { 264 dsp->dsa_pending_op = PENDING_FREE; 265 } 266 267 return (0); 268} 269 270static int 271dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, 272 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data) 273{ 274 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write); 275 276 /* 277 * We send data in increasing object, offset order. 278 * See comment in dump_free() for details. 279 */ 280 ASSERT(object > dsp->dsa_last_data_object || 281 (object == dsp->dsa_last_data_object && 282 offset > dsp->dsa_last_data_offset)); 283 dsp->dsa_last_data_object = object; 284 dsp->dsa_last_data_offset = offset + blksz - 1; 285 286 /* 287 * If there is any kind of pending aggregation (currently either 288 * a grouping of free objects or free blocks), push it out to 289 * the stream, since aggregation can't be done across operations 290 * of different types. 291 */ 292 if (dsp->dsa_pending_op != PENDING_NONE) { 293 if (dump_record(dsp, NULL, 0) != 0) 294 return (SET_ERROR(EINTR)); 295 dsp->dsa_pending_op = PENDING_NONE; 296 } 297 /* write a WRITE record */ 298 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 299 dsp->dsa_drr->drr_type = DRR_WRITE; 300 drrw->drr_object = object; 301 drrw->drr_type = type; 302 drrw->drr_offset = offset; 303 drrw->drr_length = blksz; 304 drrw->drr_toguid = dsp->dsa_toguid; 305 if (bp == NULL || BP_IS_EMBEDDED(bp)) { 306 /* 307 * There's no pre-computed checksum for partial-block 308 * writes or embedded BP's, so (like 309 * fletcher4-checkummed blocks) userland will have to 310 * compute a dedup-capable checksum itself. 311 */ 312 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF; 313 } else { 314 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp); 315 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags & 316 ZCHECKSUM_FLAG_DEDUP) 317 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP; 318 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp)); 319 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp)); 320 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp)); 321 drrw->drr_key.ddk_cksum = bp->blk_cksum; 322 } 323 324 if (dump_record(dsp, data, blksz) != 0) 325 return (SET_ERROR(EINTR)); 326 return (0); 327} 328 329static int 330dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset, 331 int blksz, const blkptr_t *bp) 332{ 333 char buf[BPE_PAYLOAD_SIZE]; 334 struct drr_write_embedded *drrw = 335 &(dsp->dsa_drr->drr_u.drr_write_embedded); 336 337 if (dsp->dsa_pending_op != PENDING_NONE) { 338 if (dump_record(dsp, NULL, 0) != 0) 339 return (EINTR); 340 dsp->dsa_pending_op = PENDING_NONE; 341 } 342 343 ASSERT(BP_IS_EMBEDDED(bp)); 344 345 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 346 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED; 347 drrw->drr_object = object; 348 drrw->drr_offset = offset; 349 drrw->drr_length = blksz; 350 drrw->drr_toguid = dsp->dsa_toguid; 351 drrw->drr_compression = BP_GET_COMPRESS(bp); 352 drrw->drr_etype = BPE_GET_ETYPE(bp); 353 drrw->drr_lsize = BPE_GET_LSIZE(bp); 354 drrw->drr_psize = BPE_GET_PSIZE(bp); 355 356 decode_embedded_bp_compressed(bp, buf); 357 358 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0) 359 return (EINTR); 360 return (0); 361} 362 363static int 364dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data) 365{ 366 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill); 367 368 if (dsp->dsa_pending_op != PENDING_NONE) { 369 if (dump_record(dsp, NULL, 0) != 0) 370 return (SET_ERROR(EINTR)); 371 dsp->dsa_pending_op = PENDING_NONE; 372 } 373 374 /* write a SPILL record */ 375 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 376 dsp->dsa_drr->drr_type = DRR_SPILL; 377 drrs->drr_object = object; 378 drrs->drr_length = blksz; 379 drrs->drr_toguid = dsp->dsa_toguid; 380 381 if (dump_record(dsp, data, blksz) != 0) 382 return (SET_ERROR(EINTR)); 383 return (0); 384} 385 386static int 387dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs) 388{ 389 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects); 390 391 /* 392 * If there is a pending op, but it's not PENDING_FREEOBJECTS, 393 * push it out, since free block aggregation can only be done for 394 * blocks of the same type (i.e., DRR_FREE records can only be 395 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records 396 * can only be aggregated with other DRR_FREEOBJECTS records. 397 */ 398 if (dsp->dsa_pending_op != PENDING_NONE && 399 dsp->dsa_pending_op != PENDING_FREEOBJECTS) { 400 if (dump_record(dsp, NULL, 0) != 0) 401 return (SET_ERROR(EINTR)); 402 dsp->dsa_pending_op = PENDING_NONE; 403 } 404 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) { 405 /* 406 * See whether this free object array can be aggregated 407 * with pending one 408 */ 409 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) { 410 drrfo->drr_numobjs += numobjs; 411 return (0); 412 } else { 413 /* can't be aggregated. Push out pending record */ 414 if (dump_record(dsp, NULL, 0) != 0) 415 return (SET_ERROR(EINTR)); 416 dsp->dsa_pending_op = PENDING_NONE; 417 } 418 } 419 420 /* write a FREEOBJECTS record */ 421 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 422 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS; 423 drrfo->drr_firstobj = firstobj; 424 drrfo->drr_numobjs = numobjs; 425 drrfo->drr_toguid = dsp->dsa_toguid; 426 427 dsp->dsa_pending_op = PENDING_FREEOBJECTS; 428 429 return (0); 430} 431 432static int 433dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp) 434{ 435 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object); 436 437 if (object < dsp->dsa_resume_object) { 438 /* 439 * Note: when resuming, we will visit all the dnodes in 440 * the block of dnodes that we are resuming from. In 441 * this case it's unnecessary to send the dnodes prior to 442 * the one we are resuming from. We should be at most one 443 * block's worth of dnodes behind the resume point. 444 */ 445 ASSERT3U(dsp->dsa_resume_object - object, <, 446 1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT)); 447 return (0); 448 } 449 450 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE) 451 return (dump_freeobjects(dsp, object, 1)); 452 453 if (dsp->dsa_pending_op != PENDING_NONE) { 454 if (dump_record(dsp, NULL, 0) != 0) 455 return (SET_ERROR(EINTR)); 456 dsp->dsa_pending_op = PENDING_NONE; 457 } 458 459 /* write an OBJECT record */ 460 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t)); 461 dsp->dsa_drr->drr_type = DRR_OBJECT; 462 drro->drr_object = object; 463 drro->drr_type = dnp->dn_type; 464 drro->drr_bonustype = dnp->dn_bonustype; 465 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 466 drro->drr_bonuslen = dnp->dn_bonuslen; 467 drro->drr_checksumtype = dnp->dn_checksum; 468 drro->drr_compress = dnp->dn_compress; 469 drro->drr_toguid = dsp->dsa_toguid; 470 471 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 472 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE) 473 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE; 474 475 if (dump_record(dsp, DN_BONUS(dnp), 476 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) { 477 return (SET_ERROR(EINTR)); 478 } 479 480 /* Free anything past the end of the file. */ 481 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) * 482 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0) 483 return (SET_ERROR(EINTR)); 484 if (dsp->dsa_err != 0) 485 return (SET_ERROR(EINTR)); 486 return (0); 487} 488 489static boolean_t 490backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp) 491{ 492 if (!BP_IS_EMBEDDED(bp)) 493 return (B_FALSE); 494 495 /* 496 * Compression function must be legacy, or explicitly enabled. 497 */ 498 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS && 499 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4))) 500 return (B_FALSE); 501 502 /* 503 * Embed type must be explicitly enabled. 504 */ 505 switch (BPE_GET_ETYPE(bp)) { 506 case BP_EMBEDDED_TYPE_DATA: 507 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) 508 return (B_TRUE); 509 break; 510 default: 511 return (B_FALSE); 512 } 513 return (B_FALSE); 514} 515 516/* 517 * This is the callback function to traverse_dataset that acts as the worker 518 * thread for dmu_send_impl. 519 */ 520/*ARGSUSED*/ 521static int 522send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 523 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg) 524{ 525 struct send_thread_arg *sta = arg; 526 struct send_block_record *record; 527 uint64_t record_size; 528 int err = 0; 529 530 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 531 zb->zb_object >= sta->resume.zb_object); 532 533 if (sta->cancel) 534 return (SET_ERROR(EINTR)); 535 536 if (bp == NULL) { 537 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL); 538 return (0); 539 } else if (zb->zb_level < 0) { 540 return (0); 541 } 542 543 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP); 544 record->eos_marker = B_FALSE; 545 record->bp = *bp; 546 record->zb = *zb; 547 record->indblkshift = dnp->dn_indblkshift; 548 record->datablkszsec = dnp->dn_datablkszsec; 549 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT; 550 bqueue_enqueue(&sta->q, record, record_size); 551 552 return (err); 553} 554 555/* 556 * This function kicks off the traverse_dataset. It also handles setting the 557 * error code of the thread in case something goes wrong, and pushes the End of 558 * Stream record when the traverse_dataset call has finished. If there is no 559 * dataset to traverse, the thread immediately pushes End of Stream marker. 560 */ 561static void 562send_traverse_thread(void *arg) 563{ 564 struct send_thread_arg *st_arg = arg; 565 int err; 566 struct send_block_record *data; 567 568 if (st_arg->ds != NULL) { 569 err = traverse_dataset_resume(st_arg->ds, 570 st_arg->fromtxg, &st_arg->resume, 571 st_arg->flags, send_cb, st_arg); 572 573 if (err != EINTR) 574 st_arg->error_code = err; 575 } 576 data = kmem_zalloc(sizeof (*data), KM_SLEEP); 577 data->eos_marker = B_TRUE; 578 bqueue_enqueue(&st_arg->q, data, 1); 579 thread_exit(); 580} 581 582/* 583 * This function actually handles figuring out what kind of record needs to be 584 * dumped, reading the data (which has hopefully been prefetched), and calling 585 * the appropriate helper function. 586 */ 587static int 588do_dump(dmu_sendarg_t *dsa, struct send_block_record *data) 589{ 590 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os); 591 const blkptr_t *bp = &data->bp; 592 const zbookmark_phys_t *zb = &data->zb; 593 uint8_t indblkshift = data->indblkshift; 594 uint16_t dblkszsec = data->datablkszsec; 595 spa_t *spa = ds->ds_dir->dd_pool->dp_spa; 596 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE; 597 int err = 0; 598 599 ASSERT3U(zb->zb_level, >=, 0); 600 601 ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT || 602 zb->zb_object >= dsa->dsa_resume_object); 603 604 if (zb->zb_object != DMU_META_DNODE_OBJECT && 605 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) { 606 return (0); 607 } else if (BP_IS_HOLE(bp) && 608 zb->zb_object == DMU_META_DNODE_OBJECT) { 609 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 610 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT; 611 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT); 612 } else if (BP_IS_HOLE(bp)) { 613 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level); 614 uint64_t offset = zb->zb_blkid * span; 615 err = dump_free(dsa, zb->zb_object, offset, span); 616 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) { 617 return (0); 618 } else if (type == DMU_OT_DNODE) { 619 int blksz = BP_GET_LSIZE(bp); 620 arc_flags_t aflags = ARC_FLAG_WAIT; 621 arc_buf_t *abuf; 622 623 ASSERT0(zb->zb_level); 624 625 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 626 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 627 &aflags, zb) != 0) 628 return (SET_ERROR(EIO)); 629 630 dnode_phys_t *blk = abuf->b_data; 631 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT); 632 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) { 633 err = dump_dnode(dsa, dnobj + i, blk + i); 634 if (err != 0) 635 break; 636 } 637 arc_buf_destroy(abuf, &abuf); 638 } else if (type == DMU_OT_SA) { 639 arc_flags_t aflags = ARC_FLAG_WAIT; 640 arc_buf_t *abuf; 641 int blksz = BP_GET_LSIZE(bp); 642 643 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 644 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 645 &aflags, zb) != 0) 646 return (SET_ERROR(EIO)); 647 648 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data); 649 arc_buf_destroy(abuf, &abuf); 650 } else if (backup_do_embed(dsa, bp)) { 651 /* it's an embedded level-0 block of a regular object */ 652 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 653 ASSERT0(zb->zb_level); 654 err = dump_write_embedded(dsa, zb->zb_object, 655 zb->zb_blkid * blksz, blksz, bp); 656 } else { 657 /* it's a level-0 block of a regular object */ 658 arc_flags_t aflags = ARC_FLAG_WAIT; 659 arc_buf_t *abuf; 660 int blksz = dblkszsec << SPA_MINBLOCKSHIFT; 661 uint64_t offset; 662 663 ASSERT0(zb->zb_level); 664 ASSERT(zb->zb_object > dsa->dsa_resume_object || 665 (zb->zb_object == dsa->dsa_resume_object && 666 zb->zb_blkid * blksz >= dsa->dsa_resume_offset)); 667 668 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf, 669 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL, 670 &aflags, zb) != 0) { 671 if (zfs_send_corrupt_data) { 672 /* Send a block filled with 0x"zfs badd bloc" */ 673 abuf = arc_alloc_buf(spa, blksz, &abuf, 674 ARC_BUFC_DATA); 675 uint64_t *ptr; 676 for (ptr = abuf->b_data; 677 (char *)ptr < (char *)abuf->b_data + blksz; 678 ptr++) 679 *ptr = 0x2f5baddb10cULL; 680 } else { 681 return (SET_ERROR(EIO)); 682 } 683 } 684 685 offset = zb->zb_blkid * blksz; 686 687 if (!(dsa->dsa_featureflags & 688 DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 689 blksz > SPA_OLD_MAXBLOCKSIZE) { 690 char *buf = abuf->b_data; 691 while (blksz > 0 && err == 0) { 692 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE); 693 err = dump_write(dsa, type, zb->zb_object, 694 offset, n, NULL, buf); 695 offset += n; 696 buf += n; 697 blksz -= n; 698 } 699 } else { 700 err = dump_write(dsa, type, zb->zb_object, 701 offset, blksz, bp, abuf->b_data); 702 } 703 arc_buf_destroy(abuf, &abuf); 704 } 705 706 ASSERT(err == 0 || err == EINTR); 707 return (err); 708} 709 710/* 711 * Pop the new data off the queue, and free the old data. 712 */ 713static struct send_block_record * 714get_next_record(bqueue_t *bq, struct send_block_record *data) 715{ 716 struct send_block_record *tmp = bqueue_dequeue(bq); 717 kmem_free(data, sizeof (*data)); 718 return (tmp); 719} 720 721/* 722 * Actually do the bulk of the work in a zfs send. 723 * 724 * Note: Releases dp using the specified tag. 725 */ 726static int 727dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds, 728 zfs_bookmark_phys_t *ancestor_zb, 729 boolean_t is_clone, boolean_t embedok, boolean_t large_block_ok, int outfd, 730 uint64_t resumeobj, uint64_t resumeoff, 731#ifdef illumos 732 vnode_t *vp, offset_t *off) 733#else 734 struct file *fp, offset_t *off) 735#endif 736{ 737 objset_t *os; 738 dmu_replay_record_t *drr; 739 dmu_sendarg_t *dsp; 740 int err; 741 uint64_t fromtxg = 0; 742 uint64_t featureflags = 0; 743 struct send_thread_arg to_arg = { 0 }; 744 745 err = dmu_objset_from_ds(to_ds, &os); 746 if (err != 0) { 747 dsl_pool_rele(dp, tag); 748 return (err); 749 } 750 751 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP); 752 drr->drr_type = DRR_BEGIN; 753 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC; 754 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo, 755 DMU_SUBSTREAM); 756 757#ifdef _KERNEL 758 if (dmu_objset_type(os) == DMU_OST_ZFS) { 759 uint64_t version; 760 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) { 761 kmem_free(drr, sizeof (dmu_replay_record_t)); 762 dsl_pool_rele(dp, tag); 763 return (SET_ERROR(EINVAL)); 764 } 765 if (version >= ZPL_VERSION_SA) { 766 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL; 767 } 768 } 769#endif 770 771 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS]) 772 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS; 773 if (embedok && 774 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) { 775 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA; 776 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 777 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4; 778 } 779 780 if (resumeobj != 0 || resumeoff != 0) { 781 featureflags |= DMU_BACKUP_FEATURE_RESUMING; 782 } 783 784 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo, 785 featureflags); 786 787 drr->drr_u.drr_begin.drr_creation_time = 788 dsl_dataset_phys(to_ds)->ds_creation_time; 789 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os); 790 if (is_clone) 791 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE; 792 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid; 793 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET) 794 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA; 795 if (zfs_send_set_freerecords_bit) 796 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS; 797 798 if (ancestor_zb != NULL) { 799 drr->drr_u.drr_begin.drr_fromguid = 800 ancestor_zb->zbm_guid; 801 fromtxg = ancestor_zb->zbm_creation_txg; 802 } 803 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname); 804 if (!to_ds->ds_is_snapshot) { 805 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--", 806 sizeof (drr->drr_u.drr_begin.drr_toname)); 807 } 808 809 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP); 810 811 dsp->dsa_drr = drr; 812 dsp->dsa_outfd = outfd; 813 dsp->dsa_proc = curproc; 814 dsp->dsa_td = curthread; 815 dsp->dsa_fp = fp; 816 dsp->dsa_os = os; 817 dsp->dsa_off = off; 818 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid; 819 dsp->dsa_pending_op = PENDING_NONE; 820 dsp->dsa_featureflags = featureflags; 821 dsp->dsa_resume_object = resumeobj; 822 dsp->dsa_resume_offset = resumeoff; 823 824 mutex_enter(&to_ds->ds_sendstream_lock); 825 list_insert_head(&to_ds->ds_sendstreams, dsp); 826 mutex_exit(&to_ds->ds_sendstream_lock); 827 828 dsl_dataset_long_hold(to_ds, FTAG); 829 dsl_pool_rele(dp, tag); 830 831 void *payload = NULL; 832 size_t payload_len = 0; 833 if (resumeobj != 0 || resumeoff != 0) { 834 dmu_object_info_t to_doi; 835 err = dmu_object_info(os, resumeobj, &to_doi); 836 if (err != 0) 837 goto out; 838 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0, 839 resumeoff / to_doi.doi_data_block_size); 840 841 nvlist_t *nvl = fnvlist_alloc(); 842 fnvlist_add_uint64(nvl, "resume_object", resumeobj); 843 fnvlist_add_uint64(nvl, "resume_offset", resumeoff); 844 payload = fnvlist_pack(nvl, &payload_len); 845 drr->drr_payloadlen = payload_len; 846 fnvlist_free(nvl); 847 } 848 849 err = dump_record(dsp, payload, payload_len); 850 fnvlist_pack_free(payload, payload_len); 851 if (err != 0) { 852 err = dsp->dsa_err; 853 goto out; 854 } 855 856 err = bqueue_init(&to_arg.q, zfs_send_queue_length, 857 offsetof(struct send_block_record, ln)); 858 to_arg.error_code = 0; 859 to_arg.cancel = B_FALSE; 860 to_arg.ds = to_ds; 861 to_arg.fromtxg = fromtxg; 862 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH; 863 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, &p0, 864 TS_RUN, minclsyspri); 865 866 struct send_block_record *to_data; 867 to_data = bqueue_dequeue(&to_arg.q); 868 869 while (!to_data->eos_marker && err == 0) { 870 err = do_dump(dsp, to_data); 871 to_data = get_next_record(&to_arg.q, to_data); 872 if (issig(JUSTLOOKING) && issig(FORREAL)) 873 err = EINTR; 874 } 875 876 if (err != 0) { 877 to_arg.cancel = B_TRUE; 878 while (!to_data->eos_marker) { 879 to_data = get_next_record(&to_arg.q, to_data); 880 } 881 } 882 kmem_free(to_data, sizeof (*to_data)); 883 884 bqueue_destroy(&to_arg.q); 885 886 if (err == 0 && to_arg.error_code != 0) 887 err = to_arg.error_code; 888 889 if (err != 0) 890 goto out; 891 892 if (dsp->dsa_pending_op != PENDING_NONE) 893 if (dump_record(dsp, NULL, 0) != 0) 894 err = SET_ERROR(EINTR); 895 896 if (err != 0) { 897 if (err == EINTR && dsp->dsa_err != 0) 898 err = dsp->dsa_err; 899 goto out; 900 } 901 902 bzero(drr, sizeof (dmu_replay_record_t)); 903 drr->drr_type = DRR_END; 904 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc; 905 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid; 906 907 if (dump_record(dsp, NULL, 0) != 0) 908 err = dsp->dsa_err; 909 910out: 911 mutex_enter(&to_ds->ds_sendstream_lock); 912 list_remove(&to_ds->ds_sendstreams, dsp); 913 mutex_exit(&to_ds->ds_sendstream_lock); 914 915 kmem_free(drr, sizeof (dmu_replay_record_t)); 916 kmem_free(dsp, sizeof (dmu_sendarg_t)); 917 918 dsl_dataset_long_rele(to_ds, FTAG); 919 920 return (err); 921} 922 923int 924dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap, 925 boolean_t embedok, boolean_t large_block_ok, 926#ifdef illumos 927 int outfd, vnode_t *vp, offset_t *off) 928#else 929 int outfd, struct file *fp, offset_t *off) 930#endif 931{ 932 dsl_pool_t *dp; 933 dsl_dataset_t *ds; 934 dsl_dataset_t *fromds = NULL; 935 int err; 936 937 err = dsl_pool_hold(pool, FTAG, &dp); 938 if (err != 0) 939 return (err); 940 941 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds); 942 if (err != 0) { 943 dsl_pool_rele(dp, FTAG); 944 return (err); 945 } 946 947 if (fromsnap != 0) { 948 zfs_bookmark_phys_t zb; 949 boolean_t is_clone; 950 951 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds); 952 if (err != 0) { 953 dsl_dataset_rele(ds, FTAG); 954 dsl_pool_rele(dp, FTAG); 955 return (err); 956 } 957 if (!dsl_dataset_is_before(ds, fromds, 0)) 958 err = SET_ERROR(EXDEV); 959 zb.zbm_creation_time = 960 dsl_dataset_phys(fromds)->ds_creation_time; 961 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg; 962 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 963 is_clone = (fromds->ds_dir != ds->ds_dir); 964 dsl_dataset_rele(fromds, FTAG); 965 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 966 embedok, large_block_ok, outfd, 0, 0, fp, off); 967 } else { 968 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 969 embedok, large_block_ok, outfd, 0, 0, fp, off); 970 } 971 dsl_dataset_rele(ds, FTAG); 972 return (err); 973} 974 975int 976dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok, 977 boolean_t large_block_ok, int outfd, uint64_t resumeobj, uint64_t resumeoff, 978#ifdef illumos 979 vnode_t *vp, offset_t *off) 980#else 981 struct file *fp, offset_t *off) 982#endif 983{ 984 dsl_pool_t *dp; 985 dsl_dataset_t *ds; 986 int err; 987 boolean_t owned = B_FALSE; 988 989 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL) 990 return (SET_ERROR(EINVAL)); 991 992 err = dsl_pool_hold(tosnap, FTAG, &dp); 993 if (err != 0) 994 return (err); 995 996 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) { 997 /* 998 * We are sending a filesystem or volume. Ensure 999 * that it doesn't change by owning the dataset. 1000 */ 1001 err = dsl_dataset_own(dp, tosnap, FTAG, &ds); 1002 owned = B_TRUE; 1003 } else { 1004 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds); 1005 } 1006 if (err != 0) { 1007 dsl_pool_rele(dp, FTAG); 1008 return (err); 1009 } 1010 1011 if (fromsnap != NULL) { 1012 zfs_bookmark_phys_t zb; 1013 boolean_t is_clone = B_FALSE; 1014 int fsnamelen = strchr(tosnap, '@') - tosnap; 1015 1016 /* 1017 * If the fromsnap is in a different filesystem, then 1018 * mark the send stream as a clone. 1019 */ 1020 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 || 1021 (fromsnap[fsnamelen] != '@' && 1022 fromsnap[fsnamelen] != '#')) { 1023 is_clone = B_TRUE; 1024 } 1025 1026 if (strchr(fromsnap, '@')) { 1027 dsl_dataset_t *fromds; 1028 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds); 1029 if (err == 0) { 1030 if (!dsl_dataset_is_before(ds, fromds, 0)) 1031 err = SET_ERROR(EXDEV); 1032 zb.zbm_creation_time = 1033 dsl_dataset_phys(fromds)->ds_creation_time; 1034 zb.zbm_creation_txg = 1035 dsl_dataset_phys(fromds)->ds_creation_txg; 1036 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid; 1037 is_clone = (ds->ds_dir != fromds->ds_dir); 1038 dsl_dataset_rele(fromds, FTAG); 1039 } 1040 } else { 1041 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb); 1042 } 1043 if (err != 0) { 1044 dsl_dataset_rele(ds, FTAG); 1045 dsl_pool_rele(dp, FTAG); 1046 return (err); 1047 } 1048 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone, 1049 embedok, large_block_ok, 1050 outfd, resumeobj, resumeoff, fp, off); 1051 } else { 1052 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE, 1053 embedok, large_block_ok, 1054 outfd, resumeobj, resumeoff, fp, off); 1055 } 1056 if (owned) 1057 dsl_dataset_disown(ds, FTAG); 1058 else 1059 dsl_dataset_rele(ds, FTAG); 1060 return (err); 1061} 1062 1063static int 1064dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size, 1065 uint64_t *sizep) 1066{ 1067 int err; 1068 /* 1069 * Assume that space (both on-disk and in-stream) is dominated by 1070 * data. We will adjust for indirect blocks and the copies property, 1071 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records). 1072 */ 1073 1074 /* 1075 * Subtract out approximate space used by indirect blocks. 1076 * Assume most space is used by data blocks (non-indirect, non-dnode). 1077 * Assume all blocks are recordsize. Assume ditto blocks and 1078 * internal fragmentation counter out compression. 1079 * 1080 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per 1081 * block, which we observe in practice. 1082 */ 1083 uint64_t recordsize; 1084 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize); 1085 if (err != 0) 1086 return (err); 1087 size -= size / recordsize * sizeof (blkptr_t); 1088 1089 /* Add in the space for the record associated with each block. */ 1090 size += size / recordsize * sizeof (dmu_replay_record_t); 1091 1092 *sizep = size; 1093 1094 return (0); 1095} 1096 1097int 1098dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep) 1099{ 1100 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1101 int err; 1102 uint64_t size; 1103 1104 ASSERT(dsl_pool_config_held(dp)); 1105 1106 /* tosnap must be a snapshot */ 1107 if (!ds->ds_is_snapshot) 1108 return (SET_ERROR(EINVAL)); 1109 1110 /* fromsnap, if provided, must be a snapshot */ 1111 if (fromds != NULL && !fromds->ds_is_snapshot) 1112 return (SET_ERROR(EINVAL)); 1113 1114 /* 1115 * fromsnap must be an earlier snapshot from the same fs as tosnap, 1116 * or the origin's fs. 1117 */ 1118 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0)) 1119 return (SET_ERROR(EXDEV)); 1120 1121 /* Get uncompressed size estimate of changed data. */ 1122 if (fromds == NULL) { 1123 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes; 1124 } else { 1125 uint64_t used, comp; 1126 err = dsl_dataset_space_written(fromds, ds, 1127 &used, &comp, &size); 1128 if (err != 0) 1129 return (err); 1130 } 1131 1132 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1133 return (err); 1134} 1135 1136/* 1137 * Simple callback used to traverse the blocks of a snapshot and sum their 1138 * uncompressed size 1139 */ 1140/* ARGSUSED */ 1141static int 1142dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp, 1143 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg) 1144{ 1145 uint64_t *spaceptr = arg; 1146 if (bp != NULL && !BP_IS_HOLE(bp)) { 1147 *spaceptr += BP_GET_UCSIZE(bp); 1148 } 1149 return (0); 1150} 1151 1152/* 1153 * Given a desination snapshot and a TXG, calculate the approximate size of a 1154 * send stream sent from that TXG. from_txg may be zero, indicating that the 1155 * whole snapshot will be sent. 1156 */ 1157int 1158dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg, 1159 uint64_t *sizep) 1160{ 1161 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1162 int err; 1163 uint64_t size = 0; 1164 1165 ASSERT(dsl_pool_config_held(dp)); 1166 1167 /* tosnap must be a snapshot */ 1168 if (!dsl_dataset_is_snapshot(ds)) 1169 return (SET_ERROR(EINVAL)); 1170 1171 /* verify that from_txg is before the provided snapshot was taken */ 1172 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) { 1173 return (SET_ERROR(EXDEV)); 1174 } 1175 1176 /* 1177 * traverse the blocks of the snapshot with birth times after 1178 * from_txg, summing their uncompressed size 1179 */ 1180 err = traverse_dataset(ds, from_txg, TRAVERSE_POST, 1181 dmu_calculate_send_traversal, &size); 1182 if (err) 1183 return (err); 1184 1185 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep); 1186 return (err); 1187} 1188 1189typedef struct dmu_recv_begin_arg { 1190 const char *drba_origin; 1191 dmu_recv_cookie_t *drba_cookie; 1192 cred_t *drba_cred; 1193 uint64_t drba_snapobj; 1194} dmu_recv_begin_arg_t; 1195 1196static int 1197recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds, 1198 uint64_t fromguid) 1199{ 1200 uint64_t val; 1201 int error; 1202 dsl_pool_t *dp = ds->ds_dir->dd_pool; 1203 1204 /* temporary clone name must not exist */ 1205 error = zap_lookup(dp->dp_meta_objset, 1206 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name, 1207 8, 1, &val); 1208 if (error != ENOENT) 1209 return (error == 0 ? EBUSY : error); 1210 1211 /* new snapshot name must not exist */ 1212 error = zap_lookup(dp->dp_meta_objset, 1213 dsl_dataset_phys(ds)->ds_snapnames_zapobj, 1214 drba->drba_cookie->drc_tosnap, 8, 1, &val); 1215 if (error != ENOENT) 1216 return (error == 0 ? EEXIST : error); 1217 1218 /* 1219 * Check snapshot limit before receiving. We'll recheck again at the 1220 * end, but might as well abort before receiving if we're already over 1221 * the limit. 1222 * 1223 * Note that we do not check the file system limit with 1224 * dsl_dir_fscount_check because the temporary %clones don't count 1225 * against that limit. 1226 */ 1227 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT, 1228 NULL, drba->drba_cred); 1229 if (error != 0) 1230 return (error); 1231 1232 if (fromguid != 0) { 1233 dsl_dataset_t *snap; 1234 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj; 1235 1236 /* Find snapshot in this dir that matches fromguid. */ 1237 while (obj != 0) { 1238 error = dsl_dataset_hold_obj(dp, obj, FTAG, 1239 &snap); 1240 if (error != 0) 1241 return (SET_ERROR(ENODEV)); 1242 if (snap->ds_dir != ds->ds_dir) { 1243 dsl_dataset_rele(snap, FTAG); 1244 return (SET_ERROR(ENODEV)); 1245 } 1246 if (dsl_dataset_phys(snap)->ds_guid == fromguid) 1247 break; 1248 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 1249 dsl_dataset_rele(snap, FTAG); 1250 } 1251 if (obj == 0) 1252 return (SET_ERROR(ENODEV)); 1253 1254 if (drba->drba_cookie->drc_force) { 1255 drba->drba_snapobj = obj; 1256 } else { 1257 /* 1258 * If we are not forcing, there must be no 1259 * changes since fromsnap. 1260 */ 1261 if (dsl_dataset_modified_since_snap(ds, snap)) { 1262 dsl_dataset_rele(snap, FTAG); 1263 return (SET_ERROR(ETXTBSY)); 1264 } 1265 drba->drba_snapobj = ds->ds_prev->ds_object; 1266 } 1267 1268 dsl_dataset_rele(snap, FTAG); 1269 } else { 1270 /* if full, then must be forced */ 1271 if (!drba->drba_cookie->drc_force) 1272 return (SET_ERROR(EEXIST)); 1273 /* start from $ORIGIN@$ORIGIN, if supported */ 1274 drba->drba_snapobj = dp->dp_origin_snap != NULL ? 1275 dp->dp_origin_snap->ds_object : 0; 1276 } 1277 1278 return (0); 1279 1280} 1281 1282static int 1283dmu_recv_begin_check(void *arg, dmu_tx_t *tx) 1284{ 1285 dmu_recv_begin_arg_t *drba = arg; 1286 dsl_pool_t *dp = dmu_tx_pool(tx); 1287 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1288 uint64_t fromguid = drrb->drr_fromguid; 1289 int flags = drrb->drr_flags; 1290 int error; 1291 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1292 dsl_dataset_t *ds; 1293 const char *tofs = drba->drba_cookie->drc_tofs; 1294 1295 /* already checked */ 1296 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1297 ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING)); 1298 1299 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1300 DMU_COMPOUNDSTREAM || 1301 drrb->drr_type >= DMU_OST_NUMTYPES || 1302 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL)) 1303 return (SET_ERROR(EINVAL)); 1304 1305 /* Verify pool version supports SA if SA_SPILL feature set */ 1306 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1307 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1308 return (SET_ERROR(ENOTSUP)); 1309 1310 if (drba->drba_cookie->drc_resumable && 1311 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET)) 1312 return (SET_ERROR(ENOTSUP)); 1313 1314 /* 1315 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1316 * record to a plan WRITE record, so the pool must have the 1317 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1318 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1319 */ 1320 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1321 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1322 return (SET_ERROR(ENOTSUP)); 1323 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1324 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1325 return (SET_ERROR(ENOTSUP)); 1326 1327 /* 1328 * The receiving code doesn't know how to translate large blocks 1329 * to smaller ones, so the pool must have the LARGE_BLOCKS 1330 * feature enabled if the stream has LARGE_BLOCKS. 1331 */ 1332 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) && 1333 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS)) 1334 return (SET_ERROR(ENOTSUP)); 1335 1336 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1337 if (error == 0) { 1338 /* target fs already exists; recv into temp clone */ 1339 1340 /* Can't recv a clone into an existing fs */ 1341 if (flags & DRR_FLAG_CLONE || drba->drba_origin) { 1342 dsl_dataset_rele(ds, FTAG); 1343 return (SET_ERROR(EINVAL)); 1344 } 1345 1346 error = recv_begin_check_existing_impl(drba, ds, fromguid); 1347 dsl_dataset_rele(ds, FTAG); 1348 } else if (error == ENOENT) { 1349 /* target fs does not exist; must be a full backup or clone */ 1350 char buf[ZFS_MAX_DATASET_NAME_LEN]; 1351 1352 /* 1353 * If it's a non-clone incremental, we are missing the 1354 * target fs, so fail the recv. 1355 */ 1356 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE || 1357 drba->drba_origin)) 1358 return (SET_ERROR(ENOENT)); 1359 1360 /* 1361 * If we're receiving a full send as a clone, and it doesn't 1362 * contain all the necessary free records and freeobject 1363 * records, reject it. 1364 */ 1365 if (fromguid == 0 && drba->drba_origin && 1366 !(flags & DRR_FLAG_FREERECORDS)) 1367 return (SET_ERROR(EINVAL)); 1368 1369 /* Open the parent of tofs */ 1370 ASSERT3U(strlen(tofs), <, sizeof (buf)); 1371 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1); 1372 error = dsl_dataset_hold(dp, buf, FTAG, &ds); 1373 if (error != 0) 1374 return (error); 1375 1376 /* 1377 * Check filesystem and snapshot limits before receiving. We'll 1378 * recheck snapshot limits again at the end (we create the 1379 * filesystems and increment those counts during begin_sync). 1380 */ 1381 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1382 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred); 1383 if (error != 0) { 1384 dsl_dataset_rele(ds, FTAG); 1385 return (error); 1386 } 1387 1388 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, 1389 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred); 1390 if (error != 0) { 1391 dsl_dataset_rele(ds, FTAG); 1392 return (error); 1393 } 1394 1395 if (drba->drba_origin != NULL) { 1396 dsl_dataset_t *origin; 1397 error = dsl_dataset_hold(dp, drba->drba_origin, 1398 FTAG, &origin); 1399 if (error != 0) { 1400 dsl_dataset_rele(ds, FTAG); 1401 return (error); 1402 } 1403 if (!origin->ds_is_snapshot) { 1404 dsl_dataset_rele(origin, FTAG); 1405 dsl_dataset_rele(ds, FTAG); 1406 return (SET_ERROR(EINVAL)); 1407 } 1408 if (dsl_dataset_phys(origin)->ds_guid != fromguid && 1409 fromguid != 0) { 1410 dsl_dataset_rele(origin, FTAG); 1411 dsl_dataset_rele(ds, FTAG); 1412 return (SET_ERROR(ENODEV)); 1413 } 1414 dsl_dataset_rele(origin, FTAG); 1415 } 1416 dsl_dataset_rele(ds, FTAG); 1417 error = 0; 1418 } 1419 return (error); 1420} 1421 1422static void 1423dmu_recv_begin_sync(void *arg, dmu_tx_t *tx) 1424{ 1425 dmu_recv_begin_arg_t *drba = arg; 1426 dsl_pool_t *dp = dmu_tx_pool(tx); 1427 objset_t *mos = dp->dp_meta_objset; 1428 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1429 const char *tofs = drba->drba_cookie->drc_tofs; 1430 dsl_dataset_t *ds, *newds; 1431 uint64_t dsobj; 1432 int error; 1433 uint64_t crflags = 0; 1434 1435 if (drrb->drr_flags & DRR_FLAG_CI_DATA) 1436 crflags |= DS_FLAG_CI_DATASET; 1437 1438 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1439 if (error == 0) { 1440 /* create temporary clone */ 1441 dsl_dataset_t *snap = NULL; 1442 if (drba->drba_snapobj != 0) { 1443 VERIFY0(dsl_dataset_hold_obj(dp, 1444 drba->drba_snapobj, FTAG, &snap)); 1445 } 1446 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name, 1447 snap, crflags, drba->drba_cred, tx); 1448 if (drba->drba_snapobj != 0) 1449 dsl_dataset_rele(snap, FTAG); 1450 dsl_dataset_rele(ds, FTAG); 1451 } else { 1452 dsl_dir_t *dd; 1453 const char *tail; 1454 dsl_dataset_t *origin = NULL; 1455 1456 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail)); 1457 1458 if (drba->drba_origin != NULL) { 1459 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin, 1460 FTAG, &origin)); 1461 } 1462 1463 /* Create new dataset. */ 1464 dsobj = dsl_dataset_create_sync(dd, 1465 strrchr(tofs, '/') + 1, 1466 origin, crflags, drba->drba_cred, tx); 1467 if (origin != NULL) 1468 dsl_dataset_rele(origin, FTAG); 1469 dsl_dir_rele(dd, FTAG); 1470 drba->drba_cookie->drc_newfs = B_TRUE; 1471 } 1472 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds)); 1473 1474 if (drba->drba_cookie->drc_resumable) { 1475 dsl_dataset_zapify(newds, tx); 1476 if (drrb->drr_fromguid != 0) { 1477 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID, 1478 8, 1, &drrb->drr_fromguid, tx)); 1479 } 1480 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID, 1481 8, 1, &drrb->drr_toguid, tx)); 1482 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME, 1483 1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx)); 1484 uint64_t one = 1; 1485 uint64_t zero = 0; 1486 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT, 1487 8, 1, &one, tx)); 1488 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET, 1489 8, 1, &zero, tx)); 1490 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES, 1491 8, 1, &zero, tx)); 1492 if (DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo) & 1493 DMU_BACKUP_FEATURE_EMBED_DATA) { 1494 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK, 1495 8, 1, &one, tx)); 1496 } 1497 } 1498 1499 dmu_buf_will_dirty(newds->ds_dbuf, tx); 1500 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT; 1501 1502 /* 1503 * If we actually created a non-clone, we need to create the 1504 * objset in our new dataset. 1505 */ 1506 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) { 1507 (void) dmu_objset_create_impl(dp->dp_spa, 1508 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx); 1509 } 1510 1511 drba->drba_cookie->drc_ds = newds; 1512 1513 spa_history_log_internal_ds(newds, "receive", tx, ""); 1514} 1515 1516static int 1517dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx) 1518{ 1519 dmu_recv_begin_arg_t *drba = arg; 1520 dsl_pool_t *dp = dmu_tx_pool(tx); 1521 struct drr_begin *drrb = drba->drba_cookie->drc_drrb; 1522 int error; 1523 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo); 1524 dsl_dataset_t *ds; 1525 const char *tofs = drba->drba_cookie->drc_tofs; 1526 1527 /* already checked */ 1528 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC); 1529 ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING); 1530 1531 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) == 1532 DMU_COMPOUNDSTREAM || 1533 drrb->drr_type >= DMU_OST_NUMTYPES) 1534 return (SET_ERROR(EINVAL)); 1535 1536 /* Verify pool version supports SA if SA_SPILL feature set */ 1537 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) && 1538 spa_version(dp->dp_spa) < SPA_VERSION_SA) 1539 return (SET_ERROR(ENOTSUP)); 1540 1541 /* 1542 * The receiving code doesn't know how to translate a WRITE_EMBEDDED 1543 * record to a plain WRITE record, so the pool must have the 1544 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED 1545 * records. Same with WRITE_EMBEDDED records that use LZ4 compression. 1546 */ 1547 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) && 1548 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) 1549 return (SET_ERROR(ENOTSUP)); 1550 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) && 1551 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) 1552 return (SET_ERROR(ENOTSUP)); 1553 1554 /* 6 extra bytes for /%recv */ 1555 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; 1556 1557 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1558 tofs, recv_clone_name); 1559 1560 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1561 /* %recv does not exist; continue in tofs */ 1562 error = dsl_dataset_hold(dp, tofs, FTAG, &ds); 1563 if (error != 0) 1564 return (error); 1565 } 1566 1567 /* check that ds is marked inconsistent */ 1568 if (!DS_IS_INCONSISTENT(ds)) { 1569 dsl_dataset_rele(ds, FTAG); 1570 return (SET_ERROR(EINVAL)); 1571 } 1572 1573 /* check that there is resuming data, and that the toguid matches */ 1574 if (!dsl_dataset_is_zapified(ds)) { 1575 dsl_dataset_rele(ds, FTAG); 1576 return (SET_ERROR(EINVAL)); 1577 } 1578 uint64_t val; 1579 error = zap_lookup(dp->dp_meta_objset, ds->ds_object, 1580 DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val); 1581 if (error != 0 || drrb->drr_toguid != val) { 1582 dsl_dataset_rele(ds, FTAG); 1583 return (SET_ERROR(EINVAL)); 1584 } 1585 1586 /* 1587 * Check if the receive is still running. If so, it will be owned. 1588 * Note that nothing else can own the dataset (e.g. after the receive 1589 * fails) because it will be marked inconsistent. 1590 */ 1591 if (dsl_dataset_has_owner(ds)) { 1592 dsl_dataset_rele(ds, FTAG); 1593 return (SET_ERROR(EBUSY)); 1594 } 1595 1596 /* There should not be any snapshots of this fs yet. */ 1597 if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) { 1598 dsl_dataset_rele(ds, FTAG); 1599 return (SET_ERROR(EINVAL)); 1600 } 1601 1602 /* 1603 * Note: resume point will be checked when we process the first WRITE 1604 * record. 1605 */ 1606 1607 /* check that the origin matches */ 1608 val = 0; 1609 (void) zap_lookup(dp->dp_meta_objset, ds->ds_object, 1610 DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val); 1611 if (drrb->drr_fromguid != val) { 1612 dsl_dataset_rele(ds, FTAG); 1613 return (SET_ERROR(EINVAL)); 1614 } 1615 1616 dsl_dataset_rele(ds, FTAG); 1617 return (0); 1618} 1619 1620static void 1621dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx) 1622{ 1623 dmu_recv_begin_arg_t *drba = arg; 1624 dsl_pool_t *dp = dmu_tx_pool(tx); 1625 const char *tofs = drba->drba_cookie->drc_tofs; 1626 dsl_dataset_t *ds; 1627 uint64_t dsobj; 1628 /* 6 extra bytes for /%recv */ 1629 char recvname[ZFS_MAX_DATASET_NAME_LEN + 6]; 1630 1631 (void) snprintf(recvname, sizeof (recvname), "%s/%s", 1632 tofs, recv_clone_name); 1633 1634 if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) { 1635 /* %recv does not exist; continue in tofs */ 1636 VERIFY0(dsl_dataset_hold(dp, tofs, FTAG, &ds)); 1637 drba->drba_cookie->drc_newfs = B_TRUE; 1638 } 1639 1640 /* clear the inconsistent flag so that we can own it */ 1641 ASSERT(DS_IS_INCONSISTENT(ds)); 1642 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1643 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 1644 dsobj = ds->ds_object; 1645 dsl_dataset_rele(ds, FTAG); 1646 1647 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds)); 1648 1649 dmu_buf_will_dirty(ds->ds_dbuf, tx); 1650 dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT; 1651 1652 ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))); 1653 1654 drba->drba_cookie->drc_ds = ds; 1655 1656 spa_history_log_internal_ds(ds, "resume receive", tx, ""); 1657} 1658 1659/* 1660 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin() 1661 * succeeds; otherwise we will leak the holds on the datasets. 1662 */ 1663int 1664dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin, 1665 boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc) 1666{ 1667 dmu_recv_begin_arg_t drba = { 0 }; 1668 1669 bzero(drc, sizeof (dmu_recv_cookie_t)); 1670 drc->drc_drr_begin = drr_begin; 1671 drc->drc_drrb = &drr_begin->drr_u.drr_begin; 1672 drc->drc_tosnap = tosnap; 1673 drc->drc_tofs = tofs; 1674 drc->drc_force = force; 1675 drc->drc_resumable = resumable; 1676 drc->drc_cred = CRED(); 1677 1678 if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) { 1679 drc->drc_byteswap = B_TRUE; 1680 fletcher_4_incremental_byteswap(drr_begin, 1681 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1682 byteswap_record(drr_begin); 1683 } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) { 1684 fletcher_4_incremental_native(drr_begin, 1685 sizeof (dmu_replay_record_t), &drc->drc_cksum); 1686 } else { 1687 return (SET_ERROR(EINVAL)); 1688 } 1689 1690 drba.drba_origin = origin; 1691 drba.drba_cookie = drc; 1692 drba.drba_cred = CRED(); 1693 1694 if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) & 1695 DMU_BACKUP_FEATURE_RESUMING) { 1696 return (dsl_sync_task(tofs, 1697 dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync, 1698 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1699 } else { 1700 return (dsl_sync_task(tofs, 1701 dmu_recv_begin_check, dmu_recv_begin_sync, 1702 &drba, 5, ZFS_SPACE_CHECK_NORMAL)); 1703 } 1704} 1705 1706struct receive_record_arg { 1707 dmu_replay_record_t header; 1708 void *payload; /* Pointer to a buffer containing the payload */ 1709 /* 1710 * If the record is a write, pointer to the arc_buf_t containing the 1711 * payload. 1712 */ 1713 arc_buf_t *write_buf; 1714 int payload_size; 1715 uint64_t bytes_read; /* bytes read from stream when record created */ 1716 boolean_t eos_marker; /* Marks the end of the stream */ 1717 bqueue_node_t node; 1718}; 1719 1720struct receive_writer_arg { 1721 objset_t *os; 1722 boolean_t byteswap; 1723 bqueue_t q; 1724 1725 /* 1726 * These three args are used to signal to the main thread that we're 1727 * done. 1728 */ 1729 kmutex_t mutex; 1730 kcondvar_t cv; 1731 boolean_t done; 1732 1733 int err; 1734 /* A map from guid to dataset to help handle dedup'd streams. */ 1735 avl_tree_t *guid_to_ds_map; 1736 boolean_t resumable; 1737 uint64_t last_object, last_offset; 1738 uint64_t bytes_read; /* bytes read when current record created */ 1739}; 1740 1741struct objlist { 1742 list_t list; /* List of struct receive_objnode. */ 1743 /* 1744 * Last object looked up. Used to assert that objects are being looked 1745 * up in ascending order. 1746 */ 1747 uint64_t last_lookup; 1748}; 1749 1750struct receive_objnode { 1751 list_node_t node; 1752 uint64_t object; 1753}; 1754 1755struct receive_arg { 1756 objset_t *os; 1757 kthread_t *td; 1758 struct file *fp; 1759 uint64_t voff; /* The current offset in the stream */ 1760 uint64_t bytes_read; 1761 /* 1762 * A record that has had its payload read in, but hasn't yet been handed 1763 * off to the worker thread. 1764 */ 1765 struct receive_record_arg *rrd; 1766 /* A record that has had its header read in, but not its payload. */ 1767 struct receive_record_arg *next_rrd; 1768 zio_cksum_t cksum; 1769 zio_cksum_t prev_cksum; 1770 int err; 1771 boolean_t byteswap; 1772 /* Sorted list of objects not to issue prefetches for. */ 1773 struct objlist ignore_objlist; 1774}; 1775 1776typedef struct guid_map_entry { 1777 uint64_t guid; 1778 dsl_dataset_t *gme_ds; 1779 avl_node_t avlnode; 1780} guid_map_entry_t; 1781 1782static int 1783guid_compare(const void *arg1, const void *arg2) 1784{ 1785 const guid_map_entry_t *gmep1 = arg1; 1786 const guid_map_entry_t *gmep2 = arg2; 1787 1788 if (gmep1->guid < gmep2->guid) 1789 return (-1); 1790 else if (gmep1->guid > gmep2->guid) 1791 return (1); 1792 return (0); 1793} 1794 1795static void 1796free_guid_map_onexit(void *arg) 1797{ 1798 avl_tree_t *ca = arg; 1799 void *cookie = NULL; 1800 guid_map_entry_t *gmep; 1801 1802 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) { 1803 dsl_dataset_long_rele(gmep->gme_ds, gmep); 1804 dsl_dataset_rele(gmep->gme_ds, gmep); 1805 kmem_free(gmep, sizeof (guid_map_entry_t)); 1806 } 1807 avl_destroy(ca); 1808 kmem_free(ca, sizeof (avl_tree_t)); 1809} 1810 1811static int 1812restore_bytes(struct receive_arg *ra, void *buf, int len, off_t off, ssize_t *resid) 1813{ 1814 struct uio auio; 1815 struct iovec aiov; 1816 int error; 1817 1818 aiov.iov_base = buf; 1819 aiov.iov_len = len; 1820 auio.uio_iov = &aiov; 1821 auio.uio_iovcnt = 1; 1822 auio.uio_resid = len; 1823 auio.uio_segflg = UIO_SYSSPACE; 1824 auio.uio_rw = UIO_READ; 1825 auio.uio_offset = off; 1826 auio.uio_td = ra->td; 1827#ifdef _KERNEL 1828 error = fo_read(ra->fp, &auio, ra->td->td_ucred, FOF_OFFSET, ra->td); 1829#else 1830 fprintf(stderr, "%s: returning EOPNOTSUPP\n", __func__); 1831 error = EOPNOTSUPP; 1832#endif 1833 *resid = auio.uio_resid; 1834 return (error); 1835} 1836 1837static int 1838receive_read(struct receive_arg *ra, int len, void *buf) 1839{ 1840 int done = 0; 1841 1842 /* 1843 * The code doesn't rely on this (lengths being multiples of 8). See 1844 * comment in dump_bytes. 1845 */ 1846 ASSERT0(len % 8); 1847 1848 while (done < len) { 1849 ssize_t resid; 1850 1851 ra->err = restore_bytes(ra, buf + done, 1852 len - done, ra->voff, &resid); 1853 1854 if (resid == len - done) { 1855 /* 1856 * Note: ECKSUM indicates that the receive 1857 * was interrupted and can potentially be resumed. 1858 */ 1859 ra->err = SET_ERROR(ECKSUM); 1860 } 1861 ra->voff += len - done - resid; 1862 done = len - resid; 1863 if (ra->err != 0) 1864 return (ra->err); 1865 } 1866 1867 ra->bytes_read += len; 1868 1869 ASSERT3U(done, ==, len); 1870 return (0); 1871} 1872 1873static void 1874byteswap_record(dmu_replay_record_t *drr) 1875{ 1876#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X)) 1877#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X)) 1878 drr->drr_type = BSWAP_32(drr->drr_type); 1879 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen); 1880 1881 switch (drr->drr_type) { 1882 case DRR_BEGIN: 1883 DO64(drr_begin.drr_magic); 1884 DO64(drr_begin.drr_versioninfo); 1885 DO64(drr_begin.drr_creation_time); 1886 DO32(drr_begin.drr_type); 1887 DO32(drr_begin.drr_flags); 1888 DO64(drr_begin.drr_toguid); 1889 DO64(drr_begin.drr_fromguid); 1890 break; 1891 case DRR_OBJECT: 1892 DO64(drr_object.drr_object); 1893 DO32(drr_object.drr_type); 1894 DO32(drr_object.drr_bonustype); 1895 DO32(drr_object.drr_blksz); 1896 DO32(drr_object.drr_bonuslen); 1897 DO64(drr_object.drr_toguid); 1898 break; 1899 case DRR_FREEOBJECTS: 1900 DO64(drr_freeobjects.drr_firstobj); 1901 DO64(drr_freeobjects.drr_numobjs); 1902 DO64(drr_freeobjects.drr_toguid); 1903 break; 1904 case DRR_WRITE: 1905 DO64(drr_write.drr_object); 1906 DO32(drr_write.drr_type); 1907 DO64(drr_write.drr_offset); 1908 DO64(drr_write.drr_length); 1909 DO64(drr_write.drr_toguid); 1910 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum); 1911 DO64(drr_write.drr_key.ddk_prop); 1912 break; 1913 case DRR_WRITE_BYREF: 1914 DO64(drr_write_byref.drr_object); 1915 DO64(drr_write_byref.drr_offset); 1916 DO64(drr_write_byref.drr_length); 1917 DO64(drr_write_byref.drr_toguid); 1918 DO64(drr_write_byref.drr_refguid); 1919 DO64(drr_write_byref.drr_refobject); 1920 DO64(drr_write_byref.drr_refoffset); 1921 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref. 1922 drr_key.ddk_cksum); 1923 DO64(drr_write_byref.drr_key.ddk_prop); 1924 break; 1925 case DRR_WRITE_EMBEDDED: 1926 DO64(drr_write_embedded.drr_object); 1927 DO64(drr_write_embedded.drr_offset); 1928 DO64(drr_write_embedded.drr_length); 1929 DO64(drr_write_embedded.drr_toguid); 1930 DO32(drr_write_embedded.drr_lsize); 1931 DO32(drr_write_embedded.drr_psize); 1932 break; 1933 case DRR_FREE: 1934 DO64(drr_free.drr_object); 1935 DO64(drr_free.drr_offset); 1936 DO64(drr_free.drr_length); 1937 DO64(drr_free.drr_toguid); 1938 break; 1939 case DRR_SPILL: 1940 DO64(drr_spill.drr_object); 1941 DO64(drr_spill.drr_length); 1942 DO64(drr_spill.drr_toguid); 1943 break; 1944 case DRR_END: 1945 DO64(drr_end.drr_toguid); 1946 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum); 1947 break; 1948 } 1949 1950 if (drr->drr_type != DRR_BEGIN) { 1951 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum); 1952 } 1953 1954#undef DO64 1955#undef DO32 1956} 1957 1958static inline uint8_t 1959deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size) 1960{ 1961 if (bonus_type == DMU_OT_SA) { 1962 return (1); 1963 } else { 1964 return (1 + 1965 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT)); 1966 } 1967} 1968 1969static void 1970save_resume_state(struct receive_writer_arg *rwa, 1971 uint64_t object, uint64_t offset, dmu_tx_t *tx) 1972{ 1973 int txgoff = dmu_tx_get_txg(tx) & TXG_MASK; 1974 1975 if (!rwa->resumable) 1976 return; 1977 1978 /* 1979 * We use ds_resume_bytes[] != 0 to indicate that we need to 1980 * update this on disk, so it must not be 0. 1981 */ 1982 ASSERT(rwa->bytes_read != 0); 1983 1984 /* 1985 * We only resume from write records, which have a valid 1986 * (non-meta-dnode) object number. 1987 */ 1988 ASSERT(object != 0); 1989 1990 /* 1991 * For resuming to work correctly, we must receive records in order, 1992 * sorted by object,offset. This is checked by the callers, but 1993 * assert it here for good measure. 1994 */ 1995 ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]); 1996 ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] || 1997 offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]); 1998 ASSERT3U(rwa->bytes_read, >=, 1999 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]); 2000 2001 rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object; 2002 rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset; 2003 rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read; 2004} 2005 2006static int 2007receive_object(struct receive_writer_arg *rwa, struct drr_object *drro, 2008 void *data) 2009{ 2010 dmu_object_info_t doi; 2011 dmu_tx_t *tx; 2012 uint64_t object; 2013 int err; 2014 2015 if (drro->drr_type == DMU_OT_NONE || 2016 !DMU_OT_IS_VALID(drro->drr_type) || 2017 !DMU_OT_IS_VALID(drro->drr_bonustype) || 2018 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS || 2019 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS || 2020 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) || 2021 drro->drr_blksz < SPA_MINBLOCKSIZE || 2022 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) || 2023 drro->drr_bonuslen > DN_MAX_BONUSLEN) { 2024 return (SET_ERROR(EINVAL)); 2025 } 2026 2027 err = dmu_object_info(rwa->os, drro->drr_object, &doi); 2028 2029 if (err != 0 && err != ENOENT) 2030 return (SET_ERROR(EINVAL)); 2031 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT; 2032 2033 /* 2034 * If we are losing blkptrs or changing the block size this must 2035 * be a new file instance. We must clear out the previous file 2036 * contents before we can change this type of metadata in the dnode. 2037 */ 2038 if (err == 0) { 2039 int nblkptr; 2040 2041 nblkptr = deduce_nblkptr(drro->drr_bonustype, 2042 drro->drr_bonuslen); 2043 2044 if (drro->drr_blksz != doi.doi_data_block_size || 2045 nblkptr < doi.doi_nblkptr) { 2046 err = dmu_free_long_range(rwa->os, drro->drr_object, 2047 0, DMU_OBJECT_END); 2048 if (err != 0) 2049 return (SET_ERROR(EINVAL)); 2050 } 2051 } 2052 2053 tx = dmu_tx_create(rwa->os); 2054 dmu_tx_hold_bonus(tx, object); 2055 err = dmu_tx_assign(tx, TXG_WAIT); 2056 if (err != 0) { 2057 dmu_tx_abort(tx); 2058 return (err); 2059 } 2060 2061 if (object == DMU_NEW_OBJECT) { 2062 /* currently free, want to be allocated */ 2063 err = dmu_object_claim(rwa->os, drro->drr_object, 2064 drro->drr_type, drro->drr_blksz, 2065 drro->drr_bonustype, drro->drr_bonuslen, tx); 2066 } else if (drro->drr_type != doi.doi_type || 2067 drro->drr_blksz != doi.doi_data_block_size || 2068 drro->drr_bonustype != doi.doi_bonus_type || 2069 drro->drr_bonuslen != doi.doi_bonus_size) { 2070 /* currently allocated, but with different properties */ 2071 err = dmu_object_reclaim(rwa->os, drro->drr_object, 2072 drro->drr_type, drro->drr_blksz, 2073 drro->drr_bonustype, drro->drr_bonuslen, tx); 2074 } 2075 if (err != 0) { 2076 dmu_tx_commit(tx); 2077 return (SET_ERROR(EINVAL)); 2078 } 2079 2080 dmu_object_set_checksum(rwa->os, drro->drr_object, 2081 drro->drr_checksumtype, tx); 2082 dmu_object_set_compress(rwa->os, drro->drr_object, 2083 drro->drr_compress, tx); 2084 2085 if (data != NULL) { 2086 dmu_buf_t *db; 2087 2088 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db)); 2089 dmu_buf_will_dirty(db, tx); 2090 2091 ASSERT3U(db->db_size, >=, drro->drr_bonuslen); 2092 bcopy(data, db->db_data, drro->drr_bonuslen); 2093 if (rwa->byteswap) { 2094 dmu_object_byteswap_t byteswap = 2095 DMU_OT_BYTESWAP(drro->drr_bonustype); 2096 dmu_ot_byteswap[byteswap].ob_func(db->db_data, 2097 drro->drr_bonuslen); 2098 } 2099 dmu_buf_rele(db, FTAG); 2100 } 2101 dmu_tx_commit(tx); 2102 2103 return (0); 2104} 2105 2106/* ARGSUSED */ 2107static int 2108receive_freeobjects(struct receive_writer_arg *rwa, 2109 struct drr_freeobjects *drrfo) 2110{ 2111 uint64_t obj; 2112 int next_err = 0; 2113 2114 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj) 2115 return (SET_ERROR(EINVAL)); 2116 2117 for (obj = drrfo->drr_firstobj; 2118 obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0; 2119 next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) { 2120 int err; 2121 2122 if (dmu_object_info(rwa->os, obj, NULL) != 0) 2123 continue; 2124 2125 err = dmu_free_long_object(rwa->os, obj); 2126 if (err != 0) 2127 return (err); 2128 } 2129 if (next_err != ESRCH) 2130 return (next_err); 2131 return (0); 2132} 2133 2134static int 2135receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw, 2136 arc_buf_t *abuf) 2137{ 2138 dmu_tx_t *tx; 2139 int err; 2140 2141 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset || 2142 !DMU_OT_IS_VALID(drrw->drr_type)) 2143 return (SET_ERROR(EINVAL)); 2144 2145 /* 2146 * For resuming to work, records must be in increasing order 2147 * by (object, offset). 2148 */ 2149 if (drrw->drr_object < rwa->last_object || 2150 (drrw->drr_object == rwa->last_object && 2151 drrw->drr_offset < rwa->last_offset)) { 2152 return (SET_ERROR(EINVAL)); 2153 } 2154 rwa->last_object = drrw->drr_object; 2155 rwa->last_offset = drrw->drr_offset; 2156 2157 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0) 2158 return (SET_ERROR(EINVAL)); 2159 2160 tx = dmu_tx_create(rwa->os); 2161 2162 dmu_tx_hold_write(tx, drrw->drr_object, 2163 drrw->drr_offset, drrw->drr_length); 2164 err = dmu_tx_assign(tx, TXG_WAIT); 2165 if (err != 0) { 2166 dmu_tx_abort(tx); 2167 return (err); 2168 } 2169 if (rwa->byteswap) { 2170 dmu_object_byteswap_t byteswap = 2171 DMU_OT_BYTESWAP(drrw->drr_type); 2172 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data, 2173 drrw->drr_length); 2174 } 2175 2176 dmu_buf_t *bonus; 2177 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0) 2178 return (SET_ERROR(EINVAL)); 2179 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx); 2180 2181 /* 2182 * Note: If the receive fails, we want the resume stream to start 2183 * with the same record that we last successfully received (as opposed 2184 * to the next record), so that we can verify that we are 2185 * resuming from the correct location. 2186 */ 2187 save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx); 2188 dmu_tx_commit(tx); 2189 dmu_buf_rele(bonus, FTAG); 2190 2191 return (0); 2192} 2193 2194/* 2195 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed 2196 * streams to refer to a copy of the data that is already on the 2197 * system because it came in earlier in the stream. This function 2198 * finds the earlier copy of the data, and uses that copy instead of 2199 * data from the stream to fulfill this write. 2200 */ 2201static int 2202receive_write_byref(struct receive_writer_arg *rwa, 2203 struct drr_write_byref *drrwbr) 2204{ 2205 dmu_tx_t *tx; 2206 int err; 2207 guid_map_entry_t gmesrch; 2208 guid_map_entry_t *gmep; 2209 avl_index_t where; 2210 objset_t *ref_os = NULL; 2211 dmu_buf_t *dbp; 2212 2213 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset) 2214 return (SET_ERROR(EINVAL)); 2215 2216 /* 2217 * If the GUID of the referenced dataset is different from the 2218 * GUID of the target dataset, find the referenced dataset. 2219 */ 2220 if (drrwbr->drr_toguid != drrwbr->drr_refguid) { 2221 gmesrch.guid = drrwbr->drr_refguid; 2222 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch, 2223 &where)) == NULL) { 2224 return (SET_ERROR(EINVAL)); 2225 } 2226 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os)) 2227 return (SET_ERROR(EINVAL)); 2228 } else { 2229 ref_os = rwa->os; 2230 } 2231 2232 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject, 2233 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH); 2234 if (err != 0) 2235 return (err); 2236 2237 tx = dmu_tx_create(rwa->os); 2238 2239 dmu_tx_hold_write(tx, drrwbr->drr_object, 2240 drrwbr->drr_offset, drrwbr->drr_length); 2241 err = dmu_tx_assign(tx, TXG_WAIT); 2242 if (err != 0) { 2243 dmu_tx_abort(tx); 2244 return (err); 2245 } 2246 dmu_write(rwa->os, drrwbr->drr_object, 2247 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx); 2248 dmu_buf_rele(dbp, FTAG); 2249 2250 /* See comment in restore_write. */ 2251 save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx); 2252 dmu_tx_commit(tx); 2253 return (0); 2254} 2255 2256static int 2257receive_write_embedded(struct receive_writer_arg *rwa, 2258 struct drr_write_embedded *drrwe, void *data) 2259{ 2260 dmu_tx_t *tx; 2261 int err; 2262 2263 if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset) 2264 return (EINVAL); 2265 2266 if (drrwe->drr_psize > BPE_PAYLOAD_SIZE) 2267 return (EINVAL); 2268 2269 if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES) 2270 return (EINVAL); 2271 if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS) 2272 return (EINVAL); 2273 2274 tx = dmu_tx_create(rwa->os); 2275 2276 dmu_tx_hold_write(tx, drrwe->drr_object, 2277 drrwe->drr_offset, drrwe->drr_length); 2278 err = dmu_tx_assign(tx, TXG_WAIT); 2279 if (err != 0) { 2280 dmu_tx_abort(tx); 2281 return (err); 2282 } 2283 2284 dmu_write_embedded(rwa->os, drrwe->drr_object, 2285 drrwe->drr_offset, data, drrwe->drr_etype, 2286 drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize, 2287 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx); 2288 2289 /* See comment in restore_write. */ 2290 save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx); 2291 dmu_tx_commit(tx); 2292 return (0); 2293} 2294 2295static int 2296receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs, 2297 void *data) 2298{ 2299 dmu_tx_t *tx; 2300 dmu_buf_t *db, *db_spill; 2301 int err; 2302 2303 if (drrs->drr_length < SPA_MINBLOCKSIZE || 2304 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os))) 2305 return (SET_ERROR(EINVAL)); 2306 2307 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0) 2308 return (SET_ERROR(EINVAL)); 2309 2310 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db)); 2311 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) { 2312 dmu_buf_rele(db, FTAG); 2313 return (err); 2314 } 2315 2316 tx = dmu_tx_create(rwa->os); 2317 2318 dmu_tx_hold_spill(tx, db->db_object); 2319 2320 err = dmu_tx_assign(tx, TXG_WAIT); 2321 if (err != 0) { 2322 dmu_buf_rele(db, FTAG); 2323 dmu_buf_rele(db_spill, FTAG); 2324 dmu_tx_abort(tx); 2325 return (err); 2326 } 2327 dmu_buf_will_dirty(db_spill, tx); 2328 2329 if (db_spill->db_size < drrs->drr_length) 2330 VERIFY(0 == dbuf_spill_set_blksz(db_spill, 2331 drrs->drr_length, tx)); 2332 bcopy(data, db_spill->db_data, drrs->drr_length); 2333 2334 dmu_buf_rele(db, FTAG); 2335 dmu_buf_rele(db_spill, FTAG); 2336 2337 dmu_tx_commit(tx); 2338 return (0); 2339} 2340 2341/* ARGSUSED */ 2342static int 2343receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf) 2344{ 2345 int err; 2346 2347 if (drrf->drr_length != -1ULL && 2348 drrf->drr_offset + drrf->drr_length < drrf->drr_offset) 2349 return (SET_ERROR(EINVAL)); 2350 2351 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0) 2352 return (SET_ERROR(EINVAL)); 2353 2354 err = dmu_free_long_range(rwa->os, drrf->drr_object, 2355 drrf->drr_offset, drrf->drr_length); 2356 2357 return (err); 2358} 2359 2360/* used to destroy the drc_ds on error */ 2361static void 2362dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc) 2363{ 2364 if (drc->drc_resumable) { 2365 /* wait for our resume state to be written to disk */ 2366 txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0); 2367 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2368 } else { 2369 char name[ZFS_MAX_DATASET_NAME_LEN]; 2370 dsl_dataset_name(drc->drc_ds, name); 2371 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 2372 (void) dsl_destroy_head(name); 2373 } 2374} 2375 2376static void 2377receive_cksum(struct receive_arg *ra, int len, void *buf) 2378{ 2379 if (ra->byteswap) { 2380 fletcher_4_incremental_byteswap(buf, len, &ra->cksum); 2381 } else { 2382 fletcher_4_incremental_native(buf, len, &ra->cksum); 2383 } 2384} 2385 2386/* 2387 * Read the payload into a buffer of size len, and update the current record's 2388 * payload field. 2389 * Allocate ra->next_rrd and read the next record's header into 2390 * ra->next_rrd->header. 2391 * Verify checksum of payload and next record. 2392 */ 2393static int 2394receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf) 2395{ 2396 int err; 2397 2398 if (len != 0) { 2399 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE); 2400 err = receive_read(ra, len, buf); 2401 if (err != 0) 2402 return (err); 2403 receive_cksum(ra, len, buf); 2404 2405 /* note: rrd is NULL when reading the begin record's payload */ 2406 if (ra->rrd != NULL) { 2407 ra->rrd->payload = buf; 2408 ra->rrd->payload_size = len; 2409 ra->rrd->bytes_read = ra->bytes_read; 2410 } 2411 } 2412 2413 ra->prev_cksum = ra->cksum; 2414 2415 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP); 2416 err = receive_read(ra, sizeof (ra->next_rrd->header), 2417 &ra->next_rrd->header); 2418 ra->next_rrd->bytes_read = ra->bytes_read; 2419 if (err != 0) { 2420 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2421 ra->next_rrd = NULL; 2422 return (err); 2423 } 2424 if (ra->next_rrd->header.drr_type == DRR_BEGIN) { 2425 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2426 ra->next_rrd = NULL; 2427 return (SET_ERROR(EINVAL)); 2428 } 2429 2430 /* 2431 * Note: checksum is of everything up to but not including the 2432 * checksum itself. 2433 */ 2434 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2435 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t)); 2436 receive_cksum(ra, 2437 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), 2438 &ra->next_rrd->header); 2439 2440 zio_cksum_t cksum_orig = 2441 ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2442 zio_cksum_t *cksump = 2443 &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum; 2444 2445 if (ra->byteswap) 2446 byteswap_record(&ra->next_rrd->header); 2447 2448 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) && 2449 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) { 2450 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd)); 2451 ra->next_rrd = NULL; 2452 return (SET_ERROR(ECKSUM)); 2453 } 2454 2455 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig); 2456 2457 return (0); 2458} 2459 2460static void 2461objlist_create(struct objlist *list) 2462{ 2463 list_create(&list->list, sizeof (struct receive_objnode), 2464 offsetof(struct receive_objnode, node)); 2465 list->last_lookup = 0; 2466} 2467 2468static void 2469objlist_destroy(struct objlist *list) 2470{ 2471 for (struct receive_objnode *n = list_remove_head(&list->list); 2472 n != NULL; n = list_remove_head(&list->list)) { 2473 kmem_free(n, sizeof (*n)); 2474 } 2475 list_destroy(&list->list); 2476} 2477 2478/* 2479 * This function looks through the objlist to see if the specified object number 2480 * is contained in the objlist. In the process, it will remove all object 2481 * numbers in the list that are smaller than the specified object number. Thus, 2482 * any lookup of an object number smaller than a previously looked up object 2483 * number will always return false; therefore, all lookups should be done in 2484 * ascending order. 2485 */ 2486static boolean_t 2487objlist_exists(struct objlist *list, uint64_t object) 2488{ 2489 struct receive_objnode *node = list_head(&list->list); 2490 ASSERT3U(object, >=, list->last_lookup); 2491 list->last_lookup = object; 2492 while (node != NULL && node->object < object) { 2493 VERIFY3P(node, ==, list_remove_head(&list->list)); 2494 kmem_free(node, sizeof (*node)); 2495 node = list_head(&list->list); 2496 } 2497 return (node != NULL && node->object == object); 2498} 2499 2500/* 2501 * The objlist is a list of object numbers stored in ascending order. However, 2502 * the insertion of new object numbers does not seek out the correct location to 2503 * store a new object number; instead, it appends it to the list for simplicity. 2504 * Thus, any users must take care to only insert new object numbers in ascending 2505 * order. 2506 */ 2507static void 2508objlist_insert(struct objlist *list, uint64_t object) 2509{ 2510 struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP); 2511 node->object = object; 2512#ifdef ZFS_DEBUG 2513 struct receive_objnode *last_object = list_tail(&list->list); 2514 uint64_t last_objnum = (last_object != NULL ? last_object->object : 0); 2515 ASSERT3U(node->object, >, last_objnum); 2516#endif 2517 list_insert_tail(&list->list, node); 2518} 2519 2520/* 2521 * Issue the prefetch reads for any necessary indirect blocks. 2522 * 2523 * We use the object ignore list to tell us whether or not to issue prefetches 2524 * for a given object. We do this for both correctness (in case the blocksize 2525 * of an object has changed) and performance (if the object doesn't exist, don't 2526 * needlessly try to issue prefetches). We also trim the list as we go through 2527 * the stream to prevent it from growing to an unbounded size. 2528 * 2529 * The object numbers within will always be in sorted order, and any write 2530 * records we see will also be in sorted order, but they're not sorted with 2531 * respect to each other (i.e. we can get several object records before 2532 * receiving each object's write records). As a result, once we've reached a 2533 * given object number, we can safely remove any reference to lower object 2534 * numbers in the ignore list. In practice, we receive up to 32 object records 2535 * before receiving write records, so the list can have up to 32 nodes in it. 2536 */ 2537/* ARGSUSED */ 2538static void 2539receive_read_prefetch(struct receive_arg *ra, 2540 uint64_t object, uint64_t offset, uint64_t length) 2541{ 2542 if (!objlist_exists(&ra->ignore_objlist, object)) { 2543 dmu_prefetch(ra->os, object, 1, offset, length, 2544 ZIO_PRIORITY_SYNC_READ); 2545 } 2546} 2547 2548/* 2549 * Read records off the stream, issuing any necessary prefetches. 2550 */ 2551static int 2552receive_read_record(struct receive_arg *ra) 2553{ 2554 int err; 2555 2556 switch (ra->rrd->header.drr_type) { 2557 case DRR_OBJECT: 2558 { 2559 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object; 2560 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8); 2561 void *buf = kmem_zalloc(size, KM_SLEEP); 2562 dmu_object_info_t doi; 2563 err = receive_read_payload_and_next_header(ra, size, buf); 2564 if (err != 0) { 2565 kmem_free(buf, size); 2566 return (err); 2567 } 2568 err = dmu_object_info(ra->os, drro->drr_object, &doi); 2569 /* 2570 * See receive_read_prefetch for an explanation why we're 2571 * storing this object in the ignore_obj_list. 2572 */ 2573 if (err == ENOENT || 2574 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) { 2575 objlist_insert(&ra->ignore_objlist, drro->drr_object); 2576 err = 0; 2577 } 2578 return (err); 2579 } 2580 case DRR_FREEOBJECTS: 2581 { 2582 err = receive_read_payload_and_next_header(ra, 0, NULL); 2583 return (err); 2584 } 2585 case DRR_WRITE: 2586 { 2587 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write; 2588 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os), 2589 drrw->drr_length); 2590 2591 err = receive_read_payload_and_next_header(ra, 2592 drrw->drr_length, abuf->b_data); 2593 if (err != 0) { 2594 dmu_return_arcbuf(abuf); 2595 return (err); 2596 } 2597 ra->rrd->write_buf = abuf; 2598 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset, 2599 drrw->drr_length); 2600 return (err); 2601 } 2602 case DRR_WRITE_BYREF: 2603 { 2604 struct drr_write_byref *drrwb = 2605 &ra->rrd->header.drr_u.drr_write_byref; 2606 err = receive_read_payload_and_next_header(ra, 0, NULL); 2607 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset, 2608 drrwb->drr_length); 2609 return (err); 2610 } 2611 case DRR_WRITE_EMBEDDED: 2612 { 2613 struct drr_write_embedded *drrwe = 2614 &ra->rrd->header.drr_u.drr_write_embedded; 2615 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8); 2616 void *buf = kmem_zalloc(size, KM_SLEEP); 2617 2618 err = receive_read_payload_and_next_header(ra, size, buf); 2619 if (err != 0) { 2620 kmem_free(buf, size); 2621 return (err); 2622 } 2623 2624 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset, 2625 drrwe->drr_length); 2626 return (err); 2627 } 2628 case DRR_FREE: 2629 { 2630 /* 2631 * It might be beneficial to prefetch indirect blocks here, but 2632 * we don't really have the data to decide for sure. 2633 */ 2634 err = receive_read_payload_and_next_header(ra, 0, NULL); 2635 return (err); 2636 } 2637 case DRR_END: 2638 { 2639 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end; 2640 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum)) 2641 return (SET_ERROR(ECKSUM)); 2642 return (0); 2643 } 2644 case DRR_SPILL: 2645 { 2646 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill; 2647 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP); 2648 err = receive_read_payload_and_next_header(ra, drrs->drr_length, 2649 buf); 2650 if (err != 0) 2651 kmem_free(buf, drrs->drr_length); 2652 return (err); 2653 } 2654 default: 2655 return (SET_ERROR(EINVAL)); 2656 } 2657} 2658 2659/* 2660 * Commit the records to the pool. 2661 */ 2662static int 2663receive_process_record(struct receive_writer_arg *rwa, 2664 struct receive_record_arg *rrd) 2665{ 2666 int err; 2667 2668 /* Processing in order, therefore bytes_read should be increasing. */ 2669 ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read); 2670 rwa->bytes_read = rrd->bytes_read; 2671 2672 switch (rrd->header.drr_type) { 2673 case DRR_OBJECT: 2674 { 2675 struct drr_object *drro = &rrd->header.drr_u.drr_object; 2676 err = receive_object(rwa, drro, rrd->payload); 2677 kmem_free(rrd->payload, rrd->payload_size); 2678 rrd->payload = NULL; 2679 return (err); 2680 } 2681 case DRR_FREEOBJECTS: 2682 { 2683 struct drr_freeobjects *drrfo = 2684 &rrd->header.drr_u.drr_freeobjects; 2685 return (receive_freeobjects(rwa, drrfo)); 2686 } 2687 case DRR_WRITE: 2688 { 2689 struct drr_write *drrw = &rrd->header.drr_u.drr_write; 2690 err = receive_write(rwa, drrw, rrd->write_buf); 2691 /* if receive_write() is successful, it consumes the arc_buf */ 2692 if (err != 0) 2693 dmu_return_arcbuf(rrd->write_buf); 2694 rrd->write_buf = NULL; 2695 rrd->payload = NULL; 2696 return (err); 2697 } 2698 case DRR_WRITE_BYREF: 2699 { 2700 struct drr_write_byref *drrwbr = 2701 &rrd->header.drr_u.drr_write_byref; 2702 return (receive_write_byref(rwa, drrwbr)); 2703 } 2704 case DRR_WRITE_EMBEDDED: 2705 { 2706 struct drr_write_embedded *drrwe = 2707 &rrd->header.drr_u.drr_write_embedded; 2708 err = receive_write_embedded(rwa, drrwe, rrd->payload); 2709 kmem_free(rrd->payload, rrd->payload_size); 2710 rrd->payload = NULL; 2711 return (err); 2712 } 2713 case DRR_FREE: 2714 { 2715 struct drr_free *drrf = &rrd->header.drr_u.drr_free; 2716 return (receive_free(rwa, drrf)); 2717 } 2718 case DRR_SPILL: 2719 { 2720 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill; 2721 err = receive_spill(rwa, drrs, rrd->payload); 2722 kmem_free(rrd->payload, rrd->payload_size); 2723 rrd->payload = NULL; 2724 return (err); 2725 } 2726 default: 2727 return (SET_ERROR(EINVAL)); 2728 } 2729} 2730 2731/* 2732 * dmu_recv_stream's worker thread; pull records off the queue, and then call 2733 * receive_process_record When we're done, signal the main thread and exit. 2734 */ 2735static void 2736receive_writer_thread(void *arg) 2737{ 2738 struct receive_writer_arg *rwa = arg; 2739 struct receive_record_arg *rrd; 2740 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker; 2741 rrd = bqueue_dequeue(&rwa->q)) { 2742 /* 2743 * If there's an error, the main thread will stop putting things 2744 * on the queue, but we need to clear everything in it before we 2745 * can exit. 2746 */ 2747 if (rwa->err == 0) { 2748 rwa->err = receive_process_record(rwa, rrd); 2749 } else if (rrd->write_buf != NULL) { 2750 dmu_return_arcbuf(rrd->write_buf); 2751 rrd->write_buf = NULL; 2752 rrd->payload = NULL; 2753 } else if (rrd->payload != NULL) { 2754 kmem_free(rrd->payload, rrd->payload_size); 2755 rrd->payload = NULL; 2756 } 2757 kmem_free(rrd, sizeof (*rrd)); 2758 } 2759 kmem_free(rrd, sizeof (*rrd)); 2760 mutex_enter(&rwa->mutex); 2761 rwa->done = B_TRUE; 2762 cv_signal(&rwa->cv); 2763 mutex_exit(&rwa->mutex); 2764 thread_exit(); 2765} 2766 2767static int 2768resume_check(struct receive_arg *ra, nvlist_t *begin_nvl) 2769{ 2770 uint64_t val; 2771 objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset; 2772 uint64_t dsobj = dmu_objset_id(ra->os); 2773 uint64_t resume_obj, resume_off; 2774 2775 if (nvlist_lookup_uint64(begin_nvl, 2776 "resume_object", &resume_obj) != 0 || 2777 nvlist_lookup_uint64(begin_nvl, 2778 "resume_offset", &resume_off) != 0) { 2779 return (SET_ERROR(EINVAL)); 2780 } 2781 VERIFY0(zap_lookup(mos, dsobj, 2782 DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val)); 2783 if (resume_obj != val) 2784 return (SET_ERROR(EINVAL)); 2785 VERIFY0(zap_lookup(mos, dsobj, 2786 DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val)); 2787 if (resume_off != val) 2788 return (SET_ERROR(EINVAL)); 2789 2790 return (0); 2791} 2792 2793/* 2794 * Read in the stream's records, one by one, and apply them to the pool. There 2795 * are two threads involved; the thread that calls this function will spin up a 2796 * worker thread, read the records off the stream one by one, and issue 2797 * prefetches for any necessary indirect blocks. It will then push the records 2798 * onto an internal blocking queue. The worker thread will pull the records off 2799 * the queue, and actually write the data into the DMU. This way, the worker 2800 * thread doesn't have to wait for reads to complete, since everything it needs 2801 * (the indirect blocks) will be prefetched. 2802 * 2803 * NB: callers *must* call dmu_recv_end() if this succeeds. 2804 */ 2805int 2806dmu_recv_stream(dmu_recv_cookie_t *drc, struct file *fp, offset_t *voffp, 2807 int cleanup_fd, uint64_t *action_handlep) 2808{ 2809 int err = 0; 2810 struct receive_arg ra = { 0 }; 2811 struct receive_writer_arg rwa = { 0 }; 2812 int featureflags; 2813 nvlist_t *begin_nvl = NULL; 2814 2815 ra.byteswap = drc->drc_byteswap; 2816 ra.cksum = drc->drc_cksum; 2817 ra.td = curthread; 2818 ra.fp = fp; 2819 ra.voff = *voffp; 2820 2821 if (dsl_dataset_is_zapified(drc->drc_ds)) { 2822 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset, 2823 drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES, 2824 sizeof (ra.bytes_read), 1, &ra.bytes_read); 2825 } 2826 2827 objlist_create(&ra.ignore_objlist); 2828 2829 /* these were verified in dmu_recv_begin */ 2830 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==, 2831 DMU_SUBSTREAM); 2832 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES); 2833 2834 /* 2835 * Open the objset we are modifying. 2836 */ 2837 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os)); 2838 2839 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT); 2840 2841 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo); 2842 2843 /* if this stream is dedup'ed, set up the avl tree for guid mapping */ 2844 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) { 2845 minor_t minor; 2846 2847 if (cleanup_fd == -1) { 2848 ra.err = SET_ERROR(EBADF); 2849 goto out; 2850 } 2851 ra.err = zfs_onexit_fd_hold(cleanup_fd, &minor); 2852 if (ra.err != 0) { 2853 cleanup_fd = -1; 2854 goto out; 2855 } 2856 2857 if (*action_handlep == 0) { 2858 rwa.guid_to_ds_map = 2859 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP); 2860 avl_create(rwa.guid_to_ds_map, guid_compare, 2861 sizeof (guid_map_entry_t), 2862 offsetof(guid_map_entry_t, avlnode)); 2863 err = zfs_onexit_add_cb(minor, 2864 free_guid_map_onexit, rwa.guid_to_ds_map, 2865 action_handlep); 2866 if (ra.err != 0) 2867 goto out; 2868 } else { 2869 err = zfs_onexit_cb_data(minor, *action_handlep, 2870 (void **)&rwa.guid_to_ds_map); 2871 if (ra.err != 0) 2872 goto out; 2873 } 2874 2875 drc->drc_guid_to_ds_map = rwa.guid_to_ds_map; 2876 } 2877 2878 uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen; 2879 void *payload = NULL; 2880 if (payloadlen != 0) 2881 payload = kmem_alloc(payloadlen, KM_SLEEP); 2882 2883 err = receive_read_payload_and_next_header(&ra, payloadlen, payload); 2884 if (err != 0) { 2885 if (payloadlen != 0) 2886 kmem_free(payload, payloadlen); 2887 goto out; 2888 } 2889 if (payloadlen != 0) { 2890 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP); 2891 kmem_free(payload, payloadlen); 2892 if (err != 0) 2893 goto out; 2894 } 2895 2896 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) { 2897 err = resume_check(&ra, begin_nvl); 2898 if (err != 0) 2899 goto out; 2900 } 2901 2902 (void) bqueue_init(&rwa.q, zfs_recv_queue_length, 2903 offsetof(struct receive_record_arg, node)); 2904 cv_init(&rwa.cv, NULL, CV_DEFAULT, NULL); 2905 mutex_init(&rwa.mutex, NULL, MUTEX_DEFAULT, NULL); 2906 rwa.os = ra.os; 2907 rwa.byteswap = drc->drc_byteswap; 2908 rwa.resumable = drc->drc_resumable; 2909 2910 (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, &p0, 2911 TS_RUN, minclsyspri); 2912 /* 2913 * We're reading rwa.err without locks, which is safe since we are the 2914 * only reader, and the worker thread is the only writer. It's ok if we 2915 * miss a write for an iteration or two of the loop, since the writer 2916 * thread will keep freeing records we send it until we send it an eos 2917 * marker. 2918 * 2919 * We can leave this loop in 3 ways: First, if rwa.err is 2920 * non-zero. In that case, the writer thread will free the rrd we just 2921 * pushed. Second, if we're interrupted; in that case, either it's the 2922 * first loop and ra.rrd was never allocated, or it's later, and ra.rrd 2923 * has been handed off to the writer thread who will free it. Finally, 2924 * if receive_read_record fails or we're at the end of the stream, then 2925 * we free ra.rrd and exit. 2926 */ 2927 while (rwa.err == 0) { 2928 if (issig(JUSTLOOKING) && issig(FORREAL)) { 2929 err = SET_ERROR(EINTR); 2930 break; 2931 } 2932 2933 ASSERT3P(ra.rrd, ==, NULL); 2934 ra.rrd = ra.next_rrd; 2935 ra.next_rrd = NULL; 2936 /* Allocates and loads header into ra.next_rrd */ 2937 err = receive_read_record(&ra); 2938 2939 if (ra.rrd->header.drr_type == DRR_END || err != 0) { 2940 kmem_free(ra.rrd, sizeof (*ra.rrd)); 2941 ra.rrd = NULL; 2942 break; 2943 } 2944 2945 bqueue_enqueue(&rwa.q, ra.rrd, 2946 sizeof (struct receive_record_arg) + ra.rrd->payload_size); 2947 ra.rrd = NULL; 2948 } 2949 if (ra.next_rrd == NULL) 2950 ra.next_rrd = kmem_zalloc(sizeof (*ra.next_rrd), KM_SLEEP); 2951 ra.next_rrd->eos_marker = B_TRUE; 2952 bqueue_enqueue(&rwa.q, ra.next_rrd, 1); 2953 2954 mutex_enter(&rwa.mutex); 2955 while (!rwa.done) { 2956 cv_wait(&rwa.cv, &rwa.mutex); 2957 } 2958 mutex_exit(&rwa.mutex); 2959 2960 cv_destroy(&rwa.cv); 2961 mutex_destroy(&rwa.mutex); 2962 bqueue_destroy(&rwa.q); 2963 if (err == 0) 2964 err = rwa.err; 2965 2966out: 2967 nvlist_free(begin_nvl); 2968 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1)) 2969 zfs_onexit_fd_rele(cleanup_fd); 2970 2971 if (err != 0) { 2972 /* 2973 * Clean up references. If receive is not resumable, 2974 * destroy what we created, so we don't leave it in 2975 * the inconsistent state. 2976 */ 2977 dmu_recv_cleanup_ds(drc); 2978 } 2979 2980 *voffp = ra.voff; 2981 objlist_destroy(&ra.ignore_objlist); 2982 return (err); 2983} 2984 2985static int 2986dmu_recv_end_check(void *arg, dmu_tx_t *tx) 2987{ 2988 dmu_recv_cookie_t *drc = arg; 2989 dsl_pool_t *dp = dmu_tx_pool(tx); 2990 int error; 2991 2992 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag); 2993 2994 if (!drc->drc_newfs) { 2995 dsl_dataset_t *origin_head; 2996 2997 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head); 2998 if (error != 0) 2999 return (error); 3000 if (drc->drc_force) { 3001 /* 3002 * We will destroy any snapshots in tofs (i.e. before 3003 * origin_head) that are after the origin (which is 3004 * the snap before drc_ds, because drc_ds can not 3005 * have any snaps of its own). 3006 */ 3007 uint64_t obj; 3008 3009 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3010 while (obj != 3011 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3012 dsl_dataset_t *snap; 3013 error = dsl_dataset_hold_obj(dp, obj, FTAG, 3014 &snap); 3015 if (error != 0) 3016 break; 3017 if (snap->ds_dir != origin_head->ds_dir) 3018 error = SET_ERROR(EINVAL); 3019 if (error == 0) { 3020 error = dsl_destroy_snapshot_check_impl( 3021 snap, B_FALSE); 3022 } 3023 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3024 dsl_dataset_rele(snap, FTAG); 3025 if (error != 0) 3026 break; 3027 } 3028 if (error != 0) { 3029 dsl_dataset_rele(origin_head, FTAG); 3030 return (error); 3031 } 3032 } 3033 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds, 3034 origin_head, drc->drc_force, drc->drc_owner, tx); 3035 if (error != 0) { 3036 dsl_dataset_rele(origin_head, FTAG); 3037 return (error); 3038 } 3039 error = dsl_dataset_snapshot_check_impl(origin_head, 3040 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3041 dsl_dataset_rele(origin_head, FTAG); 3042 if (error != 0) 3043 return (error); 3044 3045 error = dsl_destroy_head_check_impl(drc->drc_ds, 1); 3046 } else { 3047 error = dsl_dataset_snapshot_check_impl(drc->drc_ds, 3048 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred); 3049 } 3050 return (error); 3051} 3052 3053static void 3054dmu_recv_end_sync(void *arg, dmu_tx_t *tx) 3055{ 3056 dmu_recv_cookie_t *drc = arg; 3057 dsl_pool_t *dp = dmu_tx_pool(tx); 3058 3059 spa_history_log_internal_ds(drc->drc_ds, "finish receiving", 3060 tx, "snap=%s", drc->drc_tosnap); 3061 3062 if (!drc->drc_newfs) { 3063 dsl_dataset_t *origin_head; 3064 3065 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG, 3066 &origin_head)); 3067 3068 if (drc->drc_force) { 3069 /* 3070 * Destroy any snapshots of drc_tofs (origin_head) 3071 * after the origin (the snap before drc_ds). 3072 */ 3073 uint64_t obj; 3074 3075 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj; 3076 while (obj != 3077 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) { 3078 dsl_dataset_t *snap; 3079 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG, 3080 &snap)); 3081 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir); 3082 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj; 3083 dsl_destroy_snapshot_sync_impl(snap, 3084 B_FALSE, tx); 3085 dsl_dataset_rele(snap, FTAG); 3086 } 3087 } 3088 VERIFY3P(drc->drc_ds->ds_prev, ==, 3089 origin_head->ds_prev); 3090 3091 dsl_dataset_clone_swap_sync_impl(drc->drc_ds, 3092 origin_head, tx); 3093 dsl_dataset_snapshot_sync_impl(origin_head, 3094 drc->drc_tosnap, tx); 3095 3096 /* set snapshot's creation time and guid */ 3097 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx); 3098 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time = 3099 drc->drc_drrb->drr_creation_time; 3100 dsl_dataset_phys(origin_head->ds_prev)->ds_guid = 3101 drc->drc_drrb->drr_toguid; 3102 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &= 3103 ~DS_FLAG_INCONSISTENT; 3104 3105 dmu_buf_will_dirty(origin_head->ds_dbuf, tx); 3106 dsl_dataset_phys(origin_head)->ds_flags &= 3107 ~DS_FLAG_INCONSISTENT; 3108 3109 dsl_dataset_rele(origin_head, FTAG); 3110 dsl_destroy_head_sync_impl(drc->drc_ds, tx); 3111 3112 if (drc->drc_owner != NULL) 3113 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner); 3114 } else { 3115 dsl_dataset_t *ds = drc->drc_ds; 3116 3117 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx); 3118 3119 /* set snapshot's creation time and guid */ 3120 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx); 3121 dsl_dataset_phys(ds->ds_prev)->ds_creation_time = 3122 drc->drc_drrb->drr_creation_time; 3123 dsl_dataset_phys(ds->ds_prev)->ds_guid = 3124 drc->drc_drrb->drr_toguid; 3125 dsl_dataset_phys(ds->ds_prev)->ds_flags &= 3126 ~DS_FLAG_INCONSISTENT; 3127 3128 dmu_buf_will_dirty(ds->ds_dbuf, tx); 3129 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT; 3130 if (dsl_dataset_has_resume_receive_state(ds)) { 3131 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3132 DS_FIELD_RESUME_FROMGUID, tx); 3133 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3134 DS_FIELD_RESUME_OBJECT, tx); 3135 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3136 DS_FIELD_RESUME_OFFSET, tx); 3137 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3138 DS_FIELD_RESUME_BYTES, tx); 3139 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3140 DS_FIELD_RESUME_TOGUID, tx); 3141 (void) zap_remove(dp->dp_meta_objset, ds->ds_object, 3142 DS_FIELD_RESUME_TONAME, tx); 3143 } 3144 } 3145 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj; 3146 /* 3147 * Release the hold from dmu_recv_begin. This must be done before 3148 * we return to open context, so that when we free the dataset's dnode, 3149 * we can evict its bonus buffer. 3150 */ 3151 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag); 3152 drc->drc_ds = NULL; 3153} 3154 3155static int 3156add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj) 3157{ 3158 dsl_pool_t *dp; 3159 dsl_dataset_t *snapds; 3160 guid_map_entry_t *gmep; 3161 int err; 3162 3163 ASSERT(guid_map != NULL); 3164 3165 err = dsl_pool_hold(name, FTAG, &dp); 3166 if (err != 0) 3167 return (err); 3168 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP); 3169 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds); 3170 if (err == 0) { 3171 gmep->guid = dsl_dataset_phys(snapds)->ds_guid; 3172 gmep->gme_ds = snapds; 3173 avl_add(guid_map, gmep); 3174 dsl_dataset_long_hold(snapds, gmep); 3175 } else 3176 kmem_free(gmep, sizeof (*gmep)); 3177 3178 dsl_pool_rele(dp, FTAG); 3179 return (err); 3180} 3181 3182static int dmu_recv_end_modified_blocks = 3; 3183 3184static int 3185dmu_recv_existing_end(dmu_recv_cookie_t *drc) 3186{ 3187 int error; 3188 3189#ifdef _KERNEL 3190 /* 3191 * We will be destroying the ds; make sure its origin is unmounted if 3192 * necessary. 3193 */ 3194 char name[ZFS_MAX_DATASET_NAME_LEN]; 3195 dsl_dataset_name(drc->drc_ds, name); 3196 zfs_destroy_unmount_origin(name); 3197#endif 3198 3199 error = dsl_sync_task(drc->drc_tofs, 3200 dmu_recv_end_check, dmu_recv_end_sync, drc, 3201 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3202 3203 if (error != 0) 3204 dmu_recv_cleanup_ds(drc); 3205 return (error); 3206} 3207 3208static int 3209dmu_recv_new_end(dmu_recv_cookie_t *drc) 3210{ 3211 int error; 3212 3213 error = dsl_sync_task(drc->drc_tofs, 3214 dmu_recv_end_check, dmu_recv_end_sync, drc, 3215 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL); 3216 3217 if (error != 0) { 3218 dmu_recv_cleanup_ds(drc); 3219 } else if (drc->drc_guid_to_ds_map != NULL) { 3220 (void) add_ds_to_guidmap(drc->drc_tofs, 3221 drc->drc_guid_to_ds_map, 3222 drc->drc_newsnapobj); 3223 } 3224 return (error); 3225} 3226 3227int 3228dmu_recv_end(dmu_recv_cookie_t *drc, void *owner) 3229{ 3230 drc->drc_owner = owner; 3231 3232 if (drc->drc_newfs) 3233 return (dmu_recv_new_end(drc)); 3234 else 3235 return (dmu_recv_existing_end(drc)); 3236} 3237 3238/* 3239 * Return TRUE if this objset is currently being received into. 3240 */ 3241boolean_t 3242dmu_objset_is_receiving(objset_t *os) 3243{ 3244 return (os->os_dsl_dataset != NULL && 3245 os->os_dsl_dataset->ds_owner == dmu_recv_tag); 3246} 3247