aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2021-01-09 21:14:36 +0200
committerLasse Collin <lasse.collin@tukaani.org>2021-01-09 22:18:23 +0200
commitf7fa309e1f7178d04c7bedc03b73077639371e97 (patch)
tree127ac1ffc7ecb8265e98fd80c99c096c2beb5c5e /src/liblzma
parentUpdate THANKS. (diff)
downloadxz-f7fa309e1f7178d04c7bedc03b73077639371e97.tar.xz
liblzma: Make lzma_outq usable for threaded decompression too.
Before this commit all output queue buffers were allocated as a single big allocation. Now each buffer is allocated separately when needed. Used buffers are cached to avoid reallocation overhead but the cache will keep only one buffer size at a time. This should make things work OK in the decompression where most of the time the buffer sizes will be the same but with some less common files the buffer sizes may vary. While this should work fine, it's still a bit preliminary and may even get reverted if it turns out to be useless for decompression.
Diffstat (limited to 'src/liblzma')
-rw-r--r--src/liblzma/common/outqueue.c268
-rw-r--r--src/liblzma/common/outqueue.h138
-rw-r--r--src/liblzma/common/stream_encoder_mt.c52
3 files changed, 301 insertions, 157 deletions
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c
index 2dc8a38d..6331a50c 100644
--- a/src/liblzma/common/outqueue.c
+++ b/src/liblzma/common/outqueue.c
@@ -13,84 +13,100 @@
#include "outqueue.h"
-/// This is to ease integer overflow checking: We may allocate up to
-/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other
-/// data structures (that's the second /2).
-#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2)
+/// Get the maximum number of buffers that may be allocated based
+/// on the number of threads. For now this is twice the number of threads.
+/// It's a compromise between RAM usage and keeping the worker threads busy
+/// when buffers finish out of order.
+#define GET_BUFS_LIMIT(threads) (2 * (threads))
-static lzma_ret
-get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count,
- uint64_t buf_size_max, uint32_t threads)
+extern uint64_t
+lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
- if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX)
- return LZMA_OPTIONS_ERROR;
-
- // The number of buffers is twice the number of threads.
- // This wastes RAM but keeps the threads busy when buffers
- // finish out of order.
+ // This is to ease integer overflow checking: We may allocate up to
+ // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
+ // memory for other data structures too (that's the /2).
//
- // NOTE: If this is changed, update BUF_SIZE_MAX too.
- *bufs_count = threads * 2;
- *bufs_alloc_size = *bufs_count * buf_size_max;
+ // lzma_outq_prealloc_buf() will still accept bigger buffers than this.
+ const uint64_t limit
+ = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
- return LZMA_OK;
+ if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
+ return UINT64_MAX;
+
+ return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max);
}
-extern uint64_t
-lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
+static void
+move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
+ assert(outq->head != NULL);
+ assert(outq->tail != NULL);
+ assert(outq->bufs_in_use > 0);
+
+ --outq->bufs_in_use;
+
+ lzma_outbuf *buf = outq->head;
+ outq->head = buf->next;
+ if (outq->head == NULL)
+ outq->tail = NULL;
+
+ if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
+ lzma_outq_clear_cache(outq, allocator);
+
+ buf->next = outq->cache;
+ outq->cache = buf;
+
+ return;
+}
- if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads)
- != LZMA_OK)
- return UINT64_MAX;
- return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf)
- + bufs_alloc_size;
+static void
+free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+
+ --outq->bufs_allocated;
+ outq->memusage -= sizeof(*buf) + buf->allocated;
+
+ lzma_free(buf, allocator);
+ return;
+}
+
+
+extern void
+lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ while (outq->cache != NULL)
+ free_one_cached_buffer(outq, allocator);
+
+ return;
}
extern lzma_ret
lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
- uint64_t buf_size_max, uint32_t threads)
+ uint32_t threads)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
-
- // Set bufs_count and bufs_alloc_size.
- return_if_error(get_options(&bufs_alloc_size, &bufs_count,
- buf_size_max, threads));
-
- // Allocate memory if needed.
- if (outq->buf_size_max != buf_size_max
- || outq->bufs_allocated != bufs_count) {
- lzma_outq_end(outq, allocator);
-
-#if SIZE_MAX < UINT64_MAX
- if (bufs_alloc_size > SIZE_MAX)
- return LZMA_MEM_ERROR;
-#endif
-
- outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf),
- allocator);
- outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size),
- allocator);
-
- if (outq->bufs == NULL || outq->bufs_mem == NULL) {
- lzma_outq_end(outq, allocator);
- return LZMA_MEM_ERROR;
- }
- }
+ if (threads > LZMA_THREADS_MAX)
+ return LZMA_OPTIONS_ERROR;
+
+ const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
+
+ // Clear head/tail.
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+
+ // If new buf_limit is lower than the old one, we may need to free
+ // a few cached buffers.
+ while (bufs_limit < outq->bufs_allocated)
+ free_one_cached_buffer(outq, allocator);
- // Initialize the rest of the main structure. Initialization of
- // outq->bufs[] is done when they are actually needed.
- outq->buf_size_max = (size_t)(buf_size_max);
- outq->bufs_allocated = bufs_count;
- outq->bufs_pos = 0;
- outq->bufs_used = 0;
+ outq->bufs_limit = bufs_limit;
outq->read_pos = 0;
return LZMA_OK;
@@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
extern void
lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
{
- lzma_free(outq->bufs, allocator);
- outq->bufs = NULL;
-
- lzma_free(outq->bufs_mem, allocator);
- outq->bufs_mem = NULL;
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+ lzma_outq_clear_cache(outq, allocator);
return;
}
-extern lzma_outbuf *
-lzma_outq_get_buf(lzma_outq *outq)
+extern lzma_ret
+lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
+ size_t size)
{
// Caller must have checked it with lzma_outq_has_buf().
- assert(outq->bufs_used < outq->bufs_allocated);
+ assert(outq->bufs_in_use < outq->bufs_limit);
+
+ // If there already is appropriately-sized buffer in the cache,
+ // we need to do nothing.
+ if (outq->cache != NULL && outq->cache->allocated == size)
+ return LZMA_OK;
+
+ if (size > SIZE_MAX - sizeof(lzma_outbuf))
+ return LZMA_MEM_ERROR;
+
+ // The cache may have buffers but their size is wrong.
+ lzma_outq_clear_cache(outq, allocator);
+
+ outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator);
+ if (outq->cache == NULL)
+ return LZMA_MEM_ERROR;
+
+ outq->cache->next = NULL;
+ outq->cache->allocated = size;
- // Initialize the new buffer.
- lzma_outbuf *buf = &outq->bufs[outq->bufs_pos];
- buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max;
- buf->size = 0;
+ ++outq->bufs_allocated;
+ outq->memusage += sizeof(lzma_outbuf) + size;
+
+ return LZMA_OK;
+}
+
+
+extern lzma_outbuf *
+lzma_outq_get_buf(lzma_outq *outq, void *worker)
+{
+ // Caller must have used lzma_outq_prealloc_buf() to ensure these.
+ assert(outq->bufs_in_use < outq->bufs_limit);
+ assert(outq->bufs_in_use < outq->bufs_allocated);
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+ buf->next = NULL;
+
+ if (outq->tail != NULL) {
+ assert(outq->head != NULL);
+ outq->tail->next = buf;
+ } else {
+ assert(outq->head == NULL);
+ outq->head = buf;
+ }
+
+ outq->tail = buf;
+
+ buf->worker = worker;
buf->finished = false;
+ buf->pos = 0;
- // Update the queue state.
- if (++outq->bufs_pos == outq->bufs_allocated)
- outq->bufs_pos = 0;
+ buf->unpadded_size = 0;
+ buf->uncompressed_size = 0;
- ++outq->bufs_used;
+ ++outq->bufs_in_use;
return buf;
}
@@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq)
extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
+ if (outq->head == NULL)
+ return false;
- return outq->bufs[i].finished;
+ return outq->read_pos < outq->head->pos || outq->head->finished;
}
extern lzma_ret
-lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out,
- size_t *restrict out_pos, size_t out_size,
+lzma_outq_read(lzma_outq *restrict outq,
+ const lzma_allocator *restrict allocator,
+ uint8_t *restrict out, size_t *restrict out_pos,
+ size_t out_size,
lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size)
{
// There must be at least one buffer from which to read.
- if (outq->bufs_used == 0)
+ if (outq->bufs_in_use == 0)
return LZMA_OK;
// Get the buffer.
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
-
- lzma_outbuf *buf = &outq->bufs[i];
-
- // If it isn't finished yet, we cannot read from it.
- if (!buf->finished)
- return LZMA_OK;
+ lzma_outbuf *buf = outq->head;
// Copy from the buffer to output.
- lzma_bufcpy(buf->buf, &outq->read_pos, buf->size,
+ //
+ // FIXME? In threaded decoder it may be bad to do this copy while
+ // the mutex is being held.
+ lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
out, out_pos, out_size);
// Return if we didn't get all the data from the buffer.
- if (outq->read_pos < buf->size)
+ if (!buf->finished || outq->read_pos < buf->pos)
return LZMA_OK;
// The buffer was finished. Tell the caller its size information.
- *unpadded_size = buf->unpadded_size;
- *uncompressed_size = buf->uncompressed_size;
+ if (unpadded_size != NULL)
+ *unpadded_size = buf->unpadded_size;
+
+ if (uncompressed_size != NULL)
+ *uncompressed_size = buf->uncompressed_size;
// Free this buffer for further use.
- --outq->bufs_used;
+ move_head_to_cache(outq, allocator);
outq->read_pos = 0;
return LZMA_STREAM_END;
}
+
+
+extern void
+lzma_outq_enable_partial_output(lzma_outq *outq,
+ void (*enable_partial_output)(void *worker))
+{
+ if (outq->head != NULL && !outq->head->finished
+ && outq->head->worker != NULL) {
+ enable_partial_output(outq->head->worker);
+
+ // Set it to NULL since calling it twice is pointless.
+ outq->head->worker = NULL;
+ }
+
+ return;
+}
diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h
index 079634de..355e0ced 100644
--- a/src/liblzma/common/outqueue.h
+++ b/src/liblzma/common/outqueue.h
@@ -14,16 +14,27 @@
/// Output buffer for a single thread
-typedef struct {
- /// Pointer to the output buffer of lzma_outq.buf_size_max bytes
- uint8_t *buf;
-
- /// Amount of data written to buf
- size_t size;
-
- /// Additional size information
- lzma_vli unpadded_size;
- lzma_vli uncompressed_size;
+typedef struct lzma_outbuf_s lzma_outbuf;
+struct lzma_outbuf_s {
+ /// Pointer to the next buffer. This is used for the cached buffers.
+ /// The worker thread must not modify this.
+ lzma_outbuf *next;
+
+ /// This initialized by lzma_outq_get_buf() and
+ /// is used by lzma_outq_enable_partial_output().
+ /// The worker thread must not modify this.
+ void *worker;
+
+ /// Amount of memory allocated for buf[].
+ /// The worker thread must not modify this.
+ size_t allocated;
+
+ /// Writing position in the worker thread or, in other words, the
+ /// amount of finished data written to buf[] which can be copied out
+ ///
+ /// \note This is read by another thread and thus access
+ /// to this variable needs a mutex.
+ size_t pos;
/// True when no more data will be written into this buffer.
///
@@ -31,32 +42,44 @@ typedef struct {
/// to this variable needs a mutex.
bool finished;
-} lzma_outbuf;
+ /// Additional size information. lzma_outq_read() may read these
+ /// when "finished" is true.
+ lzma_vli unpadded_size;
+ lzma_vli uncompressed_size;
+ /// Buffer of "allocated" bytes
+ uint8_t buf[];
+};
-typedef struct {
- /// Array of buffers that are used cyclically.
- lzma_outbuf *bufs;
- /// Memory allocated for all the buffers
- uint8_t *bufs_mem;
+typedef struct {
+ /// Linked list of buffers in use. The next output byte will be
+ /// read from the head and buffers for the next thread will be
+ /// appended to the tail. tail->next is always NULL.
+ lzma_outbuf *head;
+ lzma_outbuf *tail;
- /// Amount of buffer space available in each buffer
- size_t buf_size_max;
+ /// Number of bytes read from head->buf[] in lzma_outq_read()
+ size_t read_pos;
- /// Number of buffers allocated
- uint32_t bufs_allocated;
+ /// Linked list of allocated buffers that aren't currently used.
+ /// This way buffers of similar size can be reused and don't
+ /// need to be reallocated every time. For simplicity, all
+ /// cached buffers in the list have the same allocated size.
+ lzma_outbuf *cache;
- /// Position in the bufs array. The next buffer to be taken
- /// into use is bufs[bufs_pos].
- uint32_t bufs_pos;
+ /// Total amount of memory allocated for buffers
+ uint64_t memusage;
- /// Number of buffers in use
- uint32_t bufs_used;
+ /// Number of buffers in use in the head...tail list. If and only if
+ /// this is zero, the pointers head and tail above are NULL.
+ uint32_t bufs_in_use;
- /// Position in the buffer in lzma_outq_read()
- size_t read_pos;
+ /// Number of buffers allocated (in use + cached)
+ uint32_t bufs_allocated;
+ /// Maximum allowed number of allocated buffers
+ uint32_t bufs_limit;
} lzma_outq;
@@ -76,32 +99,50 @@ extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads);
/// function knows that there are no previous
/// allocations to free.
/// \param allocator Pointer to allocator or NULL
-/// \param buf_size_max Maximum amount of data that a single buffer
-/// in the queue may need to store.
/// \param threads Number of buffers that may be in use
/// concurrently. Note that more than this number
-/// of buffers will actually get allocated to
+/// of buffers may actually get allocated to
/// improve performance when buffers finish
-/// out of order.
+/// out of order. The actual maximum number of
+/// allocated buffers is derived from the number
+/// of threads.
///
/// \return - LZMA_OK
/// - LZMA_MEM_ERROR
///
-extern lzma_ret lzma_outq_init(
- lzma_outq *outq, const lzma_allocator *allocator,
- uint64_t buf_size_max, uint32_t threads);
+extern lzma_ret lzma_outq_init(lzma_outq *outq,
+ const lzma_allocator *allocator, uint32_t threads);
/// \brief Free the memory associated with the output queue
extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator);
+/// \brief Free all cached buffers that consume memory but aren't in use
+extern void lzma_outq_clear_cache(
+ lzma_outq *outq, const lzma_allocator *allocator);
+
+
+/// \brief Preallocate a new buffer into cache
+///
+/// Splitting the buffer allocation into a separate function makes it
+/// possible to ensure that way lzma_outq_get_buf() cannot fail.
+/// If the preallocated buffer isn't actually used (for example, some
+/// other error occurs), the caller has to do nothing as the buffer will
+/// be used later or cleared from the cache when not needed.
+///
+/// \return LZMA_OK on success, LZMA_MEM_ERROR if allocation fails
+///
+extern lzma_ret lzma_outq_prealloc_buf(
+ lzma_outq *outq, const lzma_allocator *allocator, size_t size);
+
+
/// \brief Get a new buffer
///
-/// lzma_outq_has_buf() must be used to check that there is a buffer
+/// lzma_outq_prealloc_buf() must be used to ensure that there is a buffer
/// available before calling lzma_outq_get_buf().
///
-extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq);
+extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq, void *worker);
/// \brief Test if there is data ready to be read
@@ -126,17 +167,32 @@ extern bool lzma_outq_is_readable(const lzma_outq *outq);
/// \return - LZMA: All OK. Either no data was available or the buffer
/// being read didn't become empty yet.
/// - LZMA_STREAM_END: The buffer being read was finished.
-/// *unpadded_size and *uncompressed_size were set.
+/// *unpadded_size and *uncompressed_size were set if they
+/// were not NULL.
///
-/// \note This reads lzma_outbuf.finished variables and thus call
-/// to this function needs to be protected with a mutex.
+/// \note This reads lzma_outbuf.finished and .pos variables and thus
+/// calls to this function need to be protected with a mutex.
///
extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
+ const lzma_allocator *restrict allocator,
uint8_t *restrict out, size_t *restrict out_pos,
size_t out_size, lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size);
+/// \brief Enable partial output from a worker thread
+///
+/// If the buffer at the head of the output queue isn't finished,
+/// this will call enable_partial_output on the worker associated with
+/// that output buffer.
+///
+/// \note This reads a lzma_outbuf.finished variable and thus
+/// calls to this function need to be protected with a mutex.
+///
+extern void lzma_outq_enable_partial_output(lzma_outq *outq,
+ void (*enable_partial_output)(void *worker));
+
+
/// \brief Test if there is at least one buffer free
///
/// This must be used before getting a new buffer with lzma_outq_get_buf().
@@ -144,7 +200,7 @@ extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
static inline bool
lzma_outq_has_buf(const lzma_outq *outq)
{
- return outq->bufs_used < outq->bufs_allocated;
+ return outq->bufs_in_use < outq->bufs_limit;
}
@@ -152,5 +208,5 @@ lzma_outq_has_buf(const lzma_outq *outq)
static inline bool
lzma_outq_is_empty(const lzma_outq *outq)
{
- return outq->bufs_used == 0;
+ return outq->bufs_in_use == 0;
}
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 01e40339..6b897ab9 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -133,6 +133,9 @@ struct lzma_stream_coder_s {
/// Output buffer queue for compressed data
lzma_outq outq;
+ /// How much memory to allocate for each lzma_outbuf.buf
+ size_t outbuf_alloc_size;
+
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
@@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
-worker_encode(worker_thread *thr, worker_state state)
+worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
@@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state)
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
- .compressed_size = thr->coder->outq.buf_size_max,
+ .compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
// TODO: To allow changing the filter chain, the filters
@@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state)
size_t in_pos = 0;
size_t in_size = 0;
- thr->outbuf->size = thr->block_options.header_size;
- const size_t out_size = thr->coder->outq.buf_size_max;
+ *out_pos = thr->block_options.header_size;
+ const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
- // Store in_pos and out_pos into *thr so that
+ // Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
@@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
- thr->progress_out = thr->outbuf->size;
+ thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
@@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state)
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
- &thr->outbuf->size, out_size, action);
- } while (ret == LZMA_OK && thr->outbuf->size < out_size);
+ out_pos, out_size, action);
+ } while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
@@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state)
return state;
// Do the encoding. This takes care of the Block Header too.
- thr->outbuf->size = 0;
+ *out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
- &thr->outbuf->size, out_size);
+ out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
@@ -367,11 +370,13 @@ worker_start(void *thr_ptr)
}
}
+ size_t out_pos = 0;
+
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
- state = worker_encode(thr, state);
+ state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
@@ -387,14 +392,17 @@ worker_start(void *thr_ptr)
}
mythread_sync(thr->coder->mutex) {
- // Mark the output buffer as finished if
- // no errors occurred.
- thr->outbuf->finished = state == THR_FINISH;
+ // If no errors occurred, make the encoded data
+ // available to be copied out.
+ if (state == THR_FINISH) {
+ thr->outbuf->pos = out_pos;
+ thr->outbuf->finished = true;
+ }
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
- thr->coder->progress_out += thr->outbuf->size;
+ thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
@@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
+ // That's also true if we cannot allocate memory for the output
+ // buffer in the output queue.
+ return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
+ coder->outbuf_alloc_size));
+
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
- coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
+ coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
mythread_cond_signal(&coder->thr->cond);
}
@@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Try to read compressed data to out[].
- ret = lzma_outq_read(&coder->outq,
+ ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
@@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
- if (block_size > SIZE_MAX)
+ if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
@@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
+ coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
@@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
- outbuf_size_max, options->threads));
+ options->threads));
// Timeout
coder->timeout = options->timeout;