aboutsummaryrefslogtreecommitdiff
path: root/src/lzma/process.c
diff options
context:
space:
mode:
authorLasse Collin <lasse.collin@tukaani.org>2007-12-09 00:42:33 +0200
committerLasse Collin <lasse.collin@tukaani.org>2007-12-09 00:42:33 +0200
commit5d018dc03549c1ee4958364712fb0c94e1bf2741 (patch)
tree1b211911fb33fddb3f04b77f99e81df23623ffc4 /src/lzma/process.c
downloadxz-5d018dc03549c1ee4958364712fb0c94e1bf2741.tar.xz
Imported to git.
Diffstat (limited to 'src/lzma/process.c')
-rw-r--r--src/lzma/process.c458
1 files changed, 458 insertions, 0 deletions
diff --git a/src/lzma/process.c b/src/lzma/process.c
new file mode 100644
index 00000000..10a76b74
--- /dev/null
+++ b/src/lzma/process.c
@@ -0,0 +1,458 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+/// \file process.c
+/// \brief Compresses or uncompresses a file
+//
+// Copyright (C) 2007 Lasse Collin
+//
+// This program is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Lesser General Public
+// License as published by the Free Software Foundation; either
+// version 2.1 of the License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Lesser General Public License for more details.
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include "private.h"
+
+
+typedef struct {
+ lzma_stream strm;
+ void *options;
+
+ file_pair *pair;
+
+ /// We don't need this for *anything* but seems that at least with
+ /// glibc pthread_create() doesn't allow NULL.
+ pthread_t thread;
+
+ bool in_use;
+
+} thread_data;
+
+
+/// Number of available threads
+static size_t free_threads;
+
+/// Thread-specific data
+static thread_data *threads;
+
+static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
+
+/// Attributes of new coder threads. They are created in detached state.
+/// Coder threads signal to the service thread themselves when they are done.
+static pthread_attr_t thread_attr;
+
+
+//////////
+// Init //
+//////////
+
+extern void
+process_init(void)
+{
+ threads = malloc(sizeof(thread_data) * opt_threads);
+ if (threads == NULL) {
+ out_of_memory();
+ my_exit(ERROR);
+ }
+
+ for (size_t i = 0; i < opt_threads; ++i)
+ threads[i] = (thread_data){
+ .strm = LZMA_STREAM_INIT_VAR,
+ .options = NULL,
+ .pair = NULL,
+ .in_use = false,
+ };
+
+ if (pthread_attr_init(&thread_attr)
+ || pthread_attr_setdetachstate(
+ &thread_attr, PTHREAD_CREATE_DETACHED)) {
+ out_of_memory();
+ my_exit(ERROR);
+ }
+
+ free_threads = opt_threads;
+
+ return;
+}
+
+
+//////////////////////////
+// Thread-specific data //
+//////////////////////////
+
+static thread_data *
+get_thread_data(void)
+{
+ pthread_mutex_lock(&mutex);
+
+ while (free_threads == 0) {
+ pthread_cond_wait(&cond, &mutex);
+
+ if (user_abort) {
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
+ return NULL;
+ }
+ }
+
+ thread_data *t = threads;
+ while (t->in_use)
+ ++t;
+
+ t->in_use = true;
+ --free_threads;
+
+ pthread_mutex_unlock(&mutex);
+
+ return t;
+}
+
+
+static void
+release_thread_data(thread_data *t)
+{
+ pthread_mutex_lock(&mutex);
+
+ t->in_use = false;
+ ++free_threads;
+
+ pthread_cond_signal(&cond);
+ pthread_mutex_unlock(&mutex);
+
+ return;
+}
+
+
+static int
+create_thread(void *(*func)(thread_data *t), thread_data *t)
+{
+ if (opt_threads == 1) {
+ func(t);
+ } else {
+ const int err = pthread_create(&t->thread, &thread_attr,
+ (void *(*)(void *))(func), t);
+ if (err) {
+ errmsg(V_ERROR, _("Cannot create a thread: %s"),
+ strerror(err));
+ user_abort = 1;
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+
+/////////////////////////
+// One thread per file //
+/////////////////////////
+
+static int
+single_init(thread_data *t)
+{
+ lzma_ret ret;
+
+ if (opt_mode == MODE_COMPRESS) {
+ const lzma_vli uncompressed_size
+ = t->pair->src_fd != STDIN_FILENO
+ ? (lzma_vli)(t->pair->src_st.st_size)
+ : LZMA_VLI_VALUE_UNKNOWN;
+
+ // TODO Support Multi-Block Streams to store Extra.
+ if (opt_header == HEADER_ALONE) {
+ lzma_options_alone alone;
+ alone.uncompressed_size = uncompressed_size;
+ memcpy(&alone.lzma, opt_filters[0].options,
+ sizeof(alone.lzma));
+ ret = lzma_alone_encoder(&t->strm, &alone);
+ } else {
+ lzma_options_stream stream = {
+ .check = opt_check,
+ .has_crc32 = true,
+ .uncompressed_size = uncompressed_size,
+ .alignment = 0,
+ };
+ memcpy(stream.filters, opt_filters,
+ sizeof(stream.filters));
+ ret = lzma_stream_encoder_single(&t->strm, &stream);
+ }
+ } else {
+ // TODO Restrict file format if requested on the command line.
+ ret = lzma_auto_decoder(&t->strm, NULL, NULL);
+ }
+
+ if (ret != LZMA_OK) {
+ if (ret == LZMA_MEM_ERROR)
+ out_of_memory();
+ else
+ internal_error();
+
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static lzma_ret
+single_skip_padding(thread_data *t, uint8_t *in_buf)
+{
+ // Handle decoding of concatenated Streams. There can be arbitrary
+ // number of nul-byte padding between the Streams, which must be
+ // ignored.
+ //
+ // NOTE: Concatenating LZMA_Alone files works only if at least
+ // one of lc, lp, and pb is non-zero. Using the concatenation
+ // on LZMA_Alone files is strongly discouraged.
+ while (true) {
+ while (t->strm.avail_in > 0) {
+ if (*t->strm.next_in != '\0')
+ return LZMA_OK;
+
+ ++t->strm.next_in;
+ --t->strm.avail_in;
+ }
+
+ if (t->pair->src_eof)
+ return LZMA_STREAM_END;
+
+ t->strm.next_in = in_buf;
+ t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
+ if (t->strm.avail_in == SIZE_MAX)
+ return LZMA_DATA_ERROR;
+ }
+}
+
+
+static void *
+single(thread_data *t)
+{
+ if (single_init(t)) {
+ io_close(t->pair, false);
+ release_thread_data(t);
+ return NULL;
+ }
+
+ uint8_t in_buf[BUFSIZ];
+ uint8_t out_buf[BUFSIZ];
+ lzma_action action = LZMA_RUN;
+ lzma_ret ret;
+ bool success = false;
+
+ t->strm.avail_in = 0;
+
+ while (!user_abort) {
+ if (t->strm.avail_in == 0 && !t->pair->src_eof) {
+ t->strm.next_in = in_buf;
+ t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
+
+ if (t->strm.avail_in == SIZE_MAX)
+ break;
+ else if (t->pair->src_eof
+ && opt_mode == MODE_COMPRESS)
+ action = LZMA_FINISH;
+ }
+
+ t->strm.next_out = out_buf;
+ t->strm.avail_out = BUFSIZ;
+
+ ret = lzma_code(&t->strm, action);
+
+ if (opt_mode != MODE_TEST)
+ if (io_write(t->pair, out_buf,
+ BUFSIZ - t->strm.avail_out))
+ break;
+
+ if (ret != LZMA_OK) {
+ if (ret == LZMA_STREAM_END) {
+ if (opt_mode == MODE_COMPRESS) {
+ success = true;
+ break;
+ }
+
+ // Support decoding concatenated .lzma files.
+ ret = single_skip_padding(t, in_buf);
+
+ if (ret == LZMA_STREAM_END) {
+ assert(t->pair->src_eof);
+ success = true;
+ break;
+ }
+
+ if (ret == LZMA_OK && !single_init(t))
+ continue;
+
+ break;
+
+ } else {
+ errmsg(V_ERROR, "%s: %s", t->pair->src_name,
+ str_strm_error(ret));
+ break;
+ }
+ }
+ }
+
+ io_close(t->pair, success);
+ release_thread_data(t);
+
+ return NULL;
+}
+
+
+///////////////////////////////
+// Multiple threads per file //
+///////////////////////////////
+
+// TODO
+
+// I'm not sure what would the best way to implement this. Here's one
+// possible way:
+// - Reader thread would read the input data and control the coders threads.
+// - Every coder thread is associated with input and output buffer pools.
+// The input buffer pool is filled by reader thread, and the output buffer
+// pool is emptied by the writer thread.
+// - Writer thread writes the output data of the oldest living coder thread.
+//
+// The per-file thread started by the application's main thread is used as
+// the reader thread. In the beginning, it starts the writer thread and the
+// first coder thread. The coder thread would be left waiting for input from
+// the reader thread, and the writer thread would be waiting for input from
+// the coder thread.
+//
+// The reader thread reads the input data into a ring buffer, whose size
+// depends on the value returned by lzma_chunk_size(). If the ring buffer
+// gets full, the buffer is marked "to be finished", which indicates to
+// the coder thread that no more input is coming. Then a new coder thread
+// would be started.
+//
+// TODO
+
+/*
+typedef struct {
+ /// Buffers
+ uint8_t (*buffers)[BUFSIZ];
+
+ /// Number of buffers
+ size_t buffer_count;
+
+ /// buffers[read_pos] is the buffer currently being read. Once finish
+ /// is true and read_pos == write_pos, end of input has been reached.
+ size_t read_pos;
+
+ /// buffers[write_pos] is the buffer into which data is currently
+ /// being written.
+ size_t write_pos;
+
+ /// This variable matters only when read_pos == write_pos && finish.
+ /// In that case, this variable will contain the size of the
+ /// buffers[read_pos].
+ size_t last_size;
+
+ /// True once no more data is being written to the buffer. When this
+ /// is set, the last_size variable must have been set too.
+ bool finish;
+
+ /// Mutex to protect access to the variables in this structure
+ pthread_mutex_t mutex;
+
+ /// Condition to indicate when another thread can continue
+ pthread_cond_t cond;
+} mem_pool;
+
+
+static foo
+multi_reader(thread_data *t)
+{
+ bool done = false;
+
+ do {
+ const size_t size = io_read(t->pair,
+ m->buffers + m->write_pos, BUFSIZ);
+ if (size == SIZE_MAX) {
+ // TODO
+ } else if (t->pair->src_eof) {
+ m->last_size = size;
+ }
+
+ pthread_mutex_lock(&m->mutex);
+
+ if (++m->write_pos == m->buffer_count)
+ m->write_pos = 0;
+
+ if (m->write_pos == m->read_pos || t->pair->src_eof)
+ m->finish = true;
+
+ pthread_cond_signal(&m->cond);
+ pthread_mutex_unlock(&m->mutex);
+
+ } while (!m->finish);
+
+ return done ? 0 : -1;
+}
+
+
+static foo
+multi_code()
+{
+ lzma_action = LZMA_RUN;
+
+ while (true) {
+ pthread_mutex_lock(&m->mutex);
+
+ while (m->read_pos == m->write_pos && !m->finish)
+ pthread_cond_wait(&m->cond, &m->mutex);
+
+ pthread_mutex_unlock(&m->mutex);
+
+ if (m->finish) {
+ t->strm.avail_in = m->last_size;
+ if (opt_mode == MODE_COMPRESS)
+ action = LZMA_FINISH;
+ } else {
+ t->strm.avail_in = BUFSIZ;
+ }
+
+ t->strm.next_in = m->buffers + m->read_pos;
+
+ const lzma_ret ret = lzma_code(&t->strm, action);
+
+ }
+}
+
+*/
+
+
+///////////////////////
+// Starting new file //
+///////////////////////
+
+extern void
+process_file(const char *filename)
+{
+ thread_data *t = get_thread_data();
+ if (t == NULL)
+ return; // User abort
+
+ // If this fails, it shows appropriate error messages too.
+ t->pair = io_open(filename);
+ if (t->pair == NULL) {
+ release_thread_data(t);
+ return;
+ }
+
+ // TODO Currently only one-thread-per-file mode is implemented.
+
+ if (create_thread(&single, t)) {
+ io_close(t->pair, false);
+ release_thread_data(t);
+ }
+
+ return;
+}