From 93439cfafe1768b3b18d67d2356ef7e7559bba59 Mon Sep 17 00:00:00 2001 From: Lasse Collin Date: Thu, 24 Nov 2022 16:25:10 +0200 Subject: liblzma: Add lzma_filters_update() support to the multi-threaded encoder. A tiny downside of this is that now a 1-4 tiny allocations are made for every Block because each worker thread needs its own copy of the filter chain. --- src/liblzma/api/lzma/filter.h | 36 +++++++------ src/liblzma/common/stream_encoder_mt.c | 96 +++++++++++++++++++++++++++++++--- 2 files changed, 109 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/liblzma/api/lzma/filter.h b/src/liblzma/api/lzma/filter.h index 41c5895e..5d0a63c5 100644 --- a/src/liblzma/api/lzma/filter.h +++ b/src/liblzma/api/lzma/filter.h @@ -226,21 +226,27 @@ extern LZMA_API(lzma_ret) lzma_raw_decoder( /** * \brief Update the filter chain in the encoder * - * This function is for advanced users only. This function has two slightly - * different purposes: - * - * - After LZMA_FULL_FLUSH when using Stream encoder: Set a new filter - * chain, which will be used starting from the next Block. - * - * - After LZMA_SYNC_FLUSH using Raw, Block, or Stream encoder: Change - * the filter-specific options in the middle of encoding. The actual - * filters in the chain (Filter IDs) cannot be changed. In the future, - * it might become possible to change the filter options without - * using LZMA_SYNC_FLUSH. - * - * While rarely useful, this function may be called also when no data has - * been compressed yet. In that case, this function will behave as if - * LZMA_FULL_FLUSH (Stream encoder) or LZMA_SYNC_FLUSH (Raw or Block + * This function may be called after lzma_code() has returned LZMA_STREAM_END + * when LZMA_FULL_BARRIER, LZMA_FULL_FLUSH, or LZMA_SYNC_FLUSH was used: + * + * - After LZMA_FULL_BARRIER or LZMA_FULL_FLUSH: Single-threaded .xz Stream + * encoder (lzma_stream_encoder()) and (since liblzma 5.4.0) multi-threaded + * Stream encoder (lzma_stream_encoder_mt()) allow setting a new filter + * chain to be used for the next Block(s). + * + * - After LZMA_SYNC_FLUSH: Raw encoder (lzma_raw_encoder()), + * Block encocder (lzma_block_encoder()), and single-threaded .xz Stream + * encoder (lzma_stream_encoder()) allow changing certain filter-specific + * options in the middle of encoding. The actual filters in the chain + * (Filter IDs) must not be changed! Currently only the lc, lp, and pb + * options of LZMA2 (not LZMA1) can be changed this way. + * + * - In the future some filters might allow changing some of their options + * without any barrier or flushing but currently such filters don't exist. + * + * This function may also be called when no data has been compressed yet + * although this is rarely useful. In that case, this function will behave + * as if LZMA_FULL_FLUSH (Stream encoders) or LZMA_SYNC_FLUSH (Raw or Block * encoder) had been used right before calling this function. * * \return - LZMA_OK diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c index 2c6d4386..f4497c10 100644 --- a/src/liblzma/common/stream_encoder_mt.c +++ b/src/liblzma/common/stream_encoder_mt.c @@ -85,6 +85,11 @@ struct worker_thread_s { /// Compression options for this Block lzma_block block_options; + /// Filter chain for this thread. By copying the filters array + /// to each thread it is possible to change the filter chain + /// between Blocks using lzma_filters_update(). + lzma_filter filters[LZMA_FILTERS_MAX + 1]; + /// Next structure in the stack of free worker threads. worker_thread *next; @@ -109,9 +114,22 @@ struct lzma_stream_coder_s { /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. size_t block_size; - /// The filter chain currently in use + /// The filter chain to use for the next Block. + /// This can be updated using lzma_filters_update() + /// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH. lzma_filter filters[LZMA_FILTERS_MAX + 1]; + /// A copy of filters[] will be put here when attempting to get + /// a new worker thread. This will be copied to a worker thread + /// when a thread becomes free and then this cache is marked as + /// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache + /// the filter options from filters[] would get uselessly copied + /// multiple times (allocated and freed) when waiting for a new free + /// worker thread. + /// + /// This is freed if filters[] is updated via lzma_filters_update(). + lzma_filter filters_cache[LZMA_FILTERS_MAX + 1]; + /// Index to hold sizes of the Blocks lzma_index *index; @@ -210,10 +228,7 @@ worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) .check = thr->coder->stream_flags.check, .compressed_size = thr->outbuf->allocated, .uncompressed_size = thr->coder->block_size, - - // TODO: To allow changing the filter chain, the filters - // array must be copied to each worker_thread. - .filters = thr->coder->filters, + .filters = thr->filters, }; // Calculate maximum size of the Block Header. This amount is @@ -415,6 +430,8 @@ worker_start(void *thr_ptr) } // Exiting, free the resources. + lzma_filters_free(thr->filters, thr->allocator); + mythread_mutex_destroy(&thr->mutex); mythread_cond_destroy(&thr->cond); @@ -498,6 +515,7 @@ initialize_new_thread(lzma_stream_coder *coder, thr->progress_in = 0; thr->progress_out = 0; thr->block_encoder = LZMA_NEXT_CODER_INIT; + thr->filters[0].id = LZMA_VLI_UNKNOWN; if (mythread_create(&thr->thread_id, &worker_start, thr)) goto error_thread; @@ -532,6 +550,13 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, coder->outbuf_alloc_size)); + // Make a thread-specific copy of the filter chain. Put it in + // the cache array first so that if we cannot get a new thread yet, + // the allocation is ready when we try again. + if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN) + return_if_error(lzma_filters_copy( + coder->filters, coder->filters_cache, allocator)); + // If there is a free structure on the stack, use it. mythread_sync(coder->mutex) { if (coder->threads_free != NULL) { @@ -555,6 +580,15 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) coder->thr->state = THR_RUN; coder->thr->in_size = 0; coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); + + // Free the old thread-specific filter options and replace + // them with the already-allocated new options from + // coder->filters_cache[]. Then mark the cache as empty. + lzma_filters_free(coder->thr->filters, allocator); + memcpy(coder->thr->filters, coder->filters_cache, + sizeof(coder->filters_cache)); + coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; + mythread_cond_signal(&coder->thr->cond); } @@ -867,6 +901,7 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) lzma_outq_end(&coder->outq, allocator); lzma_filters_free(coder->filters, allocator); + lzma_filters_free(coder->filters_cache, allocator); lzma_next_end(&coder->index_encoder, allocator); lzma_index_end(coder->index, allocator); @@ -879,6 +914,45 @@ stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) } +static lzma_ret +stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator, + const lzma_filter *filters, + const lzma_filter *reversed_filters + lzma_attribute((__unused__))) +{ + lzma_stream_coder *coder = coder_ptr; + + // Applications shouldn't attempt to change the options when + // we are already encoding the Index or Stream Footer. + if (coder->sequence > SEQ_BLOCK) + return LZMA_PROG_ERROR; + + // For now the threaded encoder doesn't support changing + // the options in the middle of a Block. + if (coder->thr != NULL) + return LZMA_PROG_ERROR; + + // Check if the filter chain seems mostly valid. See the comment + // in stream_encoder_mt_init(). + if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) + return LZMA_OPTIONS_ERROR; + + // Make a copy to a temporary buffer first. This way the encoder + // state stays unchanged if an error occurs in lzma_filters_copy(). + lzma_filter temp[LZMA_FILTERS_MAX + 1]; + return_if_error(lzma_filters_copy(filters, temp, allocator)); + + // Free the options of the old chain as well as the cache. + lzma_filters_free(coder->filters, allocator); + lzma_filters_free(coder->filters_cache, allocator); + + // Copy the new filter chain in place. + memcpy(coder->filters, temp, sizeof(temp)); + + return LZMA_OK; +} + + /// Options handling for lzma_stream_encoder_mt_init() and /// lzma_stream_encoder_mt_memusage() static lzma_ret @@ -977,7 +1051,9 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Validate the filter chain so that we can give an error in this // function instead of delaying it to the first call to lzma_code(). // The memory usage calculation verifies the filter chain as - // a side effect so we take advantage of that. + // a side effect so we take advantage of that. It's not a perfect + // check though as raw encoder allows LZMA1 too but such problems + // will be caught eventually with Block Header encoder. if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) return LZMA_OPTIONS_ERROR; @@ -1017,9 +1093,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, next->code = &stream_encode_mt; next->end = &stream_encoder_mt_end; next->get_progress = &get_progress; -// next->update = &stream_encoder_mt_update; + next->update = &stream_encoder_mt_update; coder->filters[0].id = LZMA_VLI_UNKNOWN; + coder->filters_cache[0].id = LZMA_VLI_UNKNOWN; coder->index_encoder = LZMA_NEXT_CODER_INIT; coder->index = NULL; memzero(&coder->outq, sizeof(coder->outq)); @@ -1066,8 +1143,11 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Timeout coder->timeout = options->timeout; - // Free the old filter chain and copy the new one. + // Free the old filter chain and the cache. lzma_filters_free(coder->filters, allocator); + lzma_filters_free(coder->filters_cache, allocator); + + // Copy the new filter chain. return_if_error(lzma_filters_copy( filters, coder->filters, allocator)); -- cgit v1.2.3