diff options
Diffstat (limited to 'src/liblzma/common')
-rw-r--r-- | src/liblzma/common/stream_encoder_mt.c | 83 |
1 files changed, 36 insertions, 47 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c index f9bd6a10..23167d00 100644 --- a/src/liblzma/common/stream_encoder_mt.c +++ b/src/liblzma/common/stream_encoder_mt.c @@ -87,12 +87,12 @@ struct worker_thread_s { /// Next structure in the stack of free worker threads. worker_thread *next; - pthread_mutex_t mutex; - pthread_cond_t cond; + mythread_mutex mutex; + mythread_cond cond; /// The ID of this thread is used to join the thread /// when it's not needed anymore. - pthread_t thread_id; + mythread thread_id; }; @@ -133,12 +133,9 @@ struct lzma_coder_s { lzma_outq outq; - /// True if wait_max is used. - bool has_timeout; - /// Maximum wait time if cannot use all the input and cannot - /// fill the output buffer. - struct timespec wait_max; + /// fill the output buffer. This is in milliseconds. + uint32_t timeout; /// Error code from a worker thread @@ -174,7 +171,7 @@ struct lzma_coder_s { uint64_t progress_out; - pthread_mutex_t mutex; + mythread_mutex mutex; mythread_cond cond; }; @@ -253,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state) while (in_size == thr->in_size && thr->state == THR_RUN) - pthread_cond_wait(&thr->cond, &thr->mutex); + mythread_cond_wait(&thr->cond, &thr->mutex); state = thr->state; in_size = thr->in_size; @@ -305,7 +302,7 @@ worker_encode(worker_thread *thr, worker_state state) // First wait that we have gotten all the input. mythread_sync(thr->mutex) { while (thr->state == THR_RUN) - pthread_cond_wait(&thr->cond, &thr->mutex); + mythread_cond_wait(&thr->cond, &thr->mutex); state = thr->state; in_size = thr->in_size; @@ -344,7 +341,7 @@ worker_encode(worker_thread *thr, worker_state state) } -static void * +static MYTHREAD_RET_TYPE worker_start(void *thr_ptr) { worker_thread *thr = thr_ptr; @@ -358,14 +355,14 @@ worker_start(void *thr_ptr) // requested to stop, just set the state. if (thr->state == THR_STOP) { thr->state = THR_IDLE; - pthread_cond_signal(&thr->cond); + mythread_cond_signal(&thr->cond); } state = thr->state; if (state != THR_IDLE) break; - pthread_cond_wait(&thr->cond, &thr->mutex); + mythread_cond_wait(&thr->cond, &thr->mutex); } } @@ -384,7 +381,7 @@ worker_start(void *thr_ptr) mythread_sync(thr->mutex) { if (thr->state != THR_EXIT) { thr->state = THR_IDLE; - pthread_cond_signal(&thr->cond); + mythread_cond_signal(&thr->cond); } } @@ -409,12 +406,12 @@ worker_start(void *thr_ptr) } // Exiting, free the resources. - pthread_mutex_destroy(&thr->mutex); - pthread_cond_destroy(&thr->cond); + mythread_mutex_destroy(&thr->mutex); + mythread_cond_destroy(&thr->cond); lzma_next_end(&thr->block_encoder, thr->allocator); lzma_free(thr->in, thr->allocator); - return NULL; + return MYTHREAD_RET_VALUE; } @@ -426,7 +423,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads) for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { coder->threads[i].state = THR_STOP; - pthread_cond_signal(&coder->threads[i].cond); + mythread_cond_signal(&coder->threads[i].cond); } } @@ -437,7 +434,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads) for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { while (coder->threads[i].state != THR_IDLE) - pthread_cond_wait(&coder->threads[i].cond, + mythread_cond_wait(&coder->threads[i].cond, &coder->threads[i].mutex); } } @@ -454,12 +451,12 @@ threads_end(lzma_coder *coder, const lzma_allocator *allocator) for (uint32_t i = 0; i < coder->threads_initialized; ++i) { mythread_sync(coder->threads[i].mutex) { coder->threads[i].state = THR_EXIT; - pthread_cond_signal(&coder->threads[i].cond); + mythread_cond_signal(&coder->threads[i].cond); } } for (uint32_t i = 0; i < coder->threads_initialized; ++i) { - int ret = pthread_join(coder->threads[i].thread_id, NULL); + int ret = mythread_join(coder->threads[i].thread_id); assert(ret == 0); (void)ret; } @@ -479,10 +476,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) if (thr->in == NULL) return LZMA_MEM_ERROR; - if (pthread_mutex_init(&thr->mutex, NULL)) + if (mythread_mutex_init(&thr->mutex)) goto error_mutex; - if (pthread_cond_init(&thr->cond, NULL)) + if (mythread_cond_init(&thr->cond)) goto error_cond; thr->state = THR_IDLE; @@ -501,10 +498,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator) return LZMA_OK; error_thread: - pthread_cond_destroy(&thr->cond); + mythread_cond_destroy(&thr->cond); error_cond: - pthread_mutex_destroy(&thr->mutex); + mythread_mutex_destroy(&thr->mutex); error_mutex: lzma_free(thr->in, allocator); @@ -543,7 +540,7 @@ get_thread(lzma_coder *coder, const lzma_allocator *allocator) coder->thr->state = THR_RUN; coder->thr->in_size = 0; coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); - pthread_cond_signal(&coder->thr->cond); + mythread_cond_signal(&coder->thr->cond); } return LZMA_OK; @@ -594,7 +591,7 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator, if (finish) coder->thr->state = THR_FINISH; - pthread_cond_signal(&coder->thr->cond); + mythread_cond_signal(&coder->thr->cond); } } @@ -619,21 +616,20 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator, /// Wait until more input can be consumed, more output can be read, or /// an optional timeout is reached. static bool -wait_for_work(lzma_coder *coder, struct timespec *wait_abs, +wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs, bool *has_blocked, bool has_input) { - if (coder->has_timeout && !*has_blocked) { + if (coder->timeout != 0 && !*has_blocked) { // Every time when stream_encode_mt() is called via - // lzma_code(), *has_block starts as false. We set it + // lzma_code(), *has_blocked starts as false. We set it // to true here and calculate the absolute time when // we must return if there's nothing to do. // // The idea of *has_blocked is to avoid unneeded calls - // to mythread_cond_abstime(), which may do a syscall + // to mythread_condtime_set(), which may do a syscall // depending on the operating system. *has_blocked = true; - *wait_abs = coder->wait_max; - mythread_cond_abstime(&coder->cond, wait_abs); + mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); } bool timed_out = false; @@ -651,7 +647,7 @@ wait_for_work(lzma_coder *coder, struct timespec *wait_abs, && !lzma_outq_is_readable(&coder->outq) && coder->thread_error == LZMA_OK && !timed_out) { - if (coder->has_timeout) + if (coder->timeout != 0) timed_out = mythread_cond_timedwait( &coder->cond, &coder->mutex, wait_abs) != 0; @@ -692,7 +688,7 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator, // These are for wait_for_work(). bool has_blocked = false; - struct timespec wait_abs; + mythread_condtime wait_abs; while (true) { mythread_sync(coder->mutex) { @@ -828,7 +824,7 @@ stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator) lzma_index_end(coder->index, allocator); mythread_cond_destroy(&coder->cond); - pthread_mutex_destroy(&coder->mutex); + mythread_mutex_destroy(&coder->mutex); lzma_free(coder, allocator); return; @@ -949,14 +945,14 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // the error handling has to be done here because // stream_encoder_mt_end() doesn't know if they have // already been initialized or not. - if (pthread_mutex_init(&next->coder->mutex, NULL)) { + if (mythread_mutex_init(&next->coder->mutex)) { lzma_free(next->coder, allocator); next->coder = NULL; return LZMA_MEM_ERROR; } if (mythread_cond_init(&next->coder->cond)) { - pthread_mutex_destroy(&next->coder->mutex); + mythread_mutex_destroy(&next->coder->mutex); lzma_free(next->coder, allocator); next->coder = NULL; return LZMA_MEM_ERROR; @@ -1011,14 +1007,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, outbuf_size_max, options->threads)); // Timeout - if (options->timeout > 0) { - next->coder->wait_max.tv_sec = options->timeout / 1000; - next->coder->wait_max.tv_nsec - = (options->timeout % 1000) * 1000000L; - next->coder->has_timeout = true; - } else { - next->coder->has_timeout = false; - } + next->coder->timeout = options->timeout; // Free the old filter chain and copy the new one. for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) |