diff options
Diffstat (limited to 'src/liblzma/common/stream_encoder_mt.c')
-rw-r--r-- | src/liblzma/common/stream_encoder_mt.c | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c index 01e40339..6b897ab9 100644 --- a/src/liblzma/common/stream_encoder_mt.c +++ b/src/liblzma/common/stream_encoder_mt.c @@ -133,6 +133,9 @@ struct lzma_stream_coder_s { /// Output buffer queue for compressed data lzma_outq outq; + /// How much memory to allocate for each lzma_outbuf.buf + size_t outbuf_alloc_size; + /// Maximum wait time if cannot use all the input and cannot /// fill the output buffer. This is in milliseconds. @@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret) static worker_state -worker_encode(worker_thread *thr, worker_state state) +worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) { assert(thr->progress_in == 0); assert(thr->progress_out == 0); @@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state) thr->block_options = (lzma_block){ .version = 0, .check = thr->coder->stream_flags.check, - .compressed_size = thr->coder->outq.buf_size_max, + .compressed_size = thr->outbuf->allocated, .uncompressed_size = thr->coder->block_size, // TODO: To allow changing the filter chain, the filters @@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state) size_t in_pos = 0; size_t in_size = 0; - thr->outbuf->size = thr->block_options.header_size; - const size_t out_size = thr->coder->outq.buf_size_max; + *out_pos = thr->block_options.header_size; + const size_t out_size = thr->outbuf->allocated; do { mythread_sync(thr->mutex) { - // Store in_pos and out_pos into *thr so that + // Store in_pos and *out_pos into *thr so that // an application may read them via // lzma_get_progress() to get progress information. // @@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state) // finishes. Instead, the final values are taken // later from thr->outbuf. thr->progress_in = in_pos; - thr->progress_out = thr->outbuf->size; + thr->progress_out = *out_pos; while (in_size == thr->in_size && thr->state == THR_RUN) @@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state) ret = thr->block_encoder.code( thr->block_encoder.coder, thr->allocator, thr->in, &in_pos, in_limit, thr->outbuf->buf, - &thr->outbuf->size, out_size, action); - } while (ret == LZMA_OK && thr->outbuf->size < out_size); + out_pos, out_size, action); + } while (ret == LZMA_OK && *out_pos < out_size); switch (ret) { case LZMA_STREAM_END: @@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state) return state; // Do the encoding. This takes care of the Block Header too. - thr->outbuf->size = 0; + *out_pos = 0; ret = lzma_block_uncomp_encode(&thr->block_options, thr->in, in_size, thr->outbuf->buf, - &thr->outbuf->size, out_size); + out_pos, out_size); // It shouldn't fail. if (ret != LZMA_OK) { @@ -367,11 +370,13 @@ worker_start(void *thr_ptr) } } + size_t out_pos = 0; + assert(state != THR_IDLE); assert(state != THR_STOP); if (state <= THR_FINISH) - state = worker_encode(thr, state); + state = worker_encode(thr, &out_pos, state); if (state == THR_EXIT) break; @@ -387,14 +392,17 @@ worker_start(void *thr_ptr) } mythread_sync(thr->coder->mutex) { - // Mark the output buffer as finished if - // no errors occurred. - thr->outbuf->finished = state == THR_FINISH; + // If no errors occurred, make the encoded data + // available to be copied out. + if (state == THR_FINISH) { + thr->outbuf->pos = out_pos; + thr->outbuf->finished = true; + } // Update the main progress info. thr->coder->progress_in += thr->outbuf->uncompressed_size; - thr->coder->progress_out += thr->outbuf->size; + thr->coder->progress_out += out_pos; thr->progress_in = 0; thr->progress_out = 0; @@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) if (!lzma_outq_has_buf(&coder->outq)) return LZMA_OK; + // That's also true if we cannot allocate memory for the output + // buffer in the output queue. + return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, + coder->outbuf_alloc_size)); + // If there is a free structure on the stack, use it. mythread_sync(coder->mutex) { if (coder->threads_free != NULL) { @@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) mythread_sync(coder->thr->mutex) { coder->thr->state = THR_RUN; coder->thr->in_size = 0; - coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); + coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); mythread_cond_signal(&coder->thr->cond); } @@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, } // Try to read compressed data to out[]. - ret = lzma_outq_read(&coder->outq, + ret = lzma_outq_read(&coder->outq, allocator, out, out_pos, out_size, &unpadded_size, &uncompressed_size); @@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, &block_size, &outbuf_size_max)); #if SIZE_MAX < UINT64_MAX - if (block_size > SIZE_MAX) + if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) return LZMA_MEM_ERROR; #endif @@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Basic initializations coder->sequence = SEQ_STREAM_HEADER; coder->block_size = (size_t)(block_size); + coder->outbuf_alloc_size = (size_t)(outbuf_size_max); coder->thread_error = LZMA_OK; coder->thr = NULL; @@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Output queue return_if_error(lzma_outq_init(&coder->outq, allocator, - outbuf_size_max, options->threads)); + options->threads)); // Timeout coder->timeout = options->timeout; |