primary.c revision 209183
1204076Spjd/*- 2204076Spjd * Copyright (c) 2009 The FreeBSD Foundation 3204076Spjd * All rights reserved. 4204076Spjd * 5204076Spjd * This software was developed by Pawel Jakub Dawidek under sponsorship from 6204076Spjd * the FreeBSD Foundation. 7204076Spjd * 8204076Spjd * Redistribution and use in source and binary forms, with or without 9204076Spjd * modification, are permitted provided that the following conditions 10204076Spjd * are met: 11204076Spjd * 1. Redistributions of source code must retain the above copyright 12204076Spjd * notice, this list of conditions and the following disclaimer. 13204076Spjd * 2. Redistributions in binary form must reproduce the above copyright 14204076Spjd * notice, this list of conditions and the following disclaimer in the 15204076Spjd * documentation and/or other materials provided with the distribution. 16204076Spjd * 17204076Spjd * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18204076Spjd * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19204076Spjd * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20204076Spjd * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21204076Spjd * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22204076Spjd * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23204076Spjd * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24204076Spjd * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25204076Spjd * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26204076Spjd * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27204076Spjd * SUCH DAMAGE. 28204076Spjd */ 29204076Spjd 30204076Spjd#include <sys/cdefs.h> 31204076Spjd__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 209183 2010-06-14 21:44:20Z pjd $"); 32204076Spjd 33204076Spjd#include <sys/types.h> 34204076Spjd#include <sys/time.h> 35204076Spjd#include <sys/bio.h> 36204076Spjd#include <sys/disk.h> 37204076Spjd#include <sys/refcount.h> 38204076Spjd#include <sys/stat.h> 39204076Spjd 40204076Spjd#include <geom/gate/g_gate.h> 41204076Spjd 42204076Spjd#include <assert.h> 43204076Spjd#include <err.h> 44204076Spjd#include <errno.h> 45204076Spjd#include <fcntl.h> 46204076Spjd#include <libgeom.h> 47204076Spjd#include <pthread.h> 48204076Spjd#include <stdint.h> 49204076Spjd#include <stdio.h> 50204076Spjd#include <string.h> 51204076Spjd#include <sysexits.h> 52204076Spjd#include <unistd.h> 53204076Spjd 54204076Spjd#include <activemap.h> 55204076Spjd#include <nv.h> 56204076Spjd#include <rangelock.h> 57204076Spjd 58204076Spjd#include "control.h" 59204076Spjd#include "hast.h" 60204076Spjd#include "hast_proto.h" 61204076Spjd#include "hastd.h" 62204076Spjd#include "metadata.h" 63204076Spjd#include "proto.h" 64204076Spjd#include "pjdlog.h" 65204076Spjd#include "subr.h" 66204076Spjd#include "synch.h" 67204076Spjd 68204076Spjdstruct hio { 69204076Spjd /* 70204076Spjd * Number of components we are still waiting for. 71204076Spjd * When this field goes to 0, we can send the request back to the 72204076Spjd * kernel. Each component has to decrease this counter by one 73204076Spjd * even on failure. 74204076Spjd */ 75204076Spjd unsigned int hio_countdown; 76204076Spjd /* 77204076Spjd * Each component has a place to store its own error. 78204076Spjd * Once the request is handled by all components we can decide if the 79204076Spjd * request overall is successful or not. 80204076Spjd */ 81204076Spjd int *hio_errors; 82204076Spjd /* 83204076Spjd * Structure used to comunicate with GEOM Gate class. 84204076Spjd */ 85204076Spjd struct g_gate_ctl_io hio_ggio; 86204076Spjd TAILQ_ENTRY(hio) *hio_next; 87204076Spjd}; 88204076Spjd#define hio_free_next hio_next[0] 89204076Spjd#define hio_done_next hio_next[0] 90204076Spjd 91204076Spjd/* 92204076Spjd * Free list holds unused structures. When free list is empty, we have to wait 93204076Spjd * until some in-progress requests are freed. 94204076Spjd */ 95204076Spjdstatic TAILQ_HEAD(, hio) hio_free_list; 96204076Spjdstatic pthread_mutex_t hio_free_list_lock; 97204076Spjdstatic pthread_cond_t hio_free_list_cond; 98204076Spjd/* 99204076Spjd * There is one send list for every component. One requests is placed on all 100204076Spjd * send lists - each component gets the same request, but each component is 101204076Spjd * responsible for managing his own send list. 102204076Spjd */ 103204076Spjdstatic TAILQ_HEAD(, hio) *hio_send_list; 104204076Spjdstatic pthread_mutex_t *hio_send_list_lock; 105204076Spjdstatic pthread_cond_t *hio_send_list_cond; 106204076Spjd/* 107204076Spjd * There is one recv list for every component, although local components don't 108204076Spjd * use recv lists as local requests are done synchronously. 109204076Spjd */ 110204076Spjdstatic TAILQ_HEAD(, hio) *hio_recv_list; 111204076Spjdstatic pthread_mutex_t *hio_recv_list_lock; 112204076Spjdstatic pthread_cond_t *hio_recv_list_cond; 113204076Spjd/* 114204076Spjd * Request is placed on done list by the slowest component (the one that 115204076Spjd * decreased hio_countdown from 1 to 0). 116204076Spjd */ 117204076Spjdstatic TAILQ_HEAD(, hio) hio_done_list; 118204076Spjdstatic pthread_mutex_t hio_done_list_lock; 119204076Spjdstatic pthread_cond_t hio_done_list_cond; 120204076Spjd/* 121204076Spjd * Structure below are for interaction with sync thread. 122204076Spjd */ 123204076Spjdstatic bool sync_inprogress; 124204076Spjdstatic pthread_mutex_t sync_lock; 125204076Spjdstatic pthread_cond_t sync_cond; 126204076Spjd/* 127204076Spjd * The lock below allows to synchornize access to remote connections. 128204076Spjd */ 129204076Spjdstatic pthread_rwlock_t *hio_remote_lock; 130204076Spjdstatic pthread_mutex_t hio_guard_lock; 131204076Spjdstatic pthread_cond_t hio_guard_cond; 132204076Spjd 133204076Spjd/* 134204076Spjd * Lock to synchronize metadata updates. Also synchronize access to 135204076Spjd * hr_primary_localcnt and hr_primary_remotecnt fields. 136204076Spjd */ 137204076Spjdstatic pthread_mutex_t metadata_lock; 138204076Spjd 139204076Spjd/* 140204076Spjd * Maximum number of outstanding I/O requests. 141204076Spjd */ 142204076Spjd#define HAST_HIO_MAX 256 143204076Spjd/* 144204076Spjd * Number of components. At this point there are only two components: local 145204076Spjd * and remote, but in the future it might be possible to use multiple local 146204076Spjd * and remote components. 147204076Spjd */ 148204076Spjd#define HAST_NCOMPONENTS 2 149204076Spjd/* 150204076Spjd * Number of seconds to sleep before next reconnect try. 151204076Spjd */ 152204076Spjd#define RECONNECT_SLEEP 5 153204076Spjd 154204076Spjd#define ISCONNECTED(res, no) \ 155204076Spjd ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 156204076Spjd 157204076Spjd#define QUEUE_INSERT1(hio, name, ncomp) do { \ 158204076Spjd bool _wakeup; \ 159204076Spjd \ 160204076Spjd mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 161204076Spjd _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 162204076Spjd TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 163204076Spjd hio_next[(ncomp)]); \ 164204076Spjd mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 165204076Spjd if (_wakeup) \ 166204076Spjd cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 167204076Spjd} while (0) 168204076Spjd#define QUEUE_INSERT2(hio, name) do { \ 169204076Spjd bool _wakeup; \ 170204076Spjd \ 171204076Spjd mtx_lock(&hio_##name##_list_lock); \ 172204076Spjd _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 173204076Spjd TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 174204076Spjd mtx_unlock(&hio_##name##_list_lock); \ 175204076Spjd if (_wakeup) \ 176204076Spjd cv_signal(&hio_##name##_list_cond); \ 177204076Spjd} while (0) 178204076Spjd#define QUEUE_TAKE1(hio, name, ncomp) do { \ 179204076Spjd mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 180204076Spjd while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL) { \ 181204076Spjd cv_wait(&hio_##name##_list_cond[(ncomp)], \ 182204076Spjd &hio_##name##_list_lock[(ncomp)]); \ 183204076Spjd } \ 184204076Spjd TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 185204076Spjd hio_next[(ncomp)]); \ 186204076Spjd mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 187204076Spjd} while (0) 188204076Spjd#define QUEUE_TAKE2(hio, name) do { \ 189204076Spjd mtx_lock(&hio_##name##_list_lock); \ 190204076Spjd while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 191204076Spjd cv_wait(&hio_##name##_list_cond, \ 192204076Spjd &hio_##name##_list_lock); \ 193204076Spjd } \ 194204076Spjd TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 195204076Spjd mtx_unlock(&hio_##name##_list_lock); \ 196204076Spjd} while (0) 197204076Spjd 198209183Spjd#define SYNCREQ(hio) do { \ 199209183Spjd (hio)->hio_ggio.gctl_unit = -1; \ 200209183Spjd (hio)->hio_ggio.gctl_seq = 1; \ 201209183Spjd} while (0) 202204076Spjd#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 203204076Spjd#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 204204076Spjd#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 205204076Spjd 206204076Spjdstatic struct hast_resource *gres; 207204076Spjd 208204076Spjdstatic pthread_mutex_t range_lock; 209204076Spjdstatic struct rangelocks *range_regular; 210204076Spjdstatic bool range_regular_wait; 211204076Spjdstatic pthread_cond_t range_regular_cond; 212204076Spjdstatic struct rangelocks *range_sync; 213204076Spjdstatic bool range_sync_wait; 214204076Spjdstatic pthread_cond_t range_sync_cond; 215204076Spjd 216204076Spjdstatic void *ggate_recv_thread(void *arg); 217204076Spjdstatic void *local_send_thread(void *arg); 218204076Spjdstatic void *remote_send_thread(void *arg); 219204076Spjdstatic void *remote_recv_thread(void *arg); 220204076Spjdstatic void *ggate_send_thread(void *arg); 221204076Spjdstatic void *sync_thread(void *arg); 222204076Spjdstatic void *guard_thread(void *arg); 223204076Spjd 224204076Spjdstatic void sighandler(int sig); 225204076Spjd 226204076Spjdstatic void 227204076Spjdcleanup(struct hast_resource *res) 228204076Spjd{ 229204076Spjd int rerrno; 230204076Spjd 231204076Spjd /* Remember errno. */ 232204076Spjd rerrno = errno; 233204076Spjd 234204076Spjd /* 235204076Spjd * Close descriptor to /dev/hast/<name> 236204076Spjd * to work-around race in the kernel. 237204076Spjd */ 238204076Spjd close(res->hr_localfd); 239204076Spjd 240204076Spjd /* Destroy ggate provider if we created one. */ 241204076Spjd if (res->hr_ggateunit >= 0) { 242204076Spjd struct g_gate_ctl_destroy ggiod; 243204076Spjd 244204076Spjd ggiod.gctl_version = G_GATE_VERSION; 245204076Spjd ggiod.gctl_unit = res->hr_ggateunit; 246204076Spjd ggiod.gctl_force = 1; 247204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 248204076Spjd pjdlog_warning("Unable to destroy hast/%s device", 249204076Spjd res->hr_provname); 250204076Spjd } 251204076Spjd res->hr_ggateunit = -1; 252204076Spjd } 253204076Spjd 254204076Spjd /* Restore errno. */ 255204076Spjd errno = rerrno; 256204076Spjd} 257204076Spjd 258204076Spjdstatic void 259204076Spjdprimary_exit(int exitcode, const char *fmt, ...) 260204076Spjd{ 261204076Spjd va_list ap; 262204076Spjd 263204076Spjd assert(exitcode != EX_OK); 264204076Spjd va_start(ap, fmt); 265204076Spjd pjdlogv_errno(LOG_ERR, fmt, ap); 266204076Spjd va_end(ap); 267204076Spjd cleanup(gres); 268204076Spjd exit(exitcode); 269204076Spjd} 270204076Spjd 271204076Spjdstatic void 272204076Spjdprimary_exitx(int exitcode, const char *fmt, ...) 273204076Spjd{ 274204076Spjd va_list ap; 275204076Spjd 276204076Spjd va_start(ap, fmt); 277204076Spjd pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 278204076Spjd va_end(ap); 279204076Spjd cleanup(gres); 280204076Spjd exit(exitcode); 281204076Spjd} 282204076Spjd 283204076Spjdstatic int 284204076Spjdhast_activemap_flush(struct hast_resource *res) 285204076Spjd{ 286204076Spjd const unsigned char *buf; 287204076Spjd size_t size; 288204076Spjd 289204076Spjd buf = activemap_bitmap(res->hr_amp, &size); 290204076Spjd assert(buf != NULL); 291204076Spjd assert((size % res->hr_local_sectorsize) == 0); 292204076Spjd if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 293204076Spjd (ssize_t)size) { 294204076Spjd KEEP_ERRNO(pjdlog_errno(LOG_ERR, 295204076Spjd "Unable to flush activemap to disk")); 296204076Spjd return (-1); 297204076Spjd } 298204076Spjd return (0); 299204076Spjd} 300204076Spjd 301204076Spjdstatic void 302204076Spjdinit_environment(struct hast_resource *res __unused) 303204076Spjd{ 304204076Spjd struct hio *hio; 305204076Spjd unsigned int ii, ncomps; 306204076Spjd 307204076Spjd /* 308204076Spjd * In the future it might be per-resource value. 309204076Spjd */ 310204076Spjd ncomps = HAST_NCOMPONENTS; 311204076Spjd 312204076Spjd /* 313204076Spjd * Allocate memory needed by lists. 314204076Spjd */ 315204076Spjd hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 316204076Spjd if (hio_send_list == NULL) { 317204076Spjd primary_exitx(EX_TEMPFAIL, 318204076Spjd "Unable to allocate %zu bytes of memory for send lists.", 319204076Spjd sizeof(hio_send_list[0]) * ncomps); 320204076Spjd } 321204076Spjd hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 322204076Spjd if (hio_send_list_lock == NULL) { 323204076Spjd primary_exitx(EX_TEMPFAIL, 324204076Spjd "Unable to allocate %zu bytes of memory for send list locks.", 325204076Spjd sizeof(hio_send_list_lock[0]) * ncomps); 326204076Spjd } 327204076Spjd hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 328204076Spjd if (hio_send_list_cond == NULL) { 329204076Spjd primary_exitx(EX_TEMPFAIL, 330204076Spjd "Unable to allocate %zu bytes of memory for send list condition variables.", 331204076Spjd sizeof(hio_send_list_cond[0]) * ncomps); 332204076Spjd } 333204076Spjd hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 334204076Spjd if (hio_recv_list == NULL) { 335204076Spjd primary_exitx(EX_TEMPFAIL, 336204076Spjd "Unable to allocate %zu bytes of memory for recv lists.", 337204076Spjd sizeof(hio_recv_list[0]) * ncomps); 338204076Spjd } 339204076Spjd hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 340204076Spjd if (hio_recv_list_lock == NULL) { 341204076Spjd primary_exitx(EX_TEMPFAIL, 342204076Spjd "Unable to allocate %zu bytes of memory for recv list locks.", 343204076Spjd sizeof(hio_recv_list_lock[0]) * ncomps); 344204076Spjd } 345204076Spjd hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 346204076Spjd if (hio_recv_list_cond == NULL) { 347204076Spjd primary_exitx(EX_TEMPFAIL, 348204076Spjd "Unable to allocate %zu bytes of memory for recv list condition variables.", 349204076Spjd sizeof(hio_recv_list_cond[0]) * ncomps); 350204076Spjd } 351204076Spjd hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 352204076Spjd if (hio_remote_lock == NULL) { 353204076Spjd primary_exitx(EX_TEMPFAIL, 354204076Spjd "Unable to allocate %zu bytes of memory for remote connections locks.", 355204076Spjd sizeof(hio_remote_lock[0]) * ncomps); 356204076Spjd } 357204076Spjd 358204076Spjd /* 359204076Spjd * Initialize lists, their locks and theirs condition variables. 360204076Spjd */ 361204076Spjd TAILQ_INIT(&hio_free_list); 362204076Spjd mtx_init(&hio_free_list_lock); 363204076Spjd cv_init(&hio_free_list_cond); 364204076Spjd for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 365204076Spjd TAILQ_INIT(&hio_send_list[ii]); 366204076Spjd mtx_init(&hio_send_list_lock[ii]); 367204076Spjd cv_init(&hio_send_list_cond[ii]); 368204076Spjd TAILQ_INIT(&hio_recv_list[ii]); 369204076Spjd mtx_init(&hio_recv_list_lock[ii]); 370204076Spjd cv_init(&hio_recv_list_cond[ii]); 371204076Spjd rw_init(&hio_remote_lock[ii]); 372204076Spjd } 373204076Spjd TAILQ_INIT(&hio_done_list); 374204076Spjd mtx_init(&hio_done_list_lock); 375204076Spjd cv_init(&hio_done_list_cond); 376204076Spjd mtx_init(&hio_guard_lock); 377204076Spjd cv_init(&hio_guard_cond); 378204076Spjd mtx_init(&metadata_lock); 379204076Spjd 380204076Spjd /* 381204076Spjd * Allocate requests pool and initialize requests. 382204076Spjd */ 383204076Spjd for (ii = 0; ii < HAST_HIO_MAX; ii++) { 384204076Spjd hio = malloc(sizeof(*hio)); 385204076Spjd if (hio == NULL) { 386204076Spjd primary_exitx(EX_TEMPFAIL, 387204076Spjd "Unable to allocate %zu bytes of memory for hio request.", 388204076Spjd sizeof(*hio)); 389204076Spjd } 390204076Spjd hio->hio_countdown = 0; 391204076Spjd hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 392204076Spjd if (hio->hio_errors == NULL) { 393204076Spjd primary_exitx(EX_TEMPFAIL, 394204076Spjd "Unable allocate %zu bytes of memory for hio errors.", 395204076Spjd sizeof(hio->hio_errors[0]) * ncomps); 396204076Spjd } 397204076Spjd hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 398204076Spjd if (hio->hio_next == NULL) { 399204076Spjd primary_exitx(EX_TEMPFAIL, 400204076Spjd "Unable allocate %zu bytes of memory for hio_next field.", 401204076Spjd sizeof(hio->hio_next[0]) * ncomps); 402204076Spjd } 403204076Spjd hio->hio_ggio.gctl_version = G_GATE_VERSION; 404204076Spjd hio->hio_ggio.gctl_data = malloc(MAXPHYS); 405204076Spjd if (hio->hio_ggio.gctl_data == NULL) { 406204076Spjd primary_exitx(EX_TEMPFAIL, 407204076Spjd "Unable to allocate %zu bytes of memory for gctl_data.", 408204076Spjd MAXPHYS); 409204076Spjd } 410204076Spjd hio->hio_ggio.gctl_length = MAXPHYS; 411204076Spjd hio->hio_ggio.gctl_error = 0; 412204076Spjd TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 413204076Spjd } 414204076Spjd 415204076Spjd /* 416204076Spjd * Turn on signals handling. 417204076Spjd */ 418204076Spjd signal(SIGINT, sighandler); 419204076Spjd signal(SIGTERM, sighandler); 420204076Spjd} 421204076Spjd 422204076Spjdstatic void 423204076Spjdinit_local(struct hast_resource *res) 424204076Spjd{ 425204076Spjd unsigned char *buf; 426204076Spjd size_t mapsize; 427204076Spjd 428204076Spjd if (metadata_read(res, true) < 0) 429204076Spjd exit(EX_NOINPUT); 430204076Spjd mtx_init(&res->hr_amp_lock); 431204076Spjd if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 432204076Spjd res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 433204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 434204076Spjd } 435204076Spjd mtx_init(&range_lock); 436204076Spjd cv_init(&range_regular_cond); 437204076Spjd if (rangelock_init(&range_regular) < 0) 438204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 439204076Spjd cv_init(&range_sync_cond); 440204076Spjd if (rangelock_init(&range_sync) < 0) 441204076Spjd primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 442204076Spjd mapsize = activemap_ondisk_size(res->hr_amp); 443204076Spjd buf = calloc(1, mapsize); 444204076Spjd if (buf == NULL) { 445204076Spjd primary_exitx(EX_TEMPFAIL, 446204076Spjd "Unable to allocate buffer for activemap."); 447204076Spjd } 448204076Spjd if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 449204076Spjd (ssize_t)mapsize) { 450204076Spjd primary_exit(EX_NOINPUT, "Unable to read activemap"); 451204076Spjd } 452204076Spjd activemap_copyin(res->hr_amp, buf, mapsize); 453209181Spjd free(buf); 454204076Spjd if (res->hr_resuid != 0) 455204076Spjd return; 456204076Spjd /* 457204076Spjd * We're using provider for the first time, so we have to generate 458204076Spjd * resource unique identifier and initialize local and remote counts. 459204076Spjd */ 460204076Spjd arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 461204076Spjd res->hr_primary_localcnt = 1; 462204076Spjd res->hr_primary_remotecnt = 0; 463204076Spjd if (metadata_write(res) < 0) 464204076Spjd exit(EX_NOINPUT); 465204076Spjd} 466204076Spjd 467205738Spjdstatic bool 468205738Spjdinit_remote(struct hast_resource *res, struct proto_conn **inp, 469205738Spjd struct proto_conn **outp) 470204076Spjd{ 471205738Spjd struct proto_conn *in, *out; 472204076Spjd struct nv *nvout, *nvin; 473204076Spjd const unsigned char *token; 474204076Spjd unsigned char *map; 475204076Spjd const char *errmsg; 476204076Spjd int32_t extentsize; 477204076Spjd int64_t datasize; 478204076Spjd uint32_t mapsize; 479204076Spjd size_t size; 480204076Spjd 481205738Spjd assert((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 482205738Spjd 483205738Spjd in = out = NULL; 484205738Spjd 485204076Spjd /* Prepare outgoing connection with remote node. */ 486205738Spjd if (proto_client(res->hr_remoteaddr, &out) < 0) { 487207347Spjd primary_exit(EX_TEMPFAIL, "Unable to create connection to %s", 488204076Spjd res->hr_remoteaddr); 489204076Spjd } 490204076Spjd /* Try to connect, but accept failure. */ 491205738Spjd if (proto_connect(out) < 0) { 492204076Spjd pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 493204076Spjd res->hr_remoteaddr); 494204076Spjd goto close; 495204076Spjd } 496207371Spjd /* Error in setting timeout is not critical, but why should it fail? */ 497207371Spjd if (proto_timeout(out, res->hr_timeout) < 0) 498207371Spjd pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 499204076Spjd /* 500204076Spjd * First handshake step. 501204076Spjd * Setup outgoing connection with remote node. 502204076Spjd */ 503204076Spjd nvout = nv_alloc(); 504204076Spjd nv_add_string(nvout, res->hr_name, "resource"); 505204076Spjd if (nv_error(nvout) != 0) { 506204076Spjd pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 507204076Spjd "Unable to allocate header for connection with %s", 508204076Spjd res->hr_remoteaddr); 509204076Spjd nv_free(nvout); 510204076Spjd goto close; 511204076Spjd } 512205738Spjd if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 513204076Spjd pjdlog_errno(LOG_WARNING, 514204076Spjd "Unable to send handshake header to %s", 515204076Spjd res->hr_remoteaddr); 516204076Spjd nv_free(nvout); 517204076Spjd goto close; 518204076Spjd } 519204076Spjd nv_free(nvout); 520205738Spjd if (hast_proto_recv_hdr(out, &nvin) < 0) { 521204076Spjd pjdlog_errno(LOG_WARNING, 522204076Spjd "Unable to receive handshake header from %s", 523204076Spjd res->hr_remoteaddr); 524204076Spjd goto close; 525204076Spjd } 526204076Spjd errmsg = nv_get_string(nvin, "errmsg"); 527204076Spjd if (errmsg != NULL) { 528204076Spjd pjdlog_warning("%s", errmsg); 529204076Spjd nv_free(nvin); 530204076Spjd goto close; 531204076Spjd } 532204076Spjd token = nv_get_uint8_array(nvin, &size, "token"); 533204076Spjd if (token == NULL) { 534204076Spjd pjdlog_warning("Handshake header from %s has no 'token' field.", 535204076Spjd res->hr_remoteaddr); 536204076Spjd nv_free(nvin); 537204076Spjd goto close; 538204076Spjd } 539204076Spjd if (size != sizeof(res->hr_token)) { 540204076Spjd pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 541204076Spjd res->hr_remoteaddr, size, sizeof(res->hr_token)); 542204076Spjd nv_free(nvin); 543204076Spjd goto close; 544204076Spjd } 545204076Spjd bcopy(token, res->hr_token, sizeof(res->hr_token)); 546204076Spjd nv_free(nvin); 547204076Spjd 548204076Spjd /* 549204076Spjd * Second handshake step. 550204076Spjd * Setup incoming connection with remote node. 551204076Spjd */ 552205738Spjd if (proto_client(res->hr_remoteaddr, &in) < 0) { 553204076Spjd pjdlog_errno(LOG_WARNING, "Unable to create connection to %s", 554204076Spjd res->hr_remoteaddr); 555204076Spjd } 556204076Spjd /* Try to connect, but accept failure. */ 557205738Spjd if (proto_connect(in) < 0) { 558204076Spjd pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 559204076Spjd res->hr_remoteaddr); 560204076Spjd goto close; 561204076Spjd } 562207371Spjd /* Error in setting timeout is not critical, but why should it fail? */ 563207371Spjd if (proto_timeout(in, res->hr_timeout) < 0) 564207371Spjd pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 565204076Spjd nvout = nv_alloc(); 566204076Spjd nv_add_string(nvout, res->hr_name, "resource"); 567204076Spjd nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 568204076Spjd "token"); 569204076Spjd nv_add_uint64(nvout, res->hr_resuid, "resuid"); 570204076Spjd nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 571204076Spjd nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 572204076Spjd if (nv_error(nvout) != 0) { 573204076Spjd pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 574204076Spjd "Unable to allocate header for connection with %s", 575204076Spjd res->hr_remoteaddr); 576204076Spjd nv_free(nvout); 577204076Spjd goto close; 578204076Spjd } 579205738Spjd if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 580204076Spjd pjdlog_errno(LOG_WARNING, 581204076Spjd "Unable to send handshake header to %s", 582204076Spjd res->hr_remoteaddr); 583204076Spjd nv_free(nvout); 584204076Spjd goto close; 585204076Spjd } 586204076Spjd nv_free(nvout); 587205738Spjd if (hast_proto_recv_hdr(out, &nvin) < 0) { 588204076Spjd pjdlog_errno(LOG_WARNING, 589204076Spjd "Unable to receive handshake header from %s", 590204076Spjd res->hr_remoteaddr); 591204076Spjd goto close; 592204076Spjd } 593204076Spjd errmsg = nv_get_string(nvin, "errmsg"); 594204076Spjd if (errmsg != NULL) { 595204076Spjd pjdlog_warning("%s", errmsg); 596204076Spjd nv_free(nvin); 597204076Spjd goto close; 598204076Spjd } 599204076Spjd datasize = nv_get_int64(nvin, "datasize"); 600204076Spjd if (datasize != res->hr_datasize) { 601204076Spjd pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 602204076Spjd (intmax_t)res->hr_datasize, (intmax_t)datasize); 603204076Spjd nv_free(nvin); 604204076Spjd goto close; 605204076Spjd } 606204076Spjd extentsize = nv_get_int32(nvin, "extentsize"); 607204076Spjd if (extentsize != res->hr_extentsize) { 608204076Spjd pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 609204076Spjd (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 610204076Spjd nv_free(nvin); 611204076Spjd goto close; 612204076Spjd } 613204076Spjd res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 614204076Spjd res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 615204076Spjd res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 616204076Spjd map = NULL; 617204076Spjd mapsize = nv_get_uint32(nvin, "mapsize"); 618204076Spjd if (mapsize > 0) { 619204076Spjd map = malloc(mapsize); 620204076Spjd if (map == NULL) { 621204076Spjd pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 622204076Spjd (uintmax_t)mapsize); 623204076Spjd nv_free(nvin); 624204076Spjd goto close; 625204076Spjd } 626204076Spjd /* 627204076Spjd * Remote node have some dirty extents on its own, lets 628204076Spjd * download its activemap. 629204076Spjd */ 630205738Spjd if (hast_proto_recv_data(res, out, nvin, map, 631204076Spjd mapsize) < 0) { 632204076Spjd pjdlog_errno(LOG_ERR, 633204076Spjd "Unable to receive remote activemap"); 634204076Spjd nv_free(nvin); 635204076Spjd free(map); 636204076Spjd goto close; 637204076Spjd } 638204076Spjd /* 639204076Spjd * Merge local and remote bitmaps. 640204076Spjd */ 641204076Spjd activemap_merge(res->hr_amp, map, mapsize); 642204076Spjd free(map); 643204076Spjd /* 644204076Spjd * Now that we merged bitmaps from both nodes, flush it to the 645204076Spjd * disk before we start to synchronize. 646204076Spjd */ 647204076Spjd (void)hast_activemap_flush(res); 648204076Spjd } 649204076Spjd pjdlog_info("Connected to %s.", res->hr_remoteaddr); 650205738Spjd if (inp != NULL && outp != NULL) { 651205738Spjd *inp = in; 652205738Spjd *outp = out; 653205738Spjd } else { 654205738Spjd res->hr_remotein = in; 655205738Spjd res->hr_remoteout = out; 656205738Spjd } 657205738Spjd return (true); 658205738Spjdclose: 659205738Spjd proto_close(out); 660205738Spjd if (in != NULL) 661205738Spjd proto_close(in); 662205738Spjd return (false); 663205738Spjd} 664205738Spjd 665205738Spjdstatic void 666205738Spjdsync_start(void) 667205738Spjd{ 668205738Spjd 669204076Spjd mtx_lock(&sync_lock); 670204076Spjd sync_inprogress = true; 671204076Spjd mtx_unlock(&sync_lock); 672204076Spjd cv_signal(&sync_cond); 673204076Spjd} 674204076Spjd 675204076Spjdstatic void 676204076Spjdinit_ggate(struct hast_resource *res) 677204076Spjd{ 678204076Spjd struct g_gate_ctl_create ggiocreate; 679204076Spjd struct g_gate_ctl_cancel ggiocancel; 680204076Spjd 681204076Spjd /* 682204076Spjd * We communicate with ggate via /dev/ggctl. Open it. 683204076Spjd */ 684204076Spjd res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 685204076Spjd if (res->hr_ggatefd < 0) 686204076Spjd primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 687204076Spjd /* 688204076Spjd * Create provider before trying to connect, as connection failure 689204076Spjd * is not critical, but may take some time. 690204076Spjd */ 691204076Spjd ggiocreate.gctl_version = G_GATE_VERSION; 692204076Spjd ggiocreate.gctl_mediasize = res->hr_datasize; 693204076Spjd ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 694204076Spjd ggiocreate.gctl_flags = 0; 695206669Spjd ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 696204076Spjd ggiocreate.gctl_timeout = 0; 697204076Spjd ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 698204076Spjd snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 699204076Spjd res->hr_provname); 700204076Spjd bzero(ggiocreate.gctl_info, sizeof(ggiocreate.gctl_info)); 701204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 702204076Spjd pjdlog_info("Device hast/%s created.", res->hr_provname); 703204076Spjd res->hr_ggateunit = ggiocreate.gctl_unit; 704204076Spjd return; 705204076Spjd } 706204076Spjd if (errno != EEXIST) { 707204076Spjd primary_exit(EX_OSERR, "Unable to create hast/%s device", 708204076Spjd res->hr_provname); 709204076Spjd } 710204076Spjd pjdlog_debug(1, 711204076Spjd "Device hast/%s already exists, we will try to take it over.", 712204076Spjd res->hr_provname); 713204076Spjd /* 714204076Spjd * If we received EEXIST, we assume that the process who created the 715204076Spjd * provider died and didn't clean up. In that case we will start from 716204076Spjd * where he left of. 717204076Spjd */ 718204076Spjd ggiocancel.gctl_version = G_GATE_VERSION; 719204076Spjd ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 720204076Spjd snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 721204076Spjd res->hr_provname); 722204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 723204076Spjd pjdlog_info("Device hast/%s recovered.", res->hr_provname); 724204076Spjd res->hr_ggateunit = ggiocancel.gctl_unit; 725204076Spjd return; 726204076Spjd } 727204076Spjd primary_exit(EX_OSERR, "Unable to take over hast/%s device", 728204076Spjd res->hr_provname); 729204076Spjd} 730204076Spjd 731204076Spjdvoid 732204076Spjdhastd_primary(struct hast_resource *res) 733204076Spjd{ 734204076Spjd pthread_t td; 735204076Spjd pid_t pid; 736204076Spjd int error; 737204076Spjd 738204076Spjd gres = res; 739204076Spjd 740204076Spjd /* 741204076Spjd * Create communication channel between parent and child. 742204076Spjd */ 743204076Spjd if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 744204076Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 745204076Spjd primary_exit(EX_OSERR, 746204076Spjd "Unable to create control sockets between parent and child"); 747204076Spjd } 748204076Spjd 749204076Spjd pid = fork(); 750204076Spjd if (pid < 0) { 751204076Spjd KEEP_ERRNO((void)pidfile_remove(pfh)); 752207347Spjd primary_exit(EX_TEMPFAIL, "Unable to fork"); 753204076Spjd } 754204076Spjd 755204076Spjd if (pid > 0) { 756204076Spjd /* This is parent. */ 757204076Spjd res->hr_workerpid = pid; 758204076Spjd return; 759204076Spjd } 760204076Spjd (void)pidfile_close(pfh); 761204076Spjd 762204076Spjd setproctitle("%s (primary)", res->hr_name); 763204076Spjd 764204076Spjd init_local(res); 765205738Spjd if (init_remote(res, NULL, NULL)) 766205738Spjd sync_start(); 767204076Spjd init_ggate(res); 768204076Spjd init_environment(res); 769204076Spjd error = pthread_create(&td, NULL, ggate_recv_thread, res); 770204076Spjd assert(error == 0); 771204076Spjd error = pthread_create(&td, NULL, local_send_thread, res); 772204076Spjd assert(error == 0); 773204076Spjd error = pthread_create(&td, NULL, remote_send_thread, res); 774204076Spjd assert(error == 0); 775204076Spjd error = pthread_create(&td, NULL, remote_recv_thread, res); 776204076Spjd assert(error == 0); 777204076Spjd error = pthread_create(&td, NULL, ggate_send_thread, res); 778204076Spjd assert(error == 0); 779204076Spjd error = pthread_create(&td, NULL, sync_thread, res); 780204076Spjd assert(error == 0); 781204076Spjd error = pthread_create(&td, NULL, ctrl_thread, res); 782204076Spjd assert(error == 0); 783204076Spjd (void)guard_thread(res); 784204076Spjd} 785204076Spjd 786204076Spjdstatic void 787204076Spjdreqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 788204076Spjd{ 789204076Spjd char msg[1024]; 790204076Spjd va_list ap; 791204076Spjd int len; 792204076Spjd 793204076Spjd va_start(ap, fmt); 794204076Spjd len = vsnprintf(msg, sizeof(msg), fmt, ap); 795204076Spjd va_end(ap); 796204076Spjd if ((size_t)len < sizeof(msg)) { 797204076Spjd switch (ggio->gctl_cmd) { 798204076Spjd case BIO_READ: 799204076Spjd (void)snprintf(msg + len, sizeof(msg) - len, 800204076Spjd "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 801204076Spjd (uintmax_t)ggio->gctl_length); 802204076Spjd break; 803204076Spjd case BIO_DELETE: 804204076Spjd (void)snprintf(msg + len, sizeof(msg) - len, 805204076Spjd "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 806204076Spjd (uintmax_t)ggio->gctl_length); 807204076Spjd break; 808204076Spjd case BIO_FLUSH: 809204076Spjd (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 810204076Spjd break; 811204076Spjd case BIO_WRITE: 812204076Spjd (void)snprintf(msg + len, sizeof(msg) - len, 813204076Spjd "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 814204076Spjd (uintmax_t)ggio->gctl_length); 815204076Spjd break; 816204076Spjd default: 817204076Spjd (void)snprintf(msg + len, sizeof(msg) - len, 818204076Spjd "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 819204076Spjd break; 820204076Spjd } 821204076Spjd } 822204076Spjd pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 823204076Spjd} 824204076Spjd 825204076Spjdstatic void 826204076Spjdremote_close(struct hast_resource *res, int ncomp) 827204076Spjd{ 828204076Spjd 829204076Spjd rw_wlock(&hio_remote_lock[ncomp]); 830204076Spjd /* 831204076Spjd * A race is possible between dropping rlock and acquiring wlock - 832204076Spjd * another thread can close connection in-between. 833204076Spjd */ 834204076Spjd if (!ISCONNECTED(res, ncomp)) { 835204076Spjd assert(res->hr_remotein == NULL); 836204076Spjd assert(res->hr_remoteout == NULL); 837204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 838204076Spjd return; 839204076Spjd } 840204076Spjd 841204076Spjd assert(res->hr_remotein != NULL); 842204076Spjd assert(res->hr_remoteout != NULL); 843204076Spjd 844204076Spjd pjdlog_debug(2, "Closing old incoming connection to %s.", 845204076Spjd res->hr_remoteaddr); 846204076Spjd proto_close(res->hr_remotein); 847204076Spjd res->hr_remotein = NULL; 848204076Spjd pjdlog_debug(2, "Closing old outgoing connection to %s.", 849204076Spjd res->hr_remoteaddr); 850204076Spjd proto_close(res->hr_remoteout); 851204076Spjd res->hr_remoteout = NULL; 852204076Spjd 853204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 854204076Spjd 855204076Spjd /* 856204076Spjd * Stop synchronization if in-progress. 857204076Spjd */ 858204076Spjd mtx_lock(&sync_lock); 859204076Spjd if (sync_inprogress) 860204076Spjd sync_inprogress = false; 861204076Spjd mtx_unlock(&sync_lock); 862204076Spjd 863204076Spjd /* 864204076Spjd * Wake up guard thread, so it can immediately start reconnect. 865204076Spjd */ 866204076Spjd mtx_lock(&hio_guard_lock); 867204076Spjd cv_signal(&hio_guard_cond); 868204076Spjd mtx_unlock(&hio_guard_lock); 869204076Spjd} 870204076Spjd 871204076Spjd/* 872204076Spjd * Thread receives ggate I/O requests from the kernel and passes them to 873204076Spjd * appropriate threads: 874204076Spjd * WRITE - always goes to both local_send and remote_send threads 875204076Spjd * READ (when the block is up-to-date on local component) - 876204076Spjd * only local_send thread 877204076Spjd * READ (when the block isn't up-to-date on local component) - 878204076Spjd * only remote_send thread 879204076Spjd * DELETE - always goes to both local_send and remote_send threads 880204076Spjd * FLUSH - always goes to both local_send and remote_send threads 881204076Spjd */ 882204076Spjdstatic void * 883204076Spjdggate_recv_thread(void *arg) 884204076Spjd{ 885204076Spjd struct hast_resource *res = arg; 886204076Spjd struct g_gate_ctl_io *ggio; 887204076Spjd struct hio *hio; 888204076Spjd unsigned int ii, ncomp, ncomps; 889204076Spjd int error; 890204076Spjd 891204076Spjd ncomps = HAST_NCOMPONENTS; 892204076Spjd 893204076Spjd for (;;) { 894204076Spjd pjdlog_debug(2, "ggate_recv: Taking free request."); 895204076Spjd QUEUE_TAKE2(hio, free); 896204076Spjd pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 897204076Spjd ggio = &hio->hio_ggio; 898204076Spjd ggio->gctl_unit = res->hr_ggateunit; 899204076Spjd ggio->gctl_length = MAXPHYS; 900204076Spjd ggio->gctl_error = 0; 901204076Spjd pjdlog_debug(2, 902204076Spjd "ggate_recv: (%p) Waiting for request from the kernel.", 903204076Spjd hio); 904204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 905204076Spjd if (sigexit_received) 906204076Spjd pthread_exit(NULL); 907204076Spjd primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 908204076Spjd } 909204076Spjd error = ggio->gctl_error; 910204076Spjd switch (error) { 911204076Spjd case 0: 912204076Spjd break; 913204076Spjd case ECANCELED: 914204076Spjd /* Exit gracefully. */ 915204076Spjd if (!sigexit_received) { 916204076Spjd pjdlog_debug(2, 917204076Spjd "ggate_recv: (%p) Received cancel from the kernel.", 918204076Spjd hio); 919204076Spjd pjdlog_info("Received cancel from the kernel, exiting."); 920204076Spjd } 921204076Spjd pthread_exit(NULL); 922204076Spjd case ENOMEM: 923204076Spjd /* 924204076Spjd * Buffer too small? Impossible, we allocate MAXPHYS 925204076Spjd * bytes - request can't be bigger than that. 926204076Spjd */ 927204076Spjd /* FALLTHROUGH */ 928204076Spjd case ENXIO: 929204076Spjd default: 930204076Spjd primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 931204076Spjd strerror(error)); 932204076Spjd } 933204076Spjd for (ii = 0; ii < ncomps; ii++) 934204076Spjd hio->hio_errors[ii] = EINVAL; 935204076Spjd reqlog(LOG_DEBUG, 2, ggio, 936204076Spjd "ggate_recv: (%p) Request received from the kernel: ", 937204076Spjd hio); 938204076Spjd /* 939204076Spjd * Inform all components about new write request. 940204076Spjd * For read request prefer local component unless the given 941204076Spjd * range is out-of-date, then use remote component. 942204076Spjd */ 943204076Spjd switch (ggio->gctl_cmd) { 944204076Spjd case BIO_READ: 945204076Spjd pjdlog_debug(2, 946204076Spjd "ggate_recv: (%p) Moving request to the send queue.", 947204076Spjd hio); 948204076Spjd refcount_init(&hio->hio_countdown, 1); 949204076Spjd mtx_lock(&metadata_lock); 950204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 951204076Spjd res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 952204076Spjd /* 953204076Spjd * This range is up-to-date on local component, 954204076Spjd * so handle request locally. 955204076Spjd */ 956204076Spjd /* Local component is 0 for now. */ 957204076Spjd ncomp = 0; 958204076Spjd } else /* if (res->hr_syncsrc == 959204076Spjd HAST_SYNCSRC_SECONDARY) */ { 960204076Spjd assert(res->hr_syncsrc == 961204076Spjd HAST_SYNCSRC_SECONDARY); 962204076Spjd /* 963204076Spjd * This range is out-of-date on local component, 964204076Spjd * so send request to the remote node. 965204076Spjd */ 966204076Spjd /* Remote component is 1 for now. */ 967204076Spjd ncomp = 1; 968204076Spjd } 969204076Spjd mtx_unlock(&metadata_lock); 970204076Spjd QUEUE_INSERT1(hio, send, ncomp); 971204076Spjd break; 972204076Spjd case BIO_WRITE: 973204076Spjd for (;;) { 974204076Spjd mtx_lock(&range_lock); 975204076Spjd if (rangelock_islocked(range_sync, 976204076Spjd ggio->gctl_offset, ggio->gctl_length)) { 977204076Spjd pjdlog_debug(2, 978204076Spjd "regular: Range offset=%jd length=%zu locked.", 979204076Spjd (intmax_t)ggio->gctl_offset, 980204076Spjd (size_t)ggio->gctl_length); 981204076Spjd range_regular_wait = true; 982204076Spjd cv_wait(&range_regular_cond, &range_lock); 983204076Spjd range_regular_wait = false; 984204076Spjd mtx_unlock(&range_lock); 985204076Spjd continue; 986204076Spjd } 987204076Spjd if (rangelock_add(range_regular, 988204076Spjd ggio->gctl_offset, ggio->gctl_length) < 0) { 989204076Spjd mtx_unlock(&range_lock); 990204076Spjd pjdlog_debug(2, 991204076Spjd "regular: Range offset=%jd length=%zu is already locked, waiting.", 992204076Spjd (intmax_t)ggio->gctl_offset, 993204076Spjd (size_t)ggio->gctl_length); 994204076Spjd sleep(1); 995204076Spjd continue; 996204076Spjd } 997204076Spjd mtx_unlock(&range_lock); 998204076Spjd break; 999204076Spjd } 1000204076Spjd mtx_lock(&res->hr_amp_lock); 1001204076Spjd if (activemap_write_start(res->hr_amp, 1002204076Spjd ggio->gctl_offset, ggio->gctl_length)) { 1003204076Spjd (void)hast_activemap_flush(res); 1004204076Spjd } 1005204076Spjd mtx_unlock(&res->hr_amp_lock); 1006204076Spjd /* FALLTHROUGH */ 1007204076Spjd case BIO_DELETE: 1008204076Spjd case BIO_FLUSH: 1009204076Spjd pjdlog_debug(2, 1010204076Spjd "ggate_recv: (%p) Moving request to the send queues.", 1011204076Spjd hio); 1012204076Spjd refcount_init(&hio->hio_countdown, ncomps); 1013204076Spjd for (ii = 0; ii < ncomps; ii++) 1014204076Spjd QUEUE_INSERT1(hio, send, ii); 1015204076Spjd break; 1016204076Spjd } 1017204076Spjd } 1018204076Spjd /* NOTREACHED */ 1019204076Spjd return (NULL); 1020204076Spjd} 1021204076Spjd 1022204076Spjd/* 1023204076Spjd * Thread reads from or writes to local component. 1024204076Spjd * If local read fails, it redirects it to remote_send thread. 1025204076Spjd */ 1026204076Spjdstatic void * 1027204076Spjdlocal_send_thread(void *arg) 1028204076Spjd{ 1029204076Spjd struct hast_resource *res = arg; 1030204076Spjd struct g_gate_ctl_io *ggio; 1031204076Spjd struct hio *hio; 1032204076Spjd unsigned int ncomp, rncomp; 1033204076Spjd ssize_t ret; 1034204076Spjd 1035204076Spjd /* Local component is 0 for now. */ 1036204076Spjd ncomp = 0; 1037204076Spjd /* Remote component is 1 for now. */ 1038204076Spjd rncomp = 1; 1039204076Spjd 1040204076Spjd for (;;) { 1041204076Spjd pjdlog_debug(2, "local_send: Taking request."); 1042204076Spjd QUEUE_TAKE1(hio, send, ncomp); 1043204076Spjd pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1044204076Spjd ggio = &hio->hio_ggio; 1045204076Spjd switch (ggio->gctl_cmd) { 1046204076Spjd case BIO_READ: 1047204076Spjd ret = pread(res->hr_localfd, ggio->gctl_data, 1048204076Spjd ggio->gctl_length, 1049204076Spjd ggio->gctl_offset + res->hr_localoff); 1050204076Spjd if (ret == ggio->gctl_length) 1051204076Spjd hio->hio_errors[ncomp] = 0; 1052204076Spjd else { 1053204076Spjd /* 1054204076Spjd * If READ failed, try to read from remote node. 1055204076Spjd */ 1056204076Spjd QUEUE_INSERT1(hio, send, rncomp); 1057204076Spjd continue; 1058204076Spjd } 1059204076Spjd break; 1060204076Spjd case BIO_WRITE: 1061204076Spjd ret = pwrite(res->hr_localfd, ggio->gctl_data, 1062204076Spjd ggio->gctl_length, 1063204076Spjd ggio->gctl_offset + res->hr_localoff); 1064204076Spjd if (ret < 0) 1065204076Spjd hio->hio_errors[ncomp] = errno; 1066204076Spjd else if (ret != ggio->gctl_length) 1067204076Spjd hio->hio_errors[ncomp] = EIO; 1068204076Spjd else 1069204076Spjd hio->hio_errors[ncomp] = 0; 1070204076Spjd break; 1071204076Spjd case BIO_DELETE: 1072204076Spjd ret = g_delete(res->hr_localfd, 1073204076Spjd ggio->gctl_offset + res->hr_localoff, 1074204076Spjd ggio->gctl_length); 1075204076Spjd if (ret < 0) 1076204076Spjd hio->hio_errors[ncomp] = errno; 1077204076Spjd else 1078204076Spjd hio->hio_errors[ncomp] = 0; 1079204076Spjd break; 1080204076Spjd case BIO_FLUSH: 1081204076Spjd ret = g_flush(res->hr_localfd); 1082204076Spjd if (ret < 0) 1083204076Spjd hio->hio_errors[ncomp] = errno; 1084204076Spjd else 1085204076Spjd hio->hio_errors[ncomp] = 0; 1086204076Spjd break; 1087204076Spjd } 1088204076Spjd if (refcount_release(&hio->hio_countdown)) { 1089204076Spjd if (ISSYNCREQ(hio)) { 1090204076Spjd mtx_lock(&sync_lock); 1091204076Spjd SYNCREQDONE(hio); 1092204076Spjd mtx_unlock(&sync_lock); 1093204076Spjd cv_signal(&sync_cond); 1094204076Spjd } else { 1095204076Spjd pjdlog_debug(2, 1096204076Spjd "local_send: (%p) Moving request to the done queue.", 1097204076Spjd hio); 1098204076Spjd QUEUE_INSERT2(hio, done); 1099204076Spjd } 1100204076Spjd } 1101204076Spjd } 1102204076Spjd /* NOTREACHED */ 1103204076Spjd return (NULL); 1104204076Spjd} 1105204076Spjd 1106204076Spjd/* 1107204076Spjd * Thread sends request to secondary node. 1108204076Spjd */ 1109204076Spjdstatic void * 1110204076Spjdremote_send_thread(void *arg) 1111204076Spjd{ 1112204076Spjd struct hast_resource *res = arg; 1113204076Spjd struct g_gate_ctl_io *ggio; 1114204076Spjd struct hio *hio; 1115204076Spjd struct nv *nv; 1116204076Spjd unsigned int ncomp; 1117204076Spjd bool wakeup; 1118204076Spjd uint64_t offset, length; 1119204076Spjd uint8_t cmd; 1120204076Spjd void *data; 1121204076Spjd 1122204076Spjd /* Remote component is 1 for now. */ 1123204076Spjd ncomp = 1; 1124204076Spjd 1125204076Spjd for (;;) { 1126204076Spjd pjdlog_debug(2, "remote_send: Taking request."); 1127204076Spjd QUEUE_TAKE1(hio, send, ncomp); 1128204076Spjd pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1129204076Spjd ggio = &hio->hio_ggio; 1130204076Spjd switch (ggio->gctl_cmd) { 1131204076Spjd case BIO_READ: 1132204076Spjd cmd = HIO_READ; 1133204076Spjd data = NULL; 1134204076Spjd offset = ggio->gctl_offset; 1135204076Spjd length = ggio->gctl_length; 1136204076Spjd break; 1137204076Spjd case BIO_WRITE: 1138204076Spjd cmd = HIO_WRITE; 1139204076Spjd data = ggio->gctl_data; 1140204076Spjd offset = ggio->gctl_offset; 1141204076Spjd length = ggio->gctl_length; 1142204076Spjd break; 1143204076Spjd case BIO_DELETE: 1144204076Spjd cmd = HIO_DELETE; 1145204076Spjd data = NULL; 1146204076Spjd offset = ggio->gctl_offset; 1147204076Spjd length = ggio->gctl_length; 1148204076Spjd break; 1149204076Spjd case BIO_FLUSH: 1150204076Spjd cmd = HIO_FLUSH; 1151204076Spjd data = NULL; 1152204076Spjd offset = 0; 1153204076Spjd length = 0; 1154204076Spjd break; 1155204076Spjd default: 1156204076Spjd assert(!"invalid condition"); 1157204076Spjd abort(); 1158204076Spjd } 1159204076Spjd nv = nv_alloc(); 1160204076Spjd nv_add_uint8(nv, cmd, "cmd"); 1161204076Spjd nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1162204076Spjd nv_add_uint64(nv, offset, "offset"); 1163204076Spjd nv_add_uint64(nv, length, "length"); 1164204076Spjd if (nv_error(nv) != 0) { 1165204076Spjd hio->hio_errors[ncomp] = nv_error(nv); 1166204076Spjd pjdlog_debug(2, 1167204076Spjd "remote_send: (%p) Unable to prepare header to send.", 1168204076Spjd hio); 1169204076Spjd reqlog(LOG_ERR, 0, ggio, 1170204076Spjd "Unable to prepare header to send (%s): ", 1171204076Spjd strerror(nv_error(nv))); 1172204076Spjd /* Move failed request immediately to the done queue. */ 1173204076Spjd goto done_queue; 1174204076Spjd } 1175204076Spjd pjdlog_debug(2, 1176204076Spjd "remote_send: (%p) Moving request to the recv queue.", 1177204076Spjd hio); 1178204076Spjd /* 1179204076Spjd * Protect connection from disappearing. 1180204076Spjd */ 1181204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1182204076Spjd if (!ISCONNECTED(res, ncomp)) { 1183204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1184204076Spjd hio->hio_errors[ncomp] = ENOTCONN; 1185204076Spjd goto done_queue; 1186204076Spjd } 1187204076Spjd /* 1188204076Spjd * Move the request to recv queue before sending it, because 1189204076Spjd * in different order we can get reply before we move request 1190204076Spjd * to recv queue. 1191204076Spjd */ 1192204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1193204076Spjd wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1194204076Spjd TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1195204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1196204076Spjd if (hast_proto_send(res, res->hr_remoteout, nv, data, 1197204076Spjd data != NULL ? length : 0) < 0) { 1198204076Spjd hio->hio_errors[ncomp] = errno; 1199204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1200204076Spjd remote_close(res, ncomp); 1201204076Spjd pjdlog_debug(2, 1202204076Spjd "remote_send: (%p) Unable to send request.", hio); 1203204076Spjd reqlog(LOG_ERR, 0, ggio, 1204204076Spjd "Unable to send request (%s): ", 1205204076Spjd strerror(hio->hio_errors[ncomp])); 1206204076Spjd /* 1207204076Spjd * Take request back from the receive queue and move 1208204076Spjd * it immediately to the done queue. 1209204076Spjd */ 1210204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1211204076Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1212204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1213204076Spjd goto done_queue; 1214204076Spjd } 1215204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1216204076Spjd nv_free(nv); 1217204076Spjd if (wakeup) 1218204076Spjd cv_signal(&hio_recv_list_cond[ncomp]); 1219204076Spjd continue; 1220204076Spjddone_queue: 1221204076Spjd nv_free(nv); 1222204076Spjd if (ISSYNCREQ(hio)) { 1223204076Spjd if (!refcount_release(&hio->hio_countdown)) 1224204076Spjd continue; 1225204076Spjd mtx_lock(&sync_lock); 1226204076Spjd SYNCREQDONE(hio); 1227204076Spjd mtx_unlock(&sync_lock); 1228204076Spjd cv_signal(&sync_cond); 1229204076Spjd continue; 1230204076Spjd } 1231204076Spjd if (ggio->gctl_cmd == BIO_WRITE) { 1232204076Spjd mtx_lock(&res->hr_amp_lock); 1233204076Spjd if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1234204076Spjd ggio->gctl_length)) { 1235204076Spjd (void)hast_activemap_flush(res); 1236204076Spjd } 1237204076Spjd mtx_unlock(&res->hr_amp_lock); 1238204076Spjd } 1239204076Spjd if (!refcount_release(&hio->hio_countdown)) 1240204076Spjd continue; 1241204076Spjd pjdlog_debug(2, 1242204076Spjd "remote_send: (%p) Moving request to the done queue.", 1243204076Spjd hio); 1244204076Spjd QUEUE_INSERT2(hio, done); 1245204076Spjd } 1246204076Spjd /* NOTREACHED */ 1247204076Spjd return (NULL); 1248204076Spjd} 1249204076Spjd 1250204076Spjd/* 1251204076Spjd * Thread receives answer from secondary node and passes it to ggate_send 1252204076Spjd * thread. 1253204076Spjd */ 1254204076Spjdstatic void * 1255204076Spjdremote_recv_thread(void *arg) 1256204076Spjd{ 1257204076Spjd struct hast_resource *res = arg; 1258204076Spjd struct g_gate_ctl_io *ggio; 1259204076Spjd struct hio *hio; 1260204076Spjd struct nv *nv; 1261204076Spjd unsigned int ncomp; 1262204076Spjd uint64_t seq; 1263204076Spjd int error; 1264204076Spjd 1265204076Spjd /* Remote component is 1 for now. */ 1266204076Spjd ncomp = 1; 1267204076Spjd 1268204076Spjd for (;;) { 1269204076Spjd /* Wait until there is anything to receive. */ 1270204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1271204076Spjd while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1272204076Spjd pjdlog_debug(2, "remote_recv: No requests, waiting."); 1273204076Spjd cv_wait(&hio_recv_list_cond[ncomp], 1274204076Spjd &hio_recv_list_lock[ncomp]); 1275204076Spjd } 1276204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1277204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1278204076Spjd if (!ISCONNECTED(res, ncomp)) { 1279204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1280204076Spjd /* 1281204076Spjd * Connection is dead, so move all pending requests to 1282204076Spjd * the done queue (one-by-one). 1283204076Spjd */ 1284204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1285204076Spjd hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1286204076Spjd assert(hio != NULL); 1287204076Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1288204076Spjd hio_next[ncomp]); 1289204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1290204076Spjd goto done_queue; 1291204076Spjd } 1292204076Spjd if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1293204076Spjd pjdlog_errno(LOG_ERR, 1294204076Spjd "Unable to receive reply header"); 1295204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1296204076Spjd remote_close(res, ncomp); 1297204076Spjd continue; 1298204076Spjd } 1299204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1300204076Spjd seq = nv_get_uint64(nv, "seq"); 1301204076Spjd if (seq == 0) { 1302204076Spjd pjdlog_error("Header contains no 'seq' field."); 1303204076Spjd nv_free(nv); 1304204076Spjd continue; 1305204076Spjd } 1306204076Spjd mtx_lock(&hio_recv_list_lock[ncomp]); 1307204076Spjd TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1308204076Spjd if (hio->hio_ggio.gctl_seq == seq) { 1309204076Spjd TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1310204076Spjd hio_next[ncomp]); 1311204076Spjd break; 1312204076Spjd } 1313204076Spjd } 1314204076Spjd mtx_unlock(&hio_recv_list_lock[ncomp]); 1315204076Spjd if (hio == NULL) { 1316204076Spjd pjdlog_error("Found no request matching received 'seq' field (%ju).", 1317204076Spjd (uintmax_t)seq); 1318204076Spjd nv_free(nv); 1319204076Spjd continue; 1320204076Spjd } 1321204076Spjd error = nv_get_int16(nv, "error"); 1322204076Spjd if (error != 0) { 1323204076Spjd /* Request failed on remote side. */ 1324204076Spjd hio->hio_errors[ncomp] = 0; 1325204076Spjd nv_free(nv); 1326204076Spjd goto done_queue; 1327204076Spjd } 1328204076Spjd ggio = &hio->hio_ggio; 1329204076Spjd switch (ggio->gctl_cmd) { 1330204076Spjd case BIO_READ: 1331204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1332204076Spjd if (!ISCONNECTED(res, ncomp)) { 1333204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1334204076Spjd nv_free(nv); 1335204076Spjd goto done_queue; 1336204076Spjd } 1337204076Spjd if (hast_proto_recv_data(res, res->hr_remotein, nv, 1338204076Spjd ggio->gctl_data, ggio->gctl_length) < 0) { 1339204076Spjd hio->hio_errors[ncomp] = errno; 1340204076Spjd pjdlog_errno(LOG_ERR, 1341204076Spjd "Unable to receive reply data"); 1342204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1343204076Spjd nv_free(nv); 1344204076Spjd remote_close(res, ncomp); 1345204076Spjd goto done_queue; 1346204076Spjd } 1347204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1348204076Spjd break; 1349204076Spjd case BIO_WRITE: 1350204076Spjd case BIO_DELETE: 1351204076Spjd case BIO_FLUSH: 1352204076Spjd break; 1353204076Spjd default: 1354204076Spjd assert(!"invalid condition"); 1355204076Spjd abort(); 1356204076Spjd } 1357204076Spjd hio->hio_errors[ncomp] = 0; 1358204076Spjd nv_free(nv); 1359204076Spjddone_queue: 1360204076Spjd if (refcount_release(&hio->hio_countdown)) { 1361204076Spjd if (ISSYNCREQ(hio)) { 1362204076Spjd mtx_lock(&sync_lock); 1363204076Spjd SYNCREQDONE(hio); 1364204076Spjd mtx_unlock(&sync_lock); 1365204076Spjd cv_signal(&sync_cond); 1366204076Spjd } else { 1367204076Spjd pjdlog_debug(2, 1368204076Spjd "remote_recv: (%p) Moving request to the done queue.", 1369204076Spjd hio); 1370204076Spjd QUEUE_INSERT2(hio, done); 1371204076Spjd } 1372204076Spjd } 1373204076Spjd } 1374204076Spjd /* NOTREACHED */ 1375204076Spjd return (NULL); 1376204076Spjd} 1377204076Spjd 1378204076Spjd/* 1379204076Spjd * Thread sends answer to the kernel. 1380204076Spjd */ 1381204076Spjdstatic void * 1382204076Spjdggate_send_thread(void *arg) 1383204076Spjd{ 1384204076Spjd struct hast_resource *res = arg; 1385204076Spjd struct g_gate_ctl_io *ggio; 1386204076Spjd struct hio *hio; 1387204076Spjd unsigned int ii, ncomp, ncomps; 1388204076Spjd 1389204076Spjd ncomps = HAST_NCOMPONENTS; 1390204076Spjd 1391204076Spjd for (;;) { 1392204076Spjd pjdlog_debug(2, "ggate_send: Taking request."); 1393204076Spjd QUEUE_TAKE2(hio, done); 1394204076Spjd pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1395204076Spjd ggio = &hio->hio_ggio; 1396204076Spjd for (ii = 0; ii < ncomps; ii++) { 1397204076Spjd if (hio->hio_errors[ii] == 0) { 1398204076Spjd /* 1399204076Spjd * One successful request is enough to declare 1400204076Spjd * success. 1401204076Spjd */ 1402204076Spjd ggio->gctl_error = 0; 1403204076Spjd break; 1404204076Spjd } 1405204076Spjd } 1406204076Spjd if (ii == ncomps) { 1407204076Spjd /* 1408204076Spjd * None of the requests were successful. 1409204076Spjd * Use first error. 1410204076Spjd */ 1411204076Spjd ggio->gctl_error = hio->hio_errors[0]; 1412204076Spjd } 1413204076Spjd if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1414204076Spjd mtx_lock(&res->hr_amp_lock); 1415204076Spjd activemap_write_complete(res->hr_amp, 1416204076Spjd ggio->gctl_offset, ggio->gctl_length); 1417204076Spjd mtx_unlock(&res->hr_amp_lock); 1418204076Spjd } 1419204076Spjd if (ggio->gctl_cmd == BIO_WRITE) { 1420204076Spjd /* 1421204076Spjd * Unlock range we locked. 1422204076Spjd */ 1423204076Spjd mtx_lock(&range_lock); 1424204076Spjd rangelock_del(range_regular, ggio->gctl_offset, 1425204076Spjd ggio->gctl_length); 1426204076Spjd if (range_sync_wait) 1427204076Spjd cv_signal(&range_sync_cond); 1428204076Spjd mtx_unlock(&range_lock); 1429204076Spjd /* 1430204076Spjd * Bump local count if this is first write after 1431204076Spjd * connection failure with remote node. 1432204076Spjd */ 1433204076Spjd ncomp = 1; 1434204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1435204076Spjd if (!ISCONNECTED(res, ncomp)) { 1436204076Spjd mtx_lock(&metadata_lock); 1437204076Spjd if (res->hr_primary_localcnt == 1438204076Spjd res->hr_secondary_remotecnt) { 1439204076Spjd res->hr_primary_localcnt++; 1440204076Spjd pjdlog_debug(1, 1441204076Spjd "Increasing localcnt to %ju.", 1442204076Spjd (uintmax_t)res->hr_primary_localcnt); 1443204076Spjd (void)metadata_write(res); 1444204076Spjd } 1445204076Spjd mtx_unlock(&metadata_lock); 1446204076Spjd } 1447204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1448204076Spjd } 1449204076Spjd if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1450204076Spjd primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1451204076Spjd pjdlog_debug(2, 1452204076Spjd "ggate_send: (%p) Moving request to the free queue.", hio); 1453204076Spjd QUEUE_INSERT2(hio, free); 1454204076Spjd } 1455204076Spjd /* NOTREACHED */ 1456204076Spjd return (NULL); 1457204076Spjd} 1458204076Spjd 1459204076Spjd/* 1460204076Spjd * Thread synchronize local and remote components. 1461204076Spjd */ 1462204076Spjdstatic void * 1463204076Spjdsync_thread(void *arg __unused) 1464204076Spjd{ 1465204076Spjd struct hast_resource *res = arg; 1466204076Spjd struct hio *hio; 1467204076Spjd struct g_gate_ctl_io *ggio; 1468204076Spjd unsigned int ii, ncomp, ncomps; 1469204076Spjd off_t offset, length, synced; 1470204076Spjd bool dorewind; 1471204076Spjd int syncext; 1472204076Spjd 1473204076Spjd ncomps = HAST_NCOMPONENTS; 1474204076Spjd dorewind = true; 1475204076Spjd synced = 0; 1476204076Spjd 1477204076Spjd for (;;) { 1478204076Spjd mtx_lock(&sync_lock); 1479204076Spjd while (!sync_inprogress) { 1480204076Spjd dorewind = true; 1481204076Spjd synced = 0; 1482204076Spjd cv_wait(&sync_cond, &sync_lock); 1483204076Spjd } 1484204076Spjd mtx_unlock(&sync_lock); 1485204076Spjd /* 1486204076Spjd * Obtain offset at which we should synchronize. 1487204076Spjd * Rewind synchronization if needed. 1488204076Spjd */ 1489204076Spjd mtx_lock(&res->hr_amp_lock); 1490204076Spjd if (dorewind) 1491204076Spjd activemap_sync_rewind(res->hr_amp); 1492204076Spjd offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1493204076Spjd if (syncext != -1) { 1494204076Spjd /* 1495204076Spjd * We synchronized entire syncext extent, we can mark 1496204076Spjd * it as clean now. 1497204076Spjd */ 1498204076Spjd if (activemap_extent_complete(res->hr_amp, syncext)) 1499204076Spjd (void)hast_activemap_flush(res); 1500204076Spjd } 1501204076Spjd mtx_unlock(&res->hr_amp_lock); 1502204076Spjd if (dorewind) { 1503204076Spjd dorewind = false; 1504204076Spjd if (offset < 0) 1505204076Spjd pjdlog_info("Nodes are in sync."); 1506204076Spjd else { 1507204076Spjd pjdlog_info("Synchronization started. %ju bytes to go.", 1508204076Spjd (uintmax_t)(res->hr_extentsize * 1509204076Spjd activemap_ndirty(res->hr_amp))); 1510204076Spjd } 1511204076Spjd } 1512204076Spjd if (offset < 0) { 1513204076Spjd mtx_lock(&sync_lock); 1514204076Spjd sync_inprogress = false; 1515204076Spjd mtx_unlock(&sync_lock); 1516204076Spjd pjdlog_debug(1, "Nothing to synchronize."); 1517204076Spjd /* 1518204076Spjd * Synchronization complete, make both localcnt and 1519204076Spjd * remotecnt equal. 1520204076Spjd */ 1521204076Spjd ncomp = 1; 1522204076Spjd rw_rlock(&hio_remote_lock[ncomp]); 1523204076Spjd if (ISCONNECTED(res, ncomp)) { 1524204076Spjd if (synced > 0) { 1525204076Spjd pjdlog_info("Synchronization complete. " 1526204076Spjd "%jd bytes synchronized.", 1527204076Spjd (intmax_t)synced); 1528204076Spjd } 1529204076Spjd mtx_lock(&metadata_lock); 1530204076Spjd res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1531204076Spjd res->hr_primary_localcnt = 1532204076Spjd res->hr_secondary_localcnt; 1533204076Spjd res->hr_primary_remotecnt = 1534204076Spjd res->hr_secondary_remotecnt; 1535204076Spjd pjdlog_debug(1, 1536204076Spjd "Setting localcnt to %ju and remotecnt to %ju.", 1537204076Spjd (uintmax_t)res->hr_primary_localcnt, 1538204076Spjd (uintmax_t)res->hr_secondary_localcnt); 1539204076Spjd (void)metadata_write(res); 1540204076Spjd mtx_unlock(&metadata_lock); 1541204076Spjd } else if (synced > 0) { 1542204076Spjd pjdlog_info("Synchronization interrupted. " 1543204076Spjd "%jd bytes synchronized so far.", 1544204076Spjd (intmax_t)synced); 1545204076Spjd } 1546204076Spjd rw_unlock(&hio_remote_lock[ncomp]); 1547204076Spjd continue; 1548204076Spjd } 1549204076Spjd pjdlog_debug(2, "sync: Taking free request."); 1550204076Spjd QUEUE_TAKE2(hio, free); 1551204076Spjd pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1552204076Spjd /* 1553204076Spjd * Lock the range we are going to synchronize. We don't want 1554204076Spjd * race where someone writes between our read and write. 1555204076Spjd */ 1556204076Spjd for (;;) { 1557204076Spjd mtx_lock(&range_lock); 1558204076Spjd if (rangelock_islocked(range_regular, offset, length)) { 1559204076Spjd pjdlog_debug(2, 1560204076Spjd "sync: Range offset=%jd length=%jd locked.", 1561204076Spjd (intmax_t)offset, (intmax_t)length); 1562204076Spjd range_sync_wait = true; 1563204076Spjd cv_wait(&range_sync_cond, &range_lock); 1564204076Spjd range_sync_wait = false; 1565204076Spjd mtx_unlock(&range_lock); 1566204076Spjd continue; 1567204076Spjd } 1568204076Spjd if (rangelock_add(range_sync, offset, length) < 0) { 1569204076Spjd mtx_unlock(&range_lock); 1570204076Spjd pjdlog_debug(2, 1571204076Spjd "sync: Range offset=%jd length=%jd is already locked, waiting.", 1572204076Spjd (intmax_t)offset, (intmax_t)length); 1573204076Spjd sleep(1); 1574204076Spjd continue; 1575204076Spjd } 1576204076Spjd mtx_unlock(&range_lock); 1577204076Spjd break; 1578204076Spjd } 1579204076Spjd /* 1580204076Spjd * First read the data from synchronization source. 1581204076Spjd */ 1582204076Spjd SYNCREQ(hio); 1583204076Spjd ggio = &hio->hio_ggio; 1584204076Spjd ggio->gctl_cmd = BIO_READ; 1585204076Spjd ggio->gctl_offset = offset; 1586204076Spjd ggio->gctl_length = length; 1587204076Spjd ggio->gctl_error = 0; 1588204076Spjd for (ii = 0; ii < ncomps; ii++) 1589204076Spjd hio->hio_errors[ii] = EINVAL; 1590204076Spjd reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1591204076Spjd hio); 1592204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1593204076Spjd hio); 1594204076Spjd mtx_lock(&metadata_lock); 1595204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1596204076Spjd /* 1597204076Spjd * This range is up-to-date on local component, 1598204076Spjd * so handle request locally. 1599204076Spjd */ 1600204076Spjd /* Local component is 0 for now. */ 1601204076Spjd ncomp = 0; 1602204076Spjd } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1603204076Spjd assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1604204076Spjd /* 1605204076Spjd * This range is out-of-date on local component, 1606204076Spjd * so send request to the remote node. 1607204076Spjd */ 1608204076Spjd /* Remote component is 1 for now. */ 1609204076Spjd ncomp = 1; 1610204076Spjd } 1611204076Spjd mtx_unlock(&metadata_lock); 1612204076Spjd refcount_init(&hio->hio_countdown, 1); 1613204076Spjd QUEUE_INSERT1(hio, send, ncomp); 1614204076Spjd 1615204076Spjd /* 1616204076Spjd * Let's wait for READ to finish. 1617204076Spjd */ 1618204076Spjd mtx_lock(&sync_lock); 1619204076Spjd while (!ISSYNCREQDONE(hio)) 1620204076Spjd cv_wait(&sync_cond, &sync_lock); 1621204076Spjd mtx_unlock(&sync_lock); 1622204076Spjd 1623204076Spjd if (hio->hio_errors[ncomp] != 0) { 1624204076Spjd pjdlog_error("Unable to read synchronization data: %s.", 1625204076Spjd strerror(hio->hio_errors[ncomp])); 1626204076Spjd goto free_queue; 1627204076Spjd } 1628204076Spjd 1629204076Spjd /* 1630204076Spjd * We read the data from synchronization source, now write it 1631204076Spjd * to synchronization target. 1632204076Spjd */ 1633204076Spjd SYNCREQ(hio); 1634204076Spjd ggio->gctl_cmd = BIO_WRITE; 1635204076Spjd for (ii = 0; ii < ncomps; ii++) 1636204076Spjd hio->hio_errors[ii] = EINVAL; 1637204076Spjd reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1638204076Spjd hio); 1639204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1640204076Spjd hio); 1641204076Spjd mtx_lock(&metadata_lock); 1642204076Spjd if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1643204076Spjd /* 1644204076Spjd * This range is up-to-date on local component, 1645204076Spjd * so we update remote component. 1646204076Spjd */ 1647204076Spjd /* Remote component is 1 for now. */ 1648204076Spjd ncomp = 1; 1649204076Spjd } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1650204076Spjd assert(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1651204076Spjd /* 1652204076Spjd * This range is out-of-date on local component, 1653204076Spjd * so we update it. 1654204076Spjd */ 1655204076Spjd /* Local component is 0 for now. */ 1656204076Spjd ncomp = 0; 1657204076Spjd } 1658204076Spjd mtx_unlock(&metadata_lock); 1659204076Spjd 1660204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1661204076Spjd hio); 1662204076Spjd refcount_init(&hio->hio_countdown, 1); 1663204076Spjd QUEUE_INSERT1(hio, send, ncomp); 1664204076Spjd 1665204076Spjd /* 1666204076Spjd * Let's wait for WRITE to finish. 1667204076Spjd */ 1668204076Spjd mtx_lock(&sync_lock); 1669204076Spjd while (!ISSYNCREQDONE(hio)) 1670204076Spjd cv_wait(&sync_cond, &sync_lock); 1671204076Spjd mtx_unlock(&sync_lock); 1672204076Spjd 1673204076Spjd if (hio->hio_errors[ncomp] != 0) { 1674204076Spjd pjdlog_error("Unable to write synchronization data: %s.", 1675204076Spjd strerror(hio->hio_errors[ncomp])); 1676204076Spjd goto free_queue; 1677204076Spjd } 1678204076Spjdfree_queue: 1679204076Spjd mtx_lock(&range_lock); 1680204076Spjd rangelock_del(range_sync, offset, length); 1681204076Spjd if (range_regular_wait) 1682204076Spjd cv_signal(&range_regular_cond); 1683204076Spjd mtx_unlock(&range_lock); 1684204076Spjd 1685204076Spjd synced += length; 1686204076Spjd 1687204076Spjd pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1688204076Spjd hio); 1689204076Spjd QUEUE_INSERT2(hio, free); 1690204076Spjd } 1691204076Spjd /* NOTREACHED */ 1692204076Spjd return (NULL); 1693204076Spjd} 1694204076Spjd 1695204076Spjdstatic void 1696204076Spjdsighandler(int sig) 1697204076Spjd{ 1698204076Spjd bool unlock; 1699204076Spjd 1700204076Spjd switch (sig) { 1701204076Spjd case SIGINT: 1702204076Spjd case SIGTERM: 1703204076Spjd sigexit_received = true; 1704204076Spjd break; 1705204076Spjd default: 1706204076Spjd assert(!"invalid condition"); 1707204076Spjd } 1708204076Spjd /* 1709204076Spjd * XXX: Racy, but if we cannot obtain hio_guard_lock here, we don't 1710204076Spjd * want to risk deadlock. 1711204076Spjd */ 1712204076Spjd unlock = mtx_trylock(&hio_guard_lock); 1713204076Spjd cv_signal(&hio_guard_cond); 1714204076Spjd if (unlock) 1715204076Spjd mtx_unlock(&hio_guard_lock); 1716204076Spjd} 1717204076Spjd 1718204076Spjd/* 1719204076Spjd * Thread guards remote connections and reconnects when needed, handles 1720204076Spjd * signals, etc. 1721204076Spjd */ 1722204076Spjdstatic void * 1723204076Spjdguard_thread(void *arg) 1724204076Spjd{ 1725204076Spjd struct hast_resource *res = arg; 1726205738Spjd struct proto_conn *in, *out; 1727204076Spjd unsigned int ii, ncomps; 1728204076Spjd int timeout; 1729204076Spjd 1730204076Spjd ncomps = HAST_NCOMPONENTS; 1731204076Spjd /* The is only one remote component for now. */ 1732204076Spjd#define ISREMOTE(no) ((no) == 1) 1733204076Spjd 1734204076Spjd for (;;) { 1735204076Spjd if (sigexit_received) { 1736204076Spjd primary_exitx(EX_OK, 1737204076Spjd "Termination signal received, exiting."); 1738204076Spjd } 1739204076Spjd /* 1740204076Spjd * If all the connection will be fine, we will sleep until 1741204076Spjd * someone wakes us up. 1742204076Spjd * If any of the connections will be broken and we won't be 1743204076Spjd * able to connect, we will sleep only for RECONNECT_SLEEP 1744204076Spjd * seconds so we can retry soon. 1745204076Spjd */ 1746204076Spjd timeout = 0; 1747204076Spjd pjdlog_debug(2, "remote_guard: Checking connections."); 1748204076Spjd mtx_lock(&hio_guard_lock); 1749204076Spjd for (ii = 0; ii < ncomps; ii++) { 1750204076Spjd if (!ISREMOTE(ii)) 1751204076Spjd continue; 1752204076Spjd rw_rlock(&hio_remote_lock[ii]); 1753204076Spjd if (ISCONNECTED(res, ii)) { 1754204076Spjd assert(res->hr_remotein != NULL); 1755204076Spjd assert(res->hr_remoteout != NULL); 1756204076Spjd rw_unlock(&hio_remote_lock[ii]); 1757204076Spjd pjdlog_debug(2, 1758204076Spjd "remote_guard: Connection to %s is ok.", 1759204076Spjd res->hr_remoteaddr); 1760204076Spjd } else { 1761204076Spjd assert(res->hr_remotein == NULL); 1762204076Spjd assert(res->hr_remoteout == NULL); 1763204076Spjd /* 1764204076Spjd * Upgrade the lock. It doesn't have to be 1765204076Spjd * atomic as no other thread can change 1766204076Spjd * connection status from disconnected to 1767204076Spjd * connected. 1768204076Spjd */ 1769204076Spjd rw_unlock(&hio_remote_lock[ii]); 1770204076Spjd pjdlog_debug(2, 1771204076Spjd "remote_guard: Reconnecting to %s.", 1772204076Spjd res->hr_remoteaddr); 1773205738Spjd in = out = NULL; 1774205738Spjd if (init_remote(res, &in, &out)) { 1775205738Spjd rw_wlock(&hio_remote_lock[ii]); 1776205738Spjd assert(res->hr_remotein == NULL); 1777205738Spjd assert(res->hr_remoteout == NULL); 1778205738Spjd assert(in != NULL && out != NULL); 1779205738Spjd res->hr_remotein = in; 1780205738Spjd res->hr_remoteout = out; 1781205738Spjd rw_unlock(&hio_remote_lock[ii]); 1782204076Spjd pjdlog_info("Successfully reconnected to %s.", 1783204076Spjd res->hr_remoteaddr); 1784205738Spjd sync_start(); 1785204076Spjd } else { 1786204076Spjd /* Both connections should be NULL. */ 1787204076Spjd assert(res->hr_remotein == NULL); 1788204076Spjd assert(res->hr_remoteout == NULL); 1789205738Spjd assert(in == NULL && out == NULL); 1790204076Spjd pjdlog_debug(2, 1791204076Spjd "remote_guard: Reconnect to %s failed.", 1792204076Spjd res->hr_remoteaddr); 1793204076Spjd timeout = RECONNECT_SLEEP; 1794204076Spjd } 1795204076Spjd } 1796204076Spjd } 1797204076Spjd (void)cv_timedwait(&hio_guard_cond, &hio_guard_lock, timeout); 1798204076Spjd mtx_unlock(&hio_guard_lock); 1799204076Spjd } 1800204076Spjd#undef ISREMOTE 1801204076Spjd /* NOTREACHED */ 1802204076Spjd return (NULL); 1803204076Spjd} 1804