// SPDX-License-Identifier: 0BSD
///////////////////////////////////////////////////////////////////////////////
//
/// \file stream_encoder_mt.c
/// \brief Multithreaded .xz Stream encoder
//
// Author: Lasse Collin
//
///////////////////////////////////////////////////////////////////////////////
#include "filter_encoder.h"
#include "easy_preset.h"
#include "block_encoder.h"
#include "block_buffer_encoder.h"
#include "index_encoder.h"
#include "outqueue.h"
/// Maximum supported block size. This makes it simpler to prevent integer
/// overflows if we are given unusually large block size.
#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
typedef enum {
/// Waiting for work.
THR_IDLE,
/// Encoding is in progress.
THR_RUN,
/// Encoding is in progress but no more input data will
/// be read.
THR_FINISH,
/// The main thread wants the thread to stop whatever it was doing
/// but not exit.
THR_STOP,
/// The main thread wants the thread to exit. We could use
/// cancellation but since there's stopped anyway, this is lazier.
THR_EXIT,
} worker_state;
typedef struct lzma_stream_coder_s lzma_stream_coder;
typedef struct worker_thread_s worker_thread;
struct worker_thread_s {
worker_state state;
/// Input buffer of coder->block_size bytes. The main thread will
/// put new input into this and update in_size accordingly. Once
/// no more input is coming, state will be set to THR_FINISH.
uint8_t *in;
/// Amount of data available in the input buffer. This is modified
/// only by the main thread.
size_t in_size;
/// Output buffer for this thread. This is set by the main
/// thread every time a new Block is started with this thread
/// structure.
lzma_outbuf *outbuf;
/// Pointer to the main structure is needed when putting this
/// thread back to the stack of free threads.
lzma_stream_coder *coder;
/// The allocator is set by the main thread. Since a copy of the
/// pointer is kept here, the application must not change the
/// allocator before calling lzma_end().
const lzma_allocator *allocator;
/// Amount of uncompressed data that has already been compressed.
uint64_t progress_in;
/// Amount of compressed data that is ready.
uint64_t progress_out;
/// Block encoder
lzma_next_coder block_encoder;
/// Compression options for this Block
lzma_block block_options;
/// Filter chain for this thread. By copying the filters array
/// to each thread it is possible to change the filter chain
/// between Blocks using lzma_filters_update().
lzma_filter filters[LZMA_FILTERS_MAX + 1];
/// Next structure in the stack of free worker threads.
worker_thread *next;
mythread_mutex mutex;
mythread_cond cond;
/// The ID of this thread is used to join the thread
/// when it's not needed anymore.
mythread thread_id;
};
struct lzma_stream_coder_s {
enum {
SEQ_STREAM_HEADER,
SEQ_BLOCK,
SEQ_INDEX,
SEQ_STREAM_FOOTER,
} sequence;
/// Start a new Block every block_size bytes of input unless
/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
size_t block_size;
/// The filter chain to use for the next Block.
/// This can be updated using lzma_filters_update()
/// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
lzma_filter filters[LZMA_FILTERS_MAX + 1];
/// A copy of filters[] will be put here when attempting to get
/// a new worker thread. This will be copied to a worker thread
/// when a thread becomes free and then this cache is marked as
/// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
/// the filter options from filters[] would get uselessly copied
/// multiple times (allocated and freed) when waiting for a new free
/// worker thread.
///
/// This is freed if filters[] is updated via lzma_filters_update().
lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
/// Index to hold sizes of the Blocks
lzma_index *index;
/// Index encoder
lzma_next_coder index_encoder;
/// Stream Flags for encoding the Stream Header and Stream Footer.
lzma_stream_flags stream_flags;
/// Buffer to hold Stream Header and Stream Footer.
uint8_t header[LZMA_STREAM_HEADER_SIZE];
/// Read position in header[]
size_t header_pos;
/// Output buffer queue for compressed data
lzma_outq outq;
/// How much memory to allocate for each lzma_outbuf.buf
size_t outbuf_alloc_size;
/// Maximum wait time if cannot use all the input and cannot
/// fill the output buffer. This is in milliseconds.
uint32_t timeout;
/// Error code from a worker thread
lzma_ret thread_error;
/// Array of allocated thread-specific structures
worker_thread *threads;
/// Number of structures in "threads" above. This is also the
/// number of threads that will be created at maximum.
uint32_t threads_max;
/// Number of thread structures that have been initialized, and
/// thus the number of worker threads actually created so far.
uint32_t threads_initialized;
/// Stack of free threads. When a thread finishes, it puts itself
/// back into this stack. This starts as empty because threads
/// are created only when actually needed.
worker_thread *threads_free;
/// The most recent worker thread to which the main thread writes
/// the new input from the application.
worker_thread *thr;
/// Amount of uncompressed data in Blocks that have already
/// been finished.
uint64_t progress_in;
/// Amount of compressed data in Stream Header + Blocks that
/// have already been finished.
uint64_t progress_out;
mythread_mutex mutex;
mythread_cond cond;
};
/// Tell the main thread that something has gone wrong.
static void
worker_error(worker_thread *thr, lzma_ret ret)
{
assert(ret != LZMA_OK);
assert(ret != LZMA_STREAM_END);
mythread_sync(thr->coder->mutex) {
if (thr->coder->thread_error == LZMA_OK)
thr->coder->thread_error = ret;
mythread_cond_signal(&thr->coder->cond);
}
return;
}
static worker_state
worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
{
assert(thr->progress_in == 0);
assert(thr->progress_out == 0);
// Set the Block options.
thr->block_options = (lzma_block){
.version = 0,
.check = thr->coder->stream_flags.check,
.compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
.filters = thr->filters,
};
// Calculate maximum size of the Block Header. This amount is
// reserved in the beginning of the buffer so that Block Header
// along with Compressed Size and Uncompressed Size can be
// written there.
lzma_ret ret = lzma_block_header_size(&thr->block_options);
if (ret != LZMA_OK) {
worker_error(thr, ret);
return THR_STOP;
}
// Initialize the Block encoder.
ret = lzma_block_encoder_init(&thr->block_encoder,
thr->allocator, &thr->block_options);
if (ret != LZMA_OK) {
worker_error(thr, ret);
return THR_STOP;
}
size_t in_pos = 0;
size_t in_size = 0;
*out_pos = thr->block_options.header_size;
const size_t out_size = thr->outbuf->allocated;
do {
mythread_sync(thr->mutex) {
// Store in_pos and *out_pos into *thr so that
// an application may read them via
// lzma_get_progress() to get progress information.
//
// NOTE: These aren't updated when the encoding
// finishes. Instead, the final values are taken
// later from thr->outbuf.
thr->progress_in = in_pos;
thr->progress_out = *out_pos;
while (in_size == thr->in_size
&& thr->state == THR_RUN)
mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
}
// Return if we were asked to stop or exit.
if (state >= THR_STOP)
return state;
lzma_action action = state == THR_FINISH
? LZMA_FINISH : LZMA_RUN;
// Limit the amount of input given to the Block encoder
// at once. This way this thread can react fairly quickly
// if the main thread wants us to stop or exit.
static const size_t in_chunk_max = 16384;
size_t in_limit = in_size;
if (in_size - in_pos > in_chunk_max) {
in_limit = in_pos + in_chunk_max;
action = LZMA_RUN;
}
ret = thr->block_encoder.code(
thr->block_encoder.coder, thr->allocator,
thr->in, &in_pos, in_limit, thr->outbuf->buf,
out_pos, out_size, action);
} while (ret == LZMA_OK && *out_pos < out_size);
switch (ret) {
case LZMA_STREAM_END:
assert(state == THR_FINISH);
// Encode the Block Header. By doing it after
// the compression, we can store the Compressed Size
// and Uncompressed Size fields.
ret = lzma_block_header_encode(&thr->block_options,
thr->outbuf->buf);
if (ret != LZMA_OK) {
worker_error(thr, ret);
return THR_STOP;
}
break;
case LZMA_OK:
// The data was incompressible. Encode it using uncompressed
// LZMA2 chunks.
//
// First wait that we have gotten all the input.
mythread_sync(thr->mutex) {
while (thr->state == THR_RUN)
mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
}
if (state >= THR_STOP)
return state;
// Do the encoding. This takes care of the Block Header too.
*out_pos = 0;
ret = lzma_block_uncomp_encode(&thr->block_options,
thr->in, in_size, thr->outbuf->buf,
out_pos, out_size);
// It shouldn't fail.
if (ret != LZMA_OK) {
worker_error(thr, LZMA_PROG_ERROR);
return THR_STOP;
}
break;
default:
worker_error(thr, ret);
return THR_STOP;
}
// Set the size information that will be read by the main thread
// to write the Index field.
thr->outbuf->unpadded_size
= lzma_block_unpadded_size(&thr->block_options);
assert(thr->outbuf->unpadded_size != 0);
thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
return THR_FINISH;
}
static MYTHREAD_RET_TYPE
worker_start(void *thr_ptr)
{
worker_thread *thr = thr_ptr;
worker_state state = THR_IDLE; // Init to silence a warning
while (true) {
// Wait for work.
mythread_sync(thr->mutex) {
while (true) {
// The thread is already idle so if we are
// requested to stop, just set the state.
if (thr->state == THR_STOP) {
thr->state = THR_IDLE;
mythread_cond_signal(&thr->cond);
}
state = thr->state;
if (state != THR_IDLE)
break;
mythread_cond_wait(&thr->cond, &thr->mutex);
}
}
size_t out_pos = 0;
assert(state != THR_IDLE);
assert(state != THR_STOP);
if (state <= THR_FINISH)
state = worker_encode(thr, &out_pos, state);
if (state == THR_EXIT)
break;
// Mark the thread as idle unless the main thread has
// told us to exit. Signal is needed for the case
// where the main thread is waiting for the threads to stop.
mythread_sync(thr->mutex) {
if (thr->state != THR_EXIT) {
thr->state = THR_IDLE;
mythread_cond_signal(&thr->cond);
}
}
mythread_sync(thr->coder->mutex) {
// If no errors occurred, make the encoded data
// available to be copied out.
if (state == THR_FINISH) {
thr->outbuf->pos = out_pos;
thr->outbuf->finished = true;
}
// Update the main progress info.
thr->coder->progress_in
+= thr->outbuf->uncompressed_size;
thr->coder->progress_out += out_pos;
thr->progress_in = 0;
thr->progress_out = 0;
// Return this thread to the stack of free threads.
thr->next = thr->coder->threads_free;
thr->coder->threads_free = thr;
mythread_cond_signal(&thr->coder->cond);
}
}
// Exiting, free the resources.
lzma_filters_free(thr->filters, thr->allocator);
mythread_mutex_destroy(&thr->mutex);
mythread_cond_destroy(&thr->cond);
lzma_next_end(&thr->block_encoder, thr->allocator);
lzma_free(thr->in, thr->allocator);
return MYTHREAD_RET_VALUE;
}
/// Make the threads stop but not exit. Optionally wait for them to stop.
static void
threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
{
// Tell the threads to stop.
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_STOP;
mythread_cond_signal(&coder->threads[i].cond);
}
}
if (!wait_for_threads)
return;
// Wait for the threads to settle in the idle state.
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
while (coder->threads[i].state != THR_IDLE)
mythread_cond_wait(&coder->threads[i].cond,
&coder->threads[i].mutex);
}
}
return;
}
/// Stop the threads and free the resources associated with them.
/// Wait until the threads have exited.
static void
threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
{
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_EXIT;
mythread_cond_signal(&coder->threads[i].cond);
}
}
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
int ret = mythread_join(coder->threads[i].thread_id);
assert(ret == 0);
(void)ret;
}
lzma_free(coder->threads, allocator);
return;
}
/// Initialize a new worker_thread structure and create a new thread.
static lzma_ret
initialize_new_thread(lzma_stream_coder *coder,
const lzma_allocator *allocator)
{
worker_thread *thr = &coder->threads[coder->threads_initialized];
thr->in = lzma_alloc(coder->block_size, allocator);
if (thr->in == NULL)
return LZMA_MEM_ERROR;
if (mythread_mutex_init(&thr->mutex))
goto error_mutex;
if (mythread_cond_init(&thr->cond))
goto error_cond;
thr->state = THR_IDLE;
thr->allocator = allocator;
thr->coder = coder;
thr->progress_in = 0;
thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT;
thr->filters[0].id = LZMA_VLI_UNKNOWN;
if (mythread_create(&thr->thread_id, &worker_start, thr))
goto error_thread;
++coder->threads_initialized;
coder->thr = thr;
return LZMA_OK;
error_thread:
mythread_cond_destroy(&thr->cond);
error_cond:
mythread_mutex_destroy(&thr->mutex);
error_mutex:
lzma_free(thr->in, allocator);
return LZMA_MEM_ERROR;
}
static lzma_ret
get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
{
// If there are no free output subqueues, there is no
// point to try getting a thread.
if (!lzma_outq_has_buf(&coder->outq))
return LZMA_OK;
// That's also true if we cannot allocate memory for the output
// buffer in the output queue.
return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
coder->outbuf_alloc_size));
// Make a thread-specific copy of the filter chain. Put it in
// the cache array first so that if we cannot get a new thread yet,
// the allocation is ready when we try again.
if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
return_if_error(lzma_filters_copy(
coder->filters, coder->filters_cache, allocator));
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
coder->thr = coder->threads_free;
coder->threads_free = coder->threads_free->next;
}
}
if (coder->thr == NULL) {
// If there are no uninitialized structures left, return.
if (coder->threads_initialized == coder->threads_max)
return LZMA_OK;
// Initialize a new thread.
return_if_error(initialize_new_thread(coder, allocator));
}
// Reset the parts of the thread state that have to be done
// in the main thread.
mythread_sync(coder->thr->mutex) {
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
// Free the old thread-specific filter options and replace
// them with the already-allocated new options from
// coder->filters_cache[]. Then mark the cache as empty.
lzma_filters_free(coder->thr->filters, allocator);
memcpy(coder->thr->filters, coder->filters_cache,
sizeof(coder->filters_cache));
coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
mythread_cond_signal(&coder->thr->cond);
}
return LZMA_OK;
}
static lzma_ret
stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
const uint8_t *restrict in, size_t *restrict in_pos,
size_t in_size, lzma_action action)
{
while (*in_pos < in_size
|| (coder->thr != NULL && action != LZMA_RUN)) {
if (coder->thr == NULL) {
// Get a new thread.
const lzma_ret ret = get_thread(coder, allocator);
if (coder->thr == NULL)
return ret;
}
// Copy the input data to thread's buffer.
size_t thr_in_size = coder->thr->in_size;
lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
&thr_in_size, coder->block_size);
// Tell the Block encoder to finish if
// - it has got block_size bytes of input; or
// - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
// or LZMA_FULL_BARRIER was used.
//
// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
const bool finish = thr_in_size == coder->block_size
|| (*in_pos == in_size && action != LZMA_RUN);
bool block_error = false;
mythread_sync(coder->thr->mutex) {
if (coder->thr->state == THR_IDLE) {
// Something has gone wrong with the Block
// encoder. It has set coder->thread_error
// which we will read a few lines later.
block_error = true;
} else {
// Tell the Block encoder its new amount
// of input and update the state if needed.
coder->thr->in_size = thr_in_size;
if (finish)
coder->thr->state = THR_FINISH;
mythread_cond_signal(&coder->thr->cond);
}
}
if (block_error) {
lzma_ret ret = LZMA_OK; // Init to silence a warning.
mythread_sync(coder->mutex) {
ret = coder->thread_error;
}
return ret;
}
if (finish)
coder->thr = NULL;
}
return LZMA_OK;
}
/// Wait until more input can be consumed, more output can be read, or
/// an optional timeout is reached.
static bool
wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
bool *has_blocked, bool has_input)
{
if (coder->timeout != 0 && !*has_blocked) {
// Every time when stream_encode_mt() is called via
// lzma_code(), *has_blocked starts as false. We set it
// to true here and calculate the absolute time when
// we must return if there's nothing to do.
//
// This way if we block multiple times for short moments
// less than "timeout" milliseconds, we will return once
// "timeout" amount of time has passed since the *first*
// blocking occurred. If the absolute time was calculated
// again every time we block, "timeout" would effectively
// be meaningless if we never consecutively block longer
// than "timeout" ms.
*has_blocked = true;
mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
}
bool timed_out = false;
mythread_sync(coder->mutex) {
// There are four things that we wait. If one of them
// becomes possible, we return.
// - If there is input left, we need to get a free
// worker thread and an output buffer for it.
// - Data ready to be read from the output queue.
// - A worker thread indicates an error.
// - Time out occurs.
while ((!has_input || coder->threads_free == NULL
|| !lzma_outq_has_buf(&coder->outq))
&& !lzma_outq_is_readable(&coder->outq)
&& coder->thread_error == LZMA_OK
&& !timed_out) {
if (coder->timeout != 0)
timed_out = mythread_cond_timedwait(
&coder->cond, &coder->mutex,
wait_abs) != 0;
else
mythread_cond_wait(&coder->cond,
&coder->mutex);
}
}
return timed_out;
}
static lzma_ret
stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
const uint8_t *restrict in, size_t *restrict in_pos,
size_t in_size, uint8_t *restrict out,
size_t *restrict out_pos, size_t out_size, lzma_action action)
{
lzma_stream_coder *coder = coder_ptr;
switch (coder->sequence) {
case SEQ_STREAM_HEADER:
lzma_bufcpy(coder->header, &coder->header_pos,
sizeof(coder->header),
out, out_pos, out_size);
if (coder->header_pos < sizeof(coder->header))
return LZMA_OK;
coder->header_pos = 0;
coder->sequence = SEQ_BLOCK;
// Fall through
case SEQ_BLOCK: {
// Initialized to silence warnings.
lzma_vli unpadded_size = 0;
lzma_vli uncompressed_size = 0;
lzma_ret ret = LZMA_OK;
// These are for wait_for_work().
bool has_blocked = false;
mythread_condtime wait_abs = { 0 };
while (true) {
mythread_sync(coder->mutex) {
// Check for Block encoder errors.
ret = coder->thread_error;
if (ret != LZMA_OK) {
assert(ret != LZMA_STREAM_END);
break; // Break out of mythread_sync.
}
// Try to read compressed data to out[].
ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
&unpadded_size,
&uncompressed_size);
}
if (ret == LZMA_STREAM_END) {
// End of Block. Add it to the Index.
ret = lzma_index_append(coder->index,
allocator, unpadded_size,
uncompressed_size);
if (ret != LZMA_OK) {
threads_stop(coder, false);
return ret;
}
// If we didn't fill the output buffer yet,
// try to read more data. Maybe the next
// outbuf has been finished already too.
if (*out_pos < out_size)
continue;
}
if (ret != LZMA_OK) {
// coder->thread_error was set.
threads_stop(coder, false);
return ret;
}
// Try to give uncompressed data to a worker thread.
ret = stream_encode_in(coder, allocator,
in, in_pos, in_size, action);
if (ret != LZMA_OK) {
threads_stop(coder, false);
return ret;
}
// See if we should wait or return.
//
// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
if (*in_pos == in_size) {
// LZMA_RUN: More data is probably coming
// so return to let the caller fill the
// input buffer.
if (action == LZMA_RUN)
return LZMA_OK;
// LZMA_FULL_BARRIER: The same as with
// LZMA_RUN but tell the caller that the
// barrier was completed.
if (action == LZMA_FULL_BARRIER)
return LZMA_STREAM_END;
// Finishing or flushing isn't completed until
// all input data has been encoded and copied
// to the output buffer.
if (lzma_outq_is_empty(&coder->outq)) {
// LZMA_FINISH: Continue to encode
// the Index field.
if (action == LZMA_FINISH)
break;
// LZMA_FULL_FLUSH: Return to tell
// the caller that flushing was
// completed.
if (action == LZMA_FULL_FLUSH)
return LZMA_STREAM_END;
}
}
// Return if there is no output space left.
// This check must be done after testing the input
// buffer, because we might want to use a different
// return code.
if (*out_pos == out_size)
return LZMA_OK;
// Neither in nor out has been used completely.
// Wait until there's something we can do.
if (wait_for_work(coder, &wait_abs, &has_blocked,
*in_pos < in_size))
return LZMA_TIMED_OUT;
}
// All Blocks have been encoded and the threads have stopped.
// Prepare to encode the Index field.
return_if_error(lzma_index_encoder_init(
&coder->index_encoder, allocator,
coder->index));
coder->sequence = SEQ_INDEX;
// Update the progress info to take the Index and
// Stream Footer into account. Those are very fast to encode
// so in terms of progress information they can be thought
// to be ready to be copied out.
coder->progress_out += lzma_index_size(coder->index)
+ LZMA_STREAM_HEADER_SIZE;
}
// Fall through
case SEQ_INDEX: {
// Call the Index encoder. It doesn't take any input, so
// those pointers can be NULL.
const lzma_ret ret = coder->index_encoder.code(
coder->index_encoder.coder, allocator,
NULL, NULL, 0,
out, out_pos, out_size, LZMA_RUN);
if (ret != LZMA_STREAM_END)
return ret;
// Encode the Stream Footer into coder->buffer.
coder->stream_flags.backward_size
= lzma_index_size(coder->index);
if (lzma_stream_footer_encode(&coder->stream_flags,
coder->header) != LZMA_OK)
return LZMA_PROG_ERROR;
coder->sequence = SEQ_STREAM_FOOTER;
}
// Fall through
case SEQ_STREAM_FOOTER:
lzma_bufcpy(coder->header, &coder->header_pos,
sizeof(coder->header),
out, out_pos, out_size);
return coder->header_pos < sizeof(coder->header)
? LZMA_OK : LZMA_STREAM_END;
}
assert(0);
return LZMA_PROG_ERROR;
}
static void
stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
{
lzma_stream_coder *coder = coder_ptr;
// Threads must be killed before the output queue can be freed.
threads_end(coder, allocator);
lzma_outq_end(&coder->outq, allocator);
lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
lzma_next_end(&coder->index_encoder, allocator);
lzma_index_end(coder->index, allocator);
mythread_cond_destroy(&coder->cond);
mythread_mutex_destroy(&coder->mutex);
lzma_free(coder, allocator);
return;
}
static lzma_ret
stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
const lzma_filter *filters,
const lzma_filter *reversed_filters
lzma_attribute((__unused__)))
{
lzma_stream_coder *coder = coder_ptr;
// Applications shouldn't attempt to change the options when
// we are already encoding the Index or Stream Footer.
if (coder->sequence > SEQ_BLOCK)
return LZMA_PROG_ERROR;
// For now the threaded encoder doesn't support changing
// the options in the middle of a Block.
if (coder->thr != NULL)
return LZMA_PROG_ERROR;
// Check if the filter chain seems mostly valid. See the comment
// in stream_encoder_mt_init().
if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
return LZMA_OPTIONS_ERROR;
// Make a copy to a temporary buffer first. This way the encoder
// state stays unchanged if an error occurs in lzma_filters_copy().
lzma_filter temp[LZMA_FILTERS_MAX + 1];
return_if_error(lzma_filters_copy(filters, temp, allocator));
// Free the options of the old chain as well as the cache.
lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
// Copy the new filter chain in place.
memcpy(coder->filters, temp, sizeof(temp));
return LZMA_OK;
}
/// Options handling for lzma_stream_encoder_mt_init() and
/// lzma_stream_encoder_mt_memusage()
static lzma_ret
get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
const lzma_filter **filters, uint64_t *block_size,
uint64_t *outbuf_size_max)
{
// Validate some of the options.
if (options == NULL)
return LZMA_PROG_ERROR;
if (options->flags != 0 || options->threads == 0
|| options->threads > LZMA_THREADS_MAX)
return LZMA_OPTIONS_ERROR;
if (options->filters != NULL) {
// Filter chain was given, use it as is.
*filters = options->filters;
} else {
// Use a preset.
if (lzma_easy_preset(opt_easy, options->preset))
return LZMA_OPTIONS_ERROR;
*filters = opt_easy->filters;
}
// If the Block size is not set, determine it from the filter chain.
if (options->block_size > 0)
*block_size = options->block_size;
else
*block_size = lzma_mt_block_size(*filters);
// UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
// should be optimized out by any reasonable compiler.
// The second condition should be there in the unlikely event that
// the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
return LZMA_OPTIONS_ERROR;
// Calculate the maximum amount output that a single output buffer
// may need to hold. This is the same as the maximum total size of
// a Block.
*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
if (*outbuf_size_max == 0)
return LZMA_MEM_ERROR;
return LZMA_OK;
}
static void
get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
{
lzma_stream_coder *coder = coder_ptr;
// Lock coder->mutex to prevent finishing threads from moving their
// progress info from the worker_thread structure to lzma_stream_coder.
mythread_sync(coder->mutex) {
*progress_in = coder->progress_in;
*progress_out = coder->progress_out;
for (size_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
*progress_in += coder->threads[i].progress_in;
*progress_out += coder->threads[i]
.progress_out;
}
}
}
return;
}
static lzma_ret
stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
const lzma_mt *options)
{
lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
// Get the filter chain.
lzma_options_easy easy;
const lzma_filter *filters;
uint64_t block_size;
uint64_t outbuf_size_max;
return_if_error(get_options(options, &easy, &filters,
&block_size, &outbuf_size_max));
#if SIZE_MAX < UINT64_MAX
if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
// Validate the filter chain so that we can give an error in this
// function instead of delaying it to the first call to lzma_code().
// The memory usage calculation verifies the filter chain as
// a side effect so we take advantage of that. It's not a perfect
// check though as raw encoder allows LZMA1 too but such problems
// will be caught eventually with Block Header encoder.
if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
return LZMA_OPTIONS_ERROR;
// Validate the Check ID.
if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
return LZMA_PROG_ERROR;
if (!lzma_check_is_supported(options->check))
return LZMA_UNSUPPORTED_CHECK;
// Allocate and initialize the base structure if needed.
lzma_stream_coder *coder = next->coder;
if (coder == NULL) {
coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
if (coder == NULL)
return LZMA_MEM_ERROR;
next->coder = coder;
// For the mutex and condition variable initializations
// the error handling has to be done here because
// stream_encoder_mt_end() doesn't know if they have
// already been initialized or not.
if (mythread_mutex_init(&coder->mutex)) {
lzma_free(coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
if (mythread_cond_init(&coder->cond)) {
mythread_mutex_destroy(&coder->mutex);
lzma_free(coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end;
next->get_progress = &get_progress;
next->update = &stream_encoder_mt_update;
coder->filters[0].id = LZMA_VLI_UNKNOWN;
coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
coder->index_encoder = LZMA_NEXT_CODER_INIT;
coder->index = NULL;
memzero(&coder->outq, sizeof(coder->outq));
coder->threads = NULL;
coder->threads_max = 0;
coder->threads_initialized = 0;
}
// Basic initializations
coder->sequence = SEQ_STREAM_HEADER;
coder->block_size = (size_t)(block_size);
coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
coder->thread_error = LZMA_OK;
coder->thr = NULL;
// Allocate the thread-specific base structures.
assert(options->threads > 0);
if (coder->threads_max != options->threads) {
threads_end(coder, allocator);
coder->threads = NULL;
coder->threads_max = 0;
coder->threads_initialized = 0;
coder->threads_free = NULL;
coder->threads = lzma_alloc(
options->threads * sizeof(worker_thread),
allocator);
if (coder->threads == NULL)
return LZMA_MEM_ERROR;
coder->threads_max = options->threads;
} else {
// Reuse the old structures and threads. Tell the running
// threads to stop and wait until they have stopped.
threads_stop(coder, true);
}
// Output queue
return_if_error(lzma_outq_init(&coder->outq, allocator,
options->threads));
// Timeout
coder->timeout = options->timeout;
// Free the old filter chain and the cache.
lzma_filters_free(coder->filters, allocator);
lzma_filters_free(coder->filters_cache, allocator);
// Copy the new filter chain.
return_if_error(lzma_filters_copy(
filters, coder->filters, allocator));
// Index
lzma_index_end(coder->index, allocator);
coder->index = lzma_index_init(allocator);
if (coder->index == NULL)
return LZMA_MEM_ERROR;
// Stream Header
coder->stream_flags.version = 0;
coder->stream_flags.check = options->check;
return_if_error(lzma_stream_header_encode(
&coder->stream_flags, coder->header));
coder->header_pos = 0;
// Progress info
coder->progress_in = 0;
coder->progress_out = LZMA_STREAM_HEADER_SIZE;
return LZMA_OK;
}
#ifdef HAVE_SYMBOL_VERSIONS_LINUX
// These are for compatibility with binaries linked against liblzma that
// has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
// Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
// but it has been added here anyway since someone might misread the
// RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
lzma_ret, lzma_stream_encoder_mt_512a)(
lzma_stream *strm, const lzma_mt *options)
lzma_nothrow lzma_attr_warn_unused_result
__attribute__((__alias__("lzma_stream_encoder_mt_52")));
LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
lzma_ret, lzma_stream_encoder_mt_522)(
lzma_stream *strm, const lzma_mt *options)
lzma_nothrow lzma_attr_warn_unused_result
__attribute__((__alias__("lzma_stream_encoder_mt_52")));
LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
lzma_ret, lzma_stream_encoder_mt_52)(
lzma_stream *strm, const lzma_mt *options)
lzma_nothrow lzma_attr_warn_unused_result;
#define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
#endif
extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
{
lzma_next_strm_init(stream_encoder_mt_init, strm, options);
strm->internal->supported_actions[LZMA_RUN] = true;
// strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
strm->internal->supported_actions[LZMA_FINISH] = true;
return LZMA_OK;
}
#ifdef HAVE_SYMBOL_VERSIONS_LINUX
LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
uint64_t, lzma_stream_encoder_mt_memusage_512a)(
const lzma_mt *options) lzma_nothrow lzma_attr_pure
__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
uint64_t, lzma_stream_encoder_mt_memusage_522)(
const lzma_mt *options) lzma_nothrow lzma_attr_pure
__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
uint64_t, lzma_stream_encoder_mt_memusage_52)(
const lzma_mt *options) lzma_nothrow lzma_attr_pure;
#define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
#endif
// This function name is a monster but it's consistent with the older
// monster names. :-( 31 chars is the max that C99 requires so in that
// sense it's not too long. ;-)
extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt *options)
{
lzma_options_easy easy;
const lzma_filter *filters;
uint64_t block_size;
uint64_t outbuf_size_max;
if (get_options(options, &easy, &filters, &block_size,
&outbuf_size_max) != LZMA_OK)
return UINT64_MAX;
// Memory usage of the input buffers
const uint64_t inbuf_memusage = options->threads * block_size;
// Memory usage of the filter encoders
uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
if (filters_memusage == UINT64_MAX)
return UINT64_MAX;
filters_memusage *= options->threads;
// Memory usage of the output queue
const uint64_t outq_memusage = lzma_outq_memusage(
outbuf_size_max, options->threads);
if (outq_memusage == UINT64_MAX)
return UINT64_MAX;
// Sum them with overflow checking.
uint64_t total_memusage = LZMA_MEMUSAGE_BASE
+ sizeof(lzma_stream_coder)
+ options->threads * sizeof(worker_thread);
if (UINT64_MAX - total_memusage < inbuf_memusage)
return UINT64_MAX;
total_memusage += inbuf_memusage;
if (UINT64_MAX - total_memusage < filters_memusage)
return UINT64_MAX;
total_memusage += filters_memusage;
if (UINT64_MAX - total_memusage < outq_memusage)
return UINT64_MAX;
return total_memusage + outq_memusage;
}