aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2012-12-14 20:13:32 +0200
committerLasse Collin <lasse.collin@tukaani.org>2012-12-14 20:13:32 +0200
commite7b424d267a34803db8d92a3515528be2ed45abd (patch)
tree80da65be9f9c6b5488de6c1c24127738fad4bd3a
parentliblzma: Fix mythread_sync for nested locking. (diff)
downloadxz-e7b424d267a34803db8d92a3515528be2ed45abd.tar.xz
Make the progress indicator smooth in threaded mode.
This adds lzma_get_progress() to liblzma and takes advantage of it in xz. lzma_get_progress() collects progress information from the thread-specific structures so that fairly accurate progress information is available to applications. Adding a new function seemed to be a better way than making the information directly available in lzma_stream (like total_in and total_out are) because collecting the information requires locking mutexes. It's waste of time to do it more often than the up to date information is actually needed by an application.
-rw-r--r--src/liblzma/api/lzma/base.h22
-rw-r--r--src/liblzma/common/common.c16
-rw-r--r--src/liblzma/common/common.h6
-rw-r--r--src/liblzma/common/stream_encoder_mt.c77
-rw-r--r--src/liblzma/liblzma.map1
-rw-r--r--src/xz/message.c20
6 files changed, 129 insertions, 13 deletions
diff --git a/src/liblzma/api/lzma/base.h b/src/liblzma/api/lzma/base.h
index 66915348..ae194f0a 100644
--- a/src/liblzma/api/lzma/base.h
+++ b/src/liblzma/api/lzma/base.h
@@ -456,7 +456,8 @@ typedef struct lzma_internal_s lzma_internal;
*
* Application may modify the values of total_in and total_out as it wants.
* They are updated by liblzma to match the amount of data read and
- * written, but aren't used for anything else.
+ * written but aren't used for anything else except as a possible return
+ * values from lzma_get_progress().
*/
typedef struct {
const uint8_t *next_in; /**< Pointer to the next input byte. */
@@ -557,6 +558,25 @@ extern LZMA_API(void) lzma_end(lzma_stream *strm) lzma_nothrow;
/**
+ * \brief Get progress information
+ *
+ * In single-threaded mode, applications can get progress information from
+ * strm->total_in and strm->total_out. In multi-threaded mode this is less
+ * useful because a significant amount of both input and output data gets
+ * buffered internally by liblzma. This makes total_in and total_out give
+ * misleading information and also makes the progress indicator updates
+ * non-smooth.
+ *
+ * This function gives realistic progress information also in multi-threaded
+ * mode by taking into account the progress made by each thread. In
+ * single-threaded mode *progress_in and *progress_out are set to
+ * strm->total_in and strm->total_out, respectively.
+ */
+extern LZMA_API(void) lzma_get_progress(lzma_stream *strm,
+ uint64_t *progress_in, uint64_t *progress_out) lzma_nothrow;
+
+
+/**
* \brief Get the memory usage of decoder filter chain
*
* This function is currently supported only when *strm has been initialized
diff --git a/src/liblzma/common/common.c b/src/liblzma/common/common.c
index 5d4d2408..578d257a 100644
--- a/src/liblzma/common/common.c
+++ b/src/liblzma/common/common.c
@@ -328,6 +328,22 @@ lzma_end(lzma_stream *strm)
}
+extern LZMA_API(void)
+lzma_get_progress(lzma_stream *strm,
+ uint64_t *progress_in, uint64_t *progress_out)
+{
+ if (strm->internal->next.get_progress != NULL) {
+ strm->internal->next.get_progress(strm->internal->next.coder,
+ progress_in, progress_out);
+ } else {
+ *progress_in = strm->total_in;
+ *progress_out = strm->total_out;
+ }
+
+ return;
+}
+
+
extern LZMA_API(lzma_check)
lzma_get_check(const lzma_stream *strm)
{
diff --git a/src/liblzma/common/common.h b/src/liblzma/common/common.h
index 86c5f02c..b71254d0 100644
--- a/src/liblzma/common/common.h
+++ b/src/liblzma/common/common.h
@@ -155,6 +155,11 @@ struct lzma_next_coder_s {
/// lzma_next_coder.coder.
lzma_end_function end;
+ /// Pointer to a function to get progress information. If this is NULL,
+ /// lzma_stream.total_in and .total_out are used instead.
+ void (*get_progress)(lzma_coder *coder,
+ uint64_t *progress_in, uint64_t *progress_out);
+
/// Pointer to function to return the type of the integrity check.
/// Most coders won't support this.
lzma_check (*get_check)(const lzma_coder *coder);
@@ -180,6 +185,7 @@ struct lzma_next_coder_s {
.id = LZMA_VLI_UNKNOWN, \
.code = NULL, \
.end = NULL, \
+ .get_progress = NULL, \
.get_check = NULL, \
.memconfig = NULL, \
.update = NULL, \
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 4c7e1bc2..3199cf80 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -71,6 +71,12 @@ struct worker_thread_s {
/// allocator before calling lzma_end().
const lzma_allocator *allocator;
+ /// Amount of uncompressed data that has already been compressed.
+ uint64_t progress_in;
+
+ /// Amount of compressed data that is ready.
+ uint64_t progress_out;
+
/// Block encoder
lzma_next_coder block_encoder;
@@ -157,6 +163,16 @@ struct lzma_coder_s {
/// the new input from the application.
worker_thread *thr;
+
+ /// Amount of uncompressed data in Blocks that have already
+ /// been finished.
+ uint64_t progress_in;
+
+ /// Amount of compressed data in Stream Header + Blocks that
+ /// have already been finished.
+ uint64_t progress_out;
+
+
pthread_mutex_t mutex;
mythread_cond cond;
};
@@ -183,6 +199,9 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
worker_encode(worker_thread *thr, worker_state state)
{
+ assert(thr->progress_in == 0);
+ assert(thr->progress_out == 0);
+
// Set the Block options.
thr->block_options = (lzma_block){
.version = 0,
@@ -221,17 +240,22 @@ worker_encode(worker_thread *thr, worker_state state)
do {
mythread_sync(thr->mutex) {
+ // Store in_pos and out_pos into *thr so that
+ // an application may read them via
+ // lzma_get_progress() to get progress information.
+ //
+ // NOTE: These aren't updated when the encoding
+ // finishes. Instead, the final values are taken
+ // later from thr->outbuf.
+ thr->progress_in = in_pos;
+ thr->progress_out = thr->outbuf->size;
+
while (in_size == thr->in_size
&& thr->state == THR_RUN)
pthread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
-
- // TODO? Store in_pos and out_pos into *thr here
- // so that the application may read them via
- // some currently non-existing function to get
- // progress information.
}
// Return if we were asked to stop or exit.
@@ -329,6 +353,13 @@ worker_start(void *thr_ptr)
// no errors occurred.
thr->outbuf->finished = state == THR_FINISH;
+ // Update the main progress info.
+ thr->coder->progress_in
+ += thr->outbuf->uncompressed_size;
+ thr->coder->progress_out += thr->outbuf->size;
+ thr->progress_in = 0;
+ thr->progress_out = 0;
+
// Return this thread to the stack of free threads.
thr->next = thr->coder->threads_free;
thr->coder->threads_free = thr;
@@ -417,6 +448,8 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
thr->state = THR_IDLE;
thr->allocator = allocator;
thr->coder = coder;
+ thr->progress_in = 0;
+ thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT;
if (mythread_create(&thr->thread_id, &worker_start, thr))
@@ -695,6 +728,13 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
&coder->index_encoder, allocator,
coder->index));
coder->sequence = SEQ_INDEX;
+
+ // Update the progress info to take the Index and
+ // Stream Footer into account. Those are very fast to encode
+ // so in terms of progress information they can be thought
+ // to be ready to be copied out.
+ coder->progress_out += lzma_index_size(coder->index)
+ + LZMA_STREAM_HEADER_SIZE;
}
// Fall through
@@ -810,6 +850,28 @@ get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
}
+static void
+get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
+{
+ // Lock coder->mutex to prevent finishing threads from moving their
+ // progress info from the worker_thread structure to lzma_coder.
+ mythread_sync(coder->mutex) {
+ *progress_in = coder->progress_in;
+ *progress_out = coder->progress_out;
+
+ for (size_t i = 0; i < coder->threads_initialized; ++i) {
+ mythread_sync(coder->threads[i].mutex) {
+ *progress_in += coder->threads[i].progress_in;
+ *progress_out += coder->threads[i]
+ .progress_out;
+ }
+ }
+ }
+
+ return;
+}
+
+
static lzma_ret
stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
const lzma_mt *options)
@@ -865,6 +927,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end;
+ next->get_progress = &get_progress;
// next->update = &stream_encoder_mt_update;
next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
@@ -941,6 +1004,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->coder->header_pos = 0;
+ // Progress info
+ next->coder->progress_in = 0;
+ next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
+
return LZMA_OK;
}
diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map
index 6dd42880..93a40f43 100644
--- a/src/liblzma/liblzma.map
+++ b/src/liblzma/liblzma.map
@@ -97,6 +97,7 @@ global:
XZ_5.1.2alpha {
global:
+ lzma_get_progress;
lzma_stream_encoder_mt;
lzma_stream_encoder_mt_memusage;
diff --git a/src/xz/message.c b/src/xz/message.c
index bc16aedb..abdf0568 100644
--- a/src/xz/message.c
+++ b/src/xz/message.c
@@ -526,20 +526,26 @@ progress_elapsed(void)
}
-/// Get information about position in the stream. This is currently simple,
-/// but it will become more complicated once we have multithreading support.
+/// Get how much uncompressed and compressed data has been processed.
static void
progress_pos(uint64_t *in_pos,
uint64_t *compressed_pos, uint64_t *uncompressed_pos)
{
- *in_pos = progress_strm->total_in;
+ uint64_t out_pos;
+ lzma_get_progress(progress_strm, in_pos, &out_pos);
+
+ // It cannot have processed more input than it has been given.
+ assert(*in_pos <= progress_strm->total_in);
+
+ // It cannot have produced more output than it claims to have ready.
+ assert(out_pos >= progress_strm->total_out);
if (opt_mode == MODE_COMPRESS) {
- *compressed_pos = progress_strm->total_out;
- *uncompressed_pos = progress_strm->total_in;
+ *compressed_pos = out_pos;
+ *uncompressed_pos = *in_pos;
} else {
- *compressed_pos = progress_strm->total_in;
- *uncompressed_pos = progress_strm->total_out;
+ *compressed_pos = *in_pos;
+ *uncompressed_pos = out_pos;
}
return;