diff options
Diffstat (limited to 'src/liblzma/common/outqueue.c')
-rw-r--r-- | src/liblzma/common/outqueue.c | 268 |
1 files changed, 171 insertions, 97 deletions
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c index 2dc8a38d..6331a50c 100644 --- a/src/liblzma/common/outqueue.c +++ b/src/liblzma/common/outqueue.c @@ -13,84 +13,100 @@ #include "outqueue.h" -/// This is to ease integer overflow checking: We may allocate up to -/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other -/// data structures (that's the second /2). -#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2) +/// Get the maximum number of buffers that may be allocated based +/// on the number of threads. For now this is twice the number of threads. +/// It's a compromise between RAM usage and keeping the worker threads busy +/// when buffers finish out of order. +#define GET_BUFS_LIMIT(threads) (2 * (threads)) -static lzma_ret -get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count, - uint64_t buf_size_max, uint32_t threads) +extern uint64_t +lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) { - if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX) - return LZMA_OPTIONS_ERROR; - - // The number of buffers is twice the number of threads. - // This wastes RAM but keeps the threads busy when buffers - // finish out of order. + // This is to ease integer overflow checking: We may allocate up to + // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra + // memory for other data structures too (that's the /2). // - // NOTE: If this is changed, update BUF_SIZE_MAX too. - *bufs_count = threads * 2; - *bufs_alloc_size = *bufs_count * buf_size_max; + // lzma_outq_prealloc_buf() will still accept bigger buffers than this. + const uint64_t limit + = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; - return LZMA_OK; + if (threads > LZMA_THREADS_MAX || buf_size_max > limit) + return UINT64_MAX; + + return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max); } -extern uint64_t -lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) +static void +move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) { - uint64_t bufs_alloc_size; - uint32_t bufs_count; + assert(outq->head != NULL); + assert(outq->tail != NULL); + assert(outq->bufs_in_use > 0); + + --outq->bufs_in_use; + + lzma_outbuf *buf = outq->head; + outq->head = buf->next; + if (outq->head == NULL) + outq->tail = NULL; + + if (outq->cache != NULL && outq->cache->allocated != buf->allocated) + lzma_outq_clear_cache(outq, allocator); + + buf->next = outq->cache; + outq->cache = buf; + + return; +} - if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads) - != LZMA_OK) - return UINT64_MAX; - return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf) - + bufs_alloc_size; +static void +free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) +{ + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + + --outq->bufs_allocated; + outq->memusage -= sizeof(*buf) + buf->allocated; + + lzma_free(buf, allocator); + return; +} + + +extern void +lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) +{ + while (outq->cache != NULL) + free_one_cached_buffer(outq, allocator); + + return; } extern lzma_ret lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, - uint64_t buf_size_max, uint32_t threads) + uint32_t threads) { - uint64_t bufs_alloc_size; - uint32_t bufs_count; - - // Set bufs_count and bufs_alloc_size. - return_if_error(get_options(&bufs_alloc_size, &bufs_count, - buf_size_max, threads)); - - // Allocate memory if needed. - if (outq->buf_size_max != buf_size_max - || outq->bufs_allocated != bufs_count) { - lzma_outq_end(outq, allocator); - -#if SIZE_MAX < UINT64_MAX - if (bufs_alloc_size > SIZE_MAX) - return LZMA_MEM_ERROR; -#endif - - outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf), - allocator); - outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size), - allocator); - - if (outq->bufs == NULL || outq->bufs_mem == NULL) { - lzma_outq_end(outq, allocator); - return LZMA_MEM_ERROR; - } - } + if (threads > LZMA_THREADS_MAX) + return LZMA_OPTIONS_ERROR; + + const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); + + // Clear head/tail. + while (outq->head != NULL) + move_head_to_cache(outq, allocator); + + // If new buf_limit is lower than the old one, we may need to free + // a few cached buffers. + while (bufs_limit < outq->bufs_allocated) + free_one_cached_buffer(outq, allocator); - // Initialize the rest of the main structure. Initialization of - // outq->bufs[] is done when they are actually needed. - outq->buf_size_max = (size_t)(buf_size_max); - outq->bufs_allocated = bufs_count; - outq->bufs_pos = 0; - outq->bufs_used = 0; + outq->bufs_limit = bufs_limit; outq->read_pos = 0; return LZMA_OK; @@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) { - lzma_free(outq->bufs, allocator); - outq->bufs = NULL; - - lzma_free(outq->bufs_mem, allocator); - outq->bufs_mem = NULL; + while (outq->head != NULL) + move_head_to_cache(outq, allocator); + lzma_outq_clear_cache(outq, allocator); return; } -extern lzma_outbuf * -lzma_outq_get_buf(lzma_outq *outq) +extern lzma_ret +lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, + size_t size) { // Caller must have checked it with lzma_outq_has_buf(). - assert(outq->bufs_used < outq->bufs_allocated); + assert(outq->bufs_in_use < outq->bufs_limit); + + // If there already is appropriately-sized buffer in the cache, + // we need to do nothing. + if (outq->cache != NULL && outq->cache->allocated == size) + return LZMA_OK; + + if (size > SIZE_MAX - sizeof(lzma_outbuf)) + return LZMA_MEM_ERROR; + + // The cache may have buffers but their size is wrong. + lzma_outq_clear_cache(outq, allocator); + + outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator); + if (outq->cache == NULL) + return LZMA_MEM_ERROR; + + outq->cache->next = NULL; + outq->cache->allocated = size; - // Initialize the new buffer. - lzma_outbuf *buf = &outq->bufs[outq->bufs_pos]; - buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max; - buf->size = 0; + ++outq->bufs_allocated; + outq->memusage += sizeof(lzma_outbuf) + size; + + return LZMA_OK; +} + + +extern lzma_outbuf * +lzma_outq_get_buf(lzma_outq *outq, void *worker) +{ + // Caller must have used lzma_outq_prealloc_buf() to ensure these. + assert(outq->bufs_in_use < outq->bufs_limit); + assert(outq->bufs_in_use < outq->bufs_allocated); + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + buf->next = NULL; + + if (outq->tail != NULL) { + assert(outq->head != NULL); + outq->tail->next = buf; + } else { + assert(outq->head == NULL); + outq->head = buf; + } + + outq->tail = buf; + + buf->worker = worker; buf->finished = false; + buf->pos = 0; - // Update the queue state. - if (++outq->bufs_pos == outq->bufs_allocated) - outq->bufs_pos = 0; + buf->unpadded_size = 0; + buf->uncompressed_size = 0; - ++outq->bufs_used; + ++outq->bufs_in_use; return buf; } @@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq) extern bool lzma_outq_is_readable(const lzma_outq *outq) { - uint32_t i = outq->bufs_pos - outq->bufs_used; - if (outq->bufs_pos < outq->bufs_used) - i += outq->bufs_allocated; + if (outq->head == NULL) + return false; - return outq->bufs[i].finished; + return outq->read_pos < outq->head->pos || outq->head->finished; } extern lzma_ret -lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out, - size_t *restrict out_pos, size_t out_size, +lzma_outq_read(lzma_outq *restrict outq, + const lzma_allocator *restrict allocator, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size, lzma_vli *restrict unpadded_size, lzma_vli *restrict uncompressed_size) { // There must be at least one buffer from which to read. - if (outq->bufs_used == 0) + if (outq->bufs_in_use == 0) return LZMA_OK; // Get the buffer. - uint32_t i = outq->bufs_pos - outq->bufs_used; - if (outq->bufs_pos < outq->bufs_used) - i += outq->bufs_allocated; - - lzma_outbuf *buf = &outq->bufs[i]; - - // If it isn't finished yet, we cannot read from it. - if (!buf->finished) - return LZMA_OK; + lzma_outbuf *buf = outq->head; // Copy from the buffer to output. - lzma_bufcpy(buf->buf, &outq->read_pos, buf->size, + // + // FIXME? In threaded decoder it may be bad to do this copy while + // the mutex is being held. + lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, out, out_pos, out_size); // Return if we didn't get all the data from the buffer. - if (outq->read_pos < buf->size) + if (!buf->finished || outq->read_pos < buf->pos) return LZMA_OK; // The buffer was finished. Tell the caller its size information. - *unpadded_size = buf->unpadded_size; - *uncompressed_size = buf->uncompressed_size; + if (unpadded_size != NULL) + *unpadded_size = buf->unpadded_size; + + if (uncompressed_size != NULL) + *uncompressed_size = buf->uncompressed_size; // Free this buffer for further use. - --outq->bufs_used; + move_head_to_cache(outq, allocator); outq->read_pos = 0; return LZMA_STREAM_END; } + + +extern void +lzma_outq_enable_partial_output(lzma_outq *outq, + void (*enable_partial_output)(void *worker)) +{ + if (outq->head != NULL && !outq->head->finished + && outq->head->worker != NULL) { + enable_partial_output(outq->head->worker); + + // Set it to NULL since calling it twice is pointless. + outq->head->worker = NULL; + } + + return; +} |