aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/liblzma/common/stream_encoder_mt.c')
-rw-r--r--src/liblzma/common/stream_encoder_mt.c52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 01e40339..6b897ab9 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -133,6 +133,9 @@ struct lzma_stream_coder_s {
/// Output buffer queue for compressed data
lzma_outq outq;
+ /// How much memory to allocate for each lzma_outbuf.buf
+ size_t outbuf_alloc_size;
+
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
@@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
-worker_encode(worker_thread *thr, worker_state state)
+worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
@@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state)
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
- .compressed_size = thr->coder->outq.buf_size_max,
+ .compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
// TODO: To allow changing the filter chain, the filters
@@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state)
size_t in_pos = 0;
size_t in_size = 0;
- thr->outbuf->size = thr->block_options.header_size;
- const size_t out_size = thr->coder->outq.buf_size_max;
+ *out_pos = thr->block_options.header_size;
+ const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
- // Store in_pos and out_pos into *thr so that
+ // Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
@@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
- thr->progress_out = thr->outbuf->size;
+ thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
@@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state)
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
- &thr->outbuf->size, out_size, action);
- } while (ret == LZMA_OK && thr->outbuf->size < out_size);
+ out_pos, out_size, action);
+ } while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
@@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state)
return state;
// Do the encoding. This takes care of the Block Header too.
- thr->outbuf->size = 0;
+ *out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
- &thr->outbuf->size, out_size);
+ out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
@@ -367,11 +370,13 @@ worker_start(void *thr_ptr)
}
}
+ size_t out_pos = 0;
+
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
- state = worker_encode(thr, state);
+ state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
@@ -387,14 +392,17 @@ worker_start(void *thr_ptr)
}
mythread_sync(thr->coder->mutex) {
- // Mark the output buffer as finished if
- // no errors occurred.
- thr->outbuf->finished = state == THR_FINISH;
+ // If no errors occurred, make the encoded data
+ // available to be copied out.
+ if (state == THR_FINISH) {
+ thr->outbuf->pos = out_pos;
+ thr->outbuf->finished = true;
+ }
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
- thr->coder->progress_out += thr->outbuf->size;
+ thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
@@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
+ // That's also true if we cannot allocate memory for the output
+ // buffer in the output queue.
+ return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
+ coder->outbuf_alloc_size));
+
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
- coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
+ coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
mythread_cond_signal(&coder->thr->cond);
}
@@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Try to read compressed data to out[].
- ret = lzma_outq_read(&coder->outq,
+ ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
@@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
- if (block_size > SIZE_MAX)
+ if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
@@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
+ coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
@@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
- outbuf_size_max, options->threads));
+ options->threads));
// Timeout
coder->timeout = options->timeout;