1243730Srwatson/*-
2243730Srwatson * Copyright (c) 2012 The FreeBSD Foundation
3243730Srwatson * All rights reserved.
4243730Srwatson *
5243730Srwatson * This software was developed by Pawel Jakub Dawidek under sponsorship from
6243730Srwatson * the FreeBSD Foundation.
7243730Srwatson *
8243730Srwatson * Redistribution and use in source and binary forms, with or without
9243730Srwatson * modification, are permitted provided that the following conditions
10243730Srwatson * are met:
11243730Srwatson * 1. Redistributions of source code must retain the above copyright
12243730Srwatson *    notice, this list of conditions and the following disclaimer.
13243730Srwatson * 2. Redistributions in binary form must reproduce the above copyright
14243730Srwatson *    notice, this list of conditions and the following disclaimer in the
15243730Srwatson *    documentation and/or other materials provided with the distribution.
16243730Srwatson *
17243730Srwatson * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
18243730Srwatson * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19243730Srwatson * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20243730Srwatson * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
21243730Srwatson * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22243730Srwatson * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23243730Srwatson * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24243730Srwatson * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25243730Srwatson * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26243730Srwatson * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27243730Srwatson * SUCH DAMAGE.
28243730Srwatson *
29243734Srwatson * $P4: //depot/projects/trustedbsd/openbsm/bin/auditdistd/sender.c#3 $
30243730Srwatson */
31243730Srwatson
32243734Srwatson#include <config/config.h>
33243730Srwatson
34243730Srwatson#include <sys/param.h>
35243730Srwatson#if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP)
36243730Srwatson#include <sys/endian.h>
37243730Srwatson#else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
38243730Srwatson#ifdef HAVE_MACHINE_ENDIAN_H
39243730Srwatson#include <machine/endian.h>
40243730Srwatson#else /* !HAVE_MACHINE_ENDIAN_H */
41243730Srwatson#ifdef HAVE_ENDIAN_H
42243730Srwatson#include <endian.h>
43243730Srwatson#else /* !HAVE_ENDIAN_H */
44243730Srwatson#error "No supported endian.h"
45243730Srwatson#endif /* !HAVE_ENDIAN_H */
46243730Srwatson#endif /* !HAVE_MACHINE_ENDIAN_H */
47243730Srwatson#include <compat/endian.h>
48243730Srwatson#endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */
49243730Srwatson#include <sys/queue.h>
50243730Srwatson#include <sys/stat.h>
51243730Srwatson#include <sys/wait.h>
52243730Srwatson
53243730Srwatson#include <stdio.h>
54243730Srwatson#include <stdlib.h>
55243730Srwatson#include <unistd.h>
56243730Srwatson
57243730Srwatson#include <ctype.h>
58243730Srwatson#include <dirent.h>
59243730Srwatson#include <err.h>
60243730Srwatson#include <errno.h>
61243730Srwatson#include <fcntl.h>
62243730Srwatson#ifdef HAVE_LIBUTIL_H
63243730Srwatson#include <libutil.h>
64243730Srwatson#endif
65243730Srwatson#include <signal.h>
66243730Srwatson#include <string.h>
67243730Srwatson#include <strings.h>
68243730Srwatson
69243730Srwatson#include <openssl/hmac.h>
70243730Srwatson
71243730Srwatson#ifndef HAVE_SIGTIMEDWAIT
72243730Srwatson#include "sigtimedwait.h"
73243730Srwatson#endif
74243730Srwatson
75243730Srwatson#include "auditdistd.h"
76243734Srwatson#include "pjdlog.h"
77243730Srwatson#include "proto.h"
78243730Srwatson#include "sandbox.h"
79243730Srwatson#include "subr.h"
80243730Srwatson#include "synch.h"
81243730Srwatson#include "trail.h"
82243730Srwatson
83243730Srwatsonstatic struct adist_config *adcfg;
84243730Srwatsonstatic struct adist_host *adhost;
85243730Srwatson
86243730Srwatsonstatic pthread_rwlock_t adist_remote_lock;
87243730Srwatsonstatic pthread_mutex_t adist_remote_mtx;
88243730Srwatsonstatic pthread_cond_t adist_remote_cond;
89243730Srwatsonstatic struct trail *adist_trail;
90243730Srwatson
91243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_free_list;
92243730Srwatsonstatic pthread_mutex_t adist_free_list_lock;
93243730Srwatsonstatic pthread_cond_t adist_free_list_cond;
94243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_send_list;
95243730Srwatsonstatic pthread_mutex_t adist_send_list_lock;
96243730Srwatsonstatic pthread_cond_t adist_send_list_cond;
97243730Srwatsonstatic TAILQ_HEAD(, adreq) adist_recv_list;
98243730Srwatsonstatic pthread_mutex_t adist_recv_list_lock;
99243730Srwatsonstatic pthread_cond_t adist_recv_list_cond;
100243730Srwatson
101243730Srwatsonstatic void
102243730Srwatsoninit_environment(void)
103243730Srwatson{
104243730Srwatson	struct adreq *adreq;
105243730Srwatson	unsigned int ii;
106243730Srwatson
107243730Srwatson	rw_init(&adist_remote_lock);
108243730Srwatson	mtx_init(&adist_remote_mtx);
109243730Srwatson	cv_init(&adist_remote_cond);
110243730Srwatson	TAILQ_INIT(&adist_free_list);
111243730Srwatson	mtx_init(&adist_free_list_lock);
112243730Srwatson	cv_init(&adist_free_list_cond);
113243730Srwatson	TAILQ_INIT(&adist_send_list);
114243730Srwatson	mtx_init(&adist_send_list_lock);
115243730Srwatson	cv_init(&adist_send_list_cond);
116243730Srwatson	TAILQ_INIT(&adist_recv_list);
117243730Srwatson	mtx_init(&adist_recv_list_lock);
118243730Srwatson	cv_init(&adist_recv_list_cond);
119243730Srwatson
120243730Srwatson	for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) {
121243730Srwatson		adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE);
122243730Srwatson		if (adreq == NULL) {
123243730Srwatson			pjdlog_exitx(EX_TEMPFAIL,
124243730Srwatson			    "Unable to allocate %zu bytes of memory for adreq object.",
125243730Srwatson			    sizeof(*adreq) + ADIST_BUF_SIZE);
126243730Srwatson		}
127243730Srwatson		adreq->adr_byteorder = ADIST_BYTEORDER;
128243730Srwatson		adreq->adr_cmd = ADIST_CMD_UNDEFINED;
129243730Srwatson		adreq->adr_seq = 0;
130243730Srwatson		adreq->adr_datasize = 0;
131243730Srwatson		TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next);
132243730Srwatson	}
133243730Srwatson}
134243730Srwatson
135243730Srwatsonstatic int
136243730Srwatsonsender_connect(void)
137243730Srwatson{
138243730Srwatson	unsigned char rnd[32], hash[32], resp[32];
139243730Srwatson	struct proto_conn *conn;
140243730Srwatson	char welcome[8];
141243730Srwatson	int16_t val;
142243730Srwatson
143243730Srwatson	val = 1;
144243730Srwatson	if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) {
145243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
146243730Srwatson		    "Unable to send connection request to parent");
147243730Srwatson	}
148243730Srwatson	if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) {
149243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
150243730Srwatson		    "Unable to receive reply to connection request from parent");
151243730Srwatson	}
152243730Srwatson	if (val != 0) {
153243730Srwatson		errno = val;
154243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
155243730Srwatson		    adhost->adh_remoteaddr);
156243730Srwatson		return (-1);
157243730Srwatson	}
158243730Srwatson	if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) {
159243730Srwatson		pjdlog_exit(EX_TEMPFAIL,
160243730Srwatson		    "Unable to receive connection from parent");
161243730Srwatson	}
162243730Srwatson	if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) {
163243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to connect to %s",
164243730Srwatson		    adhost->adh_remoteaddr);
165243730Srwatson		proto_close(conn);
166243730Srwatson		return (-1);
167243730Srwatson	}
168243730Srwatson	pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr);
169243730Srwatson	/* Error in setting timeout is not critical, but why should it fail? */
170243730Srwatson	if (proto_timeout(conn, adcfg->adc_timeout) < 0)
171243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to set connection timeout");
172243730Srwatson	else
173243730Srwatson		pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout);
174243730Srwatson
175243730Srwatson	/* Exchange welcome message, which includes version number. */
176243730Srwatson	(void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION);
177243730Srwatson	if (proto_send(conn, welcome, sizeof(welcome)) < 0) {
178243730Srwatson		pjdlog_errno(LOG_WARNING,
179243730Srwatson		    "Unable to send welcome message to %s",
180243730Srwatson		    adhost->adh_remoteaddr);
181243730Srwatson		proto_close(conn);
182243730Srwatson		return (-1);
183243730Srwatson	}
184243730Srwatson	pjdlog_debug(1, "Welcome message sent (%s).", welcome);
185243730Srwatson	bzero(welcome, sizeof(welcome));
186243730Srwatson	if (proto_recv(conn, welcome, sizeof(welcome)) < 0) {
187243730Srwatson		pjdlog_errno(LOG_WARNING,
188243730Srwatson		    "Unable to receive welcome message from %s",
189243730Srwatson		    adhost->adh_remoteaddr);
190243730Srwatson		proto_close(conn);
191243730Srwatson		return (-1);
192243730Srwatson	}
193243730Srwatson	if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) ||
194243730Srwatson	    !isdigit(welcome[6]) || welcome[7] != '\0') {
195243730Srwatson		pjdlog_warning("Invalid welcome message from %s.",
196243730Srwatson		    adhost->adh_remoteaddr);
197243730Srwatson		proto_close(conn);
198243730Srwatson		return (-1);
199243730Srwatson	}
200243730Srwatson	pjdlog_debug(1, "Welcome message received (%s).", welcome);
201243730Srwatson	/*
202243730Srwatson	 * Receiver can only reply with version number lower or equal to
203243730Srwatson	 * the one we sent.
204243730Srwatson	 */
205243730Srwatson	adhost->adh_version = atoi(welcome + 5);
206243730Srwatson	if (adhost->adh_version > ADIST_VERSION) {
207243730Srwatson		pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).",
208243730Srwatson		    adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION);
209243730Srwatson		proto_close(conn);
210243730Srwatson		return (-1);
211243730Srwatson	}
212243730Srwatson
213243730Srwatson	pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version,
214243730Srwatson	    adhost->adh_remoteaddr);
215243730Srwatson
216243730Srwatson	if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) {
217243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send name to %s",
218243730Srwatson		    adhost->adh_remoteaddr);
219243730Srwatson		proto_close(conn);
220243730Srwatson		return (-1);
221243730Srwatson	}
222243730Srwatson	pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name);
223243730Srwatson
224243730Srwatson	if (proto_recv(conn, rnd, sizeof(rnd)) == -1) {
225243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s",
226243730Srwatson		    adhost->adh_remoteaddr);
227243730Srwatson		proto_close(conn);
228243730Srwatson		return (-1);
229243730Srwatson	}
230243730Srwatson	pjdlog_debug(1, "Challenge received.");
231243730Srwatson
232243730Srwatson	if (HMAC(EVP_sha256(), adhost->adh_password,
233243730Srwatson	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
234243730Srwatson	    NULL) == NULL) {
235243730Srwatson		pjdlog_warning("Unable to generate response.");
236243730Srwatson		proto_close(conn);
237243730Srwatson		return (-1);
238243730Srwatson	}
239243730Srwatson	pjdlog_debug(1, "Response generated.");
240243730Srwatson
241243730Srwatson	if (proto_send(conn, hash, sizeof(hash)) == -1) {
242243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send response to %s",
243243730Srwatson		    adhost->adh_remoteaddr);
244243730Srwatson		proto_close(conn);
245243730Srwatson		return (-1);
246243730Srwatson	}
247243730Srwatson	pjdlog_debug(1, "Response sent.");
248243730Srwatson
249243730Srwatson	if (adist_random(rnd, sizeof(rnd)) == -1) {
250243730Srwatson		pjdlog_warning("Unable to generate challenge.");
251243730Srwatson		proto_close(conn);
252243730Srwatson		return (-1);
253243730Srwatson	}
254243730Srwatson	pjdlog_debug(1, "Challenge generated.");
255243730Srwatson
256243730Srwatson	if (proto_send(conn, rnd, sizeof(rnd)) == -1) {
257243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s",
258243730Srwatson		    adhost->adh_remoteaddr);
259243730Srwatson		proto_close(conn);
260243730Srwatson		return (-1);
261243730Srwatson	}
262243730Srwatson	pjdlog_debug(1, "Challenge sent.");
263243730Srwatson
264243730Srwatson	if (proto_recv(conn, resp, sizeof(resp)) == -1) {
265243730Srwatson		pjdlog_errno(LOG_WARNING, "Unable to receive response from %s",
266243730Srwatson		    adhost->adh_remoteaddr);
267243730Srwatson		proto_close(conn);
268243730Srwatson		return (-1);
269243730Srwatson	}
270243730Srwatson	pjdlog_debug(1, "Response received.");
271243730Srwatson
272243730Srwatson	if (HMAC(EVP_sha256(), adhost->adh_password,
273243730Srwatson	    (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash,
274243730Srwatson	    NULL) == NULL) {
275243730Srwatson		pjdlog_warning("Unable to generate hash.");
276243730Srwatson		proto_close(conn);
277243730Srwatson		return (-1);
278243730Srwatson	}
279243730Srwatson	pjdlog_debug(1, "Hash generated.");
280243730Srwatson
281243730Srwatson	if (memcmp(resp, hash, sizeof(hash)) != 0) {
282243730Srwatson		pjdlog_warning("Invalid response from %s (wrong password?).",
283243730Srwatson		    adhost->adh_remoteaddr);
284243730Srwatson		proto_close(conn);
285243730Srwatson		return (-1);
286243730Srwatson	}
287243730Srwatson	pjdlog_info("Receiver authenticated.");
288243730Srwatson
289243730Srwatson	if (proto_recv(conn, &adhost->adh_trail_offset,
290243730Srwatson	    sizeof(adhost->adh_trail_offset)) == -1) {
291243730Srwatson		pjdlog_errno(LOG_WARNING,
292243730Srwatson		    "Unable to receive size of the most recent trail file from %s",
293243730Srwatson		    adhost->adh_remoteaddr);
294243730Srwatson		proto_close(conn);
295243730Srwatson		return (-1);
296243730Srwatson	}
297243730Srwatson	adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset);
298243730Srwatson	if (proto_recv(conn, &adhost->adh_trail_name,
299243730Srwatson	    sizeof(adhost->adh_trail_name)) == -1) {
300243730Srwatson		pjdlog_errno(LOG_WARNING,
301243730Srwatson		    "Unable to receive name of the most recent trail file from %s",
302243730Srwatson		    adhost->adh_remoteaddr);
303243730Srwatson		proto_close(conn);
304243730Srwatson		return (-1);
305243730Srwatson	}
306243730Srwatson	pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.",
307243730Srwatson	    adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset);
308243730Srwatson
309243730Srwatson	rw_wlock(&adist_remote_lock);
310243730Srwatson	mtx_lock(&adist_remote_mtx);
311243730Srwatson	PJDLOG_ASSERT(adhost->adh_remote == NULL);
312243730Srwatson	PJDLOG_ASSERT(conn != NULL);
313243730Srwatson	adhost->adh_remote = conn;
314243730Srwatson	mtx_unlock(&adist_remote_mtx);
315243730Srwatson	rw_unlock(&adist_remote_lock);
316243730Srwatson	cv_signal(&adist_remote_cond);
317243730Srwatson
318243730Srwatson	return (0);
319243730Srwatson}
320243730Srwatson
321243730Srwatsonstatic void
322243730Srwatsonsender_disconnect(void)
323243730Srwatson{
324243730Srwatson
325243730Srwatson	rw_wlock(&adist_remote_lock);
326243730Srwatson	/*
327243730Srwatson	 * Check for a race between dropping rlock and acquiring wlock -
328243730Srwatson	 * another thread can close connection in-between.
329243730Srwatson	 */
330243730Srwatson	if (adhost->adh_remote == NULL) {
331243730Srwatson		rw_unlock(&adist_remote_lock);
332243730Srwatson		return;
333243730Srwatson	}
334243730Srwatson	pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr);
335243730Srwatson	proto_close(adhost->adh_remote);
336243730Srwatson	mtx_lock(&adist_remote_mtx);
337243730Srwatson	adhost->adh_remote = NULL;
338243730Srwatson	adhost->adh_reset = true;
339243730Srwatson	adhost->adh_trail_name[0] = '\0';
340243730Srwatson	adhost->adh_trail_offset = 0;
341243730Srwatson	mtx_unlock(&adist_remote_mtx);
342243730Srwatson	rw_unlock(&adist_remote_lock);
343243730Srwatson
344243730Srwatson	pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr);
345243730Srwatson
346243730Srwatson	/* Move all in-flight requests back onto free list. */
347243730Srwatson	mtx_lock(&adist_free_list_lock);
348243730Srwatson	mtx_lock(&adist_send_list_lock);
349243730Srwatson	TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next);
350243730Srwatson	mtx_unlock(&adist_send_list_lock);
351243730Srwatson	mtx_lock(&adist_recv_list_lock);
352243730Srwatson	TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next);
353243730Srwatson	mtx_unlock(&adist_recv_list_lock);
354243730Srwatson	mtx_unlock(&adist_free_list_lock);
355243730Srwatson}
356243730Srwatson
357243730Srwatsonstatic void
358243730Srwatsonadreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data,
359243730Srwatson    size_t size)
360243730Srwatson{
361243730Srwatson	static uint64_t seq = 1;
362243730Srwatson
363243730Srwatson	PJDLOG_ASSERT(size <= ADIST_BUF_SIZE);
364243730Srwatson
365243730Srwatson	switch (cmd) {
366243730Srwatson	case ADIST_CMD_OPEN:
367243730Srwatson	case ADIST_CMD_CLOSE:
368243730Srwatson		PJDLOG_ASSERT(data != NULL && size == 0);
369243730Srwatson		size = strlen(data) + 1;
370243730Srwatson		break;
371243730Srwatson	case ADIST_CMD_APPEND:
372243730Srwatson		PJDLOG_ASSERT(data != NULL && size > 0);
373243730Srwatson		break;
374243730Srwatson	case ADIST_CMD_KEEPALIVE:
375243730Srwatson	case ADIST_CMD_ERROR:
376243730Srwatson		PJDLOG_ASSERT(data == NULL && size == 0);
377243730Srwatson		break;
378243730Srwatson	default:
379243730Srwatson		PJDLOG_ABORT("Invalid command (%hhu).", cmd);
380243730Srwatson	}
381243730Srwatson
382243730Srwatson	adreq->adr_cmd = cmd;
383243730Srwatson	adreq->adr_seq = seq++;
384243730Srwatson	adreq->adr_datasize = size;
385243730Srwatson	/* Don't copy if data is already in out buffer. */
386243730Srwatson	if (data != NULL && data != adreq->adr_data)
387243730Srwatson		bcopy(data, adreq->adr_data, size);
388243730Srwatson}
389243730Srwatson
390243730Srwatsonstatic bool
391243730Srwatsonread_thread_wait(void)
392243730Srwatson{
393243730Srwatson	bool newfile = false;
394243730Srwatson
395243730Srwatson	mtx_lock(&adist_remote_mtx);
396243730Srwatson	if (adhost->adh_reset) {
397247442Spjdreset:
398243730Srwatson		adhost->adh_reset = false;
399243730Srwatson		if (trail_filefd(adist_trail) != -1)
400243730Srwatson			trail_close(adist_trail);
401243730Srwatson		trail_reset(adist_trail);
402243730Srwatson		while (adhost->adh_remote == NULL)
403243730Srwatson			cv_wait(&adist_remote_cond, &adist_remote_mtx);
404243730Srwatson		trail_start(adist_trail, adhost->adh_trail_name,
405243730Srwatson		    adhost->adh_trail_offset);
406243730Srwatson		newfile = true;
407243730Srwatson	}
408243730Srwatson	mtx_unlock(&adist_remote_mtx);
409243730Srwatson	while (trail_filefd(adist_trail) == -1) {
410243730Srwatson		newfile = true;
411243730Srwatson		wait_for_dir();
412247442Spjd		/*
413247442Spjd		 * We may have been disconnected and reconnected in the
414247442Spjd		 * meantime, check if reset is set.
415247442Spjd		 */
416247442Spjd		mtx_lock(&adist_remote_mtx);
417247442Spjd		if (adhost->adh_reset)
418247442Spjd			goto reset;
419247442Spjd		mtx_unlock(&adist_remote_mtx);
420243730Srwatson		if (trail_filefd(adist_trail) == -1)
421243730Srwatson			trail_next(adist_trail);
422243730Srwatson	}
423243730Srwatson	if (newfile) {
424243730Srwatson		pjdlog_debug(1, "Trail file \"%s/%s\" opened.",
425243730Srwatson		    adhost->adh_directory,
426243730Srwatson		    trail_filename(adist_trail));
427243730Srwatson		(void)wait_for_file_init(trail_filefd(adist_trail));
428243730Srwatson	}
429243730Srwatson	return (newfile);
430243730Srwatson}
431243730Srwatson
432243730Srwatsonstatic void *
433243730Srwatsonread_thread(void *arg __unused)
434243730Srwatson{
435243730Srwatson	struct adreq *adreq;
436243730Srwatson	ssize_t done;
437243730Srwatson	bool newfile;
438243730Srwatson
439243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
440243730Srwatson
441243730Srwatson	for (;;) {
442243730Srwatson		newfile = read_thread_wait();
443243730Srwatson		QUEUE_TAKE(adreq, &adist_free_list, 0);
444243730Srwatson		if (newfile) {
445243730Srwatson			adreq_fill(adreq, ADIST_CMD_OPEN,
446243730Srwatson			    trail_filename(adist_trail), 0);
447243730Srwatson			newfile = false;
448243730Srwatson			goto move;
449243730Srwatson		}
450243730Srwatson
451243730Srwatson		done = read(trail_filefd(adist_trail), adreq->adr_data,
452243730Srwatson		    ADIST_BUF_SIZE);
453243730Srwatson		if (done == -1) {
454243730Srwatson			off_t offset;
455243730Srwatson			int error;
456243730Srwatson
457243730Srwatson			error = errno;
458243730Srwatson			offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR);
459243730Srwatson			errno = error;
460243730Srwatson			pjdlog_errno(LOG_ERR,
461243730Srwatson			    "Error while reading \"%s/%s\" at offset %jd",
462243730Srwatson			    adhost->adh_directory, trail_filename(adist_trail),
463243730Srwatson			    offset);
464243730Srwatson			trail_close(adist_trail);
465243730Srwatson			adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0);
466243730Srwatson			goto move;
467243730Srwatson		} else if (done == 0) {
468243730Srwatson			/* End of file. */
469243730Srwatson			pjdlog_debug(3, "End of \"%s/%s\".",
470243730Srwatson			    adhost->adh_directory, trail_filename(adist_trail));
471243730Srwatson			if (!trail_switch(adist_trail)) {
472243730Srwatson				/* More audit records can arrive. */
473243730Srwatson				mtx_lock(&adist_free_list_lock);
474243730Srwatson				TAILQ_INSERT_TAIL(&adist_free_list, adreq,
475243730Srwatson				    adr_next);
476243730Srwatson				mtx_unlock(&adist_free_list_lock);
477243730Srwatson				wait_for_file();
478243730Srwatson				continue;
479243730Srwatson			}
480243730Srwatson			adreq_fill(adreq, ADIST_CMD_CLOSE,
481243730Srwatson			    trail_filename(adist_trail), 0);
482243730Srwatson			trail_close(adist_trail);
483243730Srwatson			goto move;
484243730Srwatson		}
485243730Srwatson
486243730Srwatson		adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done);
487243730Srwatsonmove:
488243730Srwatson		pjdlog_debug(3,
489243730Srwatson		    "read thread: Moving request %p to the send queue (%hhu).",
490243730Srwatson		    adreq, adreq->adr_cmd);
491243730Srwatson		QUEUE_INSERT(adreq, &adist_send_list);
492243730Srwatson	}
493243730Srwatson	/* NOTREACHED */
494243730Srwatson	return (NULL);
495243730Srwatson}
496243730Srwatson
497243730Srwatsonstatic void
498243730Srwatsonkeepalive_send(void)
499243730Srwatson{
500243730Srwatson	struct adreq *adreq;
501243730Srwatson
502243730Srwatson	rw_rlock(&adist_remote_lock);
503243730Srwatson	if (adhost->adh_remote == NULL) {
504243730Srwatson		rw_unlock(&adist_remote_lock);
505243730Srwatson		return;
506243730Srwatson	}
507243730Srwatson	rw_unlock(&adist_remote_lock);
508243730Srwatson
509243730Srwatson	mtx_lock(&adist_free_list_lock);
510243730Srwatson	adreq = TAILQ_FIRST(&adist_free_list);
511243730Srwatson	if (adreq != NULL)
512243730Srwatson		TAILQ_REMOVE(&adist_free_list, adreq, adr_next);
513243730Srwatson	mtx_unlock(&adist_free_list_lock);
514243730Srwatson	if (adreq == NULL)
515243730Srwatson		return;
516243730Srwatson
517243730Srwatson	adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0);
518243730Srwatson
519243730Srwatson	QUEUE_INSERT(adreq, &adist_send_list);
520243730Srwatson
521243730Srwatson	pjdlog_debug(3, "keepalive_send: Request sent.");
522243730Srwatson}
523243730Srwatson
524243730Srwatson/*
525243730Srwatson * Thread sends request to secondary node.
526243730Srwatson */
527243730Srwatsonstatic void *
528243730Srwatsonsend_thread(void *arg __unused)
529243730Srwatson{
530243730Srwatson	time_t lastcheck, now;
531243730Srwatson	struct adreq *adreq;
532243730Srwatson
533243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
534243730Srwatson
535243730Srwatson	lastcheck = time(NULL);
536243730Srwatson
537243730Srwatson	for (;;) {
538243730Srwatson		pjdlog_debug(3, "send thread: Taking request.");
539243730Srwatson		for (;;) {
540243730Srwatson			QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE);
541243730Srwatson			if (adreq != NULL)
542243730Srwatson				break;
543243730Srwatson			now = time(NULL);
544243730Srwatson			if (lastcheck + ADIST_KEEPALIVE <= now) {
545243730Srwatson				keepalive_send();
546243730Srwatson				lastcheck = now;
547243730Srwatson			}
548243730Srwatson		}
549243730Srwatson		PJDLOG_ASSERT(adreq != NULL);
550243730Srwatson		pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq,
551243730Srwatson		    adreq->adr_cmd);
552243730Srwatson		/*
553243730Srwatson		 * Protect connection from disappearing.
554243730Srwatson		 */
555243730Srwatson		rw_rlock(&adist_remote_lock);
556243730Srwatson		/*
557243730Srwatson		 * Move the request to the recv queue first to avoid race
558243730Srwatson		 * where the recv thread receives the reply before we move
559243730Srwatson		 * the request to the recv queue.
560243730Srwatson		 */
561243730Srwatson		QUEUE_INSERT(adreq, &adist_recv_list);
562243730Srwatson		if (adhost->adh_remote == NULL ||
563243730Srwatson		    proto_send(adhost->adh_remote, &adreq->adr_packet,
564243730Srwatson		    ADPKT_SIZE(adreq)) == -1) {
565243730Srwatson			rw_unlock(&adist_remote_lock);
566243730Srwatson			pjdlog_debug(1,
567243730Srwatson			    "send thread: (%p) Unable to send request.", adreq);
568243730Srwatson			if (adhost->adh_remote != NULL)
569243730Srwatson				sender_disconnect();
570243730Srwatson			continue;
571243730Srwatson		} else {
572243730Srwatson			pjdlog_debug(3, "Request %p sent successfully.", adreq);
573243730Srwatson			adreq_log(LOG_DEBUG, 2, -1, adreq,
574243730Srwatson			    "send: (%p) Request sent: ", adreq);
575243730Srwatson			rw_unlock(&adist_remote_lock);
576243730Srwatson		}
577243730Srwatson	}
578243730Srwatson	/* NOTREACHED */
579243730Srwatson	return (NULL);
580243730Srwatson}
581243730Srwatson
582243730Srwatsonstatic void
583243730Srwatsonadrep_decode_header(struct adrep *adrep)
584243730Srwatson{
585243730Srwatson
586243730Srwatson	/* Byte-swap only is the receiver is using different byte order. */
587243730Srwatson	if (adrep->adrp_byteorder != ADIST_BYTEORDER) {
588243730Srwatson		adrep->adrp_byteorder = ADIST_BYTEORDER;
589243730Srwatson		adrep->adrp_seq = bswap64(adrep->adrp_seq);
590243730Srwatson		adrep->adrp_error = bswap16(adrep->adrp_error);
591243730Srwatson	}
592243730Srwatson}
593243730Srwatson
594243730Srwatson/*
595243730Srwatson * Thread receives answer from secondary node and passes it to ggate_send
596243730Srwatson * thread.
597243730Srwatson */
598243730Srwatsonstatic void *
599243730Srwatsonrecv_thread(void *arg __unused)
600243730Srwatson{
601243730Srwatson	struct adrep adrep;
602243730Srwatson	struct adreq *adreq;
603243730Srwatson
604243730Srwatson	pjdlog_debug(1, "%s started.", __func__);
605243730Srwatson
606243730Srwatson	for (;;) {
607243730Srwatson		/* Wait until there is anything to receive. */
608243730Srwatson		QUEUE_WAIT(&adist_recv_list);
609243730Srwatson		pjdlog_debug(3, "recv thread: Got something.");
610243730Srwatson		rw_rlock(&adist_remote_lock);
611243730Srwatson		if (adhost->adh_remote == NULL) {
612243730Srwatson			/*
613243730Srwatson			 * Connection is dead.
614243730Srwatson			 * XXX: We shouldn't be here.
615243730Srwatson			 */
616243730Srwatson			rw_unlock(&adist_remote_lock);
617243730Srwatson			continue;
618243730Srwatson		}
619243730Srwatson		if (proto_recv(adhost->adh_remote, &adrep,
620243730Srwatson		    sizeof(adrep)) == -1) {
621243730Srwatson			rw_unlock(&adist_remote_lock);
622243730Srwatson			pjdlog_errno(LOG_ERR, "Unable to receive reply");
623243730Srwatson			sender_disconnect();
624243730Srwatson			continue;
625243730Srwatson		}
626243730Srwatson		rw_unlock(&adist_remote_lock);
627243730Srwatson		adrep_decode_header(&adrep);
628243730Srwatson		/*
629243730Srwatson		 * Find the request that was just confirmed.
630243730Srwatson		 */
631243730Srwatson		mtx_lock(&adist_recv_list_lock);
632243730Srwatson		TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) {
633243730Srwatson			if (adreq->adr_seq == adrep.adrp_seq) {
634243730Srwatson				TAILQ_REMOVE(&adist_recv_list, adreq,
635243730Srwatson				    adr_next);
636243730Srwatson				break;
637243730Srwatson			}
638243730Srwatson		}
639243730Srwatson		if (adreq == NULL) {
640243730Srwatson			/*
641243730Srwatson			 * If we disconnected in the meantime, just continue.
642243730Srwatson			 * On disconnect sender_disconnect() clears the queue,
643243730Srwatson			 * we can use that.
644243730Srwatson			 */
645243730Srwatson			if (TAILQ_EMPTY(&adist_recv_list)) {
646243730Srwatson				rw_unlock(&adist_remote_lock);
647243730Srwatson				continue;
648243730Srwatson			}
649243730Srwatson			mtx_unlock(&adist_recv_list_lock);
650243730Srwatson			pjdlog_error("Found no request matching received 'seq' field (%ju).",
651243730Srwatson			    (uintmax_t)adrep.adrp_seq);
652243730Srwatson			sender_disconnect();
653243730Srwatson			continue;
654243730Srwatson		}
655243730Srwatson		mtx_unlock(&adist_recv_list_lock);
656243730Srwatson		adreq_log(LOG_DEBUG, 2, -1, adreq,
657243730Srwatson		    "recv thread: (%p) Request confirmed: ", adreq);
658243730Srwatson		pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq,
659243730Srwatson		    adreq->adr_cmd);
660243730Srwatson		if (adrep.adrp_error != 0) {
661243730Srwatson			pjdlog_error("Receiver returned error (%s), disconnecting.",
662243730Srwatson			    adist_errstr((int)adrep.adrp_error));
663243730Srwatson			sender_disconnect();
664243730Srwatson			continue;
665243730Srwatson		}
666243730Srwatson		if (adreq->adr_cmd == ADIST_CMD_CLOSE)
667243730Srwatson			trail_unlink(adist_trail, adreq->adr_data);
668243730Srwatson		pjdlog_debug(3, "Request received successfully.");
669243730Srwatson		QUEUE_INSERT(adreq, &adist_free_list);
670243730Srwatson	}
671243730Srwatson	/* NOTREACHED */
672243730Srwatson	return (NULL);
673243730Srwatson}
674243730Srwatson
675243730Srwatsonstatic void
676243730Srwatsonguard_check_connection(void)
677243730Srwatson{
678243730Srwatson
679243730Srwatson	PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER);
680243730Srwatson
681243730Srwatson	rw_rlock(&adist_remote_lock);
682243730Srwatson	if (adhost->adh_remote != NULL) {
683243730Srwatson		rw_unlock(&adist_remote_lock);
684243730Srwatson		pjdlog_debug(3, "remote_guard: Connection to %s is ok.",
685243730Srwatson		    adhost->adh_remoteaddr);
686243730Srwatson		return;
687243730Srwatson	}
688243730Srwatson
689243730Srwatson	/*
690243730Srwatson	 * Upgrade the lock. It doesn't have to be atomic as no other thread
691243730Srwatson	 * can change connection status from disconnected to connected.
692243730Srwatson	 */
693243730Srwatson	rw_unlock(&adist_remote_lock);
694243730Srwatson	pjdlog_debug(1, "remote_guard: Reconnecting to %s.",
695243730Srwatson	    adhost->adh_remoteaddr);
696243730Srwatson	if (sender_connect() == 0) {
697243730Srwatson		pjdlog_info("Successfully reconnected to %s.",
698243730Srwatson		    adhost->adh_remoteaddr);
699243730Srwatson	} else {
700243730Srwatson		pjdlog_debug(1, "remote_guard: Reconnect to %s failed.",
701243730Srwatson		    adhost->adh_remoteaddr);
702243730Srwatson	}
703243730Srwatson}
704243730Srwatson
705243730Srwatson/*
706243730Srwatson * Thread guards remote connections and reconnects when needed, handles
707243730Srwatson * signals, etc.
708243730Srwatson */
709243730Srwatsonstatic void *
710243730Srwatsonguard_thread(void *arg __unused)
711243730Srwatson{
712243730Srwatson	struct timespec timeout;
713243730Srwatson	time_t lastcheck, now;
714243730Srwatson	sigset_t mask;
715243730Srwatson	int signo;
716243730Srwatson
717243730Srwatson	lastcheck = time(NULL);
718243730Srwatson
719243730Srwatson	PJDLOG_VERIFY(sigemptyset(&mask) == 0);
720243730Srwatson	PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0);
721243730Srwatson	PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0);
722243730Srwatson
723243730Srwatson	timeout.tv_sec = ADIST_KEEPALIVE;
724243730Srwatson	timeout.tv_nsec = 0;
725243730Srwatson	signo = -1;
726243730Srwatson
727243730Srwatson	for (;;) {
728243730Srwatson		switch (signo) {
729243730Srwatson		case SIGINT:
730243730Srwatson		case SIGTERM:
731243730Srwatson			sigexit_received = true;
732243730Srwatson			pjdlog_exitx(EX_OK,
733243730Srwatson			    "Termination signal received, exiting.");
734243730Srwatson			break;
735243730Srwatson		default:
736243730Srwatson			break;
737243730Srwatson		}
738243730Srwatson
739243730Srwatson		pjdlog_debug(3, "remote_guard: Checking connections.");
740243730Srwatson		now = time(NULL);
741243730Srwatson		if (lastcheck + ADIST_KEEPALIVE <= now) {
742243730Srwatson			guard_check_connection();
743243730Srwatson			lastcheck = now;
744243730Srwatson		}
745243730Srwatson		signo = sigtimedwait(&mask, NULL, &timeout);
746243730Srwatson	}
747243730Srwatson	/* NOTREACHED */
748243730Srwatson	return (NULL);
749243730Srwatson}
750243730Srwatson
751243730Srwatsonvoid
752243730Srwatsonadist_sender(struct adist_config *config, struct adist_host *adh)
753243730Srwatson{
754243730Srwatson	pthread_t td;
755243730Srwatson	pid_t pid;
756243730Srwatson	int error, mode, debuglevel;
757243730Srwatson
758243730Srwatson	/*
759243730Srwatson	 * Create communication channel for sending connection requests from
760243730Srwatson	 * child to parent.
761243730Srwatson	 */
762243730Srwatson	if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) {
763243730Srwatson		pjdlog_errno(LOG_ERR,
764243730Srwatson		    "Unable to create connection sockets between child and parent");
765243730Srwatson		return;
766243730Srwatson	}
767243730Srwatson
768243730Srwatson	pid = fork();
769243730Srwatson	if (pid == -1) {
770243730Srwatson		pjdlog_errno(LOG_ERR, "Unable to fork");
771243730Srwatson		proto_close(adh->adh_conn);
772243730Srwatson		adh->adh_conn = NULL;
773243730Srwatson		return;
774243730Srwatson	}
775243730Srwatson
776243730Srwatson	if (pid > 0) {
777243730Srwatson		/* This is parent. */
778243730Srwatson		adh->adh_worker_pid = pid;
779243730Srwatson		/* Declare that we are receiver. */
780243730Srwatson		proto_recv(adh->adh_conn, NULL, 0);
781243730Srwatson		return;
782243730Srwatson	}
783243730Srwatson
784243730Srwatson	adcfg = config;
785243730Srwatson	adhost = adh;
786243730Srwatson
787243730Srwatson	mode = pjdlog_mode_get();
788243730Srwatson	debuglevel = pjdlog_debug_get();
789243730Srwatson
790243730Srwatson	/* Declare that we are sender. */
791243730Srwatson	proto_send(adhost->adh_conn, NULL, 0);
792243730Srwatson
793243730Srwatson	descriptors_cleanup(adhost);
794243730Srwatson
795243730Srwatson#ifdef TODO
796243730Srwatson	descriptors_assert(adhost, mode);
797243730Srwatson#endif
798243730Srwatson
799243730Srwatson	pjdlog_init(mode);
800243730Srwatson	pjdlog_debug_set(debuglevel);
801243730Srwatson	pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name,
802243730Srwatson	    role2str(adhost->adh_role));
803243730Srwatson#ifdef HAVE_SETPROCTITLE
804243730Srwatson	setproctitle("[%s] (%s) ", adhost->adh_name,
805243730Srwatson	    role2str(adhost->adh_role));
806243730Srwatson#endif
807243730Srwatson
808243730Srwatson	/*
809243730Srwatson	 * The sender process should be able to remove entries from its
810243730Srwatson	 * trail directory, but it should not be able to write to the
811243730Srwatson	 * trail files, only read from them.
812243730Srwatson	 */
813243730Srwatson	adist_trail = trail_new(adhost->adh_directory, false);
814243730Srwatson	if (adist_trail == NULL)
815243730Srwatson		exit(EX_OSFILE);
816243730Srwatson
817243730Srwatson	if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)",
818243730Srwatson	    role2str(adhost->adh_role), adhost->adh_name) != 0) {
819243730Srwatson		exit(EX_CONFIG);
820243730Srwatson	}
821243730Srwatson	pjdlog_info("Privileges successfully dropped.");
822243730Srwatson
823243730Srwatson	/*
824243730Srwatson	 * We can ignore wait_for_dir_init() failures. It will fall back to
825243730Srwatson	 * using sleep(3).
826243730Srwatson	 */
827243730Srwatson	(void)wait_for_dir_init(trail_dirfd(adist_trail));
828243730Srwatson
829243730Srwatson	init_environment();
830243730Srwatson	if (sender_connect() == 0) {
831243730Srwatson		pjdlog_info("Successfully connected to %s.",
832243730Srwatson		    adhost->adh_remoteaddr);
833243730Srwatson	}
834243730Srwatson	adhost->adh_reset = true;
835243730Srwatson
836243730Srwatson	/*
837243730Srwatson	 * Create the guard thread first, so we can handle signals from the
838243730Srwatson	 * very begining.
839243730Srwatson	 */
840243730Srwatson	error = pthread_create(&td, NULL, guard_thread, NULL);
841243730Srwatson	PJDLOG_ASSERT(error == 0);
842243730Srwatson	error = pthread_create(&td, NULL, send_thread, NULL);
843243730Srwatson	PJDLOG_ASSERT(error == 0);
844243730Srwatson	error = pthread_create(&td, NULL, recv_thread, NULL);
845243730Srwatson	PJDLOG_ASSERT(error == 0);
846243730Srwatson	(void)read_thread(NULL);
847243730Srwatson}
848