1243730Srwatson/*- 2243730Srwatson * Copyright (c) 2012 The FreeBSD Foundation 3243730Srwatson * All rights reserved. 4243730Srwatson * 5243730Srwatson * This software was developed by Pawel Jakub Dawidek under sponsorship from 6243730Srwatson * the FreeBSD Foundation. 7243730Srwatson * 8243730Srwatson * Redistribution and use in source and binary forms, with or without 9243730Srwatson * modification, are permitted provided that the following conditions 10243730Srwatson * are met: 11243730Srwatson * 1. Redistributions of source code must retain the above copyright 12243730Srwatson * notice, this list of conditions and the following disclaimer. 13243730Srwatson * 2. Redistributions in binary form must reproduce the above copyright 14243730Srwatson * notice, this list of conditions and the following disclaimer in the 15243730Srwatson * documentation and/or other materials provided with the distribution. 16243730Srwatson * 17243730Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND 18243730Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 19243730Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 20243730Srwatson * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE 21243730Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 22243730Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 23243730Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 24243730Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 25243730Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 26243730Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 27243730Srwatson * SUCH DAMAGE. 28243730Srwatson * 29243734Srwatson * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $ 30243730Srwatson */ 31243730Srwatson 32243734Srwatson#include <config/config.h> 33243730Srwatson 34243730Srwatson#include <sys/param.h> 35243730Srwatson#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) 36243730Srwatson#include <sys/endian.h> 37243730Srwatson#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 38243730Srwatson#ifdef HAVE_MACHINE_ENDIAN_H 39243730Srwatson#include <machine/endian.h> 40243730Srwatson#else /* !HAVE_MACHINE_ENDIAN_H */ 41243730Srwatson#ifdef HAVE_ENDIAN_H 42243730Srwatson#include <endian.h> 43243730Srwatson#else /* !HAVE_ENDIAN_H */ 44243730Srwatson#error "No supported endian.h" 45243730Srwatson#endif /* !HAVE_ENDIAN_H */ 46243730Srwatson#endif /* !HAVE_MACHINE_ENDIAN_H */ 47243730Srwatson#include <compat/endian.h> 48243730Srwatson#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ 49243730Srwatson#include <sys/queue.h> 50243730Srwatson#include <sys/stat.h> 51243730Srwatson#include <sys/wait.h> 52243730Srwatson 53243730Srwatson#include <stdio.h> 54243730Srwatson#include <stdlib.h> 55243730Srwatson#include <unistd.h> 56243730Srwatson 57243730Srwatson#include <ctype.h> 58243730Srwatson#include <dirent.h> 59243730Srwatson#include <err.h> 60243730Srwatson#include <errno.h> 61243730Srwatson#include <fcntl.h> 62243730Srwatson#ifdef HAVE_LIBUTIL_H 63243730Srwatson#include <libutil.h> 64243730Srwatson#endif 65243730Srwatson#include <signal.h> 66243730Srwatson#include <string.h> 67243730Srwatson#include <strings.h> 68243730Srwatson 69243730Srwatson#include <openssl/hmac.h> 70243730Srwatson 71243730Srwatson#ifndef HAVE_SIGTIMEDWAIT 72243730Srwatson#include "sigtimedwait.h" 73243730Srwatson#endif 74243730Srwatson 75243730Srwatson#include "auditdistd.h" 76243734Srwatson#include "pjdlog.h" 77243730Srwatson#include "proto.h" 78243730Srwatson#include "sandbox.h" 79243730Srwatson#include "subr.h" 80243730Srwatson#include "synch.h" 81243730Srwatson#include "trail.h" 82243730Srwatson 83243730Srwatsonstatic struct adist_config *adcfg; 84243730Srwatsonstatic struct adist_host *adhost; 85243730Srwatson 86243730Srwatsonstatic pthread_rwlock_t adist_remote_lock; 87243730Srwatsonstatic pthread_mutex_t adist_remote_mtx; 88243730Srwatsonstatic pthread_cond_t adist_remote_cond; 89243730Srwatsonstatic struct trail *adist_trail; 90243730Srwatson 91243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_free_list; 92243730Srwatsonstatic pthread_mutex_t adist_free_list_lock; 93243730Srwatsonstatic pthread_cond_t adist_free_list_cond; 94243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_send_list; 95243730Srwatsonstatic pthread_mutex_t adist_send_list_lock; 96243730Srwatsonstatic pthread_cond_t adist_send_list_cond; 97243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_recv_list; 98243730Srwatsonstatic pthread_mutex_t adist_recv_list_lock; 99243730Srwatsonstatic pthread_cond_t adist_recv_list_cond; 100243730Srwatson 101243730Srwatsonstatic void 102243730Srwatsoninit_environment(void) 103243730Srwatson{ 104243730Srwatson struct adreq *adreq; 105243730Srwatson unsigned int ii; 106243730Srwatson 107243730Srwatson rw_init(&adist_remote_lock); 108243730Srwatson mtx_init(&adist_remote_mtx); 109243730Srwatson cv_init(&adist_remote_cond); 110243730Srwatson TAILQ_INIT(&adist_free_list); 111243730Srwatson mtx_init(&adist_free_list_lock); 112243730Srwatson cv_init(&adist_free_list_cond); 113243730Srwatson TAILQ_INIT(&adist_send_list); 114243730Srwatson mtx_init(&adist_send_list_lock); 115243730Srwatson cv_init(&adist_send_list_cond); 116243730Srwatson TAILQ_INIT(&adist_recv_list); 117243730Srwatson mtx_init(&adist_recv_list_lock); 118243730Srwatson cv_init(&adist_recv_list_cond); 119243730Srwatson 120243730Srwatson for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { 121243730Srwatson adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); 122243730Srwatson if (adreq == NULL) { 123243730Srwatson pjdlog_exitx(EX_TEMPFAIL, 124243730Srwatson "Unable to allocate %zu bytes of memory for adreq object.", 125243730Srwatson sizeof(*adreq) + ADIST_BUF_SIZE); 126243730Srwatson } 127243730Srwatson adreq->adr_byteorder = ADIST_BYTEORDER; 128243730Srwatson adreq->adr_cmd = ADIST_CMD_UNDEFINED; 129243730Srwatson adreq->adr_seq = 0; 130243730Srwatson adreq->adr_datasize = 0; 131243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); 132243730Srwatson } 133243730Srwatson} 134243730Srwatson 135243730Srwatsonstatic int 136243730Srwatsonsender_connect(void) 137243730Srwatson{ 138243730Srwatson unsigned char rnd[32], hash[32], resp[32]; 139243730Srwatson struct proto_conn *conn; 140243730Srwatson char welcome[8]; 141243730Srwatson int16_t val; 142243730Srwatson 143243730Srwatson val = 1; 144243730Srwatson if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { 145243730Srwatson pjdlog_exit(EX_TEMPFAIL, 146243730Srwatson "Unable to send connection request to parent"); 147243730Srwatson } 148243730Srwatson if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { 149243730Srwatson pjdlog_exit(EX_TEMPFAIL, 150243730Srwatson "Unable to receive reply to connection request from parent"); 151243730Srwatson } 152243730Srwatson if (val != 0) { 153243730Srwatson errno = val; 154243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 155243730Srwatson adhost->adh_remoteaddr); 156243730Srwatson return (-1); 157243730Srwatson } 158243730Srwatson if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { 159243730Srwatson pjdlog_exit(EX_TEMPFAIL, 160243730Srwatson "Unable to receive connection from parent"); 161243730Srwatson } 162243730Srwatson if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { 163243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to connect to %s", 164243730Srwatson adhost->adh_remoteaddr); 165243730Srwatson proto_close(conn); 166243730Srwatson return (-1); 167243730Srwatson } 168243730Srwatson pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); 169243730Srwatson /* Error in setting timeout is not critical, but why should it fail? */ 170243730Srwatson if (proto_timeout(conn, adcfg->adc_timeout) < 0) 171243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); 172243730Srwatson else 173243730Srwatson pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); 174243730Srwatson 175243730Srwatson /* Exchange welcome message, which includes version number. */ 176243730Srwatson (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); 177243730Srwatson if (proto_send(conn, welcome, sizeof(welcome)) < 0) { 178243730Srwatson pjdlog_errno(LOG_WARNING, 179243730Srwatson "Unable to send welcome message to %s", 180243730Srwatson adhost->adh_remoteaddr); 181243730Srwatson proto_close(conn); 182243730Srwatson return (-1); 183243730Srwatson } 184243730Srwatson pjdlog_debug(1, "Welcome message sent (%s).", welcome); 185243730Srwatson bzero(welcome, sizeof(welcome)); 186243730Srwatson if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { 187243730Srwatson pjdlog_errno(LOG_WARNING, 188243730Srwatson "Unable to receive welcome message from %s", 189243730Srwatson adhost->adh_remoteaddr); 190243730Srwatson proto_close(conn); 191243730Srwatson return (-1); 192243730Srwatson } 193243730Srwatson if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || 194243730Srwatson !isdigit(welcome[6]) || welcome[7] != '\0') { 195243730Srwatson pjdlog_warning("Invalid welcome message from %s.", 196243730Srwatson adhost->adh_remoteaddr); 197243730Srwatson proto_close(conn); 198243730Srwatson return (-1); 199243730Srwatson } 200243730Srwatson pjdlog_debug(1, "Welcome message received (%s).", welcome); 201243730Srwatson /* 202243730Srwatson * Receiver can only reply with version number lower or equal to 203243730Srwatson * the one we sent. 204243730Srwatson */ 205243730Srwatson adhost->adh_version = atoi(welcome + 5); 206243730Srwatson if (adhost->adh_version > ADIST_VERSION) { 207243730Srwatson pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", 208243730Srwatson adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); 209243730Srwatson proto_close(conn); 210243730Srwatson return (-1); 211243730Srwatson } 212243730Srwatson 213243730Srwatson pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, 214243730Srwatson adhost->adh_remoteaddr); 215243730Srwatson 216243730Srwatson if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { 217243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send name to %s", 218243730Srwatson adhost->adh_remoteaddr); 219243730Srwatson proto_close(conn); 220243730Srwatson return (-1); 221243730Srwatson } 222243730Srwatson pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); 223243730Srwatson 224243730Srwatson if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { 225243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", 226243730Srwatson adhost->adh_remoteaddr); 227243730Srwatson proto_close(conn); 228243730Srwatson return (-1); 229243730Srwatson } 230243730Srwatson pjdlog_debug(1, "Challenge received."); 231243730Srwatson 232243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 233243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 234243730Srwatson NULL) == NULL) { 235243730Srwatson pjdlog_warning("Unable to generate response."); 236243730Srwatson proto_close(conn); 237243730Srwatson return (-1); 238243730Srwatson } 239243730Srwatson pjdlog_debug(1, "Response generated."); 240243730Srwatson 241243730Srwatson if (proto_send(conn, hash, sizeof(hash)) == -1) { 242243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send response to %s", 243243730Srwatson adhost->adh_remoteaddr); 244243730Srwatson proto_close(conn); 245243730Srwatson return (-1); 246243730Srwatson } 247243730Srwatson pjdlog_debug(1, "Response sent."); 248243730Srwatson 249243730Srwatson if (adist_random(rnd, sizeof(rnd)) == -1) { 250243730Srwatson pjdlog_warning("Unable to generate challenge."); 251243730Srwatson proto_close(conn); 252243730Srwatson return (-1); 253243730Srwatson } 254243730Srwatson pjdlog_debug(1, "Challenge generated."); 255243730Srwatson 256243730Srwatson if (proto_send(conn, rnd, sizeof(rnd)) == -1) { 257243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", 258243730Srwatson adhost->adh_remoteaddr); 259243730Srwatson proto_close(conn); 260243730Srwatson return (-1); 261243730Srwatson } 262243730Srwatson pjdlog_debug(1, "Challenge sent."); 263243730Srwatson 264243730Srwatson if (proto_recv(conn, resp, sizeof(resp)) == -1) { 265243730Srwatson pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", 266243730Srwatson adhost->adh_remoteaddr); 267243730Srwatson proto_close(conn); 268243730Srwatson return (-1); 269243730Srwatson } 270243730Srwatson pjdlog_debug(1, "Response received."); 271243730Srwatson 272243730Srwatson if (HMAC(EVP_sha256(), adhost->adh_password, 273243730Srwatson (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, 274243730Srwatson NULL) == NULL) { 275243730Srwatson pjdlog_warning("Unable to generate hash."); 276243730Srwatson proto_close(conn); 277243730Srwatson return (-1); 278243730Srwatson } 279243730Srwatson pjdlog_debug(1, "Hash generated."); 280243730Srwatson 281243730Srwatson if (memcmp(resp, hash, sizeof(hash)) != 0) { 282243730Srwatson pjdlog_warning("Invalid response from %s (wrong password?).", 283243730Srwatson adhost->adh_remoteaddr); 284243730Srwatson proto_close(conn); 285243730Srwatson return (-1); 286243730Srwatson } 287243730Srwatson pjdlog_info("Receiver authenticated."); 288243730Srwatson 289243730Srwatson if (proto_recv(conn, &adhost->adh_trail_offset, 290243730Srwatson sizeof(adhost->adh_trail_offset)) == -1) { 291243730Srwatson pjdlog_errno(LOG_WARNING, 292243730Srwatson "Unable to receive size of the most recent trail file from %s", 293243730Srwatson adhost->adh_remoteaddr); 294243730Srwatson proto_close(conn); 295243730Srwatson return (-1); 296243730Srwatson } 297243730Srwatson adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); 298243730Srwatson if (proto_recv(conn, &adhost->adh_trail_name, 299243730Srwatson sizeof(adhost->adh_trail_name)) == -1) { 300243730Srwatson pjdlog_errno(LOG_WARNING, 301243730Srwatson "Unable to receive name of the most recent trail file from %s", 302243730Srwatson adhost->adh_remoteaddr); 303243730Srwatson proto_close(conn); 304243730Srwatson return (-1); 305243730Srwatson } 306243730Srwatson pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", 307243730Srwatson adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); 308243730Srwatson 309243730Srwatson rw_wlock(&adist_remote_lock); 310243730Srwatson mtx_lock(&adist_remote_mtx); 311243730Srwatson PJDLOG_ASSERT(adhost->adh_remote == NULL); 312243730Srwatson PJDLOG_ASSERT(conn != NULL); 313243730Srwatson adhost->adh_remote = conn; 314243730Srwatson mtx_unlock(&adist_remote_mtx); 315243730Srwatson rw_unlock(&adist_remote_lock); 316243730Srwatson cv_signal(&adist_remote_cond); 317243730Srwatson 318243730Srwatson return (0); 319243730Srwatson} 320243730Srwatson 321243730Srwatsonstatic void 322243730Srwatsonsender_disconnect(void) 323243730Srwatson{ 324243730Srwatson 325243730Srwatson rw_wlock(&adist_remote_lock); 326243730Srwatson /* 327243730Srwatson * Check for a race between dropping rlock and acquiring wlock - 328243730Srwatson * another thread can close connection in-between. 329243730Srwatson */ 330243730Srwatson if (adhost->adh_remote == NULL) { 331243730Srwatson rw_unlock(&adist_remote_lock); 332243730Srwatson return; 333243730Srwatson } 334243730Srwatson pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); 335243730Srwatson proto_close(adhost->adh_remote); 336243730Srwatson mtx_lock(&adist_remote_mtx); 337243730Srwatson adhost->adh_remote = NULL; 338243730Srwatson adhost->adh_reset = true; 339243730Srwatson adhost->adh_trail_name[0] = '\0'; 340243730Srwatson adhost->adh_trail_offset = 0; 341243730Srwatson mtx_unlock(&adist_remote_mtx); 342243730Srwatson rw_unlock(&adist_remote_lock); 343243730Srwatson 344243730Srwatson pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); 345243730Srwatson 346243730Srwatson /* Move all in-flight requests back onto free list. */ 347243730Srwatson mtx_lock(&adist_free_list_lock); 348243730Srwatson mtx_lock(&adist_send_list_lock); 349243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); 350243730Srwatson mtx_unlock(&adist_send_list_lock); 351243730Srwatson mtx_lock(&adist_recv_list_lock); 352243730Srwatson TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); 353243730Srwatson mtx_unlock(&adist_recv_list_lock); 354243730Srwatson mtx_unlock(&adist_free_list_lock); 355243730Srwatson} 356243730Srwatson 357243730Srwatsonstatic void 358243730Srwatsonadreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, 359243730Srwatson size_t size) 360243730Srwatson{ 361243730Srwatson static uint64_t seq = 1; 362243730Srwatson 363243730Srwatson PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); 364243730Srwatson 365243730Srwatson switch (cmd) { 366243730Srwatson case ADIST_CMD_OPEN: 367243730Srwatson case ADIST_CMD_CLOSE: 368243730Srwatson PJDLOG_ASSERT(data != NULL && size == 0); 369243730Srwatson size = strlen(data) + 1; 370243730Srwatson break; 371243730Srwatson case ADIST_CMD_APPEND: 372243730Srwatson PJDLOG_ASSERT(data != NULL && size > 0); 373243730Srwatson break; 374243730Srwatson case ADIST_CMD_KEEPALIVE: 375243730Srwatson case ADIST_CMD_ERROR: 376243730Srwatson PJDLOG_ASSERT(data == NULL && size == 0); 377243730Srwatson break; 378243730Srwatson default: 379243730Srwatson PJDLOG_ABORT("Invalid command (%hhu).", cmd); 380243730Srwatson } 381243730Srwatson 382243730Srwatson adreq->adr_cmd = cmd; 383243730Srwatson adreq->adr_seq = seq++; 384243730Srwatson adreq->adr_datasize = size; 385243730Srwatson /* Don't copy if data is already in out buffer. */ 386243730Srwatson if (data != NULL && data != adreq->adr_data) 387243730Srwatson bcopy(data, adreq->adr_data, size); 388243730Srwatson} 389243730Srwatson 390243730Srwatsonstatic bool 391243730Srwatsonread_thread_wait(void) 392243730Srwatson{ 393243730Srwatson bool newfile = false; 394243730Srwatson 395243730Srwatson mtx_lock(&adist_remote_mtx); 396243730Srwatson if (adhost->adh_reset) { 397247442Spjdreset: 398243730Srwatson adhost->adh_reset = false; 399243730Srwatson if (trail_filefd(adist_trail) != -1) 400243730Srwatson trail_close(adist_trail); 401243730Srwatson trail_reset(adist_trail); 402243730Srwatson while (adhost->adh_remote == NULL) 403243730Srwatson cv_wait(&adist_remote_cond, &adist_remote_mtx); 404243730Srwatson trail_start(adist_trail, adhost->adh_trail_name, 405243730Srwatson adhost->adh_trail_offset); 406243730Srwatson newfile = true; 407243730Srwatson } 408243730Srwatson mtx_unlock(&adist_remote_mtx); 409243730Srwatson while (trail_filefd(adist_trail) == -1) { 410243730Srwatson newfile = true; 411243730Srwatson wait_for_dir(); 412247442Spjd /* 413247442Spjd * We may have been disconnected and reconnected in the 414247442Spjd * meantime, check if reset is set. 415247442Spjd */ 416247442Spjd mtx_lock(&adist_remote_mtx); 417247442Spjd if (adhost->adh_reset) 418247442Spjd goto reset; 419247442Spjd mtx_unlock(&adist_remote_mtx); 420243730Srwatson if (trail_filefd(adist_trail) == -1) 421243730Srwatson trail_next(adist_trail); 422243730Srwatson } 423243730Srwatson if (newfile) { 424243730Srwatson pjdlog_debug(1, "Trail file \"%s/%s\" opened.", 425243730Srwatson adhost->adh_directory, 426243730Srwatson trail_filename(adist_trail)); 427243730Srwatson (void)wait_for_file_init(trail_filefd(adist_trail)); 428243730Srwatson } 429243730Srwatson return (newfile); 430243730Srwatson} 431243730Srwatson 432243730Srwatsonstatic void * 433243730Srwatsonread_thread(void *arg __unused) 434243730Srwatson{ 435243730Srwatson struct adreq *adreq; 436243730Srwatson ssize_t done; 437243730Srwatson bool newfile; 438243730Srwatson 439243730Srwatson pjdlog_debug(1, "%s started.", __func__); 440243730Srwatson 441243730Srwatson for (;;) { 442243730Srwatson newfile = read_thread_wait(); 443243730Srwatson QUEUE_TAKE(adreq, &adist_free_list, 0); 444243730Srwatson if (newfile) { 445243730Srwatson adreq_fill(adreq, ADIST_CMD_OPEN, 446243730Srwatson trail_filename(adist_trail), 0); 447243730Srwatson newfile = false; 448243730Srwatson goto move; 449243730Srwatson } 450243730Srwatson 451243730Srwatson done = read(trail_filefd(adist_trail), adreq->adr_data, 452243730Srwatson ADIST_BUF_SIZE); 453243730Srwatson if (done == -1) { 454243730Srwatson off_t offset; 455243730Srwatson int error; 456243730Srwatson 457243730Srwatson error = errno; 458243730Srwatson offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); 459243730Srwatson errno = error; 460243730Srwatson pjdlog_errno(LOG_ERR, 461243730Srwatson "Error while reading \"%s/%s\" at offset %jd", 462243730Srwatson adhost->adh_directory, trail_filename(adist_trail), 463243730Srwatson offset); 464243730Srwatson trail_close(adist_trail); 465243730Srwatson adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); 466243730Srwatson goto move; 467243730Srwatson } else if (done == 0) { 468243730Srwatson /* End of file. */ 469243730Srwatson pjdlog_debug(3, "End of \"%s/%s\".", 470243730Srwatson adhost->adh_directory, trail_filename(adist_trail)); 471243730Srwatson if (!trail_switch(adist_trail)) { 472243730Srwatson /* More audit records can arrive. */ 473243730Srwatson mtx_lock(&adist_free_list_lock); 474243730Srwatson TAILQ_INSERT_TAIL(&adist_free_list, adreq, 475243730Srwatson adr_next); 476243730Srwatson mtx_unlock(&adist_free_list_lock); 477243730Srwatson wait_for_file(); 478243730Srwatson continue; 479243730Srwatson } 480243730Srwatson adreq_fill(adreq, ADIST_CMD_CLOSE, 481243730Srwatson trail_filename(adist_trail), 0); 482243730Srwatson trail_close(adist_trail); 483243730Srwatson goto move; 484243730Srwatson } 485243730Srwatson 486243730Srwatson adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); 487243730Srwatsonmove: 488243730Srwatson pjdlog_debug(3, 489243730Srwatson "read thread: Moving request %p to the send queue (%hhu).", 490243730Srwatson adreq, adreq->adr_cmd); 491243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 492243730Srwatson } 493243730Srwatson /* NOTREACHED */ 494243730Srwatson return (NULL); 495243730Srwatson} 496243730Srwatson 497243730Srwatsonstatic void 498243730Srwatsonkeepalive_send(void) 499243730Srwatson{ 500243730Srwatson struct adreq *adreq; 501243730Srwatson 502243730Srwatson rw_rlock(&adist_remote_lock); 503243730Srwatson if (adhost->adh_remote == NULL) { 504243730Srwatson rw_unlock(&adist_remote_lock); 505243730Srwatson return; 506243730Srwatson } 507243730Srwatson rw_unlock(&adist_remote_lock); 508243730Srwatson 509243730Srwatson mtx_lock(&adist_free_list_lock); 510243730Srwatson adreq = TAILQ_FIRST(&adist_free_list); 511243730Srwatson if (adreq != NULL) 512243730Srwatson TAILQ_REMOVE(&adist_free_list, adreq, adr_next); 513243730Srwatson mtx_unlock(&adist_free_list_lock); 514243730Srwatson if (adreq == NULL) 515243730Srwatson return; 516243730Srwatson 517243730Srwatson adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); 518243730Srwatson 519243730Srwatson QUEUE_INSERT(adreq, &adist_send_list); 520243730Srwatson 521243730Srwatson pjdlog_debug(3, "keepalive_send: Request sent."); 522243730Srwatson} 523243730Srwatson 524243730Srwatson/* 525243730Srwatson * Thread sends request to secondary node. 526243730Srwatson */ 527243730Srwatsonstatic void * 528243730Srwatsonsend_thread(void *arg __unused) 529243730Srwatson{ 530243730Srwatson time_t lastcheck, now; 531243730Srwatson struct adreq *adreq; 532243730Srwatson 533243730Srwatson pjdlog_debug(1, "%s started.", __func__); 534243730Srwatson 535243730Srwatson lastcheck = time(NULL); 536243730Srwatson 537243730Srwatson for (;;) { 538243730Srwatson pjdlog_debug(3, "send thread: Taking request."); 539243730Srwatson for (;;) { 540243730Srwatson QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); 541243730Srwatson if (adreq != NULL) 542243730Srwatson break; 543243730Srwatson now = time(NULL); 544243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 545243730Srwatson keepalive_send(); 546243730Srwatson lastcheck = now; 547243730Srwatson } 548243730Srwatson } 549243730Srwatson PJDLOG_ASSERT(adreq != NULL); 550243730Srwatson pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, 551243730Srwatson adreq->adr_cmd); 552243730Srwatson /* 553243730Srwatson * Protect connection from disappearing. 554243730Srwatson */ 555243730Srwatson rw_rlock(&adist_remote_lock); 556243730Srwatson /* 557243730Srwatson * Move the request to the recv queue first to avoid race 558243730Srwatson * where the recv thread receives the reply before we move 559243730Srwatson * the request to the recv queue. 560243730Srwatson */ 561243730Srwatson QUEUE_INSERT(adreq, &adist_recv_list); 562243730Srwatson if (adhost->adh_remote == NULL || 563243730Srwatson proto_send(adhost->adh_remote, &adreq->adr_packet, 564243730Srwatson ADPKT_SIZE(adreq)) == -1) { 565243730Srwatson rw_unlock(&adist_remote_lock); 566243730Srwatson pjdlog_debug(1, 567243730Srwatson "send thread: (%p) Unable to send request.", adreq); 568243730Srwatson if (adhost->adh_remote != NULL) 569243730Srwatson sender_disconnect(); 570243730Srwatson continue; 571243730Srwatson } else { 572243730Srwatson pjdlog_debug(3, "Request %p sent successfully.", adreq); 573243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 574243730Srwatson "send: (%p) Request sent: ", adreq); 575243730Srwatson rw_unlock(&adist_remote_lock); 576243730Srwatson } 577243730Srwatson } 578243730Srwatson /* NOTREACHED */ 579243730Srwatson return (NULL); 580243730Srwatson} 581243730Srwatson 582243730Srwatsonstatic void 583243730Srwatsonadrep_decode_header(struct adrep *adrep) 584243730Srwatson{ 585243730Srwatson 586243730Srwatson /* Byte-swap only is the receiver is using different byte order. */ 587243730Srwatson if (adrep->adrp_byteorder != ADIST_BYTEORDER) { 588243730Srwatson adrep->adrp_byteorder = ADIST_BYTEORDER; 589243730Srwatson adrep->adrp_seq = bswap64(adrep->adrp_seq); 590243730Srwatson adrep->adrp_error = bswap16(adrep->adrp_error); 591243730Srwatson } 592243730Srwatson} 593243730Srwatson 594243730Srwatson/* 595243730Srwatson * Thread receives answer from secondary node and passes it to ggate_send 596243730Srwatson * thread. 597243730Srwatson */ 598243730Srwatsonstatic void * 599243730Srwatsonrecv_thread(void *arg __unused) 600243730Srwatson{ 601243730Srwatson struct adrep adrep; 602243730Srwatson struct adreq *adreq; 603243730Srwatson 604243730Srwatson pjdlog_debug(1, "%s started.", __func__); 605243730Srwatson 606243730Srwatson for (;;) { 607243730Srwatson /* Wait until there is anything to receive. */ 608243730Srwatson QUEUE_WAIT(&adist_recv_list); 609243730Srwatson pjdlog_debug(3, "recv thread: Got something."); 610243730Srwatson rw_rlock(&adist_remote_lock); 611243730Srwatson if (adhost->adh_remote == NULL) { 612243730Srwatson /* 613243730Srwatson * Connection is dead. 614243730Srwatson * XXX: We shouldn't be here. 615243730Srwatson */ 616243730Srwatson rw_unlock(&adist_remote_lock); 617243730Srwatson continue; 618243730Srwatson } 619243730Srwatson if (proto_recv(adhost->adh_remote, &adrep, 620243730Srwatson sizeof(adrep)) == -1) { 621243730Srwatson rw_unlock(&adist_remote_lock); 622243730Srwatson pjdlog_errno(LOG_ERR, "Unable to receive reply"); 623243730Srwatson sender_disconnect(); 624243730Srwatson continue; 625243730Srwatson } 626243730Srwatson rw_unlock(&adist_remote_lock); 627243730Srwatson adrep_decode_header(&adrep); 628243730Srwatson /* 629243730Srwatson * Find the request that was just confirmed. 630243730Srwatson */ 631243730Srwatson mtx_lock(&adist_recv_list_lock); 632243730Srwatson TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { 633243730Srwatson if (adreq->adr_seq == adrep.adrp_seq) { 634243730Srwatson TAILQ_REMOVE(&adist_recv_list, adreq, 635243730Srwatson adr_next); 636243730Srwatson break; 637243730Srwatson } 638243730Srwatson } 639243730Srwatson if (adreq == NULL) { 640243730Srwatson /* 641243730Srwatson * If we disconnected in the meantime, just continue. 642243730Srwatson * On disconnect sender_disconnect() clears the queue, 643243730Srwatson * we can use that. 644243730Srwatson */ 645243730Srwatson if (TAILQ_EMPTY(&adist_recv_list)) { 646243730Srwatson rw_unlock(&adist_remote_lock); 647243730Srwatson continue; 648243730Srwatson } 649243730Srwatson mtx_unlock(&adist_recv_list_lock); 650243730Srwatson pjdlog_error("Found no request matching received 'seq' field (%ju).", 651243730Srwatson (uintmax_t)adrep.adrp_seq); 652243730Srwatson sender_disconnect(); 653243730Srwatson continue; 654243730Srwatson } 655243730Srwatson mtx_unlock(&adist_recv_list_lock); 656243730Srwatson adreq_log(LOG_DEBUG, 2, -1, adreq, 657243730Srwatson "recv thread: (%p) Request confirmed: ", adreq); 658243730Srwatson pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, 659243730Srwatson adreq->adr_cmd); 660243730Srwatson if (adrep.adrp_error != 0) { 661243730Srwatson pjdlog_error("Receiver returned error (%s), disconnecting.", 662243730Srwatson adist_errstr((int)adrep.adrp_error)); 663243730Srwatson sender_disconnect(); 664243730Srwatson continue; 665243730Srwatson } 666243730Srwatson if (adreq->adr_cmd == ADIST_CMD_CLOSE) 667243730Srwatson trail_unlink(adist_trail, adreq->adr_data); 668243730Srwatson pjdlog_debug(3, "Request received successfully."); 669243730Srwatson QUEUE_INSERT(adreq, &adist_free_list); 670243730Srwatson } 671243730Srwatson /* NOTREACHED */ 672243730Srwatson return (NULL); 673243730Srwatson} 674243730Srwatson 675243730Srwatsonstatic void 676243730Srwatsonguard_check_connection(void) 677243730Srwatson{ 678243730Srwatson 679243730Srwatson PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); 680243730Srwatson 681243730Srwatson rw_rlock(&adist_remote_lock); 682243730Srwatson if (adhost->adh_remote != NULL) { 683243730Srwatson rw_unlock(&adist_remote_lock); 684243730Srwatson pjdlog_debug(3, "remote_guard: Connection to %s is ok.", 685243730Srwatson adhost->adh_remoteaddr); 686243730Srwatson return; 687243730Srwatson } 688243730Srwatson 689243730Srwatson /* 690243730Srwatson * Upgrade the lock. It doesn't have to be atomic as no other thread 691243730Srwatson * can change connection status from disconnected to connected. 692243730Srwatson */ 693243730Srwatson rw_unlock(&adist_remote_lock); 694243730Srwatson pjdlog_debug(1, "remote_guard: Reconnecting to %s.", 695243730Srwatson adhost->adh_remoteaddr); 696243730Srwatson if (sender_connect() == 0) { 697243730Srwatson pjdlog_info("Successfully reconnected to %s.", 698243730Srwatson adhost->adh_remoteaddr); 699243730Srwatson } else { 700243730Srwatson pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", 701243730Srwatson adhost->adh_remoteaddr); 702243730Srwatson } 703243730Srwatson} 704243730Srwatson 705243730Srwatson/* 706243730Srwatson * Thread guards remote connections and reconnects when needed, handles 707243730Srwatson * signals, etc. 708243730Srwatson */ 709243730Srwatsonstatic void * 710243730Srwatsonguard_thread(void *arg __unused) 711243730Srwatson{ 712243730Srwatson struct timespec timeout; 713243730Srwatson time_t lastcheck, now; 714243730Srwatson sigset_t mask; 715243730Srwatson int signo; 716243730Srwatson 717243730Srwatson lastcheck = time(NULL); 718243730Srwatson 719243730Srwatson PJDLOG_VERIFY(sigemptyset(&mask) == 0); 720243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); 721243730Srwatson PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); 722243730Srwatson 723243730Srwatson timeout.tv_sec = ADIST_KEEPALIVE; 724243730Srwatson timeout.tv_nsec = 0; 725243730Srwatson signo = -1; 726243730Srwatson 727243730Srwatson for (;;) { 728243730Srwatson switch (signo) { 729243730Srwatson case SIGINT: 730243730Srwatson case SIGTERM: 731243730Srwatson sigexit_received = true; 732243730Srwatson pjdlog_exitx(EX_OK, 733243730Srwatson "Termination signal received, exiting."); 734243730Srwatson break; 735243730Srwatson default: 736243730Srwatson break; 737243730Srwatson } 738243730Srwatson 739243730Srwatson pjdlog_debug(3, "remote_guard: Checking connections."); 740243730Srwatson now = time(NULL); 741243730Srwatson if (lastcheck + ADIST_KEEPALIVE <= now) { 742243730Srwatson guard_check_connection(); 743243730Srwatson lastcheck = now; 744243730Srwatson } 745243730Srwatson signo = sigtimedwait(&mask, NULL, &timeout); 746243730Srwatson } 747243730Srwatson /* NOTREACHED */ 748243730Srwatson return (NULL); 749243730Srwatson} 750243730Srwatson 751243730Srwatsonvoid 752243730Srwatsonadist_sender(struct adist_config *config, struct adist_host *adh) 753243730Srwatson{ 754243730Srwatson pthread_t td; 755243730Srwatson pid_t pid; 756243730Srwatson int error, mode, debuglevel; 757243730Srwatson 758243730Srwatson /* 759243730Srwatson * Create communication channel for sending connection requests from 760243730Srwatson * child to parent. 761243730Srwatson */ 762243730Srwatson if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { 763243730Srwatson pjdlog_errno(LOG_ERR, 764243730Srwatson "Unable to create connection sockets between child and parent"); 765243730Srwatson return; 766243730Srwatson } 767243730Srwatson 768243730Srwatson pid = fork(); 769243730Srwatson if (pid == -1) { 770243730Srwatson pjdlog_errno(LOG_ERR, "Unable to fork"); 771243730Srwatson proto_close(adh->adh_conn); 772243730Srwatson adh->adh_conn = NULL; 773243730Srwatson return; 774243730Srwatson } 775243730Srwatson 776243730Srwatson if (pid > 0) { 777243730Srwatson /* This is parent. */ 778243730Srwatson adh->adh_worker_pid = pid; 779243730Srwatson /* Declare that we are receiver. */ 780243730Srwatson proto_recv(adh->adh_conn, NULL, 0); 781243730Srwatson return; 782243730Srwatson } 783243730Srwatson 784243730Srwatson adcfg = config; 785243730Srwatson adhost = adh; 786243730Srwatson 787243730Srwatson mode = pjdlog_mode_get(); 788243730Srwatson debuglevel = pjdlog_debug_get(); 789243730Srwatson 790243730Srwatson /* Declare that we are sender. */ 791243730Srwatson proto_send(adhost->adh_conn, NULL, 0); 792243730Srwatson 793243730Srwatson descriptors_cleanup(adhost); 794243730Srwatson 795243730Srwatson#ifdef TODO 796243730Srwatson descriptors_assert(adhost, mode); 797243730Srwatson#endif 798243730Srwatson 799243730Srwatson pjdlog_init(mode); 800243730Srwatson pjdlog_debug_set(debuglevel); 801243730Srwatson pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, 802243730Srwatson role2str(adhost->adh_role)); 803243730Srwatson#ifdef HAVE_SETPROCTITLE 804243730Srwatson setproctitle("[%s] (%s) ", adhost->adh_name, 805243730Srwatson role2str(adhost->adh_role)); 806243730Srwatson#endif 807243730Srwatson 808243730Srwatson /* 809243730Srwatson * The sender process should be able to remove entries from its 810243730Srwatson * trail directory, but it should not be able to write to the 811243730Srwatson * trail files, only read from them. 812243730Srwatson */ 813243730Srwatson adist_trail = trail_new(adhost->adh_directory, false); 814243730Srwatson if (adist_trail == NULL) 815243730Srwatson exit(EX_OSFILE); 816243730Srwatson 817243730Srwatson if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", 818243730Srwatson role2str(adhost->adh_role), adhost->adh_name) != 0) { 819243730Srwatson exit(EX_CONFIG); 820243730Srwatson } 821243730Srwatson pjdlog_info("Privileges successfully dropped."); 822243730Srwatson 823243730Srwatson /* 824243730Srwatson * We can ignore wait_for_dir_init() failures. It will fall back to 825243730Srwatson * using sleep(3). 826243730Srwatson */ 827243730Srwatson (void)wait_for_dir_init(trail_dirfd(adist_trail)); 828243730Srwatson 829243730Srwatson init_environment(); 830243730Srwatson if (sender_connect() == 0) { 831243730Srwatson pjdlog_info("Successfully connected to %s.", 832243730Srwatson adhost->adh_remoteaddr); 833243730Srwatson } 834243730Srwatson adhost->adh_reset = true; 835243730Srwatson 836243730Srwatson /* 837243730Srwatson * Create the guard thread first, so we can handle signals from the 838243730Srwatson * very begining. 839243730Srwatson */ 840243730Srwatson error = pthread_create(&td, NULL, guard_thread, NULL); 841243730Srwatson PJDLOG_ASSERT(error == 0); 842243730Srwatson error = pthread_create(&td, NULL, send_thread, NULL); 843243730Srwatson PJDLOG_ASSERT(error == 0); 844243730Srwatson error = pthread_create(&td, NULL, recv_thread, NULL); 845243730Srwatson PJDLOG_ASSERT(error == 0); 846243730Srwatson (void)read_thread(NULL); 847243730Srwatson} 848