aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/outqueue.c
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2021-01-09 21:14:36 +0200
committerLasse Collin <lasse.collin@tukaani.org>2021-01-09 22:18:23 +0200
commitf7fa309e1f7178d04c7bedc03b73077639371e97 (patch)
tree127ac1ffc7ecb8265e98fd80c99c096c2beb5c5e /src/liblzma/common/outqueue.c
parentUpdate THANKS. (diff)
downloadxz-f7fa309e1f7178d04c7bedc03b73077639371e97.tar.xz
liblzma: Make lzma_outq usable for threaded decompression too.
Before this commit all output queue buffers were allocated as a single big allocation. Now each buffer is allocated separately when needed. Used buffers are cached to avoid reallocation overhead but the cache will keep only one buffer size at a time. This should make things work OK in the decompression where most of the time the buffer sizes will be the same but with some less common files the buffer sizes may vary. While this should work fine, it's still a bit preliminary and may even get reverted if it turns out to be useless for decompression.
Diffstat (limited to '')
-rw-r--r--src/liblzma/common/outqueue.c268
1 files changed, 171 insertions, 97 deletions
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c
index 2dc8a38d..6331a50c 100644
--- a/src/liblzma/common/outqueue.c
+++ b/src/liblzma/common/outqueue.c
@@ -13,84 +13,100 @@
#include "outqueue.h"
-/// This is to ease integer overflow checking: We may allocate up to
-/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other
-/// data structures (that's the second /2).
-#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2)
+/// Get the maximum number of buffers that may be allocated based
+/// on the number of threads. For now this is twice the number of threads.
+/// It's a compromise between RAM usage and keeping the worker threads busy
+/// when buffers finish out of order.
+#define GET_BUFS_LIMIT(threads) (2 * (threads))
-static lzma_ret
-get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count,
- uint64_t buf_size_max, uint32_t threads)
+extern uint64_t
+lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
- if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX)
- return LZMA_OPTIONS_ERROR;
-
- // The number of buffers is twice the number of threads.
- // This wastes RAM but keeps the threads busy when buffers
- // finish out of order.
+ // This is to ease integer overflow checking: We may allocate up to
+ // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
+ // memory for other data structures too (that's the /2).
//
- // NOTE: If this is changed, update BUF_SIZE_MAX too.
- *bufs_count = threads * 2;
- *bufs_alloc_size = *bufs_count * buf_size_max;
+ // lzma_outq_prealloc_buf() will still accept bigger buffers than this.
+ const uint64_t limit
+ = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
- return LZMA_OK;
+ if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
+ return UINT64_MAX;
+
+ return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max);
}
-extern uint64_t
-lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
+static void
+move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
+ assert(outq->head != NULL);
+ assert(outq->tail != NULL);
+ assert(outq->bufs_in_use > 0);
+
+ --outq->bufs_in_use;
+
+ lzma_outbuf *buf = outq->head;
+ outq->head = buf->next;
+ if (outq->head == NULL)
+ outq->tail = NULL;
+
+ if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
+ lzma_outq_clear_cache(outq, allocator);
+
+ buf->next = outq->cache;
+ outq->cache = buf;
+
+ return;
+}
- if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads)
- != LZMA_OK)
- return UINT64_MAX;
- return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf)
- + bufs_alloc_size;
+static void
+free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+
+ --outq->bufs_allocated;
+ outq->memusage -= sizeof(*buf) + buf->allocated;
+
+ lzma_free(buf, allocator);
+ return;
+}
+
+
+extern void
+lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ while (outq->cache != NULL)
+ free_one_cached_buffer(outq, allocator);
+
+ return;
}
extern lzma_ret
lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
- uint64_t buf_size_max, uint32_t threads)
+ uint32_t threads)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
-
- // Set bufs_count and bufs_alloc_size.
- return_if_error(get_options(&bufs_alloc_size, &bufs_count,
- buf_size_max, threads));
-
- // Allocate memory if needed.
- if (outq->buf_size_max != buf_size_max
- || outq->bufs_allocated != bufs_count) {
- lzma_outq_end(outq, allocator);
-
-#if SIZE_MAX < UINT64_MAX
- if (bufs_alloc_size > SIZE_MAX)
- return LZMA_MEM_ERROR;
-#endif
-
- outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf),
- allocator);
- outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size),
- allocator);
-
- if (outq->bufs == NULL || outq->bufs_mem == NULL) {
- lzma_outq_end(outq, allocator);
- return LZMA_MEM_ERROR;
- }
- }
+ if (threads > LZMA_THREADS_MAX)
+ return LZMA_OPTIONS_ERROR;
+
+ const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
+
+ // Clear head/tail.
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+
+ // If new buf_limit is lower than the old one, we may need to free
+ // a few cached buffers.
+ while (bufs_limit < outq->bufs_allocated)
+ free_one_cached_buffer(outq, allocator);
- // Initialize the rest of the main structure. Initialization of
- // outq->bufs[] is done when they are actually needed.
- outq->buf_size_max = (size_t)(buf_size_max);
- outq->bufs_allocated = bufs_count;
- outq->bufs_pos = 0;
- outq->bufs_used = 0;
+ outq->bufs_limit = bufs_limit;
outq->read_pos = 0;
return LZMA_OK;
@@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
extern void
lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
{
- lzma_free(outq->bufs, allocator);
- outq->bufs = NULL;
-
- lzma_free(outq->bufs_mem, allocator);
- outq->bufs_mem = NULL;
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+ lzma_outq_clear_cache(outq, allocator);
return;
}
-extern lzma_outbuf *
-lzma_outq_get_buf(lzma_outq *outq)
+extern lzma_ret
+lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
+ size_t size)
{
// Caller must have checked it with lzma_outq_has_buf().
- assert(outq->bufs_used < outq->bufs_allocated);
+ assert(outq->bufs_in_use < outq->bufs_limit);
+
+ // If there already is appropriately-sized buffer in the cache,
+ // we need to do nothing.
+ if (outq->cache != NULL && outq->cache->allocated == size)
+ return LZMA_OK;
+
+ if (size > SIZE_MAX - sizeof(lzma_outbuf))
+ return LZMA_MEM_ERROR;
+
+ // The cache may have buffers but their size is wrong.
+ lzma_outq_clear_cache(outq, allocator);
+
+ outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator);
+ if (outq->cache == NULL)
+ return LZMA_MEM_ERROR;
+
+ outq->cache->next = NULL;
+ outq->cache->allocated = size;
- // Initialize the new buffer.
- lzma_outbuf *buf = &outq->bufs[outq->bufs_pos];
- buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max;
- buf->size = 0;
+ ++outq->bufs_allocated;
+ outq->memusage += sizeof(lzma_outbuf) + size;
+
+ return LZMA_OK;
+}
+
+
+extern lzma_outbuf *
+lzma_outq_get_buf(lzma_outq *outq, void *worker)
+{
+ // Caller must have used lzma_outq_prealloc_buf() to ensure these.
+ assert(outq->bufs_in_use < outq->bufs_limit);
+ assert(outq->bufs_in_use < outq->bufs_allocated);
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+ buf->next = NULL;
+
+ if (outq->tail != NULL) {
+ assert(outq->head != NULL);
+ outq->tail->next = buf;
+ } else {
+ assert(outq->head == NULL);
+ outq->head = buf;
+ }
+
+ outq->tail = buf;
+
+ buf->worker = worker;
buf->finished = false;
+ buf->pos = 0;
- // Update the queue state.
- if (++outq->bufs_pos == outq->bufs_allocated)
- outq->bufs_pos = 0;
+ buf->unpadded_size = 0;
+ buf->uncompressed_size = 0;
- ++outq->bufs_used;
+ ++outq->bufs_in_use;
return buf;
}
@@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq)
extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
+ if (outq->head == NULL)
+ return false;
- return outq->bufs[i].finished;
+ return outq->read_pos < outq->head->pos || outq->head->finished;
}
extern lzma_ret
-lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out,
- size_t *restrict out_pos, size_t out_size,
+lzma_outq_read(lzma_outq *restrict outq,
+ const lzma_allocator *restrict allocator,
+ uint8_t *restrict out, size_t *restrict out_pos,
+ size_t out_size,
lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size)
{
// There must be at least one buffer from which to read.
- if (outq->bufs_used == 0)
+ if (outq->bufs_in_use == 0)
return LZMA_OK;
// Get the buffer.
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
-
- lzma_outbuf *buf = &outq->bufs[i];
-
- // If it isn't finished yet, we cannot read from it.
- if (!buf->finished)
- return LZMA_OK;
+ lzma_outbuf *buf = outq->head;
// Copy from the buffer to output.
- lzma_bufcpy(buf->buf, &outq->read_pos, buf->size,
+ //
+ // FIXME? In threaded decoder it may be bad to do this copy while
+ // the mutex is being held.
+ lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
out, out_pos, out_size);
// Return if we didn't get all the data from the buffer.
- if (outq->read_pos < buf->size)
+ if (!buf->finished || outq->read_pos < buf->pos)
return LZMA_OK;
// The buffer was finished. Tell the caller its size information.
- *unpadded_size = buf->unpadded_size;
- *uncompressed_size = buf->uncompressed_size;
+ if (unpadded_size != NULL)
+ *unpadded_size = buf->unpadded_size;
+
+ if (uncompressed_size != NULL)
+ *uncompressed_size = buf->uncompressed_size;
// Free this buffer for further use.
- --outq->bufs_used;
+ move_head_to_cache(outq, allocator);
outq->read_pos = 0;
return LZMA_STREAM_END;
}
+
+
+extern void
+lzma_outq_enable_partial_output(lzma_outq *outq,
+ void (*enable_partial_output)(void *worker))
+{
+ if (outq->head != NULL && !outq->head->finished
+ && outq->head->worker != NULL) {
+ enable_partial_output(outq->head->worker);
+
+ // Set it to NULL since calling it twice is pointless.
+ outq->head->worker = NULL;
+ }
+
+ return;
+}