primary.c revision 218218
1231200Smm/*- 2231200Smm * Copyright (c) 2009 The FreeBSD Foundation 3231200Smm * Copyright (c) 2010 Pawel Jakub Dawidek <pjd@FreeBSD.org> 4232153Smm * All rights reserved. 5231200Smm * 6231200Smm * This software was developed by Pawel Jakub Dawidek under sponsorship from 7231200Smm * the FreeBSD Foundation. 8231200Smm * 9231200Smm * Redistribution and use in source and binary forms, with or without 10231200Smm * modification, are permitted provided that the following conditions 11231200Smm * are met: 12231200Smm * 1. Redistributions of source code must retain the above copyright 13231200Smm * notice, this list of conditions and the following disclaimer. 14231200Smm * 2. Redistributions in binary form must reproduce the above copyright 15231200Smm * notice, this list of conditions and the following disclaimer in the 16231200Smm * documentation and/or other materials provided with the distribution. 17231200Smm * 18231200Smm * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 19231200Smm * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 20231200Smm * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 21231200Smm * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 22231200Smm * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 23231200Smm * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 24231200Smm * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 25231200Smm * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 26231200Smm * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 27231200Smm * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 28231200Smm * SUCH DAMAGE. 29231200Smm */ 30231200Smm 31231200Smm#include <sys/cdefs.h> 32231200Smm__FBSDID("$FreeBSD: head/sbin/hastd/primary.c 218218 2011-02-03 11:39:49Z pjd $"); 33231200Smm 34231200Smm#include <sys/types.h> 35231200Smm#include <sys/time.h> 36231200Smm#include <sys/bio.h> 37231200Smm#include <sys/disk.h> 38231200Smm#include <sys/refcount.h> 39231200Smm#include <sys/stat.h> 40231200Smm 41231200Smm#include <geom/gate/g_gate.h> 42231200Smm 43231200Smm#include <err.h> 44231200Smm#include <errno.h> 45231200Smm#include <fcntl.h> 46231200Smm#include <libgeom.h> 47231200Smm#include <pthread.h> 48231200Smm#include <signal.h> 49358088Smm#include <stdint.h> 50231200Smm#include <stdio.h> 51231200Smm#include <string.h> 52231200Smm#include <sysexits.h> 53231200Smm#include <unistd.h> 54231200Smm 55231200Smm#include <activemap.h> 56231200Smm#include <nv.h> 57231200Smm#include <rangelock.h> 58231200Smm 59231200Smm#include "control.h" 60231200Smm#include "event.h" 61231200Smm#include "hast.h" 62231200Smm#include "hast_proto.h" 63231200Smm#include "hastd.h" 64231200Smm#include "hooks.h" 65231200Smm#include "metadata.h" 66231200Smm#include "proto.h" 67231200Smm#include "pjdlog.h" 68231200Smm#include "subr.h" 69231200Smm#include "synch.h" 70231200Smm 71231200Smm/* The is only one remote component for now. */ 72231200Smm#define ISREMOTE(no) ((no) == 1) 73231200Smm 74231200Smmstruct hio { 75231200Smm /* 76231200Smm * Number of components we are still waiting for. 77231200Smm * When this field goes to 0, we can send the request back to the 78231200Smm * kernel. Each component has to decrease this counter by one 79231200Smm * even on failure. 80231200Smm */ 81231200Smm unsigned int hio_countdown; 82231200Smm /* 83231200Smm * Each component has a place to store its own error. 84231200Smm * Once the request is handled by all components we can decide if the 85231200Smm * request overall is successful or not. 86231200Smm */ 87231200Smm int *hio_errors; 88231200Smm /* 89231200Smm * Structure used to comunicate with GEOM Gate class. 90231200Smm */ 91231200Smm struct g_gate_ctl_io hio_ggio; 92231200Smm TAILQ_ENTRY(hio) *hio_next; 93231200Smm}; 94231200Smm#define hio_free_next hio_next[0] 95231200Smm#define hio_done_next hio_next[0] 96231200Smm 97231200Smm/* 98231200Smm * Free list holds unused structures. When free list is empty, we have to wait 99231200Smm * until some in-progress requests are freed. 100231200Smm */ 101231200Smmstatic TAILQ_HEAD(, hio) hio_free_list; 102231200Smmstatic pthread_mutex_t hio_free_list_lock; 103231200Smmstatic pthread_cond_t hio_free_list_cond; 104231200Smm/* 105231200Smm * There is one send list for every component. One requests is placed on all 106231200Smm * send lists - each component gets the same request, but each component is 107231200Smm * responsible for managing his own send list. 108231200Smm */ 109231200Smmstatic TAILQ_HEAD(, hio) *hio_send_list; 110231200Smmstatic pthread_mutex_t *hio_send_list_lock; 111231200Smmstatic pthread_cond_t *hio_send_list_cond; 112231200Smm/* 113231200Smm * There is one recv list for every component, although local components don't 114231200Smm * use recv lists as local requests are done synchronously. 115231200Smm */ 116231200Smmstatic TAILQ_HEAD(, hio) *hio_recv_list; 117231200Smmstatic pthread_mutex_t *hio_recv_list_lock; 118231200Smmstatic pthread_cond_t *hio_recv_list_cond; 119231200Smm/* 120231200Smm * Request is placed on done list by the slowest component (the one that 121231200Smm * decreased hio_countdown from 1 to 0). 122231200Smm */ 123311041Smmstatic TAILQ_HEAD(, hio) hio_done_list; 124231200Smmstatic pthread_mutex_t hio_done_list_lock; 125311041Smmstatic pthread_cond_t hio_done_list_cond; 126231200Smm/* 127231200Smm * Structure below are for interaction with sync thread. 128231200Smm */ 129231200Smmstatic bool sync_inprogress; 130231200Smmstatic pthread_mutex_t sync_lock; 131231200Smmstatic pthread_cond_t sync_cond; 132231200Smm/* 133231200Smm * The lock below allows to synchornize access to remote connections. 134231200Smm */ 135231200Smmstatic pthread_rwlock_t *hio_remote_lock; 136231200Smm 137231200Smm/* 138231200Smm * Lock to synchronize metadata updates. Also synchronize access to 139231200Smm * hr_primary_localcnt and hr_primary_remotecnt fields. 140231200Smm */ 141231200Smmstatic pthread_mutex_t metadata_lock; 142231200Smm 143231200Smm/* 144231200Smm * Maximum number of outstanding I/O requests. 145231200Smm */ 146231200Smm#define HAST_HIO_MAX 256 147231200Smm/* 148231200Smm * Number of components. At this point there are only two components: local 149231200Smm * and remote, but in the future it might be possible to use multiple local 150231200Smm * and remote components. 151231200Smm */ 152231200Smm#define HAST_NCOMPONENTS 2 153231200Smm/* 154231200Smm * Number of seconds to sleep between reconnect retries or keepalive packets. 155231200Smm */ 156231200Smm#define RETRY_SLEEP 10 157231200Smm 158231200Smm#define ISCONNECTED(res, no) \ 159231200Smm ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) 160231200Smm 161231200Smm#define QUEUE_INSERT1(hio, name, ncomp) do { \ 162231200Smm bool _wakeup; \ 163231200Smm \ 164231200Smm mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 165231200Smm _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ 166231200Smm TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ 167231200Smm hio_next[(ncomp)]); \ 168231200Smm mtx_unlock(&hio_##name##_list_lock[ncomp]); \ 169231200Smm if (_wakeup) \ 170231200Smm cv_signal(&hio_##name##_list_cond[(ncomp)]); \ 171231200Smm} while (0) 172231200Smm#define QUEUE_INSERT2(hio, name) do { \ 173231200Smm bool _wakeup; \ 174231200Smm \ 175231200Smm mtx_lock(&hio_##name##_list_lock); \ 176231200Smm _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ 177231200Smm TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ 178231200Smm mtx_unlock(&hio_##name##_list_lock); \ 179231200Smm if (_wakeup) \ 180231200Smm cv_signal(&hio_##name##_list_cond); \ 181232153Smm} while (0) 182232153Smm#define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ 183231200Smm bool _last; \ 184231200Smm \ 185231200Smm mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ 186231200Smm _last = false; \ 187231200Smm while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ 188231200Smm cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ 189231200Smm &hio_##name##_list_lock[(ncomp)], (timeout)); \ 190231200Smm if ((timeout) != 0) \ 191231200Smm _last = true; \ 192231200Smm } \ 193231200Smm if (hio != NULL) { \ 194231200Smm TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ 195231200Smm hio_next[(ncomp)]); \ 196231200Smm } \ 197231200Smm mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ 198231200Smm} while (0) 199231200Smm#define QUEUE_TAKE2(hio, name) do { \ 200231200Smm mtx_lock(&hio_##name##_list_lock); \ 201231200Smm while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ 202231200Smm cv_wait(&hio_##name##_list_cond, \ 203231200Smm &hio_##name##_list_lock); \ 204231200Smm } \ 205231200Smm TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ 206231200Smm mtx_unlock(&hio_##name##_list_lock); \ 207231200Smm} while (0) 208231200Smm 209231200Smm#define SYNCREQ(hio) do { \ 210231200Smm (hio)->hio_ggio.gctl_unit = -1; \ 211231200Smm (hio)->hio_ggio.gctl_seq = 1; \ 212231200Smm} while (0) 213231200Smm#define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) 214231200Smm#define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) 215231200Smm#define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) 216231200Smm 217231200Smmstatic struct hast_resource *gres; 218232153Smm 219232153Smmstatic pthread_mutex_t range_lock; 220231200Smmstatic struct rangelocks *range_regular; 221232153Smmstatic bool range_regular_wait; 222232153Smmstatic pthread_cond_t range_regular_cond; 223232153Smmstatic struct rangelocks *range_sync; 224232153Smmstatic bool range_sync_wait; 225231200Smmstatic pthread_cond_t range_sync_cond; 226231200Smm 227231200Smmstatic void *ggate_recv_thread(void *arg); 228231200Smmstatic void *local_send_thread(void *arg); 229231200Smmstatic void *remote_send_thread(void *arg); 230231200Smmstatic void *remote_recv_thread(void *arg); 231231200Smmstatic void *ggate_send_thread(void *arg); 232231200Smmstatic void *sync_thread(void *arg); 233231200Smmstatic void *guard_thread(void *arg); 234231200Smm 235231200Smmstatic void 236231200Smmcleanup(struct hast_resource *res) 237231200Smm{ 238231200Smm int rerrno; 239231200Smm 240231200Smm /* Remember errno. */ 241231200Smm rerrno = errno; 242231200Smm 243231200Smm /* Destroy ggate provider if we created one. */ 244231200Smm if (res->hr_ggateunit >= 0) { 245231200Smm struct g_gate_ctl_destroy ggiod; 246231200Smm 247231200Smm bzero(&ggiod, sizeof(ggiod)); 248231200Smm ggiod.gctl_version = G_GATE_VERSION; 249231200Smm ggiod.gctl_unit = res->hr_ggateunit; 250231200Smm ggiod.gctl_force = 1; 251238856Smm if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) < 0) { 252238856Smm pjdlog_errno(LOG_WARNING, 253231200Smm "Unable to destroy hast/%s device", 254231200Smm res->hr_provname); 255231200Smm } 256231200Smm res->hr_ggateunit = -1; 257231200Smm } 258231200Smm 259231200Smm /* Restore errno. */ 260231200Smm errno = rerrno; 261231200Smm} 262231200Smm 263231200Smmstatic __dead2 void 264231200Smmprimary_exit(int exitcode, const char *fmt, ...) 265238856Smm{ 266231200Smm va_list ap; 267231200Smm 268231200Smm PJDLOG_ASSERT(exitcode != EX_OK); 269231200Smm va_start(ap, fmt); 270231200Smm pjdlogv_errno(LOG_ERR, fmt, ap); 271231200Smm va_end(ap); 272231200Smm cleanup(gres); 273231200Smm exit(exitcode); 274231200Smm} 275231200Smm 276231200Smmstatic __dead2 void 277231200Smmprimary_exitx(int exitcode, const char *fmt, ...) 278231200Smm{ 279231200Smm va_list ap; 280231200Smm 281231200Smm va_start(ap, fmt); 282232153Smm pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); 283231200Smm va_end(ap); 284231200Smm cleanup(gres); 285231200Smm exit(exitcode); 286231200Smm} 287231200Smm 288231200Smmstatic int 289231200Smmhast_activemap_flush(struct hast_resource *res) 290231200Smm{ 291231200Smm const unsigned char *buf; 292231200Smm size_t size; 293231200Smm 294231200Smm buf = activemap_bitmap(res->hr_amp, &size); 295231200Smm PJDLOG_ASSERT(buf != NULL); 296231200Smm PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); 297231200Smm if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != 298231200Smm (ssize_t)size) { 299231200Smm KEEP_ERRNO(pjdlog_errno(LOG_ERR, 300231200Smm "Unable to flush activemap to disk")); 301231200Smm return (-1); 302231200Smm } 303231200Smm return (0); 304231200Smm} 305231200Smm 306232153Smmstatic bool 307231200Smmreal_remote(const struct hast_resource *res) 308231200Smm{ 309231200Smm 310231200Smm return (strcmp(res->hr_remoteaddr, "none") != 0); 311232153Smm} 312232153Smm 313232153Smmstatic void 314232153Smminit_environment(struct hast_resource *res __unused) 315232153Smm{ 316232153Smm struct hio *hio; 317232153Smm unsigned int ii, ncomps; 318232153Smm 319232153Smm /* 320232153Smm * In the future it might be per-resource value. 321232153Smm */ 322231200Smm ncomps = HAST_NCOMPONENTS; 323232153Smm 324232153Smm /* 325231200Smm * Allocate memory needed by lists. 326231200Smm */ 327232153Smm hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); 328232153Smm if (hio_send_list == NULL) { 329232153Smm primary_exitx(EX_TEMPFAIL, 330232153Smm "Unable to allocate %zu bytes of memory for send lists.", 331232153Smm sizeof(hio_send_list[0]) * ncomps); 332232153Smm } 333232153Smm hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); 334232153Smm if (hio_send_list_lock == NULL) { 335232153Smm primary_exitx(EX_TEMPFAIL, 336232153Smm "Unable to allocate %zu bytes of memory for send list locks.", 337232153Smm sizeof(hio_send_list_lock[0]) * ncomps); 338232153Smm } 339232153Smm hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); 340232153Smm if (hio_send_list_cond == NULL) { 341232153Smm primary_exitx(EX_TEMPFAIL, 342232153Smm "Unable to allocate %zu bytes of memory for send list condition variables.", 343342360Smm sizeof(hio_send_list_cond[0]) * ncomps); 344232153Smm } 345232153Smm hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); 346232153Smm if (hio_recv_list == NULL) { 347232153Smm primary_exitx(EX_TEMPFAIL, 348232153Smm "Unable to allocate %zu bytes of memory for recv lists.", 349232153Smm sizeof(hio_recv_list[0]) * ncomps); 350232153Smm } 351232153Smm hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); 352232153Smm if (hio_recv_list_lock == NULL) { 353232153Smm primary_exitx(EX_TEMPFAIL, 354232153Smm "Unable to allocate %zu bytes of memory for recv list locks.", 355232153Smm sizeof(hio_recv_list_lock[0]) * ncomps); 356232153Smm } 357232153Smm hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); 358232153Smm if (hio_recv_list_cond == NULL) { 359232153Smm primary_exitx(EX_TEMPFAIL, 360232153Smm "Unable to allocate %zu bytes of memory for recv list condition variables.", 361232153Smm sizeof(hio_recv_list_cond[0]) * ncomps); 362232153Smm } 363232153Smm hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); 364232153Smm if (hio_remote_lock == NULL) { 365232153Smm primary_exitx(EX_TEMPFAIL, 366232153Smm "Unable to allocate %zu bytes of memory for remote connections locks.", 367232153Smm sizeof(hio_remote_lock[0]) * ncomps); 368231200Smm } 369231200Smm 370231200Smm /* 371232153Smm * Initialize lists, their locks and theirs condition variables. 372311041Smm */ 373232153Smm TAILQ_INIT(&hio_free_list); 374232153Smm mtx_init(&hio_free_list_lock); 375232153Smm cv_init(&hio_free_list_cond); 376232153Smm for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { 377232153Smm TAILQ_INIT(&hio_send_list[ii]); 378232153Smm mtx_init(&hio_send_list_lock[ii]); 379232153Smm cv_init(&hio_send_list_cond[ii]); 380232153Smm TAILQ_INIT(&hio_recv_list[ii]); 381232153Smm mtx_init(&hio_recv_list_lock[ii]); 382232153Smm cv_init(&hio_recv_list_cond[ii]); 383232153Smm rw_init(&hio_remote_lock[ii]); 384232153Smm } 385232153Smm TAILQ_INIT(&hio_done_list); 386232153Smm mtx_init(&hio_done_list_lock); 387231200Smm cv_init(&hio_done_list_cond); 388231200Smm mtx_init(&metadata_lock); 389231200Smm 390231200Smm /* 391231200Smm * Allocate requests pool and initialize requests. 392231200Smm */ 393232153Smm for (ii = 0; ii < HAST_HIO_MAX; ii++) { 394232153Smm hio = malloc(sizeof(*hio)); 395231200Smm if (hio == NULL) { 396231200Smm primary_exitx(EX_TEMPFAIL, 397231200Smm "Unable to allocate %zu bytes of memory for hio request.", 398231200Smm sizeof(*hio)); 399231200Smm } 400231200Smm hio->hio_countdown = 0; 401231200Smm hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); 402231200Smm if (hio->hio_errors == NULL) { 403231200Smm primary_exitx(EX_TEMPFAIL, 404231200Smm "Unable allocate %zu bytes of memory for hio errors.", 405231200Smm sizeof(hio->hio_errors[0]) * ncomps); 406231200Smm } 407231200Smm hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); 408232153Smm if (hio->hio_next == NULL) { 409232153Smm primary_exitx(EX_TEMPFAIL, 410231200Smm "Unable allocate %zu bytes of memory for hio_next field.", 411231200Smm sizeof(hio->hio_next[0]) * ncomps); 412231200Smm } 413231200Smm hio->hio_ggio.gctl_version = G_GATE_VERSION; 414231200Smm hio->hio_ggio.gctl_data = malloc(MAXPHYS); 415231200Smm if (hio->hio_ggio.gctl_data == NULL) { 416231200Smm primary_exitx(EX_TEMPFAIL, 417231200Smm "Unable to allocate %zu bytes of memory for gctl_data.", 418231200Smm MAXPHYS); 419231200Smm } 420231200Smm hio->hio_ggio.gctl_length = MAXPHYS; 421231200Smm hio->hio_ggio.gctl_error = 0; 422231200Smm TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); 423231200Smm } 424232153Smm} 425232153Smm 426231200Smmstatic bool 427231200Smminit_resuid(struct hast_resource *res) 428231200Smm{ 429231200Smm 430231200Smm mtx_lock(&metadata_lock); 431231200Smm if (res->hr_resuid != 0) { 432231200Smm mtx_unlock(&metadata_lock); 433231200Smm return (false); 434231200Smm } else { 435231200Smm /* Initialize unique resource identifier. */ 436231200Smm arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); 437231200Smm mtx_unlock(&metadata_lock); 438231200Smm if (metadata_write(res) < 0) 439231200Smm exit(EX_NOINPUT); 440231200Smm return (true); 441231200Smm } 442232153Smm} 443232153Smm 444231200Smmstatic void 445231200Smminit_local(struct hast_resource *res) 446231200Smm{ 447231200Smm unsigned char *buf; 448231200Smm size_t mapsize; 449231200Smm 450231200Smm if (metadata_read(res, true) < 0) 451231200Smm exit(EX_NOINPUT); 452231200Smm mtx_init(&res->hr_amp_lock); 453231200Smm if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, 454231200Smm res->hr_local_sectorsize, res->hr_keepdirty) < 0) { 455231200Smm primary_exit(EX_TEMPFAIL, "Unable to create activemap"); 456231200Smm } 457231200Smm mtx_init(&range_lock); 458231200Smm cv_init(&range_regular_cond); 459232153Smm if (rangelock_init(&range_regular) < 0) 460232153Smm primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); 461231200Smm cv_init(&range_sync_cond); 462231200Smm if (rangelock_init(&range_sync) < 0) 463231200Smm primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); 464231200Smm mapsize = activemap_ondisk_size(res->hr_amp); 465231200Smm buf = calloc(1, mapsize); 466231200Smm if (buf == NULL) { 467231200Smm primary_exitx(EX_TEMPFAIL, 468231200Smm "Unable to allocate buffer for activemap."); 469231200Smm } 470231200Smm if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != 471302075Smm (ssize_t)mapsize) { 472231200Smm primary_exit(EX_NOINPUT, "Unable to read activemap"); 473231200Smm } 474231200Smm activemap_copyin(res->hr_amp, buf, mapsize); 475231200Smm free(buf); 476231200Smm if (res->hr_resuid != 0) 477231200Smm return; 478231200Smm /* 479231200Smm * We're using provider for the first time. Initialize local and remote 480302075Smm * counters. We don't initialize resuid here, as we want to do it just 481231200Smm * in time. The reason for this is that we want to inform secondary 482313570Smm * that there were no writes yet, so there is no need to synchronize 483231200Smm * anything. 484232153Smm */ 485231200Smm res->hr_primary_localcnt = 1; 486313570Smm res->hr_primary_remotecnt = 0; 487232153Smm if (metadata_write(res) < 0) 488302075Smm exit(EX_NOINPUT); 489302075Smm} 490313570Smm 491232153Smmstatic int 492302075Smmprimary_connect(struct hast_resource *res, struct proto_conn **connp) 493302075Smm{ 494231200Smm struct proto_conn *conn; 495232153Smm int16_t val; 496231200Smm 497231200Smm val = 1; 498231200Smm if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { 499231200Smm primary_exit(EX_TEMPFAIL, 500231200Smm "Unable to send connection request to parent"); 501302075Smm } 502231200Smm if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { 503231200Smm primary_exit(EX_TEMPFAIL, 504231200Smm "Unable to receive reply to connection request from parent"); 505231200Smm } 506231200Smm if (val != 0) { 507231200Smm errno = val; 508231200Smm pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 509231200Smm res->hr_remoteaddr); 510302075Smm return (-1); 511231200Smm } 512313570Smm if (proto_connection_recv(res->hr_conn, true, &conn) < 0) { 513231200Smm primary_exit(EX_TEMPFAIL, 514232153Smm "Unable to receive connection from parent"); 515231200Smm } 516231200Smm if (proto_connect_wait(conn, HAST_TIMEOUT) < 0) { 517232153Smm pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 518302075Smm res->hr_remoteaddr); 519302075Smm proto_close(conn); 520231200Smm return (-1); 521232153Smm } 522302075Smm /* Error in setting timeout is not critical, but why should it fail? */ 523302075Smm if (proto_timeout(conn, res->hr_timeout) < 0) 524231200Smm pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 525232153Smm 526231200Smm *connp = conn; 527231200Smm 528231200Smm return (0); 529231200Smm} 530231200Smm 531231200Smmstatic bool 532231200Smminit_remote(struct hast_resource *res, struct proto_conn **inp, 533231200Smm struct proto_conn **outp) 534231200Smm{ 535231200Smm struct proto_conn *in, *out; 536231200Smm struct nv *nvout, *nvin; 537231200Smm const unsigned char *token; 538358088Smm unsigned char *map; 539358088Smm const char *errmsg; 540358088Smm int32_t extentsize; 541232153Smm int64_t datasize; 542232153Smm uint32_t mapsize; 543231200Smm size_t size; 544231200Smm 545231200Smm PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); 546231200Smm PJDLOG_ASSERT(real_remote(res)); 547232153Smm 548231200Smm in = out = NULL; 549231200Smm errmsg = NULL; 550231200Smm 551232153Smm if (primary_connect(res, &out) == -1) 552232153Smm return (false); 553232153Smm 554232153Smm /* 555231200Smm * First handshake step. 556231200Smm * Setup outgoing connection with remote node. 557231200Smm */ 558231200Smm nvout = nv_alloc(); 559231200Smm nv_add_string(nvout, res->hr_name, "resource"); 560232153Smm if (nv_error(nvout) != 0) { 561344673Smm pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 562231200Smm "Unable to allocate header for connection with %s", 563231200Smm res->hr_remoteaddr); 564231200Smm nv_free(nvout); 565231200Smm goto close; 566231200Smm } 567231200Smm if (hast_proto_send(res, out, nvout, NULL, 0) < 0) { 568231200Smm pjdlog_errno(LOG_WARNING, 569231200Smm "Unable to send handshake header to %s", 570231200Smm res->hr_remoteaddr); 571231200Smm nv_free(nvout); 572231200Smm goto close; 573231200Smm } 574231200Smm nv_free(nvout); 575231200Smm if (hast_proto_recv_hdr(out, &nvin) < 0) { 576231200Smm pjdlog_errno(LOG_WARNING, 577231200Smm "Unable to receive handshake header from %s", 578231200Smm res->hr_remoteaddr); 579231200Smm goto close; 580231200Smm } 581231200Smm errmsg = nv_get_string(nvin, "errmsg"); 582231200Smm if (errmsg != NULL) { 583231200Smm pjdlog_warning("%s", errmsg); 584231200Smm nv_free(nvin); 585231200Smm goto close; 586231200Smm } 587231200Smm token = nv_get_uint8_array(nvin, &size, "token"); 588231200Smm if (token == NULL) { 589231200Smm pjdlog_warning("Handshake header from %s has no 'token' field.", 590231200Smm res->hr_remoteaddr); 591231200Smm nv_free(nvin); 592231200Smm goto close; 593231200Smm } 594231200Smm if (size != sizeof(res->hr_token)) { 595231200Smm pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", 596231200Smm res->hr_remoteaddr, size, sizeof(res->hr_token)); 597231200Smm nv_free(nvin); 598231200Smm goto close; 599231200Smm } 600231200Smm bcopy(token, res->hr_token, sizeof(res->hr_token)); 601231200Smm nv_free(nvin); 602231200Smm 603231200Smm /* 604231200Smm * Second handshake step. 605231200Smm * Setup incoming connection with remote node. 606231200Smm */ 607231200Smm if (primary_connect(res, &in) == -1) 608231200Smm goto close; 609231200Smm 610231200Smm nvout = nv_alloc(); 611231200Smm nv_add_string(nvout, res->hr_name, "resource"); 612231200Smm nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), 613231200Smm "token"); 614231200Smm if (res->hr_resuid == 0) { 615231200Smm /* 616231200Smm * The resuid field was not yet initialized. 617231200Smm * Because we do synchronization inside init_resuid(), it is 618231200Smm * possible that someone already initialized it, the function 619231200Smm * will return false then, but if we successfully initialized 620231200Smm * it, we will get true. True means that there were no writes 621231200Smm * to this resource yet and we want to inform secondary that 622231200Smm * synchronization is not needed by sending "virgin" argument. 623231200Smm */ 624231200Smm if (init_resuid(res)) 625231200Smm nv_add_int8(nvout, 1, "virgin"); 626231200Smm } 627231200Smm nv_add_uint64(nvout, res->hr_resuid, "resuid"); 628231200Smm nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); 629231200Smm nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); 630231200Smm if (nv_error(nvout) != 0) { 631231200Smm pjdlog_common(LOG_WARNING, 0, nv_error(nvout), 632231200Smm "Unable to allocate header for connection with %s", 633231200Smm res->hr_remoteaddr); 634231200Smm nv_free(nvout); 635231200Smm goto close; 636231200Smm } 637231200Smm if (hast_proto_send(res, in, nvout, NULL, 0) < 0) { 638231200Smm pjdlog_errno(LOG_WARNING, 639231200Smm "Unable to send handshake header to %s", 640231200Smm res->hr_remoteaddr); 641299529Smm nv_free(nvout); 642299529Smm goto close; 643299529Smm } 644231200Smm nv_free(nvout); 645231200Smm if (hast_proto_recv_hdr(out, &nvin) < 0) { 646231200Smm pjdlog_errno(LOG_WARNING, 647231200Smm "Unable to receive handshake header from %s", 648231200Smm res->hr_remoteaddr); 649231200Smm goto close; 650299529Smm } 651299529Smm errmsg = nv_get_string(nvin, "errmsg"); 652299529Smm if (errmsg != NULL) { 653231200Smm pjdlog_warning("%s", errmsg); 654231200Smm nv_free(nvin); 655231200Smm goto close; 656231200Smm } 657231200Smm datasize = nv_get_int64(nvin, "datasize"); 658231200Smm if (datasize != res->hr_datasize) { 659231200Smm pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", 660231200Smm (intmax_t)res->hr_datasize, (intmax_t)datasize); 661231200Smm nv_free(nvin); 662231200Smm goto close; 663231200Smm } 664231200Smm extentsize = nv_get_int32(nvin, "extentsize"); 665231200Smm if (extentsize != res->hr_extentsize) { 666231200Smm pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", 667231200Smm (ssize_t)res->hr_extentsize, (ssize_t)extentsize); 668231200Smm nv_free(nvin); 669231200Smm goto close; 670231200Smm } 671231200Smm res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); 672231200Smm res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); 673231200Smm res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); 674231200Smm map = NULL; 675231200Smm mapsize = nv_get_uint32(nvin, "mapsize"); 676231200Smm if (mapsize > 0) { 677231200Smm map = malloc(mapsize); 678231200Smm if (map == NULL) { 679231200Smm pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", 680231200Smm (uintmax_t)mapsize); 681231200Smm nv_free(nvin); 682231200Smm goto close; 683231200Smm } 684231200Smm /* 685231200Smm * Remote node have some dirty extents on its own, lets 686231200Smm * download its activemap. 687231200Smm */ 688231200Smm if (hast_proto_recv_data(res, out, nvin, map, 689231200Smm mapsize) < 0) { 690231200Smm pjdlog_errno(LOG_ERR, 691231200Smm "Unable to receive remote activemap"); 692231200Smm nv_free(nvin); 693231200Smm free(map); 694231200Smm goto close; 695231200Smm } 696231200Smm /* 697231200Smm * Merge local and remote bitmaps. 698231200Smm */ 699231200Smm activemap_merge(res->hr_amp, map, mapsize); 700231200Smm free(map); 701231200Smm /* 702231200Smm * Now that we merged bitmaps from both nodes, flush it to the 703231200Smm * disk before we start to synchronize. 704231200Smm */ 705231200Smm (void)hast_activemap_flush(res); 706231200Smm } 707231200Smm nv_free(nvin); 708231200Smm pjdlog_info("Connected to %s.", res->hr_remoteaddr); 709231200Smm if (inp != NULL && outp != NULL) { 710231200Smm *inp = in; 711231200Smm *outp = out; 712231200Smm } else { 713231200Smm res->hr_remotein = in; 714231200Smm res->hr_remoteout = out; 715231200Smm } 716231200Smm event_send(res, EVENT_CONNECT); 717231200Smm return (true); 718231200Smmclose: 719231200Smm if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) 720231200Smm event_send(res, EVENT_SPLITBRAIN); 721231200Smm proto_close(out); 722231200Smm if (in != NULL) 723231200Smm proto_close(in); 724231200Smm return (false); 725231200Smm} 726231200Smm 727231200Smmstatic void 728231200Smmsync_start(void) 729231200Smm{ 730231200Smm 731231200Smm mtx_lock(&sync_lock); 732231200Smm sync_inprogress = true; 733231200Smm mtx_unlock(&sync_lock); 734231200Smm cv_signal(&sync_cond); 735231200Smm} 736231200Smm 737231200Smmstatic void 738231200Smmsync_stop(void) 739231200Smm{ 740231200Smm 741231200Smm mtx_lock(&sync_lock); 742231200Smm if (sync_inprogress) 743231200Smm sync_inprogress = false; 744231200Smm mtx_unlock(&sync_lock); 745231200Smm} 746231200Smm 747231200Smmstatic void 748231200Smminit_ggate(struct hast_resource *res) 749231200Smm{ 750231200Smm struct g_gate_ctl_create ggiocreate; 751231200Smm struct g_gate_ctl_cancel ggiocancel; 752231200Smm 753231200Smm /* 754231200Smm * We communicate with ggate via /dev/ggctl. Open it. 755231200Smm */ 756 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); 757 if (res->hr_ggatefd < 0) 758 primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); 759 /* 760 * Create provider before trying to connect, as connection failure 761 * is not critical, but may take some time. 762 */ 763 bzero(&ggiocreate, sizeof(ggiocreate)); 764 ggiocreate.gctl_version = G_GATE_VERSION; 765 ggiocreate.gctl_mediasize = res->hr_datasize; 766 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; 767 ggiocreate.gctl_flags = 0; 768 ggiocreate.gctl_maxcount = G_GATE_MAX_QUEUE_SIZE; 769 ggiocreate.gctl_timeout = 0; 770 ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; 771 snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", 772 res->hr_provname); 773 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { 774 pjdlog_info("Device hast/%s created.", res->hr_provname); 775 res->hr_ggateunit = ggiocreate.gctl_unit; 776 return; 777 } 778 if (errno != EEXIST) { 779 primary_exit(EX_OSERR, "Unable to create hast/%s device", 780 res->hr_provname); 781 } 782 pjdlog_debug(1, 783 "Device hast/%s already exists, we will try to take it over.", 784 res->hr_provname); 785 /* 786 * If we received EEXIST, we assume that the process who created the 787 * provider died and didn't clean up. In that case we will start from 788 * where he left of. 789 */ 790 bzero(&ggiocancel, sizeof(ggiocancel)); 791 ggiocancel.gctl_version = G_GATE_VERSION; 792 ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; 793 snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", 794 res->hr_provname); 795 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { 796 pjdlog_info("Device hast/%s recovered.", res->hr_provname); 797 res->hr_ggateunit = ggiocancel.gctl_unit; 798 return; 799 } 800 primary_exit(EX_OSERR, "Unable to take over hast/%s device", 801 res->hr_provname); 802} 803 804void 805hastd_primary(struct hast_resource *res) 806{ 807 pthread_t td; 808 pid_t pid; 809 int error, mode; 810 811 /* 812 * Create communication channel for sending control commands from 813 * parent to child. 814 */ 815 if (proto_client("socketpair://", &res->hr_ctrl) < 0) { 816 /* TODO: There's no need for this to be fatal error. */ 817 KEEP_ERRNO((void)pidfile_remove(pfh)); 818 pjdlog_exit(EX_OSERR, 819 "Unable to create control sockets between parent and child"); 820 } 821 /* 822 * Create communication channel for sending events from child to parent. 823 */ 824 if (proto_client("socketpair://", &res->hr_event) < 0) { 825 /* TODO: There's no need for this to be fatal error. */ 826 KEEP_ERRNO((void)pidfile_remove(pfh)); 827 pjdlog_exit(EX_OSERR, 828 "Unable to create event sockets between child and parent"); 829 } 830 /* 831 * Create communication channel for sending connection requests from 832 * child to parent. 833 */ 834 if (proto_client("socketpair://", &res->hr_conn) < 0) { 835 /* TODO: There's no need for this to be fatal error. */ 836 KEEP_ERRNO((void)pidfile_remove(pfh)); 837 pjdlog_exit(EX_OSERR, 838 "Unable to create connection sockets between child and parent"); 839 } 840 841 pid = fork(); 842 if (pid < 0) { 843 /* TODO: There's no need for this to be fatal error. */ 844 KEEP_ERRNO((void)pidfile_remove(pfh)); 845 pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); 846 } 847 848 if (pid > 0) { 849 /* This is parent. */ 850 /* Declare that we are receiver. */ 851 proto_recv(res->hr_event, NULL, 0); 852 proto_recv(res->hr_conn, NULL, 0); 853 /* Declare that we are sender. */ 854 proto_send(res->hr_ctrl, NULL, 0); 855 res->hr_workerpid = pid; 856 return; 857 } 858 859 gres = res; 860 mode = pjdlog_mode_get(); 861 862 /* Declare that we are sender. */ 863 proto_send(res->hr_event, NULL, 0); 864 proto_send(res->hr_conn, NULL, 0); 865 /* Declare that we are receiver. */ 866 proto_recv(res->hr_ctrl, NULL, 0); 867 descriptors_cleanup(res); 868 869 descriptors_assert(res, mode); 870 871 pjdlog_init(mode); 872 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); 873 setproctitle("%s (primary)", res->hr_name); 874 875 init_local(res); 876 init_ggate(res); 877 init_environment(res); 878 879 if (drop_privs() != 0) { 880 cleanup(res); 881 exit(EX_CONFIG); 882 } 883 pjdlog_info("Privileges successfully dropped."); 884 885 /* 886 * Create the guard thread first, so we can handle signals from the 887 * very begining. 888 */ 889 error = pthread_create(&td, NULL, guard_thread, res); 890 PJDLOG_ASSERT(error == 0); 891 /* 892 * Create the control thread before sending any event to the parent, 893 * as we can deadlock when parent sends control request to worker, 894 * but worker has no control thread started yet, so parent waits. 895 * In the meantime worker sends an event to the parent, but parent 896 * is unable to handle the event, because it waits for control 897 * request response. 898 */ 899 error = pthread_create(&td, NULL, ctrl_thread, res); 900 PJDLOG_ASSERT(error == 0); 901 if (real_remote(res) && init_remote(res, NULL, NULL)) 902 sync_start(); 903 error = pthread_create(&td, NULL, ggate_recv_thread, res); 904 PJDLOG_ASSERT(error == 0); 905 error = pthread_create(&td, NULL, local_send_thread, res); 906 PJDLOG_ASSERT(error == 0); 907 error = pthread_create(&td, NULL, remote_send_thread, res); 908 PJDLOG_ASSERT(error == 0); 909 error = pthread_create(&td, NULL, remote_recv_thread, res); 910 PJDLOG_ASSERT(error == 0); 911 error = pthread_create(&td, NULL, ggate_send_thread, res); 912 PJDLOG_ASSERT(error == 0); 913 (void)sync_thread(res); 914} 915 916static void 917reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) 918{ 919 char msg[1024]; 920 va_list ap; 921 int len; 922 923 va_start(ap, fmt); 924 len = vsnprintf(msg, sizeof(msg), fmt, ap); 925 va_end(ap); 926 if ((size_t)len < sizeof(msg)) { 927 switch (ggio->gctl_cmd) { 928 case BIO_READ: 929 (void)snprintf(msg + len, sizeof(msg) - len, 930 "READ(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 931 (uintmax_t)ggio->gctl_length); 932 break; 933 case BIO_DELETE: 934 (void)snprintf(msg + len, sizeof(msg) - len, 935 "DELETE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 936 (uintmax_t)ggio->gctl_length); 937 break; 938 case BIO_FLUSH: 939 (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); 940 break; 941 case BIO_WRITE: 942 (void)snprintf(msg + len, sizeof(msg) - len, 943 "WRITE(%ju, %ju).", (uintmax_t)ggio->gctl_offset, 944 (uintmax_t)ggio->gctl_length); 945 break; 946 default: 947 (void)snprintf(msg + len, sizeof(msg) - len, 948 "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); 949 break; 950 } 951 } 952 pjdlog_common(loglevel, debuglevel, -1, "%s", msg); 953} 954 955static void 956remote_close(struct hast_resource *res, int ncomp) 957{ 958 959 rw_wlock(&hio_remote_lock[ncomp]); 960 /* 961 * A race is possible between dropping rlock and acquiring wlock - 962 * another thread can close connection in-between. 963 */ 964 if (!ISCONNECTED(res, ncomp)) { 965 PJDLOG_ASSERT(res->hr_remotein == NULL); 966 PJDLOG_ASSERT(res->hr_remoteout == NULL); 967 rw_unlock(&hio_remote_lock[ncomp]); 968 return; 969 } 970 971 PJDLOG_ASSERT(res->hr_remotein != NULL); 972 PJDLOG_ASSERT(res->hr_remoteout != NULL); 973 974 pjdlog_debug(2, "Closing incoming connection to %s.", 975 res->hr_remoteaddr); 976 proto_close(res->hr_remotein); 977 res->hr_remotein = NULL; 978 pjdlog_debug(2, "Closing outgoing connection to %s.", 979 res->hr_remoteaddr); 980 proto_close(res->hr_remoteout); 981 res->hr_remoteout = NULL; 982 983 rw_unlock(&hio_remote_lock[ncomp]); 984 985 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); 986 987 /* 988 * Stop synchronization if in-progress. 989 */ 990 sync_stop(); 991 992 event_send(res, EVENT_DISCONNECT); 993} 994 995/* 996 * Thread receives ggate I/O requests from the kernel and passes them to 997 * appropriate threads: 998 * WRITE - always goes to both local_send and remote_send threads 999 * READ (when the block is up-to-date on local component) - 1000 * only local_send thread 1001 * READ (when the block isn't up-to-date on local component) - 1002 * only remote_send thread 1003 * DELETE - always goes to both local_send and remote_send threads 1004 * FLUSH - always goes to both local_send and remote_send threads 1005 */ 1006static void * 1007ggate_recv_thread(void *arg) 1008{ 1009 struct hast_resource *res = arg; 1010 struct g_gate_ctl_io *ggio; 1011 struct hio *hio; 1012 unsigned int ii, ncomp, ncomps; 1013 int error; 1014 1015 ncomps = HAST_NCOMPONENTS; 1016 1017 for (;;) { 1018 pjdlog_debug(2, "ggate_recv: Taking free request."); 1019 QUEUE_TAKE2(hio, free); 1020 pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); 1021 ggio = &hio->hio_ggio; 1022 ggio->gctl_unit = res->hr_ggateunit; 1023 ggio->gctl_length = MAXPHYS; 1024 ggio->gctl_error = 0; 1025 pjdlog_debug(2, 1026 "ggate_recv: (%p) Waiting for request from the kernel.", 1027 hio); 1028 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) < 0) { 1029 if (sigexit_received) 1030 pthread_exit(NULL); 1031 primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); 1032 } 1033 error = ggio->gctl_error; 1034 switch (error) { 1035 case 0: 1036 break; 1037 case ECANCELED: 1038 /* Exit gracefully. */ 1039 if (!sigexit_received) { 1040 pjdlog_debug(2, 1041 "ggate_recv: (%p) Received cancel from the kernel.", 1042 hio); 1043 pjdlog_info("Received cancel from the kernel, exiting."); 1044 } 1045 pthread_exit(NULL); 1046 case ENOMEM: 1047 /* 1048 * Buffer too small? Impossible, we allocate MAXPHYS 1049 * bytes - request can't be bigger than that. 1050 */ 1051 /* FALLTHROUGH */ 1052 case ENXIO: 1053 default: 1054 primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", 1055 strerror(error)); 1056 } 1057 for (ii = 0; ii < ncomps; ii++) 1058 hio->hio_errors[ii] = EINVAL; 1059 reqlog(LOG_DEBUG, 2, ggio, 1060 "ggate_recv: (%p) Request received from the kernel: ", 1061 hio); 1062 /* 1063 * Inform all components about new write request. 1064 * For read request prefer local component unless the given 1065 * range is out-of-date, then use remote component. 1066 */ 1067 switch (ggio->gctl_cmd) { 1068 case BIO_READ: 1069 pjdlog_debug(2, 1070 "ggate_recv: (%p) Moving request to the send queue.", 1071 hio); 1072 refcount_init(&hio->hio_countdown, 1); 1073 mtx_lock(&metadata_lock); 1074 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || 1075 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1076 /* 1077 * This range is up-to-date on local component, 1078 * so handle request locally. 1079 */ 1080 /* Local component is 0 for now. */ 1081 ncomp = 0; 1082 } else /* if (res->hr_syncsrc == 1083 HAST_SYNCSRC_SECONDARY) */ { 1084 PJDLOG_ASSERT(res->hr_syncsrc == 1085 HAST_SYNCSRC_SECONDARY); 1086 /* 1087 * This range is out-of-date on local component, 1088 * so send request to the remote node. 1089 */ 1090 /* Remote component is 1 for now. */ 1091 ncomp = 1; 1092 } 1093 mtx_unlock(&metadata_lock); 1094 QUEUE_INSERT1(hio, send, ncomp); 1095 break; 1096 case BIO_WRITE: 1097 if (res->hr_resuid == 0) { 1098 /* This is first write, initialize resuid. */ 1099 (void)init_resuid(res); 1100 } 1101 for (;;) { 1102 mtx_lock(&range_lock); 1103 if (rangelock_islocked(range_sync, 1104 ggio->gctl_offset, ggio->gctl_length)) { 1105 pjdlog_debug(2, 1106 "regular: Range offset=%jd length=%zu locked.", 1107 (intmax_t)ggio->gctl_offset, 1108 (size_t)ggio->gctl_length); 1109 range_regular_wait = true; 1110 cv_wait(&range_regular_cond, &range_lock); 1111 range_regular_wait = false; 1112 mtx_unlock(&range_lock); 1113 continue; 1114 } 1115 if (rangelock_add(range_regular, 1116 ggio->gctl_offset, ggio->gctl_length) < 0) { 1117 mtx_unlock(&range_lock); 1118 pjdlog_debug(2, 1119 "regular: Range offset=%jd length=%zu is already locked, waiting.", 1120 (intmax_t)ggio->gctl_offset, 1121 (size_t)ggio->gctl_length); 1122 sleep(1); 1123 continue; 1124 } 1125 mtx_unlock(&range_lock); 1126 break; 1127 } 1128 mtx_lock(&res->hr_amp_lock); 1129 if (activemap_write_start(res->hr_amp, 1130 ggio->gctl_offset, ggio->gctl_length)) { 1131 (void)hast_activemap_flush(res); 1132 } 1133 mtx_unlock(&res->hr_amp_lock); 1134 /* FALLTHROUGH */ 1135 case BIO_DELETE: 1136 case BIO_FLUSH: 1137 pjdlog_debug(2, 1138 "ggate_recv: (%p) Moving request to the send queues.", 1139 hio); 1140 refcount_init(&hio->hio_countdown, ncomps); 1141 for (ii = 0; ii < ncomps; ii++) 1142 QUEUE_INSERT1(hio, send, ii); 1143 break; 1144 } 1145 } 1146 /* NOTREACHED */ 1147 return (NULL); 1148} 1149 1150/* 1151 * Thread reads from or writes to local component. 1152 * If local read fails, it redirects it to remote_send thread. 1153 */ 1154static void * 1155local_send_thread(void *arg) 1156{ 1157 struct hast_resource *res = arg; 1158 struct g_gate_ctl_io *ggio; 1159 struct hio *hio; 1160 unsigned int ncomp, rncomp; 1161 ssize_t ret; 1162 1163 /* Local component is 0 for now. */ 1164 ncomp = 0; 1165 /* Remote component is 1 for now. */ 1166 rncomp = 1; 1167 1168 for (;;) { 1169 pjdlog_debug(2, "local_send: Taking request."); 1170 QUEUE_TAKE1(hio, send, ncomp, 0); 1171 pjdlog_debug(2, "local_send: (%p) Got request.", hio); 1172 ggio = &hio->hio_ggio; 1173 switch (ggio->gctl_cmd) { 1174 case BIO_READ: 1175 ret = pread(res->hr_localfd, ggio->gctl_data, 1176 ggio->gctl_length, 1177 ggio->gctl_offset + res->hr_localoff); 1178 if (ret == ggio->gctl_length) 1179 hio->hio_errors[ncomp] = 0; 1180 else { 1181 /* 1182 * If READ failed, try to read from remote node. 1183 */ 1184 if (ret < 0) { 1185 reqlog(LOG_WARNING, 0, ggio, 1186 "Local request failed (%s), trying remote node. ", 1187 strerror(errno)); 1188 } else if (ret != ggio->gctl_length) { 1189 reqlog(LOG_WARNING, 0, ggio, 1190 "Local request failed (%zd != %jd), trying remote node. ", 1191 ret, (intmax_t)ggio->gctl_length); 1192 } 1193 QUEUE_INSERT1(hio, send, rncomp); 1194 continue; 1195 } 1196 break; 1197 case BIO_WRITE: 1198 ret = pwrite(res->hr_localfd, ggio->gctl_data, 1199 ggio->gctl_length, 1200 ggio->gctl_offset + res->hr_localoff); 1201 if (ret < 0) { 1202 hio->hio_errors[ncomp] = errno; 1203 reqlog(LOG_WARNING, 0, ggio, 1204 "Local request failed (%s): ", 1205 strerror(errno)); 1206 } else if (ret != ggio->gctl_length) { 1207 hio->hio_errors[ncomp] = EIO; 1208 reqlog(LOG_WARNING, 0, ggio, 1209 "Local request failed (%zd != %jd): ", 1210 ret, (intmax_t)ggio->gctl_length); 1211 } else { 1212 hio->hio_errors[ncomp] = 0; 1213 } 1214 break; 1215 case BIO_DELETE: 1216 ret = g_delete(res->hr_localfd, 1217 ggio->gctl_offset + res->hr_localoff, 1218 ggio->gctl_length); 1219 if (ret < 0) { 1220 hio->hio_errors[ncomp] = errno; 1221 reqlog(LOG_WARNING, 0, ggio, 1222 "Local request failed (%s): ", 1223 strerror(errno)); 1224 } else { 1225 hio->hio_errors[ncomp] = 0; 1226 } 1227 break; 1228 case BIO_FLUSH: 1229 ret = g_flush(res->hr_localfd); 1230 if (ret < 0) { 1231 hio->hio_errors[ncomp] = errno; 1232 reqlog(LOG_WARNING, 0, ggio, 1233 "Local request failed (%s): ", 1234 strerror(errno)); 1235 } else { 1236 hio->hio_errors[ncomp] = 0; 1237 } 1238 break; 1239 } 1240 if (refcount_release(&hio->hio_countdown)) { 1241 if (ISSYNCREQ(hio)) { 1242 mtx_lock(&sync_lock); 1243 SYNCREQDONE(hio); 1244 mtx_unlock(&sync_lock); 1245 cv_signal(&sync_cond); 1246 } else { 1247 pjdlog_debug(2, 1248 "local_send: (%p) Moving request to the done queue.", 1249 hio); 1250 QUEUE_INSERT2(hio, done); 1251 } 1252 } 1253 } 1254 /* NOTREACHED */ 1255 return (NULL); 1256} 1257 1258static void 1259keepalive_send(struct hast_resource *res, unsigned int ncomp) 1260{ 1261 struct nv *nv; 1262 1263 rw_rlock(&hio_remote_lock[ncomp]); 1264 1265 if (!ISCONNECTED(res, ncomp)) { 1266 rw_unlock(&hio_remote_lock[ncomp]); 1267 return; 1268 } 1269 1270 PJDLOG_ASSERT(res->hr_remotein != NULL); 1271 PJDLOG_ASSERT(res->hr_remoteout != NULL); 1272 1273 nv = nv_alloc(); 1274 nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); 1275 if (nv_error(nv) != 0) { 1276 rw_unlock(&hio_remote_lock[ncomp]); 1277 nv_free(nv); 1278 pjdlog_debug(1, 1279 "keepalive_send: Unable to prepare header to send."); 1280 return; 1281 } 1282 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) < 0) { 1283 rw_unlock(&hio_remote_lock[ncomp]); 1284 pjdlog_common(LOG_DEBUG, 1, errno, 1285 "keepalive_send: Unable to send request"); 1286 nv_free(nv); 1287 remote_close(res, ncomp); 1288 return; 1289 } 1290 1291 rw_unlock(&hio_remote_lock[ncomp]); 1292 nv_free(nv); 1293 pjdlog_debug(2, "keepalive_send: Request sent."); 1294} 1295 1296/* 1297 * Thread sends request to secondary node. 1298 */ 1299static void * 1300remote_send_thread(void *arg) 1301{ 1302 struct hast_resource *res = arg; 1303 struct g_gate_ctl_io *ggio; 1304 time_t lastcheck, now; 1305 struct hio *hio; 1306 struct nv *nv; 1307 unsigned int ncomp; 1308 bool wakeup; 1309 uint64_t offset, length; 1310 uint8_t cmd; 1311 void *data; 1312 1313 /* Remote component is 1 for now. */ 1314 ncomp = 1; 1315 lastcheck = time(NULL); 1316 1317 for (;;) { 1318 pjdlog_debug(2, "remote_send: Taking request."); 1319 QUEUE_TAKE1(hio, send, ncomp, RETRY_SLEEP); 1320 if (hio == NULL) { 1321 now = time(NULL); 1322 if (lastcheck + RETRY_SLEEP <= now) { 1323 keepalive_send(res, ncomp); 1324 lastcheck = now; 1325 } 1326 continue; 1327 } 1328 pjdlog_debug(2, "remote_send: (%p) Got request.", hio); 1329 ggio = &hio->hio_ggio; 1330 switch (ggio->gctl_cmd) { 1331 case BIO_READ: 1332 cmd = HIO_READ; 1333 data = NULL; 1334 offset = ggio->gctl_offset; 1335 length = ggio->gctl_length; 1336 break; 1337 case BIO_WRITE: 1338 cmd = HIO_WRITE; 1339 data = ggio->gctl_data; 1340 offset = ggio->gctl_offset; 1341 length = ggio->gctl_length; 1342 break; 1343 case BIO_DELETE: 1344 cmd = HIO_DELETE; 1345 data = NULL; 1346 offset = ggio->gctl_offset; 1347 length = ggio->gctl_length; 1348 break; 1349 case BIO_FLUSH: 1350 cmd = HIO_FLUSH; 1351 data = NULL; 1352 offset = 0; 1353 length = 0; 1354 break; 1355 default: 1356 PJDLOG_ASSERT(!"invalid condition"); 1357 abort(); 1358 } 1359 nv = nv_alloc(); 1360 nv_add_uint8(nv, cmd, "cmd"); 1361 nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); 1362 nv_add_uint64(nv, offset, "offset"); 1363 nv_add_uint64(nv, length, "length"); 1364 if (nv_error(nv) != 0) { 1365 hio->hio_errors[ncomp] = nv_error(nv); 1366 pjdlog_debug(2, 1367 "remote_send: (%p) Unable to prepare header to send.", 1368 hio); 1369 reqlog(LOG_ERR, 0, ggio, 1370 "Unable to prepare header to send (%s): ", 1371 strerror(nv_error(nv))); 1372 /* Move failed request immediately to the done queue. */ 1373 goto done_queue; 1374 } 1375 pjdlog_debug(2, 1376 "remote_send: (%p) Moving request to the recv queue.", 1377 hio); 1378 /* 1379 * Protect connection from disappearing. 1380 */ 1381 rw_rlock(&hio_remote_lock[ncomp]); 1382 if (!ISCONNECTED(res, ncomp)) { 1383 rw_unlock(&hio_remote_lock[ncomp]); 1384 hio->hio_errors[ncomp] = ENOTCONN; 1385 goto done_queue; 1386 } 1387 /* 1388 * Move the request to recv queue before sending it, because 1389 * in different order we can get reply before we move request 1390 * to recv queue. 1391 */ 1392 mtx_lock(&hio_recv_list_lock[ncomp]); 1393 wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); 1394 TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1395 mtx_unlock(&hio_recv_list_lock[ncomp]); 1396 if (hast_proto_send(res, res->hr_remoteout, nv, data, 1397 data != NULL ? length : 0) < 0) { 1398 hio->hio_errors[ncomp] = errno; 1399 rw_unlock(&hio_remote_lock[ncomp]); 1400 pjdlog_debug(2, 1401 "remote_send: (%p) Unable to send request.", hio); 1402 reqlog(LOG_ERR, 0, ggio, 1403 "Unable to send request (%s): ", 1404 strerror(hio->hio_errors[ncomp])); 1405 remote_close(res, ncomp); 1406 /* 1407 * Take request back from the receive queue and move 1408 * it immediately to the done queue. 1409 */ 1410 mtx_lock(&hio_recv_list_lock[ncomp]); 1411 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); 1412 mtx_unlock(&hio_recv_list_lock[ncomp]); 1413 goto done_queue; 1414 } 1415 rw_unlock(&hio_remote_lock[ncomp]); 1416 nv_free(nv); 1417 if (wakeup) 1418 cv_signal(&hio_recv_list_cond[ncomp]); 1419 continue; 1420done_queue: 1421 nv_free(nv); 1422 if (ISSYNCREQ(hio)) { 1423 if (!refcount_release(&hio->hio_countdown)) 1424 continue; 1425 mtx_lock(&sync_lock); 1426 SYNCREQDONE(hio); 1427 mtx_unlock(&sync_lock); 1428 cv_signal(&sync_cond); 1429 continue; 1430 } 1431 if (ggio->gctl_cmd == BIO_WRITE) { 1432 mtx_lock(&res->hr_amp_lock); 1433 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, 1434 ggio->gctl_length)) { 1435 (void)hast_activemap_flush(res); 1436 } 1437 mtx_unlock(&res->hr_amp_lock); 1438 } 1439 if (!refcount_release(&hio->hio_countdown)) 1440 continue; 1441 pjdlog_debug(2, 1442 "remote_send: (%p) Moving request to the done queue.", 1443 hio); 1444 QUEUE_INSERT2(hio, done); 1445 } 1446 /* NOTREACHED */ 1447 return (NULL); 1448} 1449 1450/* 1451 * Thread receives answer from secondary node and passes it to ggate_send 1452 * thread. 1453 */ 1454static void * 1455remote_recv_thread(void *arg) 1456{ 1457 struct hast_resource *res = arg; 1458 struct g_gate_ctl_io *ggio; 1459 struct hio *hio; 1460 struct nv *nv; 1461 unsigned int ncomp; 1462 uint64_t seq; 1463 int error; 1464 1465 /* Remote component is 1 for now. */ 1466 ncomp = 1; 1467 1468 for (;;) { 1469 /* Wait until there is anything to receive. */ 1470 mtx_lock(&hio_recv_list_lock[ncomp]); 1471 while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { 1472 pjdlog_debug(2, "remote_recv: No requests, waiting."); 1473 cv_wait(&hio_recv_list_cond[ncomp], 1474 &hio_recv_list_lock[ncomp]); 1475 } 1476 mtx_unlock(&hio_recv_list_lock[ncomp]); 1477 rw_rlock(&hio_remote_lock[ncomp]); 1478 if (!ISCONNECTED(res, ncomp)) { 1479 rw_unlock(&hio_remote_lock[ncomp]); 1480 /* 1481 * Connection is dead, so move all pending requests to 1482 * the done queue (one-by-one). 1483 */ 1484 mtx_lock(&hio_recv_list_lock[ncomp]); 1485 hio = TAILQ_FIRST(&hio_recv_list[ncomp]); 1486 PJDLOG_ASSERT(hio != NULL); 1487 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1488 hio_next[ncomp]); 1489 mtx_unlock(&hio_recv_list_lock[ncomp]); 1490 goto done_queue; 1491 } 1492 if (hast_proto_recv_hdr(res->hr_remotein, &nv) < 0) { 1493 pjdlog_errno(LOG_ERR, 1494 "Unable to receive reply header"); 1495 rw_unlock(&hio_remote_lock[ncomp]); 1496 remote_close(res, ncomp); 1497 continue; 1498 } 1499 rw_unlock(&hio_remote_lock[ncomp]); 1500 seq = nv_get_uint64(nv, "seq"); 1501 if (seq == 0) { 1502 pjdlog_error("Header contains no 'seq' field."); 1503 nv_free(nv); 1504 continue; 1505 } 1506 mtx_lock(&hio_recv_list_lock[ncomp]); 1507 TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { 1508 if (hio->hio_ggio.gctl_seq == seq) { 1509 TAILQ_REMOVE(&hio_recv_list[ncomp], hio, 1510 hio_next[ncomp]); 1511 break; 1512 } 1513 } 1514 mtx_unlock(&hio_recv_list_lock[ncomp]); 1515 if (hio == NULL) { 1516 pjdlog_error("Found no request matching received 'seq' field (%ju).", 1517 (uintmax_t)seq); 1518 nv_free(nv); 1519 continue; 1520 } 1521 error = nv_get_int16(nv, "error"); 1522 if (error != 0) { 1523 /* Request failed on remote side. */ 1524 hio->hio_errors[ncomp] = error; 1525 reqlog(LOG_WARNING, 0, &hio->hio_ggio, 1526 "Remote request failed (%s): ", strerror(error)); 1527 nv_free(nv); 1528 goto done_queue; 1529 } 1530 ggio = &hio->hio_ggio; 1531 switch (ggio->gctl_cmd) { 1532 case BIO_READ: 1533 rw_rlock(&hio_remote_lock[ncomp]); 1534 if (!ISCONNECTED(res, ncomp)) { 1535 rw_unlock(&hio_remote_lock[ncomp]); 1536 nv_free(nv); 1537 goto done_queue; 1538 } 1539 if (hast_proto_recv_data(res, res->hr_remotein, nv, 1540 ggio->gctl_data, ggio->gctl_length) < 0) { 1541 hio->hio_errors[ncomp] = errno; 1542 pjdlog_errno(LOG_ERR, 1543 "Unable to receive reply data"); 1544 rw_unlock(&hio_remote_lock[ncomp]); 1545 nv_free(nv); 1546 remote_close(res, ncomp); 1547 goto done_queue; 1548 } 1549 rw_unlock(&hio_remote_lock[ncomp]); 1550 break; 1551 case BIO_WRITE: 1552 case BIO_DELETE: 1553 case BIO_FLUSH: 1554 break; 1555 default: 1556 PJDLOG_ASSERT(!"invalid condition"); 1557 abort(); 1558 } 1559 hio->hio_errors[ncomp] = 0; 1560 nv_free(nv); 1561done_queue: 1562 if (refcount_release(&hio->hio_countdown)) { 1563 if (ISSYNCREQ(hio)) { 1564 mtx_lock(&sync_lock); 1565 SYNCREQDONE(hio); 1566 mtx_unlock(&sync_lock); 1567 cv_signal(&sync_cond); 1568 } else { 1569 pjdlog_debug(2, 1570 "remote_recv: (%p) Moving request to the done queue.", 1571 hio); 1572 QUEUE_INSERT2(hio, done); 1573 } 1574 } 1575 } 1576 /* NOTREACHED */ 1577 return (NULL); 1578} 1579 1580/* 1581 * Thread sends answer to the kernel. 1582 */ 1583static void * 1584ggate_send_thread(void *arg) 1585{ 1586 struct hast_resource *res = arg; 1587 struct g_gate_ctl_io *ggio; 1588 struct hio *hio; 1589 unsigned int ii, ncomp, ncomps; 1590 1591 ncomps = HAST_NCOMPONENTS; 1592 1593 for (;;) { 1594 pjdlog_debug(2, "ggate_send: Taking request."); 1595 QUEUE_TAKE2(hio, done); 1596 pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); 1597 ggio = &hio->hio_ggio; 1598 for (ii = 0; ii < ncomps; ii++) { 1599 if (hio->hio_errors[ii] == 0) { 1600 /* 1601 * One successful request is enough to declare 1602 * success. 1603 */ 1604 ggio->gctl_error = 0; 1605 break; 1606 } 1607 } 1608 if (ii == ncomps) { 1609 /* 1610 * None of the requests were successful. 1611 * Use first error. 1612 */ 1613 ggio->gctl_error = hio->hio_errors[0]; 1614 } 1615 if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { 1616 mtx_lock(&res->hr_amp_lock); 1617 activemap_write_complete(res->hr_amp, 1618 ggio->gctl_offset, ggio->gctl_length); 1619 mtx_unlock(&res->hr_amp_lock); 1620 } 1621 if (ggio->gctl_cmd == BIO_WRITE) { 1622 /* 1623 * Unlock range we locked. 1624 */ 1625 mtx_lock(&range_lock); 1626 rangelock_del(range_regular, ggio->gctl_offset, 1627 ggio->gctl_length); 1628 if (range_sync_wait) 1629 cv_signal(&range_sync_cond); 1630 mtx_unlock(&range_lock); 1631 /* 1632 * Bump local count if this is first write after 1633 * connection failure with remote node. 1634 */ 1635 ncomp = 1; 1636 rw_rlock(&hio_remote_lock[ncomp]); 1637 if (!ISCONNECTED(res, ncomp)) { 1638 mtx_lock(&metadata_lock); 1639 if (res->hr_primary_localcnt == 1640 res->hr_secondary_remotecnt) { 1641 res->hr_primary_localcnt++; 1642 pjdlog_debug(1, 1643 "Increasing localcnt to %ju.", 1644 (uintmax_t)res->hr_primary_localcnt); 1645 (void)metadata_write(res); 1646 } 1647 mtx_unlock(&metadata_lock); 1648 } 1649 rw_unlock(&hio_remote_lock[ncomp]); 1650 } 1651 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) < 0) 1652 primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); 1653 pjdlog_debug(2, 1654 "ggate_send: (%p) Moving request to the free queue.", hio); 1655 QUEUE_INSERT2(hio, free); 1656 } 1657 /* NOTREACHED */ 1658 return (NULL); 1659} 1660 1661/* 1662 * Thread synchronize local and remote components. 1663 */ 1664static void * 1665sync_thread(void *arg __unused) 1666{ 1667 struct hast_resource *res = arg; 1668 struct hio *hio; 1669 struct g_gate_ctl_io *ggio; 1670 unsigned int ii, ncomp, ncomps; 1671 off_t offset, length, synced; 1672 bool dorewind; 1673 int syncext; 1674 1675 ncomps = HAST_NCOMPONENTS; 1676 dorewind = true; 1677 synced = 0; 1678 offset = -1; 1679 1680 for (;;) { 1681 mtx_lock(&sync_lock); 1682 if (offset >= 0 && !sync_inprogress) { 1683 pjdlog_info("Synchronization interrupted. " 1684 "%jd bytes synchronized so far.", 1685 (intmax_t)synced); 1686 event_send(res, EVENT_SYNCINTR); 1687 } 1688 while (!sync_inprogress) { 1689 dorewind = true; 1690 synced = 0; 1691 cv_wait(&sync_cond, &sync_lock); 1692 } 1693 mtx_unlock(&sync_lock); 1694 /* 1695 * Obtain offset at which we should synchronize. 1696 * Rewind synchronization if needed. 1697 */ 1698 mtx_lock(&res->hr_amp_lock); 1699 if (dorewind) 1700 activemap_sync_rewind(res->hr_amp); 1701 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); 1702 if (syncext != -1) { 1703 /* 1704 * We synchronized entire syncext extent, we can mark 1705 * it as clean now. 1706 */ 1707 if (activemap_extent_complete(res->hr_amp, syncext)) 1708 (void)hast_activemap_flush(res); 1709 } 1710 mtx_unlock(&res->hr_amp_lock); 1711 if (dorewind) { 1712 dorewind = false; 1713 if (offset < 0) 1714 pjdlog_info("Nodes are in sync."); 1715 else { 1716 pjdlog_info("Synchronization started. %ju bytes to go.", 1717 (uintmax_t)(res->hr_extentsize * 1718 activemap_ndirty(res->hr_amp))); 1719 event_send(res, EVENT_SYNCSTART); 1720 } 1721 } 1722 if (offset < 0) { 1723 sync_stop(); 1724 pjdlog_debug(1, "Nothing to synchronize."); 1725 /* 1726 * Synchronization complete, make both localcnt and 1727 * remotecnt equal. 1728 */ 1729 ncomp = 1; 1730 rw_rlock(&hio_remote_lock[ncomp]); 1731 if (ISCONNECTED(res, ncomp)) { 1732 if (synced > 0) { 1733 pjdlog_info("Synchronization complete. " 1734 "%jd bytes synchronized.", 1735 (intmax_t)synced); 1736 event_send(res, EVENT_SYNCDONE); 1737 } 1738 mtx_lock(&metadata_lock); 1739 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; 1740 res->hr_primary_localcnt = 1741 res->hr_secondary_localcnt; 1742 res->hr_primary_remotecnt = 1743 res->hr_secondary_remotecnt; 1744 pjdlog_debug(1, 1745 "Setting localcnt to %ju and remotecnt to %ju.", 1746 (uintmax_t)res->hr_primary_localcnt, 1747 (uintmax_t)res->hr_secondary_localcnt); 1748 (void)metadata_write(res); 1749 mtx_unlock(&metadata_lock); 1750 } 1751 rw_unlock(&hio_remote_lock[ncomp]); 1752 continue; 1753 } 1754 pjdlog_debug(2, "sync: Taking free request."); 1755 QUEUE_TAKE2(hio, free); 1756 pjdlog_debug(2, "sync: (%p) Got free request.", hio); 1757 /* 1758 * Lock the range we are going to synchronize. We don't want 1759 * race where someone writes between our read and write. 1760 */ 1761 for (;;) { 1762 mtx_lock(&range_lock); 1763 if (rangelock_islocked(range_regular, offset, length)) { 1764 pjdlog_debug(2, 1765 "sync: Range offset=%jd length=%jd locked.", 1766 (intmax_t)offset, (intmax_t)length); 1767 range_sync_wait = true; 1768 cv_wait(&range_sync_cond, &range_lock); 1769 range_sync_wait = false; 1770 mtx_unlock(&range_lock); 1771 continue; 1772 } 1773 if (rangelock_add(range_sync, offset, length) < 0) { 1774 mtx_unlock(&range_lock); 1775 pjdlog_debug(2, 1776 "sync: Range offset=%jd length=%jd is already locked, waiting.", 1777 (intmax_t)offset, (intmax_t)length); 1778 sleep(1); 1779 continue; 1780 } 1781 mtx_unlock(&range_lock); 1782 break; 1783 } 1784 /* 1785 * First read the data from synchronization source. 1786 */ 1787 SYNCREQ(hio); 1788 ggio = &hio->hio_ggio; 1789 ggio->gctl_cmd = BIO_READ; 1790 ggio->gctl_offset = offset; 1791 ggio->gctl_length = length; 1792 ggio->gctl_error = 0; 1793 for (ii = 0; ii < ncomps; ii++) 1794 hio->hio_errors[ii] = EINVAL; 1795 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1796 hio); 1797 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1798 hio); 1799 mtx_lock(&metadata_lock); 1800 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1801 /* 1802 * This range is up-to-date on local component, 1803 * so handle request locally. 1804 */ 1805 /* Local component is 0 for now. */ 1806 ncomp = 0; 1807 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1808 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1809 /* 1810 * This range is out-of-date on local component, 1811 * so send request to the remote node. 1812 */ 1813 /* Remote component is 1 for now. */ 1814 ncomp = 1; 1815 } 1816 mtx_unlock(&metadata_lock); 1817 refcount_init(&hio->hio_countdown, 1); 1818 QUEUE_INSERT1(hio, send, ncomp); 1819 1820 /* 1821 * Let's wait for READ to finish. 1822 */ 1823 mtx_lock(&sync_lock); 1824 while (!ISSYNCREQDONE(hio)) 1825 cv_wait(&sync_cond, &sync_lock); 1826 mtx_unlock(&sync_lock); 1827 1828 if (hio->hio_errors[ncomp] != 0) { 1829 pjdlog_error("Unable to read synchronization data: %s.", 1830 strerror(hio->hio_errors[ncomp])); 1831 goto free_queue; 1832 } 1833 1834 /* 1835 * We read the data from synchronization source, now write it 1836 * to synchronization target. 1837 */ 1838 SYNCREQ(hio); 1839 ggio->gctl_cmd = BIO_WRITE; 1840 for (ii = 0; ii < ncomps; ii++) 1841 hio->hio_errors[ii] = EINVAL; 1842 reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", 1843 hio); 1844 pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", 1845 hio); 1846 mtx_lock(&metadata_lock); 1847 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { 1848 /* 1849 * This range is up-to-date on local component, 1850 * so we update remote component. 1851 */ 1852 /* Remote component is 1 for now. */ 1853 ncomp = 1; 1854 } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { 1855 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); 1856 /* 1857 * This range is out-of-date on local component, 1858 * so we update it. 1859 */ 1860 /* Local component is 0 for now. */ 1861 ncomp = 0; 1862 } 1863 mtx_unlock(&metadata_lock); 1864 1865 pjdlog_debug(2, "sync: (%p) Moving request to the send queues.", 1866 hio); 1867 refcount_init(&hio->hio_countdown, 1); 1868 QUEUE_INSERT1(hio, send, ncomp); 1869 1870 /* 1871 * Let's wait for WRITE to finish. 1872 */ 1873 mtx_lock(&sync_lock); 1874 while (!ISSYNCREQDONE(hio)) 1875 cv_wait(&sync_cond, &sync_lock); 1876 mtx_unlock(&sync_lock); 1877 1878 if (hio->hio_errors[ncomp] != 0) { 1879 pjdlog_error("Unable to write synchronization data: %s.", 1880 strerror(hio->hio_errors[ncomp])); 1881 goto free_queue; 1882 } 1883 1884 synced += length; 1885free_queue: 1886 mtx_lock(&range_lock); 1887 rangelock_del(range_sync, offset, length); 1888 if (range_regular_wait) 1889 cv_signal(&range_regular_cond); 1890 mtx_unlock(&range_lock); 1891 pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", 1892 hio); 1893 QUEUE_INSERT2(hio, free); 1894 } 1895 /* NOTREACHED */ 1896 return (NULL); 1897} 1898 1899void 1900primary_config_reload(struct hast_resource *res, struct nv *nv) 1901{ 1902 unsigned int ii, ncomps; 1903 int modified, vint; 1904 const char *vstr; 1905 1906 pjdlog_info("Reloading configuration..."); 1907 1908 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); 1909 PJDLOG_ASSERT(gres == res); 1910 nv_assert(nv, "remoteaddr"); 1911 nv_assert(nv, "replication"); 1912 nv_assert(nv, "timeout"); 1913 nv_assert(nv, "exec"); 1914 1915 ncomps = HAST_NCOMPONENTS; 1916 1917#define MODIFIED_REMOTEADDR 0x1 1918#define MODIFIED_REPLICATION 0x2 1919#define MODIFIED_TIMEOUT 0x4 1920#define MODIFIED_EXEC 0x8 1921 modified = 0; 1922 1923 vstr = nv_get_string(nv, "remoteaddr"); 1924 if (strcmp(gres->hr_remoteaddr, vstr) != 0) { 1925 /* 1926 * Don't copy res->hr_remoteaddr to gres just yet. 1927 * We want remote_close() to log disconnect from the old 1928 * addresses, not from the new ones. 1929 */ 1930 modified |= MODIFIED_REMOTEADDR; 1931 } 1932 vint = nv_get_int32(nv, "replication"); 1933 if (gres->hr_replication != vint) { 1934 gres->hr_replication = vint; 1935 modified |= MODIFIED_REPLICATION; 1936 } 1937 vint = nv_get_int32(nv, "timeout"); 1938 if (gres->hr_timeout != vint) { 1939 gres->hr_timeout = vint; 1940 modified |= MODIFIED_TIMEOUT; 1941 } 1942 vstr = nv_get_string(nv, "exec"); 1943 if (strcmp(gres->hr_exec, vstr) != 0) { 1944 strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); 1945 modified |= MODIFIED_EXEC; 1946 } 1947 1948 /* 1949 * If only timeout was modified we only need to change it without 1950 * reconnecting. 1951 */ 1952 if (modified == MODIFIED_TIMEOUT) { 1953 for (ii = 0; ii < ncomps; ii++) { 1954 if (!ISREMOTE(ii)) 1955 continue; 1956 rw_rlock(&hio_remote_lock[ii]); 1957 if (!ISCONNECTED(gres, ii)) { 1958 rw_unlock(&hio_remote_lock[ii]); 1959 continue; 1960 } 1961 rw_unlock(&hio_remote_lock[ii]); 1962 if (proto_timeout(gres->hr_remotein, 1963 gres->hr_timeout) < 0) { 1964 pjdlog_errno(LOG_WARNING, 1965 "Unable to set connection timeout"); 1966 } 1967 if (proto_timeout(gres->hr_remoteout, 1968 gres->hr_timeout) < 0) { 1969 pjdlog_errno(LOG_WARNING, 1970 "Unable to set connection timeout"); 1971 } 1972 } 1973 } else if ((modified & 1974 (MODIFIED_REMOTEADDR | MODIFIED_REPLICATION)) != 0) { 1975 for (ii = 0; ii < ncomps; ii++) { 1976 if (!ISREMOTE(ii)) 1977 continue; 1978 remote_close(gres, ii); 1979 } 1980 if (modified & MODIFIED_REMOTEADDR) { 1981 vstr = nv_get_string(nv, "remoteaddr"); 1982 strlcpy(gres->hr_remoteaddr, vstr, 1983 sizeof(gres->hr_remoteaddr)); 1984 } 1985 } 1986#undef MODIFIED_REMOTEADDR 1987#undef MODIFIED_REPLICATION 1988#undef MODIFIED_TIMEOUT 1989#undef MODIFIED_EXEC 1990 1991 pjdlog_info("Configuration reloaded successfully."); 1992} 1993 1994static void 1995guard_one(struct hast_resource *res, unsigned int ncomp) 1996{ 1997 struct proto_conn *in, *out; 1998 1999 if (!ISREMOTE(ncomp)) 2000 return; 2001 2002 rw_rlock(&hio_remote_lock[ncomp]); 2003 2004 if (!real_remote(res)) { 2005 rw_unlock(&hio_remote_lock[ncomp]); 2006 return; 2007 } 2008 2009 if (ISCONNECTED(res, ncomp)) { 2010 PJDLOG_ASSERT(res->hr_remotein != NULL); 2011 PJDLOG_ASSERT(res->hr_remoteout != NULL); 2012 rw_unlock(&hio_remote_lock[ncomp]); 2013 pjdlog_debug(2, "remote_guard: Connection to %s is ok.", 2014 res->hr_remoteaddr); 2015 return; 2016 } 2017 2018 PJDLOG_ASSERT(res->hr_remotein == NULL); 2019 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2020 /* 2021 * Upgrade the lock. It doesn't have to be atomic as no other thread 2022 * can change connection status from disconnected to connected. 2023 */ 2024 rw_unlock(&hio_remote_lock[ncomp]); 2025 pjdlog_debug(2, "remote_guard: Reconnecting to %s.", 2026 res->hr_remoteaddr); 2027 in = out = NULL; 2028 if (init_remote(res, &in, &out)) { 2029 rw_wlock(&hio_remote_lock[ncomp]); 2030 PJDLOG_ASSERT(res->hr_remotein == NULL); 2031 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2032 PJDLOG_ASSERT(in != NULL && out != NULL); 2033 res->hr_remotein = in; 2034 res->hr_remoteout = out; 2035 rw_unlock(&hio_remote_lock[ncomp]); 2036 pjdlog_info("Successfully reconnected to %s.", 2037 res->hr_remoteaddr); 2038 sync_start(); 2039 } else { 2040 /* Both connections should be NULL. */ 2041 PJDLOG_ASSERT(res->hr_remotein == NULL); 2042 PJDLOG_ASSERT(res->hr_remoteout == NULL); 2043 PJDLOG_ASSERT(in == NULL && out == NULL); 2044 pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", 2045 res->hr_remoteaddr); 2046 } 2047} 2048 2049/* 2050 * Thread guards remote connections and reconnects when needed, handles 2051 * signals, etc. 2052 */ 2053static void * 2054guard_thread(void *arg) 2055{ 2056 struct hast_resource *res = arg; 2057 unsigned int ii, ncomps; 2058 struct timespec timeout; 2059 time_t lastcheck, now; 2060 sigset_t mask; 2061 int signo; 2062 2063 ncomps = HAST_NCOMPONENTS; 2064 lastcheck = time(NULL); 2065 2066 PJDLOG_VERIFY(sigemptyset(&mask) == 0); 2067 PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 2068 PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 2069 2070 timeout.tv_sec = RETRY_SLEEP; 2071 timeout.tv_nsec = 0; 2072 signo = -1; 2073 2074 for (;;) { 2075 switch (signo) { 2076 case SIGINT: 2077 case SIGTERM: 2078 sigexit_received = true; 2079 primary_exitx(EX_OK, 2080 "Termination signal received, exiting."); 2081 break; 2082 default: 2083 break; 2084 } 2085 2086 pjdlog_debug(2, "remote_guard: Checking connections."); 2087 now = time(NULL); 2088 if (lastcheck + RETRY_SLEEP <= now) { 2089 for (ii = 0; ii < ncomps; ii++) 2090 guard_one(res, ii); 2091 lastcheck = now; 2092 } 2093 signo = sigtimedwait(&mask, NULL, &timeout); 2094 } 2095 /* NOTREACHED */ 2096 return (NULL); 2097} 2098