1303095Ssobomax/*
2303095Ssobomax * Copyright (c) 2004-2016 Maxim Sobolev <sobomax@FreeBSD.org>
3303095Ssobomax * All rights reserved.
4303095Ssobomax *
5303095Ssobomax * Redistribution and use in source and binary forms, with or without
6303095Ssobomax * modification, are permitted provided that the following conditions
7303095Ssobomax * are met:
8303095Ssobomax * 1. Redistributions of source code must retain the above copyright
9303095Ssobomax *    notice, this list of conditions and the following disclaimer.
10303095Ssobomax * 2. Redistributions in binary form must reproduce the above copyright
11303095Ssobomax *    notice, this list of conditions and the following disclaimer in the
12303095Ssobomax *    documentation and/or other materials provided with the distribution.
13303095Ssobomax *
14303095Ssobomax * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15303095Ssobomax * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16303095Ssobomax * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17303095Ssobomax * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18303095Ssobomax * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19303095Ssobomax * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20303095Ssobomax * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21303095Ssobomax * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22303095Ssobomax * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23303095Ssobomax * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24303095Ssobomax * SUCH DAMAGE.
25303095Ssobomax */
26303095Ssobomax
27303095Ssobomax#include <sys/cdefs.h>
28303095Ssobomax__FBSDID("$FreeBSD$");
29303095Ssobomax
30303095Ssobomax#include <sys/types.h>
31303095Ssobomax#include <err.h>
32303095Ssobomax#include <inttypes.h>
33303095Ssobomax#include <md5.h>
34303095Ssobomax#include <pthread.h>
35303095Ssobomax#include <stdlib.h>
36303095Ssobomax#include <strings.h>
37303095Ssobomax
38303095Ssobomax#if defined(MKUZ_DEBUG)
39303095Ssobomax# include <stdio.h>
40303095Ssobomax#endif
41303095Ssobomax
42303095Ssobomax#include "mkuz_conveyor.h"
43303095Ssobomax#include "mkuz_cfg.h"
44303095Ssobomax#include "mkuzip.h"
45303095Ssobomax#include "mkuz_format.h"
46303095Ssobomax#include "mkuz_blk.h"
47303095Ssobomax#include "mkuz_fqueue.h"
48303095Ssobomax#include "mkuz_blk_chain.h"
49303095Ssobomax
50303095Ssobomaxstatic void compute_digest(struct mkuz_blk *);
51303095Ssobomax
52303095Ssobomaxstruct cw_args {
53303095Ssobomax    struct mkuz_conveyor *cvp;
54303095Ssobomax    struct mkuz_cfg *cfp;
55303095Ssobomax};
56303095Ssobomax
57303095Ssobomaxstatic void *
58303095Ssobomaxcworker(void *p)
59303095Ssobomax{
60303095Ssobomax    struct cw_args *cwp;
61303095Ssobomax    struct mkuz_cfg *cfp;
62303095Ssobomax    struct mkuz_blk *oblk, *iblk;
63303095Ssobomax    struct mkuz_conveyor *cvp;
64303095Ssobomax    void *c_ctx;
65303095Ssobomax
66303095Ssobomax    cwp = (struct cw_args *)p;
67303095Ssobomax    cfp = cwp->cfp;
68303095Ssobomax    cvp = cwp->cvp;
69303095Ssobomax    free(cwp);
70303095Ssobomax    c_ctx = cfp->handler->f_init(cfp->blksz);
71303095Ssobomax    for (;;) {
72303095Ssobomax        iblk = mkuz_fqueue_deq(cvp->wrk_queue);
73303095Ssobomax        if (iblk == MKUZ_BLK_EOF) {
74303095Ssobomax            /* Let other threads to see the EOF block */
75303095Ssobomax            mkuz_fqueue_enq(cvp->wrk_queue, iblk);
76303095Ssobomax            break;
77303095Ssobomax        }
78303095Ssobomax        if (cfp->no_zcomp == 0 &&
79303095Ssobomax          mkuz_memvcmp(iblk->data, '\0', iblk->info.len) != 0) {
80303095Ssobomax            /* All zeroes block */
81303095Ssobomax            oblk = mkuz_blk_ctor(0);
82303095Ssobomax        } else {
83303095Ssobomax            oblk = cfp->handler->f_compress(c_ctx, iblk);
84303095Ssobomax            if (cfp->en_dedup != 0) {
85303095Ssobomax                compute_digest(oblk);
86303095Ssobomax            }
87303095Ssobomax        }
88303095Ssobomax        oblk->info.blkno = iblk->info.blkno;
89303095Ssobomax        mkuz_fqueue_enq(cvp->results, oblk);
90303095Ssobomax        free(iblk);
91303095Ssobomax    }
92303095Ssobomax    return (NULL);
93303095Ssobomax}
94303095Ssobomax
95303095Ssobomaxstatic void
96303095Ssobomaxcompute_digest(struct mkuz_blk *bp)
97303095Ssobomax{
98303095Ssobomax    MD5_CTX mcontext;
99303095Ssobomax
100303095Ssobomax    MD5Init(&mcontext);
101303095Ssobomax    MD5Update(&mcontext, bp->data, bp->info.len);
102303095Ssobomax    MD5Final(bp->info.digest, &mcontext);
103303095Ssobomax}
104303095Ssobomax
105303095Ssobomaxstruct mkuz_conveyor *
106303095Ssobomaxmkuz_conveyor_ctor(struct mkuz_cfg *cfp)
107303095Ssobomax{
108303095Ssobomax    struct mkuz_conveyor *cp;
109303095Ssobomax    struct cw_args *cwp;
110303095Ssobomax    int i, r;
111303095Ssobomax
112303095Ssobomax    cp = mkuz_safe_zmalloc(sizeof(struct mkuz_conveyor) +
113303095Ssobomax      (sizeof(pthread_t) * cfp->nworkers));
114303095Ssobomax
115303095Ssobomax    cp->wrk_queue = mkuz_fqueue_ctor(1);
116303095Ssobomax    cp->results = mkuz_fqueue_ctor(1);
117303095Ssobomax
118303095Ssobomax    for (i = 0; i < cfp->nworkers; i++) {
119303095Ssobomax        cwp = mkuz_safe_zmalloc(sizeof(struct cw_args));
120303095Ssobomax        cwp->cfp = cfp;
121303095Ssobomax        cwp->cvp = cp;
122303095Ssobomax        r = pthread_create(&cp->wthreads[i], NULL, cworker, (void *)cwp);
123303095Ssobomax        if (r != 0) {
124303095Ssobomax            errx(1, "mkuz_conveyor_ctor: pthread_create() failed");
125303095Ssobomax            /* Not reached */
126303095Ssobomax        }
127303095Ssobomax    }
128303095Ssobomax    return (cp);
129303095Ssobomax}
130