From e7b424d267a34803db8d92a3515528be2ed45abd Mon Sep 17 00:00:00 2001 From: Lasse Collin Date: Fri, 14 Dec 2012 20:13:32 +0200 Subject: Make the progress indicator smooth in threaded mode. This adds lzma_get_progress() to liblzma and takes advantage of it in xz. lzma_get_progress() collects progress information from the thread-specific structures so that fairly accurate progress information is available to applications. Adding a new function seemed to be a better way than making the information directly available in lzma_stream (like total_in and total_out are) because collecting the information requires locking mutexes. It's waste of time to do it more often than the up to date information is actually needed by an application. --- src/liblzma/api/lzma/base.h | 22 +++++++++- src/liblzma/common/common.c | 16 +++++++ src/liblzma/common/common.h | 6 +++ src/liblzma/common/stream_encoder_mt.c | 77 +++++++++++++++++++++++++++++++--- src/liblzma/liblzma.map | 1 + src/xz/message.c | 20 +++++---- 6 files changed, 129 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/liblzma/api/lzma/base.h b/src/liblzma/api/lzma/base.h index 66915348..ae194f0a 100644 --- a/src/liblzma/api/lzma/base.h +++ b/src/liblzma/api/lzma/base.h @@ -456,7 +456,8 @@ typedef struct lzma_internal_s lzma_internal; * * Application may modify the values of total_in and total_out as it wants. * They are updated by liblzma to match the amount of data read and - * written, but aren't used for anything else. + * written but aren't used for anything else except as a possible return + * values from lzma_get_progress(). */ typedef struct { const uint8_t *next_in; /**< Pointer to the next input byte. */ @@ -556,6 +557,25 @@ extern LZMA_API(lzma_ret) lzma_code(lzma_stream *strm, lzma_action action) extern LZMA_API(void) lzma_end(lzma_stream *strm) lzma_nothrow; +/** + * \brief Get progress information + * + * In single-threaded mode, applications can get progress information from + * strm->total_in and strm->total_out. In multi-threaded mode this is less + * useful because a significant amount of both input and output data gets + * buffered internally by liblzma. This makes total_in and total_out give + * misleading information and also makes the progress indicator updates + * non-smooth. + * + * This function gives realistic progress information also in multi-threaded + * mode by taking into account the progress made by each thread. In + * single-threaded mode *progress_in and *progress_out are set to + * strm->total_in and strm->total_out, respectively. + */ +extern LZMA_API(void) lzma_get_progress(lzma_stream *strm, + uint64_t *progress_in, uint64_t *progress_out) lzma_nothrow; + + /** * \brief Get the memory usage of decoder filter chain * diff --git a/src/liblzma/common/common.c b/src/liblzma/common/common.c index 5d4d2408..578d257a 100644 --- a/src/liblzma/common/common.c +++ b/src/liblzma/common/common.c @@ -328,6 +328,22 @@ lzma_end(lzma_stream *strm) } +extern LZMA_API(void) +lzma_get_progress(lzma_stream *strm, + uint64_t *progress_in, uint64_t *progress_out) +{ + if (strm->internal->next.get_progress != NULL) { + strm->internal->next.get_progress(strm->internal->next.coder, + progress_in, progress_out); + } else { + *progress_in = strm->total_in; + *progress_out = strm->total_out; + } + + return; +} + + extern LZMA_API(lzma_check) lzma_get_check(const lzma_stream *strm) { diff --git a/src/liblzma/common/common.h b/src/liblzma/common/common.h index 86c5f02c..b71254d0 100644 --- a/src/liblzma/common/common.h +++ b/src/liblzma/common/common.h @@ -155,6 +155,11 @@ struct lzma_next_coder_s { /// lzma_next_coder.coder. lzma_end_function end; + /// Pointer to a function to get progress information. If this is NULL, + /// lzma_stream.total_in and .total_out are used instead. + void (*get_progress)(lzma_coder *coder, + uint64_t *progress_in, uint64_t *progress_out); + /// Pointer to function to return the type of the integrity check. /// Most coders won't support this. lzma_check (*get_check)(const lzma_coder *coder); @@ -180,6 +185,7 @@ struct lzma_next_coder_s { .id = LZMA_VLI_UNKNOWN, \ .code = NULL, \ .end = NULL, \ + .get_progress = NULL, \ .get_check = NULL, \ .memconfig = NULL, \ .update = NULL, \ diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c index 4c7e1bc2..3199cf80 100644 --- a/src/liblzma/common/stream_encoder_mt.c +++ b/src/liblzma/common/stream_encoder_mt.c @@ -71,6 +71,12 @@ struct worker_thread_s { /// allocator before calling lzma_end(). const lzma_allocator *allocator; + /// Amount of uncompressed data that has already been compressed. + uint64_t progress_in; + + /// Amount of compressed data that is ready. + uint64_t progress_out; + /// Block encoder lzma_next_coder block_encoder; @@ -157,6 +163,16 @@ struct lzma_coder_s { /// the new input from the application. worker_thread *thr; + + /// Amount of uncompressed data in Blocks that have already + /// been finished. + uint64_t progress_in; + + /// Amount of compressed data in Stream Header + Blocks that + /// have already been finished. + uint64_t progress_out; + + pthread_mutex_t mutex; mythread_cond cond; }; @@ -183,6 +199,9 @@ worker_error(worker_thread *thr, lzma_ret ret) static worker_state worker_encode(worker_thread *thr, worker_state state) { + assert(thr->progress_in == 0); + assert(thr->progress_out == 0); + // Set the Block options. thr->block_options = (lzma_block){ .version = 0, @@ -221,17 +240,22 @@ worker_encode(worker_thread *thr, worker_state state) do { mythread_sync(thr->mutex) { + // Store in_pos and out_pos into *thr so that + // an application may read them via + // lzma_get_progress() to get progress information. + // + // NOTE: These aren't updated when the encoding + // finishes. Instead, the final values are taken + // later from thr->outbuf. + thr->progress_in = in_pos; + thr->progress_out = thr->outbuf->size; + while (in_size == thr->in_size && thr->state == THR_RUN) pthread_cond_wait(&thr->cond, &thr->mutex); state = thr->state; in_size = thr->in_size; - - // TODO? Store in_pos and out_pos into *thr here - // so that the application may read them via - // some currently non-existing function to get - // progress information. } // Return if we were asked to stop or exit. @@ -329,6 +353,13 @@ worker_start(void *thr_ptr) // no errors occurred. thr->outbuf->finished = state == THR_FINISH; + // Update the main progress info. + thr->coder->progress_in + += thr->outbuf->uncompressed_size; + thr->coder->progress_out += thr->outbuf->size; + thr->progress_in = 0; + thr->progress_out = 0; + // Return this thread to the stack of free threads. thr->next = thr->coder->threads_free; thr->coder->threads_free = thr; @@ -417,6 +448,8 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) thr->state = THR_IDLE; thr->allocator = allocator; thr->coder = coder; + thr->progress_in = 0; + thr->progress_out = 0; thr->block_encoder = LZMA_NEXT_CODER_INIT; if (mythread_create(&thr->thread_id, &worker_start, thr)) @@ -695,6 +728,13 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator, &coder->index_encoder, allocator, coder->index)); coder->sequence = SEQ_INDEX; + + // Update the progress info to take the Index and + // Stream Footer into account. Those are very fast to encode + // so in terms of progress information they can be thought + // to be ready to be copied out. + coder->progress_out += lzma_index_size(coder->index) + + LZMA_STREAM_HEADER_SIZE; } // Fall through @@ -810,6 +850,28 @@ get_options(const lzma_mt *options, lzma_options_easy *opt_easy, } +static void +get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out) +{ + // Lock coder->mutex to prevent finishing threads from moving their + // progress info from the worker_thread structure to lzma_coder. + mythread_sync(coder->mutex) { + *progress_in = coder->progress_in; + *progress_out = coder->progress_out; + + for (size_t i = 0; i < coder->threads_initialized; ++i) { + mythread_sync(coder->threads[i].mutex) { + *progress_in += coder->threads[i].progress_in; + *progress_out += coder->threads[i] + .progress_out; + } + } + } + + return; +} + + static lzma_ret stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_mt *options) @@ -865,6 +927,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, next->code = &stream_encode_mt; next->end = &stream_encoder_mt_end; + next->get_progress = &get_progress; // next->update = &stream_encoder_mt_update; next->coder->filters[0].id = LZMA_VLI_UNKNOWN; @@ -941,6 +1004,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, next->coder->header_pos = 0; + // Progress info + next->coder->progress_in = 0; + next->coder->progress_out = LZMA_STREAM_HEADER_SIZE; + return LZMA_OK; } diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map index 6dd42880..93a40f43 100644 --- a/src/liblzma/liblzma.map +++ b/src/liblzma/liblzma.map @@ -97,6 +97,7 @@ global: XZ_5.1.2alpha { global: + lzma_get_progress; lzma_stream_encoder_mt; lzma_stream_encoder_mt_memusage; diff --git a/src/xz/message.c b/src/xz/message.c index bc16aedb..abdf0568 100644 --- a/src/xz/message.c +++ b/src/xz/message.c @@ -526,20 +526,26 @@ progress_elapsed(void) } -/// Get information about position in the stream. This is currently simple, -/// but it will become more complicated once we have multithreading support. +/// Get how much uncompressed and compressed data has been processed. static void progress_pos(uint64_t *in_pos, uint64_t *compressed_pos, uint64_t *uncompressed_pos) { - *in_pos = progress_strm->total_in; + uint64_t out_pos; + lzma_get_progress(progress_strm, in_pos, &out_pos); + + // It cannot have processed more input than it has been given. + assert(*in_pos <= progress_strm->total_in); + + // It cannot have produced more output than it claims to have ready. + assert(out_pos >= progress_strm->total_out); if (opt_mode == MODE_COMPRESS) { - *compressed_pos = progress_strm->total_out; - *uncompressed_pos = progress_strm->total_in; + *compressed_pos = out_pos; + *uncompressed_pos = *in_pos; } else { - *compressed_pos = progress_strm->total_in; - *uncompressed_pos = progress_strm->total_out; + *compressed_pos = *in_pos; + *uncompressed_pos = out_pos; } return; -- cgit v1.2.3