1/*- 2 * Copyright (c) 2006, Maxime Henrion <mux@FreeBSD.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24 * SUCH DAMAGE. 25 * 26 * $FreeBSD$ 27 */ 28 29#include <assert.h> 30#include <pthread.h> 31#include <stdlib.h> 32#include <string.h> 33 34#include "fixups.h" 35#include "misc.h" 36#include "queue.h" 37 38/* 39 * A synchronized queue to implement fixups. The updater thread adds 40 * fixup requests to the queue with fixups_put() when a checksum 41 * mismatch error occurred. It then calls fixups_close() when he's 42 * done requesting fixups. The detailer thread gets the fixups with 43 * fixups_get() and then send the requests to the server. 44 * 45 * The queue is synchronized with a mutex and a condition variable. 46 */ 47 48struct fixups { 49 pthread_mutex_t lock; 50 pthread_cond_t cond; 51 STAILQ_HEAD(, fixup) fixupq; 52 struct fixup *cur; 53 size_t size; 54 int closed; 55}; 56 57static void fixups_lock(struct fixups *); 58static void fixups_unlock(struct fixups *); 59 60static struct fixup *fixup_new(struct coll *, const char *); 61static void fixup_free(struct fixup *); 62 63static void 64fixups_lock(struct fixups *f) 65{ 66 int error; 67 68 error = pthread_mutex_lock(&f->lock); 69 assert(!error); 70} 71 72static void 73fixups_unlock(struct fixups *f) 74{ 75 int error; 76 77 error = pthread_mutex_unlock(&f->lock); 78 assert(!error); 79} 80 81static struct fixup * 82fixup_new(struct coll *coll, const char *name) 83{ 84 struct fixup *fixup; 85 86 fixup = xmalloc(sizeof(struct fixup)); 87 fixup->f_name = xstrdup(name); 88 fixup->f_coll = coll; 89 return (fixup); 90} 91 92static void 93fixup_free(struct fixup *fixup) 94{ 95 96 free(fixup->f_name); 97 free(fixup); 98} 99 100/* Create a new fixup queue. */ 101struct fixups * 102fixups_new(void) 103{ 104 struct fixups *f; 105 106 f = xmalloc(sizeof(struct fixups)); 107 f->size = 0; 108 f->closed = 0; 109 f->cur = NULL; 110 STAILQ_INIT(&f->fixupq); 111 pthread_mutex_init(&f->lock, NULL); 112 pthread_cond_init(&f->cond, NULL); 113 return (f); 114} 115 116/* Add a fixup request to the queue. */ 117void 118fixups_put(struct fixups *f, struct coll *coll, const char *name) 119{ 120 struct fixup *fixup; 121 int dosignal; 122 123 dosignal = 0; 124 fixup = fixup_new(coll, name); 125 fixups_lock(f); 126 assert(!f->closed); 127 STAILQ_INSERT_TAIL(&f->fixupq, fixup, f_link); 128 if (f->size++ == 0) 129 dosignal = 1; 130 fixups_unlock(f); 131 if (dosignal) 132 pthread_cond_signal(&f->cond); 133} 134 135/* Get a fixup request from the queue. */ 136struct fixup * 137fixups_get(struct fixups *f) 138{ 139 struct fixup *fixup, *tofree; 140 141 fixups_lock(f); 142 while (f->size == 0 && !f->closed) 143 pthread_cond_wait(&f->cond, &f->lock); 144 if (f->closed && f->size == 0) { 145 fixups_unlock(f); 146 return (NULL); 147 } 148 assert(f->size > 0); 149 fixup = STAILQ_FIRST(&f->fixupq); 150 tofree = f->cur; 151 f->cur = fixup; 152 STAILQ_REMOVE_HEAD(&f->fixupq, f_link); 153 f->size--; 154 fixups_unlock(f); 155 if (tofree != NULL) 156 fixup_free(tofree); 157 return (fixup); 158} 159 160/* Close the writing end of the queue. */ 161void 162fixups_close(struct fixups *f) 163{ 164 int dosignal; 165 166 dosignal = 0; 167 fixups_lock(f); 168 if (f->size == 0 && !f->closed) 169 dosignal = 1; 170 f->closed = 1; 171 fixups_unlock(f); 172 if (dosignal) 173 pthread_cond_signal(&f->cond); 174} 175 176/* Free a fixups queue. */ 177void 178fixups_free(struct fixups *f) 179{ 180 struct fixup *fixup, *fixup2; 181 182 assert(f->closed); 183 /* 184 * Free any fixup that has been left on the queue. 185 * This can happen if we have been aborted prematurely. 186 */ 187 fixup = STAILQ_FIRST(&f->fixupq); 188 while (fixup != NULL) { 189 fixup2 = STAILQ_NEXT(fixup, f_link); 190 fixup_free(fixup); 191 fixup = fixup2; 192 } 193 if (f->cur != NULL) 194 fixup_free(f->cur); 195 pthread_cond_destroy(&f->cond); 196 pthread_mutex_destroy(&f->lock); 197 free(f); 198} 199