aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2021-01-09 21:14:36 +0200
committerLasse Collin <lasse.collin@tukaani.org>2021-01-09 22:18:23 +0200
commitf7fa309e1f7178d04c7bedc03b73077639371e97 (patch)
tree127ac1ffc7ecb8265e98fd80c99c096c2beb5c5e /src/liblzma/common/stream_encoder_mt.c
parentUpdate THANKS. (diff)
downloadxz-f7fa309e1f7178d04c7bedc03b73077639371e97.tar.xz
liblzma: Make lzma_outq usable for threaded decompression too.
Before this commit all output queue buffers were allocated as a single big allocation. Now each buffer is allocated separately when needed. Used buffers are cached to avoid reallocation overhead but the cache will keep only one buffer size at a time. This should make things work OK in the decompression where most of the time the buffer sizes will be the same but with some less common files the buffer sizes may vary. While this should work fine, it's still a bit preliminary and may even get reverted if it turns out to be useless for decompression.
Diffstat (limited to '')
-rw-r--r--src/liblzma/common/stream_encoder_mt.c52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 01e40339..6b897ab9 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -133,6 +133,9 @@ struct lzma_stream_coder_s {
/// Output buffer queue for compressed data
lzma_outq outq;
+ /// How much memory to allocate for each lzma_outbuf.buf
+ size_t outbuf_alloc_size;
+
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
@@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
-worker_encode(worker_thread *thr, worker_state state)
+worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
@@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state)
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
- .compressed_size = thr->coder->outq.buf_size_max,
+ .compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
// TODO: To allow changing the filter chain, the filters
@@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state)
size_t in_pos = 0;
size_t in_size = 0;
- thr->outbuf->size = thr->block_options.header_size;
- const size_t out_size = thr->coder->outq.buf_size_max;
+ *out_pos = thr->block_options.header_size;
+ const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
- // Store in_pos and out_pos into *thr so that
+ // Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
@@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
- thr->progress_out = thr->outbuf->size;
+ thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
@@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state)
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
- &thr->outbuf->size, out_size, action);
- } while (ret == LZMA_OK && thr->outbuf->size < out_size);
+ out_pos, out_size, action);
+ } while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
@@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state)
return state;
// Do the encoding. This takes care of the Block Header too.
- thr->outbuf->size = 0;
+ *out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
- &thr->outbuf->size, out_size);
+ out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
@@ -367,11 +370,13 @@ worker_start(void *thr_ptr)
}
}
+ size_t out_pos = 0;
+
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
- state = worker_encode(thr, state);
+ state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
@@ -387,14 +392,17 @@ worker_start(void *thr_ptr)
}
mythread_sync(thr->coder->mutex) {
- // Mark the output buffer as finished if
- // no errors occurred.
- thr->outbuf->finished = state == THR_FINISH;
+ // If no errors occurred, make the encoded data
+ // available to be copied out.
+ if (state == THR_FINISH) {
+ thr->outbuf->pos = out_pos;
+ thr->outbuf->finished = true;
+ }
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
- thr->coder->progress_out += thr->outbuf->size;
+ thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
@@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
+ // That's also true if we cannot allocate memory for the output
+ // buffer in the output queue.
+ return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
+ coder->outbuf_alloc_size));
+
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
- coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
+ coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
mythread_cond_signal(&coder->thr->cond);
}
@@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Try to read compressed data to out[].
- ret = lzma_outq_read(&coder->outq,
+ ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
@@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
- if (block_size > SIZE_MAX)
+ if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
@@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
+ coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
@@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
- outbuf_size_max, options->threads));
+ options->threads));
// Timeout
coder->timeout = options->timeout;