aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/liblzma/common/outqueue.c268
-rw-r--r--src/liblzma/common/outqueue.h138
-rw-r--r--src/liblzma/common/stream_encoder_mt.c52
3 files changed, 301 insertions, 157 deletions
diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c
index 2dc8a38d..6331a50c 100644
--- a/src/liblzma/common/outqueue.c
+++ b/src/liblzma/common/outqueue.c
@@ -13,84 +13,100 @@
#include "outqueue.h"
-/// This is to ease integer overflow checking: We may allocate up to
-/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other
-/// data structures (that's the second /2).
-#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2)
+/// Get the maximum number of buffers that may be allocated based
+/// on the number of threads. For now this is twice the number of threads.
+/// It's a compromise between RAM usage and keeping the worker threads busy
+/// when buffers finish out of order.
+#define GET_BUFS_LIMIT(threads) (2 * (threads))
-static lzma_ret
-get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count,
- uint64_t buf_size_max, uint32_t threads)
+extern uint64_t
+lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
- if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX)
- return LZMA_OPTIONS_ERROR;
-
- // The number of buffers is twice the number of threads.
- // This wastes RAM but keeps the threads busy when buffers
- // finish out of order.
+ // This is to ease integer overflow checking: We may allocate up to
+ // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
+ // memory for other data structures too (that's the /2).
//
- // NOTE: If this is changed, update BUF_SIZE_MAX too.
- *bufs_count = threads * 2;
- *bufs_alloc_size = *bufs_count * buf_size_max;
+ // lzma_outq_prealloc_buf() will still accept bigger buffers than this.
+ const uint64_t limit
+ = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
- return LZMA_OK;
+ if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
+ return UINT64_MAX;
+
+ return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max);
}
-extern uint64_t
-lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
+static void
+move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
+ assert(outq->head != NULL);
+ assert(outq->tail != NULL);
+ assert(outq->bufs_in_use > 0);
+
+ --outq->bufs_in_use;
+
+ lzma_outbuf *buf = outq->head;
+ outq->head = buf->next;
+ if (outq->head == NULL)
+ outq->tail = NULL;
+
+ if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
+ lzma_outq_clear_cache(outq, allocator);
+
+ buf->next = outq->cache;
+ outq->cache = buf;
+
+ return;
+}
- if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads)
- != LZMA_OK)
- return UINT64_MAX;
- return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf)
- + bufs_alloc_size;
+static void
+free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+
+ --outq->bufs_allocated;
+ outq->memusage -= sizeof(*buf) + buf->allocated;
+
+ lzma_free(buf, allocator);
+ return;
+}
+
+
+extern void
+lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
+{
+ while (outq->cache != NULL)
+ free_one_cached_buffer(outq, allocator);
+
+ return;
}
extern lzma_ret
lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
- uint64_t buf_size_max, uint32_t threads)
+ uint32_t threads)
{
- uint64_t bufs_alloc_size;
- uint32_t bufs_count;
-
- // Set bufs_count and bufs_alloc_size.
- return_if_error(get_options(&bufs_alloc_size, &bufs_count,
- buf_size_max, threads));
-
- // Allocate memory if needed.
- if (outq->buf_size_max != buf_size_max
- || outq->bufs_allocated != bufs_count) {
- lzma_outq_end(outq, allocator);
-
-#if SIZE_MAX < UINT64_MAX
- if (bufs_alloc_size > SIZE_MAX)
- return LZMA_MEM_ERROR;
-#endif
-
- outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf),
- allocator);
- outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size),
- allocator);
-
- if (outq->bufs == NULL || outq->bufs_mem == NULL) {
- lzma_outq_end(outq, allocator);
- return LZMA_MEM_ERROR;
- }
- }
+ if (threads > LZMA_THREADS_MAX)
+ return LZMA_OPTIONS_ERROR;
+
+ const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
+
+ // Clear head/tail.
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+
+ // If new buf_limit is lower than the old one, we may need to free
+ // a few cached buffers.
+ while (bufs_limit < outq->bufs_allocated)
+ free_one_cached_buffer(outq, allocator);
- // Initialize the rest of the main structure. Initialization of
- // outq->bufs[] is done when they are actually needed.
- outq->buf_size_max = (size_t)(buf_size_max);
- outq->bufs_allocated = bufs_count;
- outq->bufs_pos = 0;
- outq->bufs_used = 0;
+ outq->bufs_limit = bufs_limit;
outq->read_pos = 0;
return LZMA_OK;
@@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
extern void
lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
{
- lzma_free(outq->bufs, allocator);
- outq->bufs = NULL;
-
- lzma_free(outq->bufs_mem, allocator);
- outq->bufs_mem = NULL;
+ while (outq->head != NULL)
+ move_head_to_cache(outq, allocator);
+ lzma_outq_clear_cache(outq, allocator);
return;
}
-extern lzma_outbuf *
-lzma_outq_get_buf(lzma_outq *outq)
+extern lzma_ret
+lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
+ size_t size)
{
// Caller must have checked it with lzma_outq_has_buf().
- assert(outq->bufs_used < outq->bufs_allocated);
+ assert(outq->bufs_in_use < outq->bufs_limit);
+
+ // If there already is appropriately-sized buffer in the cache,
+ // we need to do nothing.
+ if (outq->cache != NULL && outq->cache->allocated == size)
+ return LZMA_OK;
+
+ if (size > SIZE_MAX - sizeof(lzma_outbuf))
+ return LZMA_MEM_ERROR;
+
+ // The cache may have buffers but their size is wrong.
+ lzma_outq_clear_cache(outq, allocator);
+
+ outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator);
+ if (outq->cache == NULL)
+ return LZMA_MEM_ERROR;
+
+ outq->cache->next = NULL;
+ outq->cache->allocated = size;
- // Initialize the new buffer.
- lzma_outbuf *buf = &outq->bufs[outq->bufs_pos];
- buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max;
- buf->size = 0;
+ ++outq->bufs_allocated;
+ outq->memusage += sizeof(lzma_outbuf) + size;
+
+ return LZMA_OK;
+}
+
+
+extern lzma_outbuf *
+lzma_outq_get_buf(lzma_outq *outq, void *worker)
+{
+ // Caller must have used lzma_outq_prealloc_buf() to ensure these.
+ assert(outq->bufs_in_use < outq->bufs_limit);
+ assert(outq->bufs_in_use < outq->bufs_allocated);
+ assert(outq->cache != NULL);
+
+ lzma_outbuf *buf = outq->cache;
+ outq->cache = buf->next;
+ buf->next = NULL;
+
+ if (outq->tail != NULL) {
+ assert(outq->head != NULL);
+ outq->tail->next = buf;
+ } else {
+ assert(outq->head == NULL);
+ outq->head = buf;
+ }
+
+ outq->tail = buf;
+
+ buf->worker = worker;
buf->finished = false;
+ buf->pos = 0;
- // Update the queue state.
- if (++outq->bufs_pos == outq->bufs_allocated)
- outq->bufs_pos = 0;
+ buf->unpadded_size = 0;
+ buf->uncompressed_size = 0;
- ++outq->bufs_used;
+ ++outq->bufs_in_use;
return buf;
}
@@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq)
extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
+ if (outq->head == NULL)
+ return false;
- return outq->bufs[i].finished;
+ return outq->read_pos < outq->head->pos || outq->head->finished;
}
extern lzma_ret
-lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out,
- size_t *restrict out_pos, size_t out_size,
+lzma_outq_read(lzma_outq *restrict outq,
+ const lzma_allocator *restrict allocator,
+ uint8_t *restrict out, size_t *restrict out_pos,
+ size_t out_size,
lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size)
{
// There must be at least one buffer from which to read.
- if (outq->bufs_used == 0)
+ if (outq->bufs_in_use == 0)
return LZMA_OK;
// Get the buffer.
- uint32_t i = outq->bufs_pos - outq->bufs_used;
- if (outq->bufs_pos < outq->bufs_used)
- i += outq->bufs_allocated;
-
- lzma_outbuf *buf = &outq->bufs[i];
-
- // If it isn't finished yet, we cannot read from it.
- if (!buf->finished)
- return LZMA_OK;
+ lzma_outbuf *buf = outq->head;
// Copy from the buffer to output.
- lzma_bufcpy(buf->buf, &outq->read_pos, buf->size,
+ //
+ // FIXME? In threaded decoder it may be bad to do this copy while
+ // the mutex is being held.
+ lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
out, out_pos, out_size);
// Return if we didn't get all the data from the buffer.
- if (outq->read_pos < buf->size)
+ if (!buf->finished || outq->read_pos < buf->pos)
return LZMA_OK;
// The buffer was finished. Tell the caller its size information.
- *unpadded_size = buf->unpadded_size;
- *uncompressed_size = buf->uncompressed_size;
+ if (unpadded_size != NULL)
+ *unpadded_size = buf->unpadded_size;
+
+ if (uncompressed_size != NULL)
+ *uncompressed_size = buf->uncompressed_size;
// Free this buffer for further use.
- --outq->bufs_used;
+ move_head_to_cache(outq, allocator);
outq->read_pos = 0;
return LZMA_STREAM_END;
}
+
+
+extern void
+lzma_outq_enable_partial_output(lzma_outq *outq,
+ void (*enable_partial_output)(void *worker))
+{
+ if (outq->head != NULL && !outq->head->finished
+ && outq->head->worker != NULL) {
+ enable_partial_output(outq->head->worker);
+
+ // Set it to NULL since calling it twice is pointless.
+ outq->head->worker = NULL;
+ }
+
+ return;
+}
diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h
index 079634de..355e0ced 100644
--- a/src/liblzma/common/outqueue.h
+++ b/src/liblzma/common/outqueue.h
@@ -14,16 +14,27 @@
/// Output buffer for a single thread
-typedef struct {
- /// Pointer to the output buffer of lzma_outq.buf_size_max bytes
- uint8_t *buf;
-
- /// Amount of data written to buf
- size_t size;
-
- /// Additional size information
- lzma_vli unpadded_size;
- lzma_vli uncompressed_size;
+typedef struct lzma_outbuf_s lzma_outbuf;
+struct lzma_outbuf_s {
+ /// Pointer to the next buffer. This is used for the cached buffers.
+ /// The worker thread must not modify this.
+ lzma_outbuf *next;
+
+ /// This initialized by lzma_outq_get_buf() and
+ /// is used by lzma_outq_enable_partial_output().
+ /// The worker thread must not modify this.
+ void *worker;
+
+ /// Amount of memory allocated for buf[].
+ /// The worker thread must not modify this.
+ size_t allocated;
+
+ /// Writing position in the worker thread or, in other words, the
+ /// amount of finished data written to buf[] which can be copied out
+ ///
+ /// \note This is read by another thread and thus access
+ /// to this variable needs a mutex.
+ size_t pos;
/// True when no more data will be written into this buffer.
///
@@ -31,32 +42,44 @@ typedef struct {
/// to this variable needs a mutex.
bool finished;
-} lzma_outbuf;
+ /// Additional size information. lzma_outq_read() may read these
+ /// when "finished" is true.
+ lzma_vli unpadded_size;
+ lzma_vli uncompressed_size;
+ /// Buffer of "allocated" bytes
+ uint8_t buf[];
+};
-typedef struct {
- /// Array of buffers that are used cyclically.
- lzma_outbuf *bufs;
- /// Memory allocated for all the buffers
- uint8_t *bufs_mem;
+typedef struct {
+ /// Linked list of buffers in use. The next output byte will be
+ /// read from the head and buffers for the next thread will be
+ /// appended to the tail. tail->next is always NULL.
+ lzma_outbuf *head;
+ lzma_outbuf *tail;
- /// Amount of buffer space available in each buffer
- size_t buf_size_max;
+ /// Number of bytes read from head->buf[] in lzma_outq_read()
+ size_t read_pos;
- /// Number of buffers allocated
- uint32_t bufs_allocated;
+ /// Linked list of allocated buffers that aren't currently used.
+ /// This way buffers of similar size can be reused and don't
+ /// need to be reallocated every time. For simplicity, all
+ /// cached buffers in the list have the same allocated size.
+ lzma_outbuf *cache;
- /// Position in the bufs array. The next buffer to be taken
- /// into use is bufs[bufs_pos].
- uint32_t bufs_pos;
+ /// Total amount of memory allocated for buffers
+ uint64_t memusage;
- /// Number of buffers in use
- uint32_t bufs_used;
+ /// Number of buffers in use in the head...tail list. If and only if
+ /// this is zero, the pointers head and tail above are NULL.
+ uint32_t bufs_in_use;
- /// Position in the buffer in lzma_outq_read()
- size_t read_pos;
+ /// Number of buffers allocated (in use + cached)
+ uint32_t bufs_allocated;
+ /// Maximum allowed number of allocated buffers
+ uint32_t bufs_limit;
} lzma_outq;
@@ -76,32 +99,50 @@ extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads);
/// function knows that there are no previous
/// allocations to free.
/// \param allocator Pointer to allocator or NULL
-/// \param buf_size_max Maximum amount of data that a single buffer
-/// in the queue may need to store.
/// \param threads Number of buffers that may be in use
/// concurrently. Note that more than this number
-/// of buffers will actually get allocated to
+/// of buffers may actually get allocated to
/// improve performance when buffers finish
-/// out of order.
+/// out of order. The actual maximum number of
+/// allocated buffers is derived from the number
+/// of threads.
///
/// \return - LZMA_OK
/// - LZMA_MEM_ERROR
///
-extern lzma_ret lzma_outq_init(
- lzma_outq *outq, const lzma_allocator *allocator,
- uint64_t buf_size_max, uint32_t threads);
+extern lzma_ret lzma_outq_init(lzma_outq *outq,
+ const lzma_allocator *allocator, uint32_t threads);
/// \brief Free the memory associated with the output queue
extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator);
+/// \brief Free all cached buffers that consume memory but aren't in use
+extern void lzma_outq_clear_cache(
+ lzma_outq *outq, const lzma_allocator *allocator);
+
+
+/// \brief Preallocate a new buffer into cache
+///
+/// Splitting the buffer allocation into a separate function makes it
+/// possible to ensure that way lzma_outq_get_buf() cannot fail.
+/// If the preallocated buffer isn't actually used (for example, some
+/// other error occurs), the caller has to do nothing as the buffer will
+/// be used later or cleared from the cache when not needed.
+///
+/// \return LZMA_OK on success, LZMA_MEM_ERROR if allocation fails
+///
+extern lzma_ret lzma_outq_prealloc_buf(
+ lzma_outq *outq, const lzma_allocator *allocator, size_t size);
+
+
/// \brief Get a new buffer
///
-/// lzma_outq_has_buf() must be used to check that there is a buffer
+/// lzma_outq_prealloc_buf() must be used to ensure that there is a buffer
/// available before calling lzma_outq_get_buf().
///
-extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq);
+extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq, void *worker);
/// \brief Test if there is data ready to be read
@@ -126,17 +167,32 @@ extern bool lzma_outq_is_readable(const lzma_outq *outq);
/// \return - LZMA: All OK. Either no data was available or the buffer
/// being read didn't become empty yet.
/// - LZMA_STREAM_END: The buffer being read was finished.
-/// *unpadded_size and *uncompressed_size were set.
+/// *unpadded_size and *uncompressed_size were set if they
+/// were not NULL.
///
-/// \note This reads lzma_outbuf.finished variables and thus call
-/// to this function needs to be protected with a mutex.
+/// \note This reads lzma_outbuf.finished and .pos variables and thus
+/// calls to this function need to be protected with a mutex.
///
extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
+ const lzma_allocator *restrict allocator,
uint8_t *restrict out, size_t *restrict out_pos,
size_t out_size, lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size);
+/// \brief Enable partial output from a worker thread
+///
+/// If the buffer at the head of the output queue isn't finished,
+/// this will call enable_partial_output on the worker associated with
+/// that output buffer.
+///
+/// \note This reads a lzma_outbuf.finished variable and thus
+/// calls to this function need to be protected with a mutex.
+///
+extern void lzma_outq_enable_partial_output(lzma_outq *outq,
+ void (*enable_partial_output)(void *worker));
+
+
/// \brief Test if there is at least one buffer free
///
/// This must be used before getting a new buffer with lzma_outq_get_buf().
@@ -144,7 +200,7 @@ extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
static inline bool
lzma_outq_has_buf(const lzma_outq *outq)
{
- return outq->bufs_used < outq->bufs_allocated;
+ return outq->bufs_in_use < outq->bufs_limit;
}
@@ -152,5 +208,5 @@ lzma_outq_has_buf(const lzma_outq *outq)
static inline bool
lzma_outq_is_empty(const lzma_outq *outq)
{
- return outq->bufs_used == 0;
+ return outq->bufs_in_use == 0;
}
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 01e40339..6b897ab9 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -133,6 +133,9 @@ struct lzma_stream_coder_s {
/// Output buffer queue for compressed data
lzma_outq outq;
+ /// How much memory to allocate for each lzma_outbuf.buf
+ size_t outbuf_alloc_size;
+
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
@@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
-worker_encode(worker_thread *thr, worker_state state)
+worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
@@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state)
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
- .compressed_size = thr->coder->outq.buf_size_max,
+ .compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
// TODO: To allow changing the filter chain, the filters
@@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state)
size_t in_pos = 0;
size_t in_size = 0;
- thr->outbuf->size = thr->block_options.header_size;
- const size_t out_size = thr->coder->outq.buf_size_max;
+ *out_pos = thr->block_options.header_size;
+ const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
- // Store in_pos and out_pos into *thr so that
+ // Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
@@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
- thr->progress_out = thr->outbuf->size;
+ thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
@@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state)
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
- &thr->outbuf->size, out_size, action);
- } while (ret == LZMA_OK && thr->outbuf->size < out_size);
+ out_pos, out_size, action);
+ } while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
@@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state)
return state;
// Do the encoding. This takes care of the Block Header too.
- thr->outbuf->size = 0;
+ *out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
- &thr->outbuf->size, out_size);
+ out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
@@ -367,11 +370,13 @@ worker_start(void *thr_ptr)
}
}
+ size_t out_pos = 0;
+
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
- state = worker_encode(thr, state);
+ state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
@@ -387,14 +392,17 @@ worker_start(void *thr_ptr)
}
mythread_sync(thr->coder->mutex) {
- // Mark the output buffer as finished if
- // no errors occurred.
- thr->outbuf->finished = state == THR_FINISH;
+ // If no errors occurred, make the encoded data
+ // available to be copied out.
+ if (state == THR_FINISH) {
+ thr->outbuf->pos = out_pos;
+ thr->outbuf->finished = true;
+ }
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
- thr->coder->progress_out += thr->outbuf->size;
+ thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
@@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
+ // That's also true if we cannot allocate memory for the output
+ // buffer in the output queue.
+ return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
+ coder->outbuf_alloc_size));
+
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
- coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
+ coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
mythread_cond_signal(&coder->thr->cond);
}
@@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
}
// Try to read compressed data to out[].
- ret = lzma_outq_read(&coder->outq,
+ ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
@@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
- if (block_size > SIZE_MAX)
+ if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
@@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
+ coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
@@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
- outbuf_size_max, options->threads));
+ options->threads));
// Timeout
coder->timeout = options->timeout;