aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/liblzma/common/stream_encoder_mt.c83
1 files changed, 36 insertions, 47 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index f9bd6a10..23167d00 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -87,12 +87,12 @@ struct worker_thread_s {
/// Next structure in the stack of free worker threads.
worker_thread *next;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
+ mythread_mutex mutex;
+ mythread_cond cond;
/// The ID of this thread is used to join the thread
/// when it's not needed anymore.
- pthread_t thread_id;
+ mythread thread_id;
};
@@ -133,12 +133,9 @@ struct lzma_coder_s {
lzma_outq outq;
- /// True if wait_max is used.
- bool has_timeout;
-
/// Maximum wait time if cannot use all the input and cannot
- /// fill the output buffer.
- struct timespec wait_max;
+ /// fill the output buffer. This is in milliseconds.
+ uint32_t timeout;
/// Error code from a worker thread
@@ -174,7 +171,7 @@ struct lzma_coder_s {
uint64_t progress_out;
- pthread_mutex_t mutex;
+ mythread_mutex mutex;
mythread_cond cond;
};
@@ -253,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
while (in_size == thr->in_size
&& thr->state == THR_RUN)
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
@@ -305,7 +302,7 @@ worker_encode(worker_thread *thr, worker_state state)
// First wait that we have gotten all the input.
mythread_sync(thr->mutex) {
while (thr->state == THR_RUN)
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
@@ -344,7 +341,7 @@ worker_encode(worker_thread *thr, worker_state state)
}
-static void *
+static MYTHREAD_RET_TYPE
worker_start(void *thr_ptr)
{
worker_thread *thr = thr_ptr;
@@ -358,14 +355,14 @@ worker_start(void *thr_ptr)
// requested to stop, just set the state.
if (thr->state == THR_STOP) {
thr->state = THR_IDLE;
- pthread_cond_signal(&thr->cond);
+ mythread_cond_signal(&thr->cond);
}
state = thr->state;
if (state != THR_IDLE)
break;
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
}
}
@@ -384,7 +381,7 @@ worker_start(void *thr_ptr)
mythread_sync(thr->mutex) {
if (thr->state != THR_EXIT) {
thr->state = THR_IDLE;
- pthread_cond_signal(&thr->cond);
+ mythread_cond_signal(&thr->cond);
}
}
@@ -409,12 +406,12 @@ worker_start(void *thr_ptr)
}
// Exiting, free the resources.
- pthread_mutex_destroy(&thr->mutex);
- pthread_cond_destroy(&thr->cond);
+ mythread_mutex_destroy(&thr->mutex);
+ mythread_cond_destroy(&thr->cond);
lzma_next_end(&thr->block_encoder, thr->allocator);
lzma_free(thr->in, thr->allocator);
- return NULL;
+ return MYTHREAD_RET_VALUE;
}
@@ -426,7 +423,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_STOP;
- pthread_cond_signal(&coder->threads[i].cond);
+ mythread_cond_signal(&coder->threads[i].cond);
}
}
@@ -437,7 +434,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
while (coder->threads[i].state != THR_IDLE)
- pthread_cond_wait(&coder->threads[i].cond,
+ mythread_cond_wait(&coder->threads[i].cond,
&coder->threads[i].mutex);
}
}
@@ -454,12 +451,12 @@ threads_end(lzma_coder *coder, const lzma_allocator *allocator)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_EXIT;
- pthread_cond_signal(&coder->threads[i].cond);
+ mythread_cond_signal(&coder->threads[i].cond);
}
}
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
- int ret = pthread_join(coder->threads[i].thread_id, NULL);
+ int ret = mythread_join(coder->threads[i].thread_id);
assert(ret == 0);
(void)ret;
}
@@ -479,10 +476,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
if (thr->in == NULL)
return LZMA_MEM_ERROR;
- if (pthread_mutex_init(&thr->mutex, NULL))
+ if (mythread_mutex_init(&thr->mutex))
goto error_mutex;
- if (pthread_cond_init(&thr->cond, NULL))
+ if (mythread_cond_init(&thr->cond))
goto error_cond;
thr->state = THR_IDLE;
@@ -501,10 +498,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
return LZMA_OK;
error_thread:
- pthread_cond_destroy(&thr->cond);
+ mythread_cond_destroy(&thr->cond);
error_cond:
- pthread_mutex_destroy(&thr->mutex);
+ mythread_mutex_destroy(&thr->mutex);
error_mutex:
lzma_free(thr->in, allocator);
@@ -543,7 +540,7 @@ get_thread(lzma_coder *coder, const lzma_allocator *allocator)
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
- pthread_cond_signal(&coder->thr->cond);
+ mythread_cond_signal(&coder->thr->cond);
}
return LZMA_OK;
@@ -594,7 +591,7 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
if (finish)
coder->thr->state = THR_FINISH;
- pthread_cond_signal(&coder->thr->cond);
+ mythread_cond_signal(&coder->thr->cond);
}
}
@@ -619,21 +616,20 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
/// Wait until more input can be consumed, more output can be read, or
/// an optional timeout is reached.
static bool
-wait_for_work(lzma_coder *coder, struct timespec *wait_abs,
+wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
bool *has_blocked, bool has_input)
{
- if (coder->has_timeout && !*has_blocked) {
+ if (coder->timeout != 0 && !*has_blocked) {
// Every time when stream_encode_mt() is called via
- // lzma_code(), *has_block starts as false. We set it
+ // lzma_code(), *has_blocked starts as false. We set it
// to true here and calculate the absolute time when
// we must return if there's nothing to do.
//
// The idea of *has_blocked is to avoid unneeded calls
- // to mythread_cond_abstime(), which may do a syscall
+ // to mythread_condtime_set(), which may do a syscall
// depending on the operating system.
*has_blocked = true;
- *wait_abs = coder->wait_max;
- mythread_cond_abstime(&coder->cond, wait_abs);
+ mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
}
bool timed_out = false;
@@ -651,7 +647,7 @@ wait_for_work(lzma_coder *coder, struct timespec *wait_abs,
&& !lzma_outq_is_readable(&coder->outq)
&& coder->thread_error == LZMA_OK
&& !timed_out) {
- if (coder->has_timeout)
+ if (coder->timeout != 0)
timed_out = mythread_cond_timedwait(
&coder->cond, &coder->mutex,
wait_abs) != 0;
@@ -692,7 +688,7 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
// These are for wait_for_work().
bool has_blocked = false;
- struct timespec wait_abs;
+ mythread_condtime wait_abs;
while (true) {
mythread_sync(coder->mutex) {
@@ -828,7 +824,7 @@ stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
lzma_index_end(coder->index, allocator);
mythread_cond_destroy(&coder->cond);
- pthread_mutex_destroy(&coder->mutex);
+ mythread_mutex_destroy(&coder->mutex);
lzma_free(coder, allocator);
return;
@@ -949,14 +945,14 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// the error handling has to be done here because
// stream_encoder_mt_end() doesn't know if they have
// already been initialized or not.
- if (pthread_mutex_init(&next->coder->mutex, NULL)) {
+ if (mythread_mutex_init(&next->coder->mutex)) {
lzma_free(next->coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
if (mythread_cond_init(&next->coder->cond)) {
- pthread_mutex_destroy(&next->coder->mutex);
+ mythread_mutex_destroy(&next->coder->mutex);
lzma_free(next->coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
@@ -1011,14 +1007,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
outbuf_size_max, options->threads));
// Timeout
- if (options->timeout > 0) {
- next->coder->wait_max.tv_sec = options->timeout / 1000;
- next->coder->wait_max.tv_nsec
- = (options->timeout % 1000) * 1000000L;
- next->coder->has_timeout = true;
- } else {
- next->coder->has_timeout = false;
- }
+ next->coder->timeout = options->timeout;
// Free the old filter chain and copy the new one.
for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)