aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/liblzma/common/stream_encoder_mt.c')
-rw-r--r--src/liblzma/common/stream_encoder_mt.c77
1 files changed, 72 insertions, 5 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index 4c7e1bc2..3199cf80 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -71,6 +71,12 @@ struct worker_thread_s {
/// allocator before calling lzma_end().
const lzma_allocator *allocator;
+ /// Amount of uncompressed data that has already been compressed.
+ uint64_t progress_in;
+
+ /// Amount of compressed data that is ready.
+ uint64_t progress_out;
+
/// Block encoder
lzma_next_coder block_encoder;
@@ -157,6 +163,16 @@ struct lzma_coder_s {
/// the new input from the application.
worker_thread *thr;
+
+ /// Amount of uncompressed data in Blocks that have already
+ /// been finished.
+ uint64_t progress_in;
+
+ /// Amount of compressed data in Stream Header + Blocks that
+ /// have already been finished.
+ uint64_t progress_out;
+
+
pthread_mutex_t mutex;
mythread_cond cond;
};
@@ -183,6 +199,9 @@ worker_error(worker_thread *thr, lzma_ret ret)
static worker_state
worker_encode(worker_thread *thr, worker_state state)
{
+ assert(thr->progress_in == 0);
+ assert(thr->progress_out == 0);
+
// Set the Block options.
thr->block_options = (lzma_block){
.version = 0,
@@ -221,17 +240,22 @@ worker_encode(worker_thread *thr, worker_state state)
do {
mythread_sync(thr->mutex) {
+ // Store in_pos and out_pos into *thr so that
+ // an application may read them via
+ // lzma_get_progress() to get progress information.
+ //
+ // NOTE: These aren't updated when the encoding
+ // finishes. Instead, the final values are taken
+ // later from thr->outbuf.
+ thr->progress_in = in_pos;
+ thr->progress_out = thr->outbuf->size;
+
while (in_size == thr->in_size
&& thr->state == THR_RUN)
pthread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
-
- // TODO? Store in_pos and out_pos into *thr here
- // so that the application may read them via
- // some currently non-existing function to get
- // progress information.
}
// Return if we were asked to stop or exit.
@@ -329,6 +353,13 @@ worker_start(void *thr_ptr)
// no errors occurred.
thr->outbuf->finished = state == THR_FINISH;
+ // Update the main progress info.
+ thr->coder->progress_in
+ += thr->outbuf->uncompressed_size;
+ thr->coder->progress_out += thr->outbuf->size;
+ thr->progress_in = 0;
+ thr->progress_out = 0;
+
// Return this thread to the stack of free threads.
thr->next = thr->coder->threads_free;
thr->coder->threads_free = thr;
@@ -417,6 +448,8 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
thr->state = THR_IDLE;
thr->allocator = allocator;
thr->coder = coder;
+ thr->progress_in = 0;
+ thr->progress_out = 0;
thr->block_encoder = LZMA_NEXT_CODER_INIT;
if (mythread_create(&thr->thread_id, &worker_start, thr))
@@ -695,6 +728,13 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
&coder->index_encoder, allocator,
coder->index));
coder->sequence = SEQ_INDEX;
+
+ // Update the progress info to take the Index and
+ // Stream Footer into account. Those are very fast to encode
+ // so in terms of progress information they can be thought
+ // to be ready to be copied out.
+ coder->progress_out += lzma_index_size(coder->index)
+ + LZMA_STREAM_HEADER_SIZE;
}
// Fall through
@@ -810,6 +850,28 @@ get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
}
+static void
+get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
+{
+ // Lock coder->mutex to prevent finishing threads from moving their
+ // progress info from the worker_thread structure to lzma_coder.
+ mythread_sync(coder->mutex) {
+ *progress_in = coder->progress_in;
+ *progress_out = coder->progress_out;
+
+ for (size_t i = 0; i < coder->threads_initialized; ++i) {
+ mythread_sync(coder->threads[i].mutex) {
+ *progress_in += coder->threads[i].progress_in;
+ *progress_out += coder->threads[i]
+ .progress_out;
+ }
+ }
+ }
+
+ return;
+}
+
+
static lzma_ret
stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
const lzma_mt *options)
@@ -865,6 +927,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->code = &stream_encode_mt;
next->end = &stream_encoder_mt_end;
+ next->get_progress = &get_progress;
// next->update = &stream_encoder_mt_update;
next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
@@ -941,6 +1004,10 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
next->coder->header_pos = 0;
+ // Progress info
+ next->coder->progress_in = 0;
+ next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
+
return LZMA_OK;
}