aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/liblzma/common/stream_decoder_mt.c77
1 files changed, 67 insertions, 10 deletions
diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c
index 1fd7dd85..47433de8 100644
--- a/src/liblzma/common/stream_decoder_mt.c
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -300,12 +300,25 @@ struct lzma_stream_coder {
/// Stream Padding is a multiple of four bytes.
bool concatenated;
+
/// When decoding concatenated Streams, this is true as long as we
/// are decoding the first Stream. This is needed to avoid misleading
/// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic
/// bytes.
bool first_stream;
+ /// This is used to track if the previous call to stream_decode_mt()
+ /// had output space (*out_pos < out_size) and managed to fill the
+ /// output buffer (*out_pos == out_size). This may be set to true
+ /// in read_output_and_wait(). This is read and then reset to false
+ /// at the beginning of stream_decode_mt().
+ ///
+ /// This is needed to support applications that call lzma_code() in
+ /// such a way that more input is provided only when lzma_code()
+ /// didn't fill the output buffer completely. Basically, this makes
+ /// it easier to convert such applications from single-threaded
+ /// decoder to multi-threaded decoder.
+ bool out_was_filled;
/// Write position in buffer[] and position in Stream Padding
size_t pos;
@@ -656,6 +669,7 @@ read_output_and_wait(struct lzma_stream_coder *coder,
do {
// Get as much output from the queue as is possible
// without blocking.
+ const size_t out_start = *out_pos;
do {
ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
@@ -683,6 +697,14 @@ read_output_and_wait(struct lzma_stream_coder *coder,
if (ret != LZMA_OK)
break;
+ // If the output buffer is now full but it wasn't full
+ // when this function was called, set out_was_filled.
+ // This way the next call to stream_decode_mt() knows
+ // that some output was produced and no output space
+ // remained in the previous call to stream_decode_mt().
+ if (*out_pos == out_size && *out_pos != out_start)
+ coder->out_was_filled = true;
+
// Check if any thread has indicated an error.
if (coder->thread_error != LZMA_OK) {
if (coder->pending_error == LZMA_OK)
@@ -949,11 +971,39 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
{
struct lzma_stream_coder *coder = coder_ptr;
- const size_t in_start = *in_pos;
-
mythread_condtime wait_abs;
bool has_blocked = false;
+ // Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should
+ // tell read_output_and_wait() to wait until it can fill the output
+ // buffer (or a timeout occurs). Two conditions must be met:
+ //
+ // (1) If the caller provided no new input. The reason for this
+ // can be, for example, the end of the file or that there is
+ // a pause in the input stream and more input is available
+ // a little later. In this situation we should wait for output
+ // because otherwise we would end up in a busy-waiting loop where
+ // we make no progress and the application just calls us again
+ // without providing any new input. This would then result in
+ // LZMA_BUF_ERROR even though more output would be available
+ // once the worker threads decode more data.
+ //
+ // (2) Even if (1) is true, we will not wait if the previous call to
+ // this function managed to produce some output and the output
+ // buffer became full. This is for compatibility with applications
+ // that call lzma_code() in such a way that new input is provided
+ // only when the output buffer didn't become full. Without this
+ // trick such applications would have bad performance (bad
+ // parallelization due to decoder not getting input fast enough).
+ //
+ // NOTE: Such loops might require that timeout is disabled (0)
+ // if they assume that output-not-full implies that all input has
+ // been consumed. If and only if timeout is enabled, we may return
+ // when output isn't full *and* not all input has been consumed.
+ const bool waiting_allowed = *in_pos == in_size
+ && !coder->out_was_filled;
+ coder->out_was_filled = false;
+
while (true)
switch (coder->sequence) {
case SEQ_STREAM_HEADER: {
@@ -1030,11 +1080,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// without a delay.
//
// On the other hand, if lzma_code() was called with
- // an empty input buffer (in_start == in_size), treat
- // it specially: try to fill the output buffer even
- // if it requires waiting for the worker threads to
- // provide output (timeout, if specified, can still
- // cause us to return).
+ // an empty input buffer(*), treat it specially: try
+ // to fill the output buffer even if it requires
+ // waiting for the worker threads to provide output
+ // (timeout, if specified, can still cause us to
+ // return).
//
// - This way the application will be able to get all
// data that can be decoded from the input provided
@@ -1049,11 +1099,15 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// anything and will return LZMA_OK immediately
// (coder->timeout is completely ignored).
//
+ // (*) See the comment at the beginning of this
+ // function how waiting_allowed is determined
+ // and why there is an exception to the rule
+ // of "called with an empty input buffer".
assert(*in_pos == in_size);
return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size,
- NULL, in_start == in_size,
+ NULL, waiting_allowed,
&wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) {
@@ -1403,10 +1457,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// Read output from the output queue. Just like in
// SEQ_BLOCK_HEADER, we wait to fill the output buffer
- // only if lzma_code() was called without providing any input.
+ // only if waiting_allowed was set to true in the beginning
+ // of this function (see the comment there).
return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size,
- NULL, in_start == in_size,
+ NULL, waiting_allowed,
&wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) {
@@ -1823,7 +1878,9 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0;
coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0;
coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0;
+
coder->first_stream = true;
+ coder->out_was_filled = false;
coder->pos = 0;
coder->threads_max = options->threads;