1156230Smux/*- 2156230Smux * Copyright (c) 2006, Maxime Henrion <mux@FreeBSD.org> 3156230Smux * All rights reserved. 4156230Smux * 5156230Smux * Redistribution and use in source and binary forms, with or without 6156230Smux * modification, are permitted provided that the following conditions 7156230Smux * are met: 8156230Smux * 1. Redistributions of source code must retain the above copyright 9156230Smux * notice, this list of conditions and the following disclaimer. 10156230Smux * 2. Redistributions in binary form must reproduce the above copyright 11156230Smux * notice, this list of conditions and the following disclaimer in the 12156230Smux * documentation and/or other materials provided with the distribution. 13156230Smux * 14156230Smux * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 15156230Smux * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 16156230Smux * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 17156230Smux * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 18156230Smux * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 19156230Smux * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 20156230Smux * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 21156230Smux * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 22156230Smux * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 23156230Smux * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 24156230Smux * SUCH DAMAGE. 25156230Smux * 26156230Smux * $FreeBSD$ 27156230Smux */ 28156230Smux 29156230Smux#include <assert.h> 30156230Smux#include <pthread.h> 31156230Smux#include <stdlib.h> 32156230Smux#include <string.h> 33156230Smux 34156230Smux#include "fixups.h" 35156230Smux#include "misc.h" 36156230Smux#include "queue.h" 37156230Smux 38156230Smux/* 39156230Smux * A synchronized queue to implement fixups. The updater thread adds 40156230Smux * fixup requests to the queue with fixups_put() when a checksum 41228992Suqs * mismatch error occurred. It then calls fixups_close() when he's 42156230Smux * done requesting fixups. The detailer thread gets the fixups with 43156230Smux * fixups_get() and then send the requests to the server. 44156230Smux * 45156230Smux * The queue is synchronized with a mutex and a condition variable. 46156230Smux */ 47156230Smux 48156230Smuxstruct fixups { 49156230Smux pthread_mutex_t lock; 50156230Smux pthread_cond_t cond; 51156230Smux STAILQ_HEAD(, fixup) fixupq; 52156230Smux struct fixup *cur; 53156230Smux size_t size; 54156230Smux int closed; 55156230Smux}; 56156230Smux 57156230Smuxstatic void fixups_lock(struct fixups *); 58156230Smuxstatic void fixups_unlock(struct fixups *); 59156230Smux 60156230Smuxstatic struct fixup *fixup_new(struct coll *, const char *); 61156230Smuxstatic void fixup_free(struct fixup *); 62156230Smux 63156230Smuxstatic void 64156230Smuxfixups_lock(struct fixups *f) 65156230Smux{ 66156230Smux int error; 67156230Smux 68156230Smux error = pthread_mutex_lock(&f->lock); 69156230Smux assert(!error); 70156230Smux} 71156230Smux 72156230Smuxstatic void 73156230Smuxfixups_unlock(struct fixups *f) 74156230Smux{ 75156230Smux int error; 76156230Smux 77156230Smux error = pthread_mutex_unlock(&f->lock); 78156230Smux assert(!error); 79156230Smux} 80156230Smux 81156230Smuxstatic struct fixup * 82156230Smuxfixup_new(struct coll *coll, const char *name) 83156230Smux{ 84156230Smux struct fixup *fixup; 85156230Smux 86156230Smux fixup = xmalloc(sizeof(struct fixup)); 87156230Smux fixup->f_name = xstrdup(name); 88156230Smux fixup->f_coll = coll; 89156230Smux return (fixup); 90156230Smux} 91156230Smux 92156230Smuxstatic void 93156230Smuxfixup_free(struct fixup *fixup) 94156230Smux{ 95156230Smux 96156230Smux free(fixup->f_name); 97156230Smux free(fixup); 98156230Smux} 99156230Smux 100156230Smux/* Create a new fixup queue. */ 101156230Smuxstruct fixups * 102156230Smuxfixups_new(void) 103156230Smux{ 104156230Smux struct fixups *f; 105156230Smux 106156230Smux f = xmalloc(sizeof(struct fixups)); 107156230Smux f->size = 0; 108156230Smux f->closed = 0; 109156230Smux f->cur = NULL; 110156230Smux STAILQ_INIT(&f->fixupq); 111156230Smux pthread_mutex_init(&f->lock, NULL); 112156230Smux pthread_cond_init(&f->cond, NULL); 113156230Smux return (f); 114156230Smux} 115156230Smux 116156230Smux/* Add a fixup request to the queue. */ 117156230Smuxvoid 118156230Smuxfixups_put(struct fixups *f, struct coll *coll, const char *name) 119156230Smux{ 120156230Smux struct fixup *fixup; 121156230Smux int dosignal; 122156230Smux 123156230Smux dosignal = 0; 124156230Smux fixup = fixup_new(coll, name); 125156230Smux fixups_lock(f); 126156230Smux assert(!f->closed); 127156230Smux STAILQ_INSERT_TAIL(&f->fixupq, fixup, f_link); 128156230Smux if (f->size++ == 0) 129156230Smux dosignal = 1; 130156230Smux fixups_unlock(f); 131156230Smux if (dosignal) 132156230Smux pthread_cond_signal(&f->cond); 133156230Smux} 134156230Smux 135156230Smux/* Get a fixup request from the queue. */ 136156230Smuxstruct fixup * 137156230Smuxfixups_get(struct fixups *f) 138156230Smux{ 139156230Smux struct fixup *fixup, *tofree; 140156230Smux 141156230Smux fixups_lock(f); 142156230Smux while (f->size == 0 && !f->closed) 143156230Smux pthread_cond_wait(&f->cond, &f->lock); 144225980Sadrian if (f->closed && f->size == 0) { 145156230Smux fixups_unlock(f); 146156230Smux return (NULL); 147156230Smux } 148156230Smux assert(f->size > 0); 149156230Smux fixup = STAILQ_FIRST(&f->fixupq); 150156230Smux tofree = f->cur; 151156230Smux f->cur = fixup; 152156230Smux STAILQ_REMOVE_HEAD(&f->fixupq, f_link); 153156230Smux f->size--; 154156230Smux fixups_unlock(f); 155156230Smux if (tofree != NULL) 156156230Smux fixup_free(tofree); 157156230Smux return (fixup); 158156230Smux} 159156230Smux 160156230Smux/* Close the writing end of the queue. */ 161156230Smuxvoid 162156230Smuxfixups_close(struct fixups *f) 163156230Smux{ 164156230Smux int dosignal; 165156230Smux 166156230Smux dosignal = 0; 167156230Smux fixups_lock(f); 168156230Smux if (f->size == 0 && !f->closed) 169156230Smux dosignal = 1; 170156230Smux f->closed = 1; 171156230Smux fixups_unlock(f); 172156230Smux if (dosignal) 173156230Smux pthread_cond_signal(&f->cond); 174156230Smux} 175156230Smux 176156230Smux/* Free a fixups queue. */ 177156230Smuxvoid 178156230Smuxfixups_free(struct fixups *f) 179156230Smux{ 180156230Smux struct fixup *fixup, *fixup2; 181156230Smux 182156230Smux assert(f->closed); 183156230Smux /* 184156230Smux * Free any fixup that has been left on the queue. 185156230Smux * This can happen if we have been aborted prematurely. 186156230Smux */ 187156230Smux fixup = STAILQ_FIRST(&f->fixupq); 188156230Smux while (fixup != NULL) { 189156230Smux fixup2 = STAILQ_NEXT(fixup, f_link); 190156230Smux fixup_free(fixup); 191156230Smux fixup = fixup2; 192156230Smux } 193156230Smux if (f->cur != NULL) 194156230Smux fixup_free(f->cur); 195156230Smux pthread_cond_destroy(&f->cond); 196156230Smux pthread_mutex_destroy(&f->lock); 197156230Smux free(f); 198156230Smux} 199