1156230Smux/*-
2156230Smux * Copyright (c) 2003-2006, Maxime Henrion <mux@FreeBSD.org>
3156230Smux * All rights reserved.
4156230Smux *
5156230Smux * Redistribution and use in source and binary forms, with or without
6156230Smux * modification, are permitted provided that the following conditions
7156230Smux * are met:
8156230Smux * 1. Redistributions of source code must retain the above copyright
9156230Smux *    notice, this list of conditions and the following disclaimer.
10156230Smux * 2. Redistributions in binary form must reproduce the above copyright
11156230Smux *    notice, this list of conditions and the following disclaimer in the
12156230Smux *    documentation and/or other materials provided with the distribution.
13156230Smux *
14156230Smux * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15156230Smux * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16156230Smux * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17156230Smux * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18156230Smux * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19156230Smux * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20156230Smux * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21156230Smux * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22156230Smux * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23156230Smux * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24156230Smux * SUCH DAMAGE.
25156230Smux *
26156230Smux * $FreeBSD$
27156230Smux */
28156230Smux
29156230Smux#include <sys/param.h>
30156230Smux#include <sys/socket.h>
31156230Smux#include <sys/uio.h>
32156230Smux
33156230Smux#include <netinet/in.h>
34156230Smux
35156230Smux#include <assert.h>
36156230Smux#include <errno.h>
37156230Smux#include <pthread.h>
38156230Smux#include <stdarg.h>
39156230Smux#include <stdio.h>
40156230Smux#include <stdlib.h>
41156230Smux#include <string.h>
42156230Smux#include <unistd.h>
43156230Smux
44156230Smux#include "misc.h"
45156230Smux#include "mux.h"
46156230Smux
47156230Smux/*
48156230Smux * Packet types.
49156230Smux */
50156230Smux#define	MUX_STARTUPREQ		0
51156230Smux#define	MUX_STARTUPREP		1
52156230Smux#define	MUX_CONNECT		2
53156230Smux#define	MUX_ACCEPT		3
54156230Smux#define	MUX_RESET		4
55156230Smux#define	MUX_DATA		5
56156230Smux#define	MUX_WINDOW		6
57156230Smux#define	MUX_CLOSE		7
58156230Smux
59156230Smux/*
60156230Smux * Header sizes.
61156230Smux */
62156230Smux#define	MUX_STARTUPHDRSZ	3
63156230Smux#define	MUX_CONNECTHDRSZ	8
64156230Smux#define	MUX_ACCEPTHDRSZ		8
65156230Smux#define	MUX_RESETHDRSZ		2
66156230Smux#define	MUX_DATAHDRSZ		4
67156230Smux#define	MUX_WINDOWHDRSZ		6
68156230Smux#define	MUX_CLOSEHDRSZ		2
69156230Smux
70156230Smux#define	MUX_PROTOVER		0		/* Protocol version. */
71156230Smux
72156230Smuxstruct mux_header {
73156230Smux	uint8_t type;
74156230Smux	union {
75156230Smux		struct {
76156230Smux			uint16_t version;
77156230Smux		} __packed mh_startup;
78156230Smux		struct {
79156230Smux			uint8_t id;
80156230Smux			uint16_t mss;
81156230Smux			uint32_t window;
82156230Smux		} __packed mh_connect;
83156230Smux		struct {
84156230Smux			uint8_t id;
85156230Smux			uint16_t mss;
86156230Smux			uint32_t window;
87156230Smux		} __packed mh_accept;
88156230Smux		struct {
89156230Smux			uint8_t id;
90156230Smux		} __packed mh_reset;
91156230Smux		struct {
92156230Smux			uint8_t id;
93156230Smux			uint16_t len;
94156230Smux		} __packed mh_data;
95156230Smux		struct {
96156230Smux			uint8_t id;
97156230Smux			uint32_t window;
98156230Smux		} __packed mh_window;
99156230Smux		struct {
100156230Smux			uint8_t id;
101156230Smux		} __packed mh_close;
102156230Smux	} mh_u;
103156230Smux} __packed;
104156230Smux
105156230Smux#define	mh_startup		mh_u.mh_startup
106156230Smux#define	mh_connect		mh_u.mh_connect
107156230Smux#define	mh_accept		mh_u.mh_accept
108156230Smux#define	mh_reset		mh_u.mh_reset
109156230Smux#define	mh_data			mh_u.mh_data
110156230Smux#define	mh_window		mh_u.mh_window
111156230Smux#define	mh_close		mh_u.mh_close
112156230Smux
113156230Smux#define	MUX_MAXCHAN		2
114156230Smux
115156230Smux/* Channel states. */
116156230Smux#define	CS_UNUSED		0
117156230Smux#define	CS_LISTENING		1
118156230Smux#define	CS_CONNECTING		2
119156230Smux#define	CS_ESTABLISHED		3
120156230Smux#define	CS_RDCLOSED		4
121156230Smux#define	CS_WRCLOSED		5
122156230Smux#define	CS_CLOSED		6
123156230Smux
124156230Smux/* Channel flags. */
125156230Smux#define	CF_CONNECT		0x01
126156230Smux#define	CF_ACCEPT		0x02
127156230Smux#define	CF_RESET		0x04
128156230Smux#define	CF_WINDOW		0x08
129156230Smux#define	CF_DATA			0x10
130156230Smux#define	CF_CLOSE		0x20
131156230Smux
132156230Smux#define	CHAN_SBSIZE		(16 * 1024)	/* Send buffer size. */
133156230Smux#define	CHAN_RBSIZE		(16 * 1024)	/* Receive buffer size. */
134156230Smux#define	CHAN_MAXSEGSIZE		1024		/* Maximum segment size. */
135156230Smux
136156230Smux/* Circular buffer. */
137156230Smuxstruct buf {
138156230Smux	uint8_t *data;
139156230Smux	size_t size;
140156230Smux	size_t in;
141156230Smux	size_t out;
142156230Smux};
143156230Smux
144156230Smuxstruct chan {
145156230Smux	int		flags;
146156230Smux	int		state;
147156230Smux	pthread_mutex_t	lock;
148156230Smux	struct mux	*mux;
149156230Smux
150156230Smux	/* Receiver state variables. */
151156230Smux	struct buf	*recvbuf;
152156230Smux	pthread_cond_t	rdready;
153156230Smux	uint32_t	recvseq;
154156230Smux	uint16_t	recvmss;
155156230Smux
156156230Smux	/* Sender state variables. */
157156230Smux	struct buf	*sendbuf;
158156230Smux	pthread_cond_t	wrready;
159156230Smux	uint32_t	sendseq;
160156230Smux	uint32_t	sendwin;
161156230Smux	uint16_t	sendmss;
162156230Smux};
163156230Smux
164156230Smuxstruct mux {
165156230Smux	int		closed;
166156230Smux	int		status;
167156230Smux	int		socket;
168156230Smux	pthread_mutex_t	lock;
169156230Smux	pthread_cond_t	done;
170156230Smux	struct chan	*channels[MUX_MAXCHAN];
171156230Smux	int		nchans;
172156230Smux
173156230Smux	/* Sender thread data. */
174156230Smux	pthread_t	sender;
175156230Smux	pthread_cond_t	sender_newwork;
176156230Smux	pthread_cond_t	sender_started;
177156230Smux	int		sender_waiting;
178156230Smux	int		sender_ready;
179156230Smux	int		sender_lastid;
180156230Smux
181156230Smux	/* Receiver thread data. */
182156230Smux	pthread_t	receiver;
183156230Smux};
184156230Smux
185156230Smuxstatic int		 sock_writev(int, struct iovec *, int);
186156230Smuxstatic int		 sock_write(int, void *, size_t);
187156230Smuxstatic ssize_t		 sock_read(int, void *, size_t);
188156230Smuxstatic int		 sock_readwait(int, void *, size_t);
189156230Smux
190156230Smuxstatic int		 mux_init(struct mux *);
191156230Smuxstatic void		 mux_lock(struct mux *);
192156230Smuxstatic void		 mux_unlock(struct mux *);
193156230Smux
194156230Smuxstatic struct chan	*chan_new(struct mux *);
195156230Smuxstatic struct chan	*chan_get(struct mux *, int);
196156230Smuxstatic struct chan	*chan_connect(struct mux *, int);
197156230Smuxstatic void		 chan_lock(struct chan *);
198156230Smuxstatic void		 chan_unlock(struct chan *);
199156230Smuxstatic int		 chan_insert(struct mux *, struct chan *);
200156230Smuxstatic void		 chan_free(struct chan *);
201156230Smux
202156230Smuxstatic struct buf	*buf_new(size_t);
203156230Smuxstatic size_t		 buf_count(struct buf *);
204156230Smuxstatic size_t		 buf_avail(struct buf *);
205156230Smuxstatic void		 buf_get(struct buf *, void *, size_t);
206156230Smuxstatic void		 buf_put(struct buf *, const void *, size_t);
207156230Smuxstatic void		 buf_free(struct buf *);
208156230Smux
209156230Smuxstatic void		 sender_wakeup(struct mux *);
210156230Smuxstatic void		*sender_loop(void *);
211156230Smuxstatic int		 sender_waitforwork(struct mux *, int *);
212156230Smuxstatic int		 sender_scan(struct mux *, int *);
213156230Smuxstatic void		 sender_cleanup(void *);
214156230Smux
215156230Smuxstatic void		*receiver_loop(void *);
216156230Smux
217156230Smuxstatic int
218156230Smuxsock_writev(int s, struct iovec *iov, int iovcnt)
219156230Smux{
220156230Smux	ssize_t nbytes;
221156230Smux
222156230Smuxagain:
223156230Smux	nbytes = writev(s, iov, iovcnt);
224156230Smux	if (nbytes != -1) {
225156230Smux		while (nbytes > 0 && (size_t)nbytes >= iov->iov_len) {
226156230Smux			nbytes -= iov->iov_len;
227156230Smux			iov++;
228156230Smux			iovcnt--;
229156230Smux		}
230156230Smux		if (nbytes == 0)
231156230Smux			return (0);
232156230Smux		iov->iov_len -= nbytes;
233156230Smux		iov->iov_base = (char *)iov->iov_base + nbytes;
234156230Smux	} else if (errno != EINTR) {
235156230Smux		return (-1);
236156230Smux	}
237156230Smux	goto again;
238156230Smux}
239156230Smux
240156230Smuxstatic int
241156230Smuxsock_write(int s, void *buf, size_t size)
242156230Smux{
243156230Smux	struct iovec iov;
244156230Smux	int ret;
245156230Smux
246156230Smux	iov.iov_base = buf;
247156230Smux	iov.iov_len = size;
248156230Smux	ret = sock_writev(s, &iov, 1);
249156230Smux	return (ret);
250156230Smux}
251156230Smux
252156230Smuxstatic ssize_t
253156230Smuxsock_read(int s, void *buf, size_t size)
254156230Smux{
255156230Smux	ssize_t nbytes;
256156230Smux
257156230Smuxagain:
258156230Smux	nbytes = read(s, buf, size);
259156230Smux	if (nbytes == -1 && errno == EINTR)
260156230Smux		goto again;
261156230Smux	return (nbytes);
262156230Smux}
263156230Smux
264156230Smuxstatic int
265156230Smuxsock_readwait(int s, void *buf, size_t size)
266156230Smux{
267156230Smux	char *cp;
268156230Smux	ssize_t nbytes;
269156230Smux	size_t left;
270156230Smux
271156230Smux	cp = buf;
272156230Smux	left = size;
273156230Smux	while (left > 0) {
274156230Smux		nbytes = sock_read(s, cp, left);
275156230Smux		if (nbytes == 0) {
276156230Smux			errno = ECONNRESET;
277156230Smux			return (-1);
278156230Smux		}
279156230Smux		if (nbytes < 0)
280156230Smux			return (-1);
281156230Smux		left -= nbytes;
282156230Smux		cp += nbytes;
283156230Smux	}
284156230Smux	return (0);
285156230Smux}
286156230Smux
287156230Smuxstatic void
288156230Smuxmux_lock(struct mux *m)
289156230Smux{
290156230Smux	int error;
291156230Smux
292156230Smux	error = pthread_mutex_lock(&m->lock);
293156230Smux	assert(!error);
294156230Smux}
295156230Smux
296156230Smuxstatic void
297156230Smuxmux_unlock(struct mux *m)
298156230Smux{
299156230Smux	int error;
300156230Smux
301156230Smux	error = pthread_mutex_unlock(&m->lock);
302156230Smux	assert(!error);
303156230Smux}
304156230Smux
305156230Smux/* Create a TCP multiplexer on the given socket. */
306156230Smuxstruct mux *
307156230Smuxmux_open(int sock, struct chan **chan)
308156230Smux{
309156230Smux	struct mux *m;
310156230Smux	struct chan *chan0;
311156230Smux	int error;
312156230Smux
313156230Smux	m = xmalloc(sizeof(struct mux));
314156230Smux	memset(m->channels, 0, sizeof(m->channels));
315156230Smux	m->nchans = 0;
316156230Smux	m->closed = 0;
317156230Smux	m->status = -1;
318156230Smux	m->socket = sock;
319156230Smux
320156230Smux	m->sender_waiting = 0;
321156230Smux	m->sender_lastid = 0;
322156230Smux	m->sender_ready = 0;
323156230Smux	pthread_mutex_init(&m->lock, NULL);
324156230Smux	pthread_cond_init(&m->done, NULL);
325156230Smux	pthread_cond_init(&m->sender_newwork, NULL);
326156230Smux	pthread_cond_init(&m->sender_started, NULL);
327156230Smux
328156230Smux	error = mux_init(m);
329156230Smux	if (error)
330156230Smux		goto bad;
331156230Smux	chan0 = chan_connect(m, 0);
332156230Smux	if (chan0 == NULL)
333156230Smux		goto bad;
334156230Smux	*chan = chan0;
335156230Smux	return (m);
336156230Smuxbad:
337156230Smux	mux_shutdown(m, NULL, STATUS_FAILURE);
338156230Smux	(void)mux_close(m);
339156230Smux	return (NULL);
340156230Smux}
341156230Smux
342156230Smuxint
343156230Smuxmux_close(struct mux *m)
344156230Smux{
345156230Smux	struct chan *chan;
346156230Smux	int i, status;
347156230Smux
348156230Smux	assert(m->closed);
349156230Smux	for (i = 0; i < m->nchans; i++) {
350156230Smux		chan = m->channels[i];
351156230Smux		if (chan != NULL)
352156230Smux			chan_free(chan);
353156230Smux	}
354156230Smux	pthread_cond_destroy(&m->sender_started);
355156230Smux	pthread_cond_destroy(&m->sender_newwork);
356156230Smux	pthread_cond_destroy(&m->done);
357156230Smux	pthread_mutex_destroy(&m->lock);
358156230Smux	status = m->status;
359156230Smux	free(m);
360156230Smux	return (status);
361156230Smux}
362156230Smux
363156230Smux/* Close a channel. */
364156230Smuxint
365156230Smuxchan_close(struct chan *chan)
366156230Smux{
367156230Smux
368156230Smux	chan_lock(chan);
369156230Smux	if (chan->state == CS_ESTABLISHED) {
370156230Smux		chan->state = CS_WRCLOSED;
371156230Smux		chan->flags |= CF_CLOSE;
372156230Smux	} else if (chan->state == CS_RDCLOSED) {
373156230Smux		chan->state = CS_CLOSED;
374156230Smux		chan->flags |= CF_CLOSE;
375156230Smux	} else if (chan->state == CS_WRCLOSED || chan->state == CS_CLOSED) {
376156230Smux		chan_unlock(chan);
377156230Smux		return (0);
378156230Smux	} else {
379156230Smux		chan_unlock(chan);
380156230Smux		return (-1);
381156230Smux	}
382156230Smux	chan_unlock(chan);
383156230Smux	sender_wakeup(chan->mux);
384156230Smux	return (0);
385156230Smux}
386156230Smux
387156230Smuxvoid
388156230Smuxchan_wait(struct chan *chan)
389156230Smux{
390156230Smux
391156230Smux	chan_lock(chan);
392156230Smux	while (chan->state != CS_CLOSED)
393156230Smux		pthread_cond_wait(&chan->rdready, &chan->lock);
394156230Smux	chan_unlock(chan);
395156230Smux}
396156230Smux
397156230Smux/* Returns the ID of an available channel in the listening state. */
398156230Smuxint
399156230Smuxchan_listen(struct mux *m)
400156230Smux{
401156230Smux	struct chan *chan;
402156230Smux	int i;
403156230Smux
404156230Smux	mux_lock(m);
405156230Smux	for (i = 0; i < m->nchans; i++) {
406156230Smux		chan = m->channels[i];
407156230Smux		chan_lock(chan);
408156230Smux		if (chan->state == CS_UNUSED) {
409156230Smux			mux_unlock(m);
410156230Smux			chan->state = CS_LISTENING;
411156230Smux			chan_unlock(chan);
412156230Smux			return (i);
413156230Smux		}
414156230Smux		chan_unlock(chan);
415156230Smux	}
416156230Smux	mux_unlock(m);
417156230Smux	chan = chan_new(m);
418156230Smux	chan->state = CS_LISTENING;
419156230Smux	i = chan_insert(m, chan);
420156230Smux	if (i == -1)
421156230Smux		chan_free(chan);
422156230Smux	return (i);
423156230Smux}
424156230Smux
425156230Smuxstruct chan *
426156230Smuxchan_accept(struct mux *m, int id)
427156230Smux{
428156230Smux	struct chan *chan;
429156230Smux
430156230Smux	chan = chan_get(m, id);
431156230Smux	while (chan->state == CS_LISTENING)
432156230Smux		pthread_cond_wait(&chan->rdready, &chan->lock);
433156230Smux	if (chan->state != CS_ESTABLISHED) {
434156230Smux		errno = ECONNRESET;
435156230Smux		chan_unlock(chan);
436156230Smux		return (NULL);
437156230Smux	}
438156230Smux	chan_unlock(chan);
439156230Smux	return (chan);
440156230Smux}
441156230Smux
442156230Smux/* Read bytes from a channel. */
443156230Smuxssize_t
444156230Smuxchan_read(struct chan *chan, void *buf, size_t size)
445156230Smux{
446156230Smux	char *cp;
447156230Smux	size_t count, n;
448156230Smux
449156230Smux	cp = buf;
450156230Smux	chan_lock(chan);
451156230Smux	for (;;) {
452156230Smux		if (chan->state == CS_RDCLOSED || chan->state == CS_CLOSED) {
453156230Smux			chan_unlock(chan);
454156230Smux			return (0);
455156230Smux		}
456156230Smux		if (chan->state != CS_ESTABLISHED &&
457156230Smux		    chan->state != CS_WRCLOSED) {
458156230Smux			chan_unlock(chan);
459156230Smux			errno = EBADF;
460156230Smux			return (-1);
461156230Smux		}
462156230Smux		count = buf_count(chan->recvbuf);
463156230Smux		if (count > 0)
464156230Smux			break;
465156230Smux		pthread_cond_wait(&chan->rdready, &chan->lock);
466156230Smux	}
467156230Smux	n = min(count, size);
468156230Smux	buf_get(chan->recvbuf, cp, n);
469156230Smux	chan->recvseq += n;
470156230Smux	chan->flags |= CF_WINDOW;
471156230Smux	chan_unlock(chan);
472156230Smux	/* We need to wake up the sender so that it sends a window update. */
473156230Smux	sender_wakeup(chan->mux);
474156230Smux	return (n);
475156230Smux}
476156230Smux
477156230Smux/* Write bytes to a channel. */
478156230Smuxssize_t
479156230Smuxchan_write(struct chan *chan, const void *buf, size_t size)
480156230Smux{
481156230Smux	const char *cp;
482156230Smux	size_t avail, n, pos;
483156230Smux
484156230Smux	pos = 0;
485156230Smux	cp = buf;
486156230Smux	chan_lock(chan);
487156230Smux	while (pos < size) {
488156230Smux		for (;;) {
489156230Smux			if (chan->state != CS_ESTABLISHED &&
490156230Smux			    chan->state != CS_RDCLOSED) {
491156230Smux				chan_unlock(chan);
492156230Smux				errno = EPIPE;
493156230Smux				return (-1);
494156230Smux			}
495156230Smux			avail = buf_avail(chan->sendbuf);
496156230Smux			if (avail > 0)
497156230Smux				break;
498156230Smux			pthread_cond_wait(&chan->wrready, &chan->lock);
499156230Smux		}
500156230Smux		n = min(avail, size - pos);
501156230Smux		buf_put(chan->sendbuf, cp + pos, n);
502156230Smux		pos += n;
503156230Smux	}
504156230Smux	chan_unlock(chan);
505156230Smux	sender_wakeup(chan->mux);
506156230Smux	return (size);
507156230Smux}
508156230Smux
509156230Smux/*
510156230Smux * Internal channel API.
511156230Smux */
512156230Smux
513156230Smuxstatic struct chan *
514156230Smuxchan_connect(struct mux *m, int id)
515156230Smux{
516156230Smux	struct chan *chan;
517156230Smux
518156230Smux	chan = chan_get(m, id);
519156230Smux	if (chan->state != CS_UNUSED) {
520156230Smux		chan_unlock(chan);
521156230Smux		return (NULL);
522156230Smux	}
523156230Smux	chan->state = CS_CONNECTING;
524156230Smux	chan->flags |= CF_CONNECT;
525156230Smux	chan_unlock(chan);
526156230Smux	sender_wakeup(m);
527156230Smux	chan_lock(chan);
528156230Smux	while (chan->state == CS_CONNECTING)
529156230Smux		pthread_cond_wait(&chan->wrready, &chan->lock);
530156230Smux	if (chan->state != CS_ESTABLISHED) {
531156230Smux		chan_unlock(chan);
532156230Smux		return (NULL);
533156230Smux	}
534156230Smux	chan_unlock(chan);
535156230Smux	return (chan);
536156230Smux}
537156230Smux
538156230Smux/*
539156230Smux * Get a channel from its ID, creating it if necessary.
540156230Smux * The channel is returned locked.
541156230Smux */
542156230Smuxstatic struct chan *
543156230Smuxchan_get(struct mux *m, int id)
544156230Smux{
545156230Smux	struct chan *chan;
546156230Smux
547156230Smux	assert(id < MUX_MAXCHAN);
548156230Smux	mux_lock(m);
549156230Smux	chan = m->channels[id];
550156230Smux	if (chan == NULL) {
551156230Smux		chan = chan_new(m);
552156230Smux		m->channels[id] = chan;
553156230Smux		m->nchans++;
554156230Smux	}
555156230Smux	chan_lock(chan);
556156230Smux	mux_unlock(m);
557156230Smux	return (chan);
558156230Smux}
559156230Smux
560156230Smux/* Lock a channel. */
561156230Smuxstatic void
562156230Smuxchan_lock(struct chan *chan)
563156230Smux{
564156230Smux	int error;
565156230Smux
566156230Smux	error = pthread_mutex_lock(&chan->lock);
567156230Smux	assert(!error);
568156230Smux}
569156230Smux
570156230Smux/* Unlock a channel.  */
571156230Smuxstatic void
572156230Smuxchan_unlock(struct chan *chan)
573156230Smux{
574156230Smux	int error;
575156230Smux
576156230Smux	error = pthread_mutex_unlock(&chan->lock);
577156230Smux	assert(!error);
578156230Smux}
579156230Smux
580156230Smux/*
581156230Smux * Create a new channel.
582156230Smux */
583156230Smuxstatic struct chan *
584156230Smuxchan_new(struct mux *m)
585156230Smux{
586156230Smux	struct chan *chan;
587156230Smux
588156230Smux	chan = xmalloc(sizeof(struct chan));
589156230Smux	chan->state = CS_UNUSED;
590156230Smux	chan->flags = 0;
591156230Smux	chan->mux = m;
592156230Smux	chan->sendbuf = buf_new(CHAN_SBSIZE);
593156230Smux	chan->sendseq = 0;
594156230Smux	chan->sendwin = 0;
595156230Smux	chan->sendmss = 0;
596156230Smux	chan->recvbuf = buf_new(CHAN_RBSIZE);
597156230Smux	chan->recvseq = 0;
598156230Smux	chan->recvmss = CHAN_MAXSEGSIZE;
599156230Smux	pthread_mutex_init(&chan->lock, NULL);
600156230Smux	pthread_cond_init(&chan->rdready, NULL);
601156230Smux	pthread_cond_init(&chan->wrready, NULL);
602156230Smux	return (chan);
603156230Smux}
604156230Smux
605156230Smux/* Free any resources associated with a channel. */
606156230Smuxstatic void
607156230Smuxchan_free(struct chan *chan)
608156230Smux{
609156230Smux
610156230Smux	pthread_cond_destroy(&chan->rdready);
611156230Smux	pthread_cond_destroy(&chan->wrready);
612156230Smux	pthread_mutex_destroy(&chan->lock);
613156230Smux	buf_free(chan->recvbuf);
614156230Smux	buf_free(chan->sendbuf);
615156230Smux	free(chan);
616156230Smux}
617156230Smux
618156230Smux/* Insert the new channel in the channel list. */
619156230Smuxstatic int
620156230Smuxchan_insert(struct mux *m, struct chan *chan)
621156230Smux{
622156230Smux	int i;
623156230Smux
624156230Smux	mux_lock(m);
625156230Smux	for (i = 0; i < MUX_MAXCHAN; i++) {
626156230Smux		if (m->channels[i] == NULL) {
627156230Smux			m->channels[i] = chan;
628156230Smux			m->nchans++;
629156230Smux			mux_unlock(m);
630156230Smux			return (i);
631156230Smux		}
632156230Smux	}
633156230Smux	errno = ENOBUFS;
634156230Smux	return (-1);
635156230Smux}
636156230Smux
637156230Smux/*
638156230Smux * Initialize the multiplexer protocol.
639156230Smux *
640156230Smux * This means negotiating protocol version and starting
641156230Smux * the receiver and sender threads.
642156230Smux */
643156230Smuxstatic int
644156230Smuxmux_init(struct mux *m)
645156230Smux{
646156230Smux	struct mux_header mh;
647156230Smux	int error;
648156230Smux
649156230Smux	mh.type = MUX_STARTUPREQ;
650156230Smux	mh.mh_startup.version = htons(MUX_PROTOVER);
651156230Smux	error = sock_write(m->socket, &mh, MUX_STARTUPHDRSZ);
652156230Smux	if (error)
653156230Smux		return (-1);
654156230Smux	error = sock_readwait(m->socket, &mh, MUX_STARTUPHDRSZ);
655156230Smux	if (error)
656156230Smux		return (-1);
657156230Smux	if (mh.type != MUX_STARTUPREP ||
658156230Smux	    ntohs(mh.mh_startup.version) != MUX_PROTOVER)
659156230Smux		return (-1);
660156230Smux	mux_lock(m);
661156230Smux	error = pthread_create(&m->sender, NULL, sender_loop, m);
662156230Smux	if (error) {
663156230Smux		mux_unlock(m);
664156230Smux		return (-1);
665156230Smux	}
666156230Smux	/*
667156230Smux	 * Make sure the sender thread has run and is waiting for new work
668156230Smux	 * before going on.  Otherwise, it might lose the race and a
669156230Smux	 * request, which will cause a deadlock.
670156230Smux	 */
671156230Smux	while (!m->sender_ready)
672156230Smux		pthread_cond_wait(&m->sender_started, &m->lock);
673156230Smux
674156230Smux	mux_unlock(m);
675156230Smux	error = pthread_create(&m->receiver, NULL, receiver_loop, m);
676156230Smux	if (error)
677156230Smux		return (-1);
678156230Smux	return (0);
679156230Smux}
680156230Smux
681156230Smux/*
682156230Smux * Close all the channels, terminate the sender and receiver thread.
683228992Suqs * This is an important function because it is used every time we need
684156230Smux * to wake up all the worker threads to abort the program.
685156230Smux *
686156230Smux * This function accepts an error message that will be printed if the
687156230Smux * multiplexer wasn't already closed.  This is useful because it ensures
688156230Smux * that only the first error message will be printed, and that it will
689156230Smux * be printed before doing the actual shutdown work.  If this is a
690156230Smux * normal shutdown, NULL can be passed instead.
691156230Smux *
692156230Smux * The "status" parameter of the first mux_shutdown() call is retained
693156230Smux * and then returned by mux_close(),  so that the main thread can know
694156230Smux * what type of error happened in the end, if any.
695156230Smux */
696156230Smuxvoid
697156230Smuxmux_shutdown(struct mux *m, const char *errmsg, int status)
698156230Smux{
699156230Smux	pthread_t self, sender, receiver;
700156230Smux	struct chan *chan;
701156230Smux	const char *name;
702156230Smux	void *val;
703156230Smux	int i, ret;
704156230Smux
705156230Smux	mux_lock(m);
706156230Smux	if (m->closed) {
707156230Smux		mux_unlock(m);
708156230Smux		return;
709156230Smux	}
710156230Smux	m->closed = 1;
711156230Smux	m->status = status;
712156230Smux	self = pthread_self();
713156230Smux	sender = m->sender;
714156230Smux	receiver = m->receiver;
715156230Smux	if (errmsg != NULL) {
716156230Smux		if (pthread_equal(self, receiver))
717156230Smux			name = "Receiver";
718156230Smux		else if (pthread_equal(self, sender))
719156230Smux			name = "Sender";
720156230Smux		else
721156230Smux			name = NULL;
722156230Smux		if (name == NULL)
723156230Smux			lprintf(-1, "%s\n", errmsg);
724156230Smux		else
725156230Smux			lprintf(-1, "%s: %s\n", name, errmsg);
726156230Smux	}
727156230Smux
728156230Smux	for (i = 0; i < MUX_MAXCHAN; i++) {
729156230Smux		if (m->channels[i] != NULL) {
730156230Smux			chan = m->channels[i];
731156230Smux			chan_lock(chan);
732156230Smux			if (chan->state != CS_UNUSED) {
733156230Smux				chan->state = CS_CLOSED;
734156230Smux				chan->flags = 0;
735156230Smux				pthread_cond_broadcast(&chan->rdready);
736156230Smux				pthread_cond_broadcast(&chan->wrready);
737156230Smux			}
738156230Smux			chan_unlock(chan);
739156230Smux		}
740156230Smux	}
741156230Smux	mux_unlock(m);
742156230Smux
743156230Smux	if (!pthread_equal(self, receiver)) {
744156230Smux		ret = pthread_cancel(receiver);
745156230Smux		assert(!ret);
746156230Smux		pthread_join(receiver, &val);
747156230Smux		assert(val == PTHREAD_CANCELED);
748156230Smux	}
749156230Smux	if (!pthread_equal(self, sender)) {
750156230Smux		ret = pthread_cancel(sender);
751156230Smux		assert(!ret);
752156230Smux		pthread_join(sender, &val);
753156230Smux		assert(val == PTHREAD_CANCELED);
754156230Smux	}
755156230Smux}
756156230Smux
757156230Smuxstatic void
758156230Smuxsender_wakeup(struct mux *m)
759156230Smux{
760156230Smux	int waiting;
761156230Smux
762156230Smux	mux_lock(m);
763156230Smux	waiting = m->sender_waiting;
764156230Smux	mux_unlock(m);
765156230Smux	/*
766156230Smux	 * We don't care about the race here: if the sender was
767156230Smux	 * waiting and is not anymore, we'll just send a useless
768156230Smux	 * signal; if he wasn't waiting then he won't go to sleep
769156230Smux	 * before having sent what we want him to.
770156230Smux	 */
771156230Smux	if (waiting)
772156230Smux		pthread_cond_signal(&m->sender_newwork);
773156230Smux}
774156230Smux
775156230Smuxstatic void *
776156230Smuxsender_loop(void *arg)
777156230Smux{
778156230Smux	struct iovec iov[3];
779156230Smux	struct mux_header mh;
780156230Smux	struct mux *m;
781156230Smux	struct chan *chan;
782156230Smux	struct buf *buf;
783156230Smux	uint32_t winsize;
784156230Smux	uint16_t hdrsize, size, len;
785173715Sjb	int error, id, iovcnt, what = 0;
786156230Smux
787156230Smux	m = (struct mux *)arg;
788186781Slulf	what = 0;
789156230Smuxagain:
790156230Smux	id = sender_waitforwork(m, &what);
791156230Smux	chan = chan_get(m, id);
792156230Smux	hdrsize = size = 0;
793156230Smux	switch (what) {
794156230Smux	case CF_CONNECT:
795156230Smux		mh.type = MUX_CONNECT;
796156230Smux		mh.mh_connect.id = id;
797156230Smux		mh.mh_connect.mss = htons(chan->recvmss);
798156230Smux		mh.mh_connect.window = htonl(chan->recvseq +
799156230Smux		    chan->recvbuf->size);
800156230Smux		hdrsize = MUX_CONNECTHDRSZ;
801156230Smux		break;
802156230Smux	case CF_ACCEPT:
803156230Smux		mh.type = MUX_ACCEPT;
804156230Smux		mh.mh_accept.id = id;
805156230Smux		mh.mh_accept.mss = htons(chan->recvmss);
806156230Smux		mh.mh_accept.window = htonl(chan->recvseq +
807156230Smux		    chan->recvbuf->size);
808156230Smux		hdrsize = MUX_ACCEPTHDRSZ;
809156230Smux		break;
810156230Smux	case CF_RESET:
811156230Smux		mh.type = MUX_RESET;
812156230Smux		mh.mh_reset.id = id;
813156230Smux		hdrsize = MUX_RESETHDRSZ;
814156230Smux		break;
815156230Smux	case CF_WINDOW:
816156230Smux		mh.type = MUX_WINDOW;
817156230Smux		mh.mh_window.id = id;
818156230Smux		mh.mh_window.window = htonl(chan->recvseq +
819156230Smux		    chan->recvbuf->size);
820156230Smux		hdrsize = MUX_WINDOWHDRSZ;
821156230Smux		break;
822156230Smux	case CF_DATA:
823156230Smux		mh.type = MUX_DATA;
824156230Smux		mh.mh_data.id = id;
825156230Smux		size = min(buf_count(chan->sendbuf), chan->sendmss);
826156230Smux		winsize = chan->sendwin - chan->sendseq;
827156230Smux		if (winsize < size)
828156230Smux			size = winsize;
829156230Smux		mh.mh_data.len = htons(size);
830156230Smux		hdrsize = MUX_DATAHDRSZ;
831156230Smux		break;
832156230Smux	case CF_CLOSE:
833156230Smux		mh.type = MUX_CLOSE;
834156230Smux		mh.mh_close.id = id;
835156230Smux		hdrsize = MUX_CLOSEHDRSZ;
836156230Smux		break;
837156230Smux	}
838156230Smux	if (size > 0) {
839156230Smux		assert(mh.type == MUX_DATA);
840156230Smux		/*
841156230Smux		 * Older FreeBSD versions (and maybe other OSes) have the
842156230Smux		 * iov_base field defined as char *.  Cast to char * to
843156230Smux		 * silence a warning in this case.
844156230Smux		 */
845156230Smux		iov[0].iov_base = (char *)&mh;
846156230Smux		iov[0].iov_len = hdrsize;
847156230Smux		iovcnt = 1;
848156230Smux		/* We access the buffer directly to avoid some copying. */
849156230Smux		buf = chan->sendbuf;
850156230Smux		len = min(size, buf->size + 1 - buf->out);
851156230Smux		iov[iovcnt].iov_base = buf->data + buf->out;
852156230Smux		iov[iovcnt].iov_len = len;
853156230Smux		iovcnt++;
854156230Smux		if (size > len) {
855156230Smux			/* Wrapping around. */
856156230Smux			iov[iovcnt].iov_base = buf->data;
857156230Smux			iov[iovcnt].iov_len = size - len;
858156230Smux			iovcnt++;
859156230Smux		}
860156230Smux		/*
861156230Smux		 * Since we're the only thread sending bytes from the
862156230Smux		 * buffer and modifying buf->out, it's safe to unlock
863156230Smux		 * here during I/O.  It avoids keeping the channel lock
864156230Smux		 * too long, since write() might block.
865156230Smux		 */
866156230Smux		chan_unlock(chan);
867156230Smux		error = sock_writev(m->socket, iov, iovcnt);
868156230Smux		if (error)
869156230Smux			goto bad;
870156230Smux		chan_lock(chan);
871156230Smux		chan->sendseq += size;
872156230Smux		buf->out += size;
873156230Smux		if (buf->out > buf->size)
874156230Smux			buf->out -= buf->size + 1;
875156230Smux		pthread_cond_signal(&chan->wrready);
876156230Smux		chan_unlock(chan);
877156230Smux	} else {
878156230Smux		chan_unlock(chan);
879156230Smux		error = sock_write(m->socket, &mh, hdrsize);
880156230Smux		if (error)
881156230Smux			goto bad;
882156230Smux	}
883156230Smux	goto again;
884156230Smuxbad:
885156230Smux	if (error == EPIPE)
886156230Smux		mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE);
887156230Smux	else
888156230Smux		mux_shutdown(m, strerror(errno), STATUS_FAILURE);
889156230Smux	return (NULL);
890156230Smux}
891156230Smux
892156230Smuxstatic void
893156230Smuxsender_cleanup(void *arg)
894156230Smux{
895156230Smux	struct mux *m;
896156230Smux
897156230Smux	m = (struct mux *)arg;
898156230Smux	mux_unlock(m);
899156230Smux}
900156230Smux
901156230Smuxstatic int
902156230Smuxsender_waitforwork(struct mux *m, int *what)
903156230Smux{
904156230Smux	int id;
905156230Smux
906156230Smux	mux_lock(m);
907156230Smux	pthread_cleanup_push(sender_cleanup, m);
908156230Smux	if (!m->sender_ready) {
909156230Smux		pthread_cond_signal(&m->sender_started);
910156230Smux		m->sender_ready = 1;
911156230Smux	}
912156230Smux	while ((id = sender_scan(m, what)) == -1) {
913156230Smux		m->sender_waiting = 1;
914156230Smux		pthread_cond_wait(&m->sender_newwork, &m->lock);
915156230Smux	}
916156230Smux	m->sender_waiting = 0;
917156230Smux	pthread_cleanup_pop(1);
918156230Smux	return (id);
919156230Smux}
920156230Smux
921156230Smux/*
922156230Smux * Scan for work to do for the sender.  Has to be called with
923156230Smux * the multiplexer lock held.
924156230Smux */
925156230Smuxstatic int
926156230Smuxsender_scan(struct mux *m, int *what)
927156230Smux{
928156230Smux	struct chan *chan;
929156230Smux	int id;
930156230Smux
931156230Smux	if (m->nchans <= 0)
932156230Smux		return (-1);
933156230Smux	id = m->sender_lastid;
934156230Smux	do {
935156230Smux		id++;
936156230Smux		if (id >= m->nchans)
937156230Smux			id = 0;
938156230Smux		chan = m->channels[id];
939156230Smux		chan_lock(chan);
940156230Smux		if (chan->state != CS_UNUSED) {
941156230Smux			if (chan->sendseq != chan->sendwin &&
942156230Smux			    buf_count(chan->sendbuf) > 0)
943156230Smux				chan->flags |= CF_DATA;
944156230Smux			if (chan->flags) {
945156230Smux				/* By order of importance. */
946156230Smux				if (chan->flags & CF_CONNECT)
947156230Smux					*what = CF_CONNECT;
948156230Smux				else if (chan->flags & CF_ACCEPT)
949156230Smux					*what = CF_ACCEPT;
950156230Smux				else if (chan->flags & CF_RESET)
951156230Smux					*what = CF_RESET;
952156230Smux				else if (chan->flags & CF_WINDOW)
953156230Smux					*what = CF_WINDOW;
954156230Smux				else if (chan->flags & CF_DATA)
955156230Smux					*what = CF_DATA;
956156230Smux				else if (chan->flags & CF_CLOSE)
957156230Smux					*what = CF_CLOSE;
958156230Smux				chan->flags &= ~*what;
959156230Smux				chan_unlock(chan);
960156230Smux				m->sender_lastid = id;
961156230Smux				return (id);
962156230Smux			}
963156230Smux		}
964156230Smux		chan_unlock(chan);
965156230Smux	} while (id != m->sender_lastid);
966156230Smux	return (-1);
967156230Smux}
968156230Smux
969156230Smux/* Read the rest of a packet header depending on its type. */
970156230Smux#define	SOCK_READREST(s, mh, hsize)	\
971156230Smux    sock_readwait(s, (char *)&mh + sizeof(mh.type), (hsize) - sizeof(mh.type))
972156230Smux
973156230Smuxvoid *
974156230Smuxreceiver_loop(void *arg)
975156230Smux{
976156230Smux	struct mux_header mh;
977156230Smux	struct mux *m;
978156230Smux	struct chan *chan;
979156230Smux	struct buf *buf;
980156230Smux	uint16_t size, len;
981156230Smux	int error;
982156230Smux
983156230Smux	m = (struct mux *)arg;
984156230Smux	while ((error = sock_readwait(m->socket, &mh.type,
985156230Smux	    sizeof(mh.type))) == 0) {
986156230Smux		switch (mh.type) {
987156230Smux		case MUX_CONNECT:
988156230Smux			error = SOCK_READREST(m->socket, mh, MUX_CONNECTHDRSZ);
989156230Smux			if (error)
990156230Smux				goto bad;
991156230Smux			chan = chan_get(m, mh.mh_connect.id);
992156230Smux			if (chan->state == CS_LISTENING) {
993156230Smux				chan->state = CS_ESTABLISHED;
994156230Smux				chan->sendmss = ntohs(mh.mh_connect.mss);
995156230Smux				chan->sendwin = ntohl(mh.mh_connect.window);
996156230Smux				chan->flags |= CF_ACCEPT;
997156230Smux				pthread_cond_signal(&chan->rdready);
998156230Smux			} else
999156230Smux				chan->flags |= CF_RESET;
1000156230Smux			chan_unlock(chan);
1001156230Smux			sender_wakeup(m);
1002156230Smux			break;
1003156230Smux		case MUX_ACCEPT:
1004156230Smux			error = SOCK_READREST(m->socket, mh, MUX_ACCEPTHDRSZ);
1005156230Smux			if (error)
1006156230Smux				goto bad;
1007156230Smux			chan = chan_get(m, mh.mh_accept.id);
1008156230Smux			if (chan->state == CS_CONNECTING) {
1009156230Smux				chan->sendmss = ntohs(mh.mh_accept.mss);
1010156230Smux				chan->sendwin = ntohl(mh.mh_accept.window);
1011156230Smux				chan->state = CS_ESTABLISHED;
1012156230Smux				pthread_cond_signal(&chan->wrready);
1013156230Smux				chan_unlock(chan);
1014156230Smux			} else {
1015156230Smux				chan->flags |= CF_RESET;
1016156230Smux				chan_unlock(chan);
1017156230Smux				sender_wakeup(m);
1018156230Smux			}
1019156230Smux			break;
1020156230Smux		case MUX_RESET:
1021156230Smux			error = SOCK_READREST(m->socket, mh, MUX_RESETHDRSZ);
1022156230Smux			if (error)
1023156230Smux				goto bad;
1024156230Smux			goto badproto;
1025156230Smux		case MUX_WINDOW:
1026156230Smux			error = SOCK_READREST(m->socket, mh, MUX_WINDOWHDRSZ);
1027156230Smux			if (error)
1028156230Smux				goto bad;
1029156230Smux			chan = chan_get(m, mh.mh_window.id);
1030156230Smux			if (chan->state == CS_ESTABLISHED ||
1031156230Smux			    chan->state == CS_RDCLOSED) {
1032156230Smux				chan->sendwin = ntohl(mh.mh_window.window);
1033156230Smux				chan_unlock(chan);
1034156230Smux				sender_wakeup(m);
1035156230Smux			} else {
1036156230Smux				chan_unlock(chan);
1037156230Smux			}
1038156230Smux			break;
1039156230Smux		case MUX_DATA:
1040156230Smux			error = SOCK_READREST(m->socket, mh, MUX_DATAHDRSZ);
1041156230Smux			if (error)
1042156230Smux				goto bad;
1043156230Smux			chan = chan_get(m, mh.mh_data.id);
1044156230Smux			len = ntohs(mh.mh_data.len);
1045156230Smux			buf = chan->recvbuf;
1046156230Smux			if ((chan->state != CS_ESTABLISHED &&
1047156230Smux			     chan->state != CS_WRCLOSED) ||
1048156230Smux			    (len > buf_avail(buf) ||
1049156230Smux			     len > chan->recvmss)) {
1050156230Smux				chan_unlock(chan);
1051156230Smux				goto badproto;
1052156230Smux				return (NULL);
1053156230Smux			}
1054156230Smux			/*
1055156230Smux			 * Similarly to the sender code, it's safe to
1056156230Smux			 * unlock the channel here.
1057156230Smux			 */
1058156230Smux			chan_unlock(chan);
1059156230Smux			size = min(buf->size + 1 - buf->in, len);
1060156230Smux			error = sock_readwait(m->socket,
1061156230Smux			    buf->data + buf->in, size);
1062156230Smux			if (error)
1063156230Smux				goto bad;
1064156230Smux			if (len > size) {
1065156230Smux				/* Wrapping around. */
1066156230Smux				error = sock_readwait(m->socket,
1067156230Smux				    buf->data, len - size);
1068156230Smux				if (error)
1069156230Smux					goto bad;
1070156230Smux			}
1071156230Smux			chan_lock(chan);
1072156230Smux			buf->in += len;
1073156230Smux			if (buf->in > buf->size)
1074156230Smux				buf->in -= buf->size + 1;
1075156230Smux			pthread_cond_signal(&chan->rdready);
1076156230Smux			chan_unlock(chan);
1077156230Smux			break;
1078156230Smux		case MUX_CLOSE:
1079156230Smux			error = SOCK_READREST(m->socket, mh, MUX_CLOSEHDRSZ);
1080156230Smux			if (error)
1081156230Smux				goto bad;
1082156230Smux			chan = chan_get(m, mh.mh_close.id);
1083156230Smux			if (chan->state == CS_ESTABLISHED)
1084156230Smux				chan->state = CS_RDCLOSED;
1085156230Smux			else if (chan->state == CS_WRCLOSED)
1086156230Smux				chan->state = CS_CLOSED;
1087156230Smux			else
1088156230Smux				goto badproto;
1089156230Smux			pthread_cond_signal(&chan->rdready);
1090156230Smux			chan_unlock(chan);
1091156230Smux			break;
1092156230Smux		default:
1093156230Smux			goto badproto;
1094156230Smux		}
1095156230Smux	}
1096156230Smuxbad:
1097156230Smux	if (errno == ECONNRESET || errno == ECONNABORTED)
1098156230Smux		mux_shutdown(m, strerror(errno), STATUS_TRANSIENTFAILURE);
1099156230Smux	else
1100156230Smux		mux_shutdown(m, strerror(errno), STATUS_FAILURE);
1101156230Smux	return (NULL);
1102156230Smuxbadproto:
1103156230Smux	mux_shutdown(m, "Protocol error", STATUS_FAILURE);
1104156230Smux	return (NULL);
1105156230Smux}
1106156230Smux
1107156230Smux/*
1108156230Smux * Circular buffers API.
1109156230Smux */
1110156230Smux
1111156230Smuxstatic struct buf *
1112156230Smuxbuf_new(size_t size)
1113156230Smux{
1114156230Smux	struct buf *buf;
1115156230Smux
1116156230Smux	buf = xmalloc(sizeof(struct buf));
1117156230Smux	buf->data = xmalloc(size + 1);
1118156230Smux	buf->size = size;
1119156230Smux	buf->in = 0;
1120156230Smux	buf->out = 0;
1121156230Smux	return (buf);
1122156230Smux}
1123156230Smux
1124156230Smuxstatic void
1125156230Smuxbuf_free(struct buf *buf)
1126156230Smux{
1127156230Smux
1128156230Smux	free(buf->data);
1129156230Smux	free(buf);
1130156230Smux}
1131156230Smux
1132156230Smux/* Number of bytes stored in the buffer. */
1133156230Smuxstatic size_t
1134156230Smuxbuf_count(struct buf *buf)
1135156230Smux{
1136156230Smux	size_t count;
1137156230Smux
1138156230Smux	if (buf->in >= buf->out)
1139156230Smux		count = buf->in - buf->out;
1140156230Smux	else
1141156230Smux		count = buf->size + 1 + buf->in - buf->out;
1142156230Smux	return (count);
1143156230Smux}
1144156230Smux
1145156230Smux/* Number of bytes available in the buffer. */
1146156230Smuxstatic size_t
1147156230Smuxbuf_avail(struct buf *buf)
1148156230Smux{
1149156230Smux	size_t avail;
1150156230Smux
1151156230Smux	if (buf->out > buf->in)
1152156230Smux		avail = buf->out - buf->in - 1;
1153156230Smux	else
1154156230Smux		avail = buf->size + buf->out - buf->in;
1155156230Smux	return (avail);
1156156230Smux}
1157156230Smux
1158156230Smuxstatic void
1159156230Smuxbuf_put(struct buf *buf, const void *data, size_t size)
1160156230Smux{
1161156230Smux	const char *cp;
1162156230Smux	size_t len;
1163156230Smux
1164156230Smux	assert(size > 0);
1165156230Smux	assert(buf_avail(buf) >= size);
1166156230Smux	cp = data;
1167156230Smux	len = buf->size + 1 - buf->in;
1168156230Smux	if (len < size) {
1169156230Smux		/* Wrapping around. */
1170156230Smux		memcpy(buf->data + buf->in, cp, len);
1171156230Smux		memcpy(buf->data, cp + len, size - len);
1172156230Smux	} else {
1173156230Smux		/* Not wrapping around. */
1174156230Smux		memcpy(buf->data + buf->in, cp, size);
1175156230Smux	}
1176156230Smux	buf->in += size;
1177156230Smux	if (buf->in > buf->size)
1178156230Smux		buf->in -= buf->size + 1;
1179156230Smux}
1180156230Smux
1181156230Smuxstatic void
1182156230Smuxbuf_get(struct buf *buf, void *data, size_t size)
1183156230Smux{
1184156230Smux	char *cp;
1185156230Smux	size_t len;
1186156230Smux
1187156230Smux	assert(size > 0);
1188156230Smux	assert(buf_count(buf) >= size);
1189156230Smux	cp = data;
1190156230Smux	len = buf->size + 1 - buf->out;
1191156230Smux	if (len < size) {
1192156230Smux		/* Wrapping around. */
1193156230Smux		memcpy(cp, buf->data + buf->out, len);
1194156230Smux		memcpy(cp + len, buf->data, size - len);
1195156230Smux	} else {
1196156230Smux		/* Not wrapping around. */
1197156230Smux		memcpy(cp, buf->data + buf->out, size);
1198156230Smux	}
1199156230Smux	buf->out += size;
1200156230Smux	if (buf->out > buf->size)
1201156230Smux		buf->out -= buf->size + 1;
1202156230Smux}
1203