1/*-
2 * See the file LICENSE for redistribution information.
3 *
4 * Copyright (c) 2001,2008 Oracle.  All rights reserved.
5 *
6 * $Id: rep_base.c,v 12.21 2008/01/08 20:58:24 bostic Exp $
7 */
8
9#include <sys/types.h>
10#include <errno.h>
11#include <signal.h>
12#include <stdlib.h>
13#include <string.h>
14
15#include <db.h>
16
17#include "rep_base.h"
18
19/*
20 * Process globals (we could put these in the machtab I suppose).
21 */
22int master_eid;
23char *myaddr;
24unsigned short myport;
25
26static void event_callback __P((DB_ENV *, u_int32_t, void *));
27
28int
29main(argc, argv)
30	int argc;
31	char *argv[];
32{
33	extern char *optarg;
34	DB_ENV *dbenv;
35	DBT local;
36	enum { MASTER, CLIENT, UNKNOWN } whoami;
37	all_args aa;
38	connect_args ca;
39	machtab_t *machtab;
40	thread_t all_thr, conn_thr;
41	void *astatus, *cstatus;
42#ifdef _WIN32
43	WSADATA wsaData;
44#else
45	struct sigaction sigact;
46#endif
47	repsite_t site, *sitep, self, *selfp;
48	int maxsites, nsites, ret, priority, totalsites;
49	char *c, ch;
50	const char *home, *progname;
51	APP_DATA my_app_data;
52
53	master_eid = DB_EID_INVALID;
54
55	my_app_data.elected = 0;
56	my_app_data.shared_data.is_master = 0; /* assume start out as client */
57	dbenv = NULL;
58	whoami = UNKNOWN;
59	machtab = NULL;
60	selfp = sitep = NULL;
61	maxsites = nsites = ret = totalsites = 0;
62	priority = 100;
63	home = "TESTDIR";
64	progname = "ex_rep_base";
65
66	if ((ret = create_env(progname, &dbenv)) != 0)
67		goto err;
68	dbenv->app_private = &my_app_data;
69	(void)dbenv->set_event_notify(dbenv, event_callback);
70
71	while ((ch = getopt(argc, argv, "Ch:Mm:n:o:p:v")) != EOF)
72		switch (ch) {
73		case 'M':
74			whoami = MASTER;
75			master_eid = SELF_EID;
76			break;
77		case 'C':
78			whoami = CLIENT;
79			break;
80		case 'h':
81			home = optarg;
82			break;
83		case 'm':
84			if ((myaddr = strdup(optarg)) == NULL) {
85				fprintf(stderr,
86				    "System error %s\n", strerror(errno));
87				goto err;
88			}
89			self.host = optarg;
90			self.host = strtok(self.host, ":");
91			if ((c = strtok(NULL, ":")) == NULL) {
92				fprintf(stderr, "Bad host specification.\n");
93				goto err;
94			}
95			myport = self.port = (unsigned short)atoi(c);
96			selfp = &self;
97			break;
98		case 'n':
99			totalsites = atoi(optarg);
100			break;
101		case 'o':
102			site.host = optarg;
103			site.host = strtok(site.host, ":");
104			if ((c = strtok(NULL, ":")) == NULL) {
105				fprintf(stderr, "Bad host specification.\n");
106				goto err;
107			}
108			site.port = atoi(c);
109			if (sitep == NULL || nsites >= maxsites) {
110				maxsites = maxsites == 0 ? 10 : 2 * maxsites;
111				if ((sitep = realloc(sitep,
112				    maxsites * sizeof(repsite_t))) == NULL) {
113					fprintf(stderr, "System error %s\n",
114					    strerror(errno));
115					goto err;
116				}
117			}
118			sitep[nsites++] = site;
119			break;
120		case 'p':
121			priority = atoi(optarg);
122			break;
123		case 'v':
124			if ((ret = dbenv->set_verbose(dbenv,
125			    DB_VERB_REPLICATION, 1)) != 0)
126				goto err;
127			break;
128		case '?':
129		default:
130			usage(progname);
131		}
132
133	/* Error check command line. */
134	if (whoami == UNKNOWN) {
135		fprintf(stderr, "Must specify -M or -C.\n");
136		goto err;
137	}
138
139	if (selfp == NULL)
140		usage(progname);
141
142	if (home == NULL)
143		usage(progname);
144
145	dbenv->rep_set_priority(dbenv, priority);
146
147#ifdef _WIN32
148	/* Initialize the Windows sockets DLL. */
149	if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
150		fprintf(stderr,
151		    "Unable to initialize Windows sockets: %d\n", ret);
152		goto err;
153	}
154#else
155	/*
156	 * Turn off SIGPIPE so that we don't kill processes when they
157	 * happen to lose a connection at the wrong time.
158	 */
159	memset(&sigact, 0, sizeof(sigact));
160	sigact.sa_handler = SIG_IGN;
161	if ((ret = sigaction(SIGPIPE, &sigact, NULL)) != 0) {
162		fprintf(stderr,
163		    "Unable to turn off SIGPIPE: %s\n", strerror(ret));
164		goto err;
165	}
166#endif
167
168	/*
169	 * We are hardcoding priorities here that all clients have the
170	 * same priority except for a designated master who gets a higher
171	 * priority.
172	 */
173	if ((ret =
174	    machtab_init(&machtab, totalsites)) != 0)
175		goto err;
176	my_app_data.comm_infrastructure = machtab;
177
178	if ((ret = env_init(dbenv, home)) != 0)
179		goto err;
180
181	/*
182	 * Now sets up comm infrastructure.  There are two phases.  First,
183	 * we open our port for listening for incoming connections.  Then
184	 * we attempt to connect to every host we know about.
185	 */
186
187	(void)dbenv->rep_set_transport(dbenv, SELF_EID, quote_send);
188
189	ca.dbenv = dbenv;
190	ca.home = home;
191	ca.progname = progname;
192	ca.machtab = machtab;
193	ca.port = selfp->port;
194	if ((ret = thread_create(&conn_thr, NULL, connect_thread, &ca)) != 0) {
195		dbenv->errx(dbenv, "can't create connect thread");
196		goto err;
197	}
198
199	aa.dbenv = dbenv;
200	aa.progname = progname;
201	aa.home = home;
202	aa.machtab = machtab;
203	aa.sites = sitep;
204	aa.nsites = nsites;
205	if ((ret = thread_create(&all_thr, NULL, connect_all, &aa)) != 0) {
206		dbenv->errx(dbenv, "can't create connect-all thread");
207		goto err;
208	}
209
210	/*
211	 * We have now got the entire communication infrastructure set up.
212	 * It's time to declare ourselves to be a client or master.
213	 */
214	if (whoami == MASTER) {
215		if ((ret = dbenv->rep_start(dbenv, NULL, DB_REP_MASTER)) != 0) {
216			dbenv->err(dbenv, ret, "dbenv->rep_start failed");
217			goto err;
218		}
219	} else {
220		memset(&local, 0, sizeof(local));
221		local.data = myaddr;
222		local.size = (u_int32_t)strlen(myaddr) + 1;
223		if ((ret =
224		    dbenv->rep_start(dbenv, &local, DB_REP_CLIENT)) != 0) {
225			dbenv->err(dbenv, ret, "dbenv->rep_start failed");
226			goto err;
227		}
228		/* Sleep to give ourselves time to find a master. */
229		sleep(5);
230	}
231	if ((ret = doloop(dbenv, &my_app_data.shared_data)) != 0) {
232		dbenv->err(dbenv, ret, "Main loop failed");
233		goto err;
234	}
235
236	/* Wait on the connection threads. */
237	if (thread_join(all_thr, &astatus) || thread_join(conn_thr, &cstatus)) {
238		ret = -1;
239		goto err;
240	}
241	if ((uintptr_t)astatus != EXIT_SUCCESS ||
242	    (uintptr_t)cstatus != EXIT_SUCCESS) {
243		ret = -1;
244		goto err;
245	}
246
247	/*
248	 * We have used the DB_TXN_NOSYNC environment flag for improved
249	 * performance without the usual sacrifice of transactional durability,
250	 * as discussed in the "Transactional guarantees" page of the Reference
251	 * Guide: if one replication site crashes, we can expect the data to
252	 * exist at another site.  However, in case we shut down all sites
253	 * gracefully, we push out the end of the log here so that the most
254	 * recent transactions don't mysteriously disappear.
255	 */
256	if ((ret = dbenv->log_flush(dbenv, NULL)) != 0)
257		dbenv->err(dbenv, ret, "log_flush");
258
259err:	if (machtab != NULL)
260		free(machtab);
261	if (dbenv != NULL)
262		(void)dbenv->close(dbenv, 0);
263#ifdef _WIN32
264	/* Shut down the Windows sockets DLL. */
265	(void)WSACleanup();
266#endif
267	return (ret);
268}
269
270static void
271event_callback(dbenv, which, info)
272	DB_ENV *dbenv;
273	u_int32_t which;
274	void *info;
275{
276	APP_DATA *app = dbenv->app_private;
277	SHARED_DATA *shared = &app->shared_data;
278
279	switch (which) {
280	case DB_EVENT_REP_CLIENT:
281		shared->is_master = 0;
282		break;
283
284	case DB_EVENT_REP_ELECTED:
285		app->elected = 1;
286		master_eid = SELF_EID;
287		break;
288
289	case DB_EVENT_REP_MASTER:
290		shared->is_master = 1;
291		break;
292
293	case DB_EVENT_REP_NEWMASTER:
294		master_eid = *(int*)info;
295		break;
296
297	case DB_EVENT_REP_STARTUPDONE:
298		/* I don't care about this, for now. */
299		break;
300
301	default:
302		dbenv->errx(dbenv, "ignoring event %d", which);
303	}
304}
305