1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <apr_pools.h>
17#include <apr_strings.h>
18
19#include "serf.h"
20#include "serf_bucket_util.h"
21
22
23typedef struct {
24    enum {
25        STATE_FETCH,
26        STATE_CHUNK,
27        STATE_EOF
28    } state;
29
30    apr_status_t last_status;
31
32    serf_bucket_t *chunk;
33    serf_bucket_t *stream;
34
35    char chunk_hdr[20];
36} chunk_context_t;
37
38
39serf_bucket_t *serf_bucket_chunk_create(
40    serf_bucket_t *stream, serf_bucket_alloc_t *allocator)
41{
42    chunk_context_t *ctx;
43
44    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
45    ctx->state = STATE_FETCH;
46    ctx->chunk = serf_bucket_aggregate_create(allocator);
47    ctx->stream = stream;
48
49    return serf_bucket_create(&serf_bucket_type_chunk, allocator, ctx);
50}
51
52#define CRLF "\r\n"
53
54static apr_status_t create_chunk(serf_bucket_t *bucket)
55{
56    chunk_context_t *ctx = bucket->data;
57    serf_bucket_t *simple_bkt;
58    apr_size_t chunk_len;
59    apr_size_t stream_len;
60    struct iovec vecs[66]; /* 64 + chunk trailer + EOF trailer = 66 */
61    int vecs_read;
62    int i;
63
64    if (ctx->state != STATE_FETCH) {
65        return APR_SUCCESS;
66    }
67
68    ctx->last_status =
69        serf_bucket_read_iovec(ctx->stream, SERF_READ_ALL_AVAIL,
70                               64, vecs, &vecs_read);
71
72    if (SERF_BUCKET_READ_ERROR(ctx->last_status)) {
73        /* Uh-oh. */
74        return ctx->last_status;
75    }
76
77    /* Count the length of the data we read. */
78    stream_len = 0;
79    for (i = 0; i < vecs_read; i++) {
80        stream_len += vecs[i].iov_len;
81    }
82
83    /* assert: stream_len in hex < sizeof(ctx->chunk_hdr) */
84
85    /* Inserting a 0 byte chunk indicates a terminator, which already happens
86     * during the EOF handler below.  Adding another one here will cause the
87     * EOF chunk to be interpreted by the server as a new request.  So,
88     * we'll only do this if we have something to write.
89     */
90    if (stream_len) {
91        /* Build the chunk header. */
92        chunk_len = apr_snprintf(ctx->chunk_hdr, sizeof(ctx->chunk_hdr),
93                                 "%" APR_UINT64_T_HEX_FMT CRLF,
94                                 (apr_uint64_t)stream_len);
95
96        /* Create a copy of the chunk header so we can have multiple chunks
97         * in the pipeline at the same time.
98         */
99        simple_bkt = serf_bucket_simple_copy_create(ctx->chunk_hdr, chunk_len,
100                                                    bucket->allocator);
101        serf_bucket_aggregate_append(ctx->chunk, simple_bkt);
102
103        /* Insert the chunk footer. */
104        vecs[vecs_read].iov_base = CRLF;
105        vecs[vecs_read++].iov_len = sizeof(CRLF) - 1;
106    }
107
108    /* We've reached the end of the line for the stream. */
109    if (APR_STATUS_IS_EOF(ctx->last_status)) {
110        /* Insert the chunk footer. */
111        vecs[vecs_read].iov_base = "0" CRLF CRLF;
112        vecs[vecs_read++].iov_len = sizeof("0" CRLF CRLF) - 1;
113
114        ctx->state = STATE_EOF;
115    }
116    else {
117        /* Okay, we can return data.  */
118        ctx->state = STATE_CHUNK;
119    }
120
121    serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_read);
122
123    return APR_SUCCESS;
124}
125
126static apr_status_t serf_chunk_read(serf_bucket_t *bucket,
127                                    apr_size_t requested,
128                                    const char **data, apr_size_t *len)
129{
130    chunk_context_t *ctx = bucket->data;
131    apr_status_t status;
132
133    /* Before proceeding, we need to fetch some data from the stream. */
134    if (ctx->state == STATE_FETCH) {
135        status = create_chunk(bucket);
136        if (status) {
137            return status;
138        }
139    }
140
141    status = serf_bucket_read(ctx->chunk, requested, data, len);
142
143    /* Mask EOF from aggregate bucket. */
144    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
145        status = ctx->last_status;
146        ctx->state = STATE_FETCH;
147    }
148
149    return status;
150}
151
152static apr_status_t serf_chunk_readline(serf_bucket_t *bucket,
153                                         int acceptable, int *found,
154                                         const char **data, apr_size_t *len)
155{
156    chunk_context_t *ctx = bucket->data;
157    apr_status_t status;
158
159    status = serf_bucket_readline(ctx->chunk, acceptable, found, data, len);
160
161    /* Mask EOF from aggregate bucket. */
162    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
163        status = APR_EAGAIN;
164        ctx->state = STATE_FETCH;
165    }
166
167    return status;
168}
169
170static apr_status_t serf_chunk_read_iovec(serf_bucket_t *bucket,
171                                          apr_size_t requested,
172                                          int vecs_size,
173                                          struct iovec *vecs,
174                                          int *vecs_used)
175{
176    chunk_context_t *ctx = bucket->data;
177    apr_status_t status;
178
179    /* Before proceeding, we need to fetch some data from the stream. */
180    if (ctx->state == STATE_FETCH) {
181        status = create_chunk(bucket);
182        if (status) {
183            return status;
184        }
185    }
186
187    status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs,
188                                    vecs_used);
189
190    /* Mask EOF from aggregate bucket. */
191    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
192        status = ctx->last_status;
193        ctx->state = STATE_FETCH;
194    }
195
196    return status;
197}
198
199static apr_status_t serf_chunk_peek(serf_bucket_t *bucket,
200                                     const char **data,
201                                     apr_size_t *len)
202{
203    chunk_context_t *ctx = bucket->data;
204    apr_status_t status;
205
206    status = serf_bucket_peek(ctx->chunk, data, len);
207
208    /* Mask EOF from aggregate bucket. */
209    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
210        status = APR_EAGAIN;
211    }
212
213    return status;
214}
215
216static void serf_chunk_destroy(serf_bucket_t *bucket)
217{
218    chunk_context_t *ctx = bucket->data;
219
220    serf_bucket_destroy(ctx->stream);
221    serf_bucket_destroy(ctx->chunk);
222
223    serf_default_destroy_and_data(bucket);
224}
225
226const serf_bucket_type_t serf_bucket_type_chunk = {
227    "CHUNK",
228    serf_chunk_read,
229    serf_chunk_readline,
230    serf_chunk_read_iovec,
231    serf_default_read_for_sendfile,
232    serf_default_read_bucket,
233    serf_chunk_peek,
234    serf_chunk_destroy,
235};
236