aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2022-11-24 16:25:10 +0200
committerLasse Collin <lasse.collin@tukaani.org>2022-11-24 16:25:10 +0200
commit93439cfafe1768b3b18d67d2356ef7e7559bba59 (patch)
tree7b02d2ea781fa77021c587f7d95c173a88c8d9cd /src
parentUpdate THANKS. (diff)
downloadxz-93439cfafe1768b3b18d67d2356ef7e7559bba59.tar.xz
liblzma: Add lzma_filters_update() support to the multi-threaded encoder.
A tiny downside of this is that now a 1-4 tiny allocations are made for every Block because each worker thread needs its own copy of the filter chain.
Diffstat (limited to 'src')
-rw-r--r--src/liblzma/api/lzma/filter.h36
-rw-r--r--src/liblzma/common/stream_encoder_mt.c96
2 files changed, 109 insertions, 23 deletions
diff --git a/src/liblzma/api/lzma/filter.h b/src/liblzma/api/lzma/filter.h
index 41c5895e..5d0a63c5 100644
--- a/src/liblzma/api/lzma/filter.h
+++ b/src/liblzma/api/lzma/filter.h
@@ -226,21 +226,27 @@ extern LZMA_API(lzma_ret) lzma_raw_decoder(
/**
* \brief Update the filter chain in the encoder
*
- * This function is for advanced users only. This function has two slightly
- * different purposes:
- *
- * - After LZMA_FULL_FLUSH when using Stream encoder: Set a new filter
- * chain, which will be used starting from the next Block.
- *
- * - After LZMA_SYNC_FLUSH using Raw, Block, or Stream encoder: Change
- * the filter-specific options in the middle of encoding. The actual
- * filters in the chain (Filter IDs) cannot be changed. In the future,
- * it might become possible to change the filter options without
- * using LZMA_SYNC_FLUSH.
- *
- * While rarely useful, this function may be called also when no data has
- * been compressed yet. In that case, this function will behave as if
- * LZMA_FULL_FLUSH (Stream encoder) or LZMA_SYNC_FLUSH (Raw or Block
+ * This function may be called after lzma_code() has returned LZMA_STREAM_END
+ * when LZMA_FULL_BARRIER, LZMA_FULL_FLUSH, or LZMA_SYNC_FLUSH was used:
+ *
+ * - After LZMA_FULL_BARRIER or LZMA_FULL_FLUSH: Single-threaded .xz Stream
+ * encoder (lzma_stream_encoder()) and (since liblzma 5.4.0) multi-threaded
+ * Stream encoder (lzma_stream_encoder_mt()) allow setting a new filter
+ * chain to be used for the next Block(s).
+ *
+ * - After LZMA_SYNC_FLUSH: Raw encoder (lzma_raw_encoder()),
+ * Block encocder (lzma_block_encoder()), and single-threaded .xz Stream
+ * encoder (lzma_stream_encoder()) allow changing certain filter-specific
+ * options in the middle of encoding. The actual filters in the chain
+ * (Filter IDs) must not be changed! Currently only the lc, lp, and pb
+ * options of LZMA2 (not LZMA1) can be changed this way.
+ *
+ * - In the future some filters might allow changing some of their options
+ * without any barrier or flushing but currently such filters don't exist.
+ *
+ * This function may also be called when no data has been compressed yet
+ * although this is rarely useful. In that case, this function will behave
+ * as if LZMA_FULL_FLUSH (Stream encoders) or LZMA_SYNC_FLUSH (Raw or Block
* encoder) had been used right before calling this function.
*
* \return - LZMA_OK
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 2c6d4386..f4497c10 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -85,6 +85,11 @@ struct worker_thread_s {
/// Compression options for this Block
lzma_block block_options;
+ /// Filter chain for this thread. By copying the filters array
+ /// to each thread it is possible to change the filter chain
+ /// between Blocks using lzma_filters_update().
+ lzma_filter filters[LZMA_FILTERS_MAX + 1];
+
/// Next structure in the stack of free worker threads.
worker_thread *next;
@@ -109,9 +114,22 @@ struct lzma_stream_coder_s {
/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
size_t block_size;
- /// The filter chain currently in use
+ /// The filter chain to use for the next Block.
+ /// This can be updated using lzma_filters_update()
+ /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
lzma_filter filters[LZMA_FILTERS_MAX + 1];
+ /// A copy of filters[] will be put here when attempting to get
+ /// a new worker thread. This will be copied to a worker thread
+ /// when a thread becomes free and then this cache is marked as
+ /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
+ /// the filter options from filters[] would get uselessly copied
+ /// multiple times (allocated and freed) when waiting for a new free
+ /// worker thread.
+ ///
+ /// This is freed if filters[] is updated via lzma_filters_update().
+ lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
+
/// Index to hold sizes of the Blocks
lzma_index *index;
@@ -210,10 +228,7 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
.check = thr->coder->stream_flags.check,
.compressed_size = thr->outbuf->allocated,
.uncompressed_size = thr->coder->block_size,
-
- // TODO: To allow changing the filter chain, the filters
- // array must be copied to each worker_thread.
- .filters = thr->coder->filters,
+ .filters = thr->filters,
};
// Calculate maximum size of the Block Header. This amount is
@@ -415,6 +430,8 @@ worker_start(void *thr_ptr)
}
// Exiting, free the resources.
+ lzma_filters_free(thr->filters, thr->allocator);
+
mythread_mutex_destroy(&thr->mutex);
mythread_cond_destroy(&thr->cond);
@@ -498,6 +515,7 @@ initialize_new_thread(lzma_stream_coder *coder,
thr->progress_in = 0;
thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT;
+ thr->filters[0].id = LZMA_VLI_UNKNOWN;
if (mythread_create(&thr->thread_id, &worker_start, thr))
goto error_thread;
@@ -532,6 +550,13 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
coder->outbuf_alloc_size));
+ // Make a thread-specific copy of the filter chain. Put it in
+ // the cache array first so that if we cannot get a new thread yet,
+ // the allocation is ready when we try again.
+ if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
+ return_if_error(lzma_filters_copy(
+ coder->filters, coder->filters_cache, allocator));
+
// If there is a free structure on the stack, use it.
mythread_sync(coder->mutex) {
if (coder->threads_free != NULL) {
@@ -555,6 +580,15 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
+
+ // Free the old thread-specific filter options and replace
+ // them with the already-allocated new options from
+ // coder->filters_cache[]. Then mark the cache as empty.
+ lzma_filters_free(coder->thr->filters, allocator);
+ memcpy(coder->thr->filters, coder->filters_cache,
+ sizeof(coder->filters_cache));
+ coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
+
mythread_cond_signal(&coder->thr->cond);
}
@@ -867,6 +901,7 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
lzma_outq_end(&coder->outq, allocator);
lzma_filters_free(coder->filters, allocator);
+ lzma_filters_free(coder->filters_cache, allocator);
lzma_next_end(&coder->index_encoder, allocator);
lzma_index_end(coder->index, allocator);
@@ -879,6 +914,45 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
}
+static lzma_ret
+stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
+ const lzma_filter *filters,
+ const lzma_filter *reversed_filters
+ lzma_attribute((__unused__)))
+{
+ lzma_stream_coder *coder = coder_ptr;
+
+ // Applications shouldn't attempt to change the options when
+ // we are already encoding the Index or Stream Footer.
+ if (coder->sequence > SEQ_BLOCK)
+ return LZMA_PROG_ERROR;
+
+ // For now the threaded encoder doesn't support changing
+ // the options in the middle of a Block.
+ if (coder->thr != NULL)
+ return LZMA_PROG_ERROR;
+
+ // Check if the filter chain seems mostly valid. See the comment
+ // in stream_encoder_mt_init().
+ if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
+ return LZMA_OPTIONS_ERROR;
+
+ // Make a copy to a temporary buffer first. This way the encoder
+ // state stays unchanged if an error occurs in lzma_filters_copy().
+ lzma_filter temp[LZMA_FILTERS_MAX + 1];
+ return_if_error(lzma_filters_copy(filters, temp, allocator));
+
+ // Free the options of the old chain as well as the cache.
+ lzma_filters_free(coder->filters, allocator);
+ lzma_filters_free(coder->filters_cache, allocator);
+
+ // Copy the new filter chain in place.
+ memcpy(coder->filters, temp, sizeof(temp));
+
+ return LZMA_OK;
+}
+
+
/// Options handling for lzma_stream_encoder_mt_init() and
/// lzma_stream_encoder_mt_memusage()
static lzma_ret
@@ -977,7 +1051,9 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Validate the filter chain so that we can give an error in this
// function instead of delaying it to the first call to lzma_code().
// The memory usage calculation verifies the filter chain as
- // a side effect so we take advantage of that.
+ // a side effect so we take advantage of that. It's not a perfect
+ // check though as raw encoder allows LZMA1 too but such problems
+ // will be caught eventually with Block Header encoder.
if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
return LZMA_OPTIONS_ERROR;
@@ -1017,9 +1093,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end;
next->get_progress = &get_progress;
-// next->update = &stream_encoder_mt_update;
+ next->update = &stream_encoder_mt_update;
coder->filters[0].id = LZMA_VLI_UNKNOWN;
+ coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
coder->index_encoder = LZMA_NEXT_CODER_INIT;
coder->index = NULL;
memzero(&coder->outq, sizeof(coder->outq));
@@ -1066,8 +1143,11 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// Timeout
coder->timeout = options->timeout;
- // Free the old filter chain and copy the new one.
+ // Free the old filter chain and the cache.
lzma_filters_free(coder->filters, allocator);
+ lzma_filters_free(coder->filters_cache, allocator);
+
+ // Copy the new filter chain.
return_if_error(lzma_filters_copy(
filters, coder->filters, allocator));