aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_decoder_mt.c
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2022-04-02 21:49:59 +0300
committerLasse Collin <lasse.collin@tukaani.org>2022-04-02 21:49:59 +0300
commite671bc8828b9c0c5406c3a22c541301d0eb54518 (patch)
tree28b0fe159991cb45fdf98c9bb70cc86c2875f3a3 /src/liblzma/common/stream_decoder_mt.c
parentUpdate THANKS. (diff)
downloadxz-e671bc8828b9c0c5406c3a22c541301d0eb54518.tar.xz
liblzma: Threaded decoder: Support zpipe.c-style decoding loop.
This makes it possible to call lzma_code() in a loop that only reads new input when lzma_code() didn't fill the output buffer completely. That isn't the calling style suggested by the liblzma example program 02_decompress.c so perhaps the usefulness of this feature is limited. Also, it is possible to write such a loop so that it works with the single-threaded decoder but not with the threaded decoder even after this commit, or so that it works only if lzma_mt.timeout = 0. The zlib tutorial <https://zlib.net/zlib_how.html> is a well-known example of a loop where more input is read only when output isn't full. Porting this as is to liblzma would work with the single-threaded decoder (if LZMA_CONCATENATED isn't used) but it wouldn't work with threaded decoder even after this commit because the loop assumes that no more output is possible when it cannot read more input ("if (strm.avail_in == 0) break;"). This cannot be fixed at liblzma side; the loop has to be modified at least a little. I'm adding this in any case because the actual code is simple and short and should have no harmful side-effects in other situations.
Diffstat (limited to 'src/liblzma/common/stream_decoder_mt.c')
-rw-r--r--src/liblzma/common/stream_decoder_mt.c77
1 files changed, 67 insertions, 10 deletions
diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c
index 1fd7dd85..47433de8 100644
--- a/src/liblzma/common/stream_decoder_mt.c
+++ b/src/liblzma/common/stream_decoder_mt.c
@@ -300,12 +300,25 @@ struct lzma_stream_coder {
/// Stream Padding is a multiple of four bytes.
bool concatenated;
+
/// When decoding concatenated Streams, this is true as long as we
/// are decoding the first Stream. This is needed to avoid misleading
/// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic
/// bytes.
bool first_stream;
+ /// This is used to track if the previous call to stream_decode_mt()
+ /// had output space (*out_pos < out_size) and managed to fill the
+ /// output buffer (*out_pos == out_size). This may be set to true
+ /// in read_output_and_wait(). This is read and then reset to false
+ /// at the beginning of stream_decode_mt().
+ ///
+ /// This is needed to support applications that call lzma_code() in
+ /// such a way that more input is provided only when lzma_code()
+ /// didn't fill the output buffer completely. Basically, this makes
+ /// it easier to convert such applications from single-threaded
+ /// decoder to multi-threaded decoder.
+ bool out_was_filled;
/// Write position in buffer[] and position in Stream Padding
size_t pos;
@@ -656,6 +669,7 @@ read_output_and_wait(struct lzma_stream_coder *coder,
do {
// Get as much output from the queue as is possible
// without blocking.
+ const size_t out_start = *out_pos;
do {
ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size,
@@ -683,6 +697,14 @@ read_output_and_wait(struct lzma_stream_coder *coder,
if (ret != LZMA_OK)
break;
+ // If the output buffer is now full but it wasn't full
+ // when this function was called, set out_was_filled.
+ // This way the next call to stream_decode_mt() knows
+ // that some output was produced and no output space
+ // remained in the previous call to stream_decode_mt().
+ if (*out_pos == out_size && *out_pos != out_start)
+ coder->out_was_filled = true;
+
// Check if any thread has indicated an error.
if (coder->thread_error != LZMA_OK) {
if (coder->pending_error == LZMA_OK)
@@ -949,11 +971,39 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
{
struct lzma_stream_coder *coder = coder_ptr;
- const size_t in_start = *in_pos;
-
mythread_condtime wait_abs;
bool has_blocked = false;
+ // Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should
+ // tell read_output_and_wait() to wait until it can fill the output
+ // buffer (or a timeout occurs). Two conditions must be met:
+ //
+ // (1) If the caller provided no new input. The reason for this
+ // can be, for example, the end of the file or that there is
+ // a pause in the input stream and more input is available
+ // a little later. In this situation we should wait for output
+ // because otherwise we would end up in a busy-waiting loop where
+ // we make no progress and the application just calls us again
+ // without providing any new input. This would then result in
+ // LZMA_BUF_ERROR even though more output would be available
+ // once the worker threads decode more data.
+ //
+ // (2) Even if (1) is true, we will not wait if the previous call to
+ // this function managed to produce some output and the output
+ // buffer became full. This is for compatibility with applications
+ // that call lzma_code() in such a way that new input is provided
+ // only when the output buffer didn't become full. Without this
+ // trick such applications would have bad performance (bad
+ // parallelization due to decoder not getting input fast enough).
+ //
+ // NOTE: Such loops might require that timeout is disabled (0)
+ // if they assume that output-not-full implies that all input has
+ // been consumed. If and only if timeout is enabled, we may return
+ // when output isn't full *and* not all input has been consumed.
+ const bool waiting_allowed = *in_pos == in_size
+ && !coder->out_was_filled;
+ coder->out_was_filled = false;
+
while (true)
switch (coder->sequence) {
case SEQ_STREAM_HEADER: {
@@ -1030,11 +1080,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// without a delay.
//
// On the other hand, if lzma_code() was called with
- // an empty input buffer (in_start == in_size), treat
- // it specially: try to fill the output buffer even
- // if it requires waiting for the worker threads to
- // provide output (timeout, if specified, can still
- // cause us to return).
+ // an empty input buffer(*), treat it specially: try
+ // to fill the output buffer even if it requires
+ // waiting for the worker threads to provide output
+ // (timeout, if specified, can still cause us to
+ // return).
//
// - This way the application will be able to get all
// data that can be decoded from the input provided
@@ -1049,11 +1099,15 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// anything and will return LZMA_OK immediately
// (coder->timeout is completely ignored).
//
+ // (*) See the comment at the beginning of this
+ // function how waiting_allowed is determined
+ // and why there is an exception to the rule
+ // of "called with an empty input buffer".
assert(*in_pos == in_size);
return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size,
- NULL, in_start == in_size,
+ NULL, waiting_allowed,
&wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) {
@@ -1403,10 +1457,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// Read output from the output queue. Just like in
// SEQ_BLOCK_HEADER, we wait to fill the output buffer
- // only if lzma_code() was called without providing any input.
+ // only if waiting_allowed was set to true in the beginning
+ // of this function (see the comment there).
return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size,
- NULL, in_start == in_size,
+ NULL, waiting_allowed,
&wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) {
@@ -1823,7 +1878,9 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0;
coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0;
coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0;
+
coder->first_stream = true;
+ coder->out_was_filled = false;
coder->pos = 0;
coder->threads_max = options->threads;