1156230Smux/*-
2156230Smux * Copyright (c) 2006, Maxime Henrion <mux@FreeBSD.org>
3156230Smux * All rights reserved.
4156230Smux *
5156230Smux * Redistribution and use in source and binary forms, with or without
6156230Smux * modification, are permitted provided that the following conditions
7156230Smux * are met:
8156230Smux * 1. Redistributions of source code must retain the above copyright
9156230Smux *    notice, this list of conditions and the following disclaimer.
10156230Smux * 2. Redistributions in binary form must reproduce the above copyright
11156230Smux *    notice, this list of conditions and the following disclaimer in the
12156230Smux *    documentation and/or other materials provided with the distribution.
13156230Smux *
14156230Smux * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15156230Smux * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16156230Smux * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17156230Smux * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18156230Smux * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19156230Smux * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20156230Smux * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21156230Smux * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22156230Smux * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23156230Smux * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24156230Smux * SUCH DAMAGE.
25156230Smux *
26156230Smux * $FreeBSD$
27156230Smux */
28156230Smux
29156230Smux#include <assert.h>
30156230Smux#include <pthread.h>
31156230Smux#include <stdlib.h>
32156230Smux#include <string.h>
33156230Smux
34156230Smux#include "fixups.h"
35156230Smux#include "misc.h"
36156230Smux#include "queue.h"
37156230Smux
38156230Smux/*
39156230Smux * A synchronized queue to implement fixups.  The updater thread adds
40156230Smux * fixup requests to the queue with fixups_put() when a checksum
41228992Suqs * mismatch error occurred.  It then calls fixups_close() when he's
42156230Smux * done requesting fixups.  The detailer thread gets the fixups with
43156230Smux * fixups_get() and then send the requests to the server.
44156230Smux *
45156230Smux * The queue is synchronized with a mutex and a condition variable.
46156230Smux */
47156230Smux
48156230Smuxstruct fixups {
49156230Smux	pthread_mutex_t lock;
50156230Smux	pthread_cond_t cond;
51156230Smux	STAILQ_HEAD(, fixup) fixupq;
52156230Smux	struct fixup *cur;
53156230Smux	size_t size;
54156230Smux	int closed;
55156230Smux};
56156230Smux
57156230Smuxstatic void		 fixups_lock(struct fixups *);
58156230Smuxstatic void		 fixups_unlock(struct fixups *);
59156230Smux
60156230Smuxstatic struct fixup	*fixup_new(struct coll *, const char *);
61156230Smuxstatic void		 fixup_free(struct fixup *);
62156230Smux
63156230Smuxstatic void
64156230Smuxfixups_lock(struct fixups *f)
65156230Smux{
66156230Smux	int error;
67156230Smux
68156230Smux	error = pthread_mutex_lock(&f->lock);
69156230Smux	assert(!error);
70156230Smux}
71156230Smux
72156230Smuxstatic void
73156230Smuxfixups_unlock(struct fixups *f)
74156230Smux{
75156230Smux	int error;
76156230Smux
77156230Smux	error = pthread_mutex_unlock(&f->lock);
78156230Smux	assert(!error);
79156230Smux}
80156230Smux
81156230Smuxstatic struct fixup *
82156230Smuxfixup_new(struct coll *coll, const char *name)
83156230Smux{
84156230Smux	struct fixup *fixup;
85156230Smux
86156230Smux	fixup = xmalloc(sizeof(struct fixup));
87156230Smux	fixup->f_name = xstrdup(name);
88156230Smux	fixup->f_coll = coll;
89156230Smux	return (fixup);
90156230Smux}
91156230Smux
92156230Smuxstatic void
93156230Smuxfixup_free(struct fixup *fixup)
94156230Smux{
95156230Smux
96156230Smux	free(fixup->f_name);
97156230Smux	free(fixup);
98156230Smux}
99156230Smux
100156230Smux/* Create a new fixup queue. */
101156230Smuxstruct fixups *
102156230Smuxfixups_new(void)
103156230Smux{
104156230Smux	struct fixups *f;
105156230Smux
106156230Smux	f = xmalloc(sizeof(struct fixups));
107156230Smux	f->size = 0;
108156230Smux	f->closed = 0;
109156230Smux	f->cur = NULL;
110156230Smux	STAILQ_INIT(&f->fixupq);
111156230Smux	pthread_mutex_init(&f->lock, NULL);
112156230Smux	pthread_cond_init(&f->cond, NULL);
113156230Smux	return (f);
114156230Smux}
115156230Smux
116156230Smux/* Add a fixup request to the queue. */
117156230Smuxvoid
118156230Smuxfixups_put(struct fixups *f, struct coll *coll, const char *name)
119156230Smux{
120156230Smux	struct fixup *fixup;
121156230Smux	int dosignal;
122156230Smux
123156230Smux	dosignal = 0;
124156230Smux	fixup = fixup_new(coll, name);
125156230Smux	fixups_lock(f);
126156230Smux	assert(!f->closed);
127156230Smux	STAILQ_INSERT_TAIL(&f->fixupq, fixup, f_link);
128156230Smux	if (f->size++ == 0)
129156230Smux		dosignal = 1;
130156230Smux	fixups_unlock(f);
131156230Smux	if (dosignal)
132156230Smux		pthread_cond_signal(&f->cond);
133156230Smux}
134156230Smux
135156230Smux/* Get a fixup request from the queue. */
136156230Smuxstruct fixup *
137156230Smuxfixups_get(struct fixups *f)
138156230Smux{
139156230Smux	struct fixup *fixup, *tofree;
140156230Smux
141156230Smux	fixups_lock(f);
142156230Smux	while (f->size == 0 && !f->closed)
143156230Smux		pthread_cond_wait(&f->cond, &f->lock);
144225980Sadrian	if (f->closed && f->size == 0) {
145156230Smux		fixups_unlock(f);
146156230Smux		return (NULL);
147156230Smux	}
148156230Smux	assert(f->size > 0);
149156230Smux	fixup = STAILQ_FIRST(&f->fixupq);
150156230Smux	tofree = f->cur;
151156230Smux	f->cur = fixup;
152156230Smux	STAILQ_REMOVE_HEAD(&f->fixupq, f_link);
153156230Smux	f->size--;
154156230Smux	fixups_unlock(f);
155156230Smux	if (tofree != NULL)
156156230Smux		fixup_free(tofree);
157156230Smux	return (fixup);
158156230Smux}
159156230Smux
160156230Smux/* Close the writing end of the queue. */
161156230Smuxvoid
162156230Smuxfixups_close(struct fixups *f)
163156230Smux{
164156230Smux	int dosignal;
165156230Smux
166156230Smux	dosignal = 0;
167156230Smux	fixups_lock(f);
168156230Smux	if (f->size == 0 && !f->closed)
169156230Smux		dosignal = 1;
170156230Smux	f->closed = 1;
171156230Smux	fixups_unlock(f);
172156230Smux	if (dosignal)
173156230Smux		pthread_cond_signal(&f->cond);
174156230Smux}
175156230Smux
176156230Smux/* Free a fixups queue. */
177156230Smuxvoid
178156230Smuxfixups_free(struct fixups *f)
179156230Smux{
180156230Smux	struct fixup *fixup, *fixup2;
181156230Smux
182156230Smux	assert(f->closed);
183156230Smux	/*
184156230Smux	 * Free any fixup that has been left on the queue.
185156230Smux	 * This can happen if we have been aborted prematurely.
186156230Smux	 */
187156230Smux	fixup = STAILQ_FIRST(&f->fixupq);
188156230Smux	while (fixup != NULL) {
189156230Smux		fixup2 = STAILQ_NEXT(fixup, f_link);
190156230Smux		fixup_free(fixup);
191156230Smux		fixup = fixup2;
192156230Smux	}
193156230Smux	if (f->cur != NULL)
194156230Smux		fixup_free(f->cur);
195156230Smux	pthread_cond_destroy(&f->cond);
196156230Smux	pthread_mutex_destroy(&f->lock);
197156230Smux	free(f);
198156230Smux}
199