aboutsummaryrefslogtreecommitdiff
path: root/src/liblzma/common/stream_encoder_mt.c
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2013-09-17 11:52:28 +0300
committerLasse Collin <lasse.collin@tukaani.org>2013-09-17 11:52:28 +0300
commit6b44b4a775fe29ecc7bcb7996e086e3bc09e5fd0 (patch)
tree09c42c60abcdf0acde7d83c89d695d3572e17ab7 /src/liblzma/common/stream_encoder_mt.c
parentBuild: Remove a comment about Automake 1.10 from configure.ac. (diff)
downloadxz-6b44b4a775fe29ecc7bcb7996e086e3bc09e5fd0.tar.xz
Add native threading support on Windows.
Now liblzma only uses "mythread" functions and types which are defined in mythread.h matching the desired threading method. Before Windows Vista, there is no direct equivalent to pthread condition variables. Since this package doesn't use pthread_cond_broadcast(), pre-Vista threading can still be kept quite simple. The pre-Vista code doesn't use anything that wasn't already available in Windows 95, so the binaries should run even on Windows 95 if someone happens to care.
Diffstat (limited to 'src/liblzma/common/stream_encoder_mt.c')
-rw-r--r--src/liblzma/common/stream_encoder_mt.c83
1 files changed, 36 insertions, 47 deletions
diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c
index f9bd6a10..23167d00 100644
--- a/src/liblzma/common/stream_encoder_mt.c
+++ b/src/liblzma/common/stream_encoder_mt.c
@@ -87,12 +87,12 @@ struct worker_thread_s {
/// Next structure in the stack of free worker threads.
worker_thread *next;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
+ mythread_mutex mutex;
+ mythread_cond cond;
/// The ID of this thread is used to join the thread
/// when it's not needed anymore.
- pthread_t thread_id;
+ mythread thread_id;
};
@@ -133,12 +133,9 @@ struct lzma_coder_s {
lzma_outq outq;
- /// True if wait_max is used.
- bool has_timeout;
-
/// Maximum wait time if cannot use all the input and cannot
- /// fill the output buffer.
- struct timespec wait_max;
+ /// fill the output buffer. This is in milliseconds.
+ uint32_t timeout;
/// Error code from a worker thread
@@ -174,7 +171,7 @@ struct lzma_coder_s {
uint64_t progress_out;
- pthread_mutex_t mutex;
+ mythread_mutex mutex;
mythread_cond cond;
};
@@ -253,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state)
while (in_size == thr->in_size
&& thr->state == THR_RUN)
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
@@ -305,7 +302,7 @@ worker_encode(worker_thread *thr, worker_state state)
// First wait that we have gotten all the input.
mythread_sync(thr->mutex) {
while (thr->state == THR_RUN)
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
state = thr->state;
in_size = thr->in_size;
@@ -344,7 +341,7 @@ worker_encode(worker_thread *thr, worker_state state)
}
-static void *
+static MYTHREAD_RET_TYPE
worker_start(void *thr_ptr)
{
worker_thread *thr = thr_ptr;
@@ -358,14 +355,14 @@ worker_start(void *thr_ptr)
// requested to stop, just set the state.
if (thr->state == THR_STOP) {
thr->state = THR_IDLE;
- pthread_cond_signal(&thr->cond);
+ mythread_cond_signal(&thr->cond);
}
state = thr->state;
if (state != THR_IDLE)
break;
- pthread_cond_wait(&thr->cond, &thr->mutex);
+ mythread_cond_wait(&thr->cond, &thr->mutex);
}
}
@@ -384,7 +381,7 @@ worker_start(void *thr_ptr)
mythread_sync(thr->mutex) {
if (thr->state != THR_EXIT) {
thr->state = THR_IDLE;
- pthread_cond_signal(&thr->cond);
+ mythread_cond_signal(&thr->cond);
}
}
@@ -409,12 +406,12 @@ worker_start(void *thr_ptr)
}
// Exiting, free the resources.
- pthread_mutex_destroy(&thr->mutex);
- pthread_cond_destroy(&thr->cond);
+ mythread_mutex_destroy(&thr->mutex);
+ mythread_cond_destroy(&thr->cond);
lzma_next_end(&thr->block_encoder, thr->allocator);
lzma_free(thr->in, thr->allocator);
- return NULL;
+ return MYTHREAD_RET_VALUE;
}
@@ -426,7 +423,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_STOP;
- pthread_cond_signal(&coder->threads[i].cond);
+ mythread_cond_signal(&coder->threads[i].cond);
}
}
@@ -437,7 +434,7 @@ threads_stop(lzma_coder *coder, bool wait_for_threads)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
while (coder->threads[i].state != THR_IDLE)
- pthread_cond_wait(&coder->threads[i].cond,
+ mythread_cond_wait(&coder->threads[i].cond,
&coder->threads[i].mutex);
}
}
@@ -454,12 +451,12 @@ threads_end(lzma_coder *coder, const lzma_allocator *allocator)
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
mythread_sync(coder->threads[i].mutex) {
coder->threads[i].state = THR_EXIT;
- pthread_cond_signal(&coder->threads[i].cond);
+ mythread_cond_signal(&coder->threads[i].cond);
}
}
for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
- int ret = pthread_join(coder->threads[i].thread_id, NULL);
+ int ret = mythread_join(coder->threads[i].thread_id);
assert(ret == 0);
(void)ret;
}
@@ -479,10 +476,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
if (thr->in == NULL)
return LZMA_MEM_ERROR;
- if (pthread_mutex_init(&thr->mutex, NULL))
+ if (mythread_mutex_init(&thr->mutex))
goto error_mutex;
- if (pthread_cond_init(&thr->cond, NULL))
+ if (mythread_cond_init(&thr->cond))
goto error_cond;
thr->state = THR_IDLE;
@@ -501,10 +498,10 @@ initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
return LZMA_OK;
error_thread:
- pthread_cond_destroy(&thr->cond);
+ mythread_cond_destroy(&thr->cond);
error_cond:
- pthread_mutex_destroy(&thr->mutex);
+ mythread_mutex_destroy(&thr->mutex);
error_mutex:
lzma_free(thr->in, allocator);
@@ -543,7 +540,7 @@ get_thread(lzma_coder *coder, const lzma_allocator *allocator)
coder->thr->state = THR_RUN;
coder->thr->in_size = 0;
coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
- pthread_cond_signal(&coder->thr->cond);
+ mythread_cond_signal(&coder->thr->cond);
}
return LZMA_OK;
@@ -594,7 +591,7 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
if (finish)
coder->thr->state = THR_FINISH;
- pthread_cond_signal(&coder->thr->cond);
+ mythread_cond_signal(&coder->thr->cond);
}
}
@@ -619,21 +616,20 @@ stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
/// Wait until more input can be consumed, more output can be read, or
/// an optional timeout is reached.
static bool
-wait_for_work(lzma_coder *coder, struct timespec *wait_abs,
+wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
bool *has_blocked, bool has_input)
{
- if (coder->has_timeout && !*has_blocked) {
+ if (coder->timeout != 0 && !*has_blocked) {
// Every time when stream_encode_mt() is called via
- // lzma_code(), *has_block starts as false. We set it
+ // lzma_code(), *has_blocked starts as false. We set it
// to true here and calculate the absolute time when
// we must return if there's nothing to do.
//
// The idea of *has_blocked is to avoid unneeded calls
- // to mythread_cond_abstime(), which may do a syscall
+ // to mythread_condtime_set(), which may do a syscall
// depending on the operating system.
*has_blocked = true;
- *wait_abs = coder->wait_max;
- mythread_cond_abstime(&coder->cond, wait_abs);
+ mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
}
bool timed_out = false;
@@ -651,7 +647,7 @@ wait_for_work(lzma_coder *coder, struct timespec *wait_abs,
&& !lzma_outq_is_readable(&coder->outq)
&& coder->thread_error == LZMA_OK
&& !timed_out) {
- if (coder->has_timeout)
+ if (coder->timeout != 0)
timed_out = mythread_cond_timedwait(
&coder->cond, &coder->mutex,
wait_abs) != 0;
@@ -692,7 +688,7 @@ stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
// These are for wait_for_work().
bool has_blocked = false;
- struct timespec wait_abs;
+ mythread_condtime wait_abs;
while (true) {
mythread_sync(coder->mutex) {
@@ -828,7 +824,7 @@ stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
lzma_index_end(coder->index, allocator);
mythread_cond_destroy(&coder->cond);
- pthread_mutex_destroy(&coder->mutex);
+ mythread_mutex_destroy(&coder->mutex);
lzma_free(coder, allocator);
return;
@@ -949,14 +945,14 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
// the error handling has to be done here because
// stream_encoder_mt_end() doesn't know if they have
// already been initialized or not.
- if (pthread_mutex_init(&next->coder->mutex, NULL)) {
+ if (mythread_mutex_init(&next->coder->mutex)) {
lzma_free(next->coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
}
if (mythread_cond_init(&next->coder->cond)) {
- pthread_mutex_destroy(&next->coder->mutex);
+ mythread_mutex_destroy(&next->coder->mutex);
lzma_free(next->coder, allocator);
next->coder = NULL;
return LZMA_MEM_ERROR;
@@ -1011,14 +1007,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
outbuf_size_max, options->threads));
// Timeout
- if (options->timeout > 0) {
- next->coder->wait_max.tv_sec = options->timeout / 1000;
- next->coder->wait_max.tv_nsec
- = (options->timeout % 1000) * 1000000L;
- next->coder->has_timeout = true;
- } else {
- next->coder->has_timeout = false;
- }
+ next->coder->timeout = options->timeout;
// Free the old filter chain and copy the new one.
for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)