///////////////////////////////////////////////////////////////////////////////
//
/// \file process.c
/// \brief Compresses or uncompresses a file
//
// Copyright (C) 2007 Lasse Collin
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
///////////////////////////////////////////////////////////////////////////////
#include "private.h"
typedef struct {
lzma_stream strm;
void *options;
file_pair *pair;
/// We don't need this for *anything* but seems that at least with
/// glibc pthread_create() doesn't allow NULL.
pthread_t thread;
bool in_use;
} thread_data;
/// Number of available threads
static size_t free_threads;
/// Thread-specific data
static thread_data *threads;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
/// Attributes of new coder threads. They are created in detached state.
/// Coder threads signal to the service thread themselves when they are done.
static pthread_attr_t thread_attr;
//////////
// Init //
//////////
extern void
process_init(void)
{
threads = malloc(sizeof(thread_data) * opt_threads);
if (threads == NULL) {
out_of_memory();
my_exit(ERROR);
}
for (size_t i = 0; i < opt_threads; ++i)
threads[i] = (thread_data){
.strm = LZMA_STREAM_INIT_VAR,
.options = NULL,
.pair = NULL,
.in_use = false,
};
if (pthread_attr_init(&thread_attr)
|| pthread_attr_setdetachstate(
&thread_attr, PTHREAD_CREATE_DETACHED)) {
out_of_memory();
my_exit(ERROR);
}
free_threads = opt_threads;
return;
}
//////////////////////////
// Thread-specific data //
//////////////////////////
static thread_data *
get_thread_data(void)
{
pthread_mutex_lock(&mutex);
while (free_threads == 0) {
pthread_cond_wait(&cond, &mutex);
if (user_abort) {
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return NULL;
}
}
thread_data *t = threads;
while (t->in_use)
++t;
t->in_use = true;
--free_threads;
pthread_mutex_unlock(&mutex);
return t;
}
static void
release_thread_data(thread_data *t)
{
pthread_mutex_lock(&mutex);
t->in_use = false;
++free_threads;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
return;
}
static int
create_thread(void *(*func)(thread_data *t), thread_data *t)
{
if (opt_threads == 1) {
func(t);
} else {
const int err = pthread_create(&t->thread, &thread_attr,
(void *(*)(void *))(func), t);
if (err) {
errmsg(V_ERROR, _("Cannot create a thread: %s"),
strerror(err));
user_abort = 1;
return -1;
}
}
return 0;
}
/////////////////////////
// One thread per file //
/////////////////////////
static int
single_init(thread_data *t)
{
lzma_ret ret;
if (opt_mode == MODE_COMPRESS) {
if (opt_header == HEADER_ALONE) {
ret = lzma_alone_encoder(&t->strm,
opt_filters[0].options);
} else {
ret = lzma_stream_encoder(&t->strm,
opt_filters, opt_check);
}
} else {
// TODO Restrict file format if requested on the command line.
ret = lzma_auto_decoder(&t->strm);
}
if (ret != LZMA_OK) {
if (ret == LZMA_MEM_ERROR)
out_of_memory();
else
internal_error();
return -1;
}
return 0;
}
static lzma_ret
single_skip_padding(thread_data *t, uint8_t *in_buf)
{
// Handle decoding of concatenated Streams. There can be arbitrary
// number of nul-byte padding between the Streams, which must be
// ignored.
//
// NOTE: Concatenating LZMA_Alone files works only if at least
// one of lc, lp, and pb is non-zero. Using the concatenation
// on LZMA_Alone files is strongly discouraged.
while (true) {
while (t->strm.avail_in > 0) {
if (*t->strm.next_in != '\0')
return LZMA_OK;
++t->strm.next_in;
--t->strm.avail_in;
}
if (t->pair->src_eof)
return LZMA_STREAM_END;
t->strm.next_in = in_buf;
t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
if (t->strm.avail_in == SIZE_MAX)
return LZMA_DATA_ERROR;
}
}
static void *
single(thread_data *t)
{
if (single_init(t)) {
io_close(t->pair, false);
release_thread_data(t);
return NULL;
}
uint8_t in_buf[BUFSIZ];
uint8_t out_buf[BUFSIZ];
lzma_action action = LZMA_RUN;
lzma_ret ret;
bool success = false;
t->strm.avail_in = 0;
while (!user_abort) {
if (t->strm.avail_in == 0 && !t->pair->src_eof) {
t->strm.next_in = in_buf;
t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
if (t->strm.avail_in == SIZE_MAX)
break;
else if (t->pair->src_eof
&& opt_mode == MODE_COMPRESS)
action = LZMA_FINISH;
}
t->strm.next_out = out_buf;
t->strm.avail_out = BUFSIZ;
ret = lzma_code(&t->strm, action);
if (opt_mode != MODE_TEST)
if (io_write(t->pair, out_buf,
BUFSIZ - t->strm.avail_out))
break;
if (ret != LZMA_OK) {
if (ret == LZMA_STREAM_END) {
if (opt_mode == MODE_COMPRESS) {
assert(t->pair->src_eof);
success = true;
break;
}
// Support decoding concatenated .lzma files.
ret = single_skip_padding(t, in_buf);
if (ret == LZMA_STREAM_END) {
assert(t->pair->src_eof);
success = true;
break;
}
if (ret == LZMA_OK && !single_init(t))
continue;
break;
} else {
errmsg(V_ERROR, "%s: %s", t->pair->src_name,
str_strm_error(ret));
break;
}
}
}
io_close(t->pair, success);
release_thread_data(t);
return NULL;
}
///////////////////////////////
// Multiple threads per file //
///////////////////////////////
// TODO
// I'm not sure what would the best way to implement this. Here's one
// possible way:
// - Reader thread would read the input data and control the coders threads.
// - Every coder thread is associated with input and output buffer pools.
// The input buffer pool is filled by reader thread, and the output buffer
// pool is emptied by the writer thread.
// - Writer thread writes the output data of the oldest living coder thread.
//
// The per-file thread started by the application's main thread is used as
// the reader thread. In the beginning, it starts the writer thread and the
// first coder thread. The coder thread would be left waiting for input from
// the reader thread, and the writer thread would be waiting for input from
// the coder thread.
//
// The reader thread reads the input data into a ring buffer, whose size
// depends on the value returned by lzma_chunk_size(). If the ring buffer
// gets full, the buffer is marked "to be finished", which indicates to
// the coder thread that no more input is coming. Then a new coder thread
// would be started.
//
// TODO
/*
typedef struct {
/// Buffers
uint8_t (*buffers)[BUFSIZ];
/// Number of buffers
size_t buffer_count;
/// buffers[read_pos] is the buffer currently being read. Once finish
/// is true and read_pos == write_pos, end of input has been reached.
size_t read_pos;
/// buffers[write_pos] is the buffer into which data is currently
/// being written.
size_t write_pos;
/// This variable matters only when read_pos == write_pos && finish.
/// In that case, this variable will contain the size of the
/// buffers[read_pos].
size_t last_size;
/// True once no more data is being written to the buffer. When this
/// is set, the last_size variable must have been set too.
bool finish;
/// Mutex to protect access to the variables in this structure
pthread_mutex_t mutex;
/// Condition to indicate when another thread can continue
pthread_cond_t cond;
} mem_pool;
static foo
multi_reader(thread_data *t)
{
bool done = false;
do {
const size_t size = io_read(t->pair,
m->buffers + m->write_pos, BUFSIZ);
if (size == SIZE_MAX) {
// TODO
} else if (t->pair->src_eof) {
m->last_size = size;
}
pthread_mutex_lock(&m->mutex);
if (++m->write_pos == m->buffer_count)
m->write_pos = 0;
if (m->write_pos == m->read_pos || t->pair->src_eof)
m->finish = true;
pthread_cond_signal(&m->cond);
pthread_mutex_unlock(&m->mutex);
} while (!m->finish);
return done ? 0 : -1;
}
static foo
multi_code()
{
lzma_action = LZMA_RUN;
while (true) {
pthread_mutex_lock(&m->mutex);
while (m->read_pos == m->write_pos && !m->finish)
pthread_cond_wait(&m->cond, &m->mutex);
pthread_mutex_unlock(&m->mutex);
if (m->finish) {
t->strm.avail_in = m->last_size;
if (opt_mode == MODE_COMPRESS)
action = LZMA_FINISH;
} else {
t->strm.avail_in = BUFSIZ;
}
t->strm.next_in = m->buffers + m->read_pos;
const lzma_ret ret = lzma_code(&t->strm, action);
}
}
*/
///////////////////////
// Starting new file //
///////////////////////
extern void
process_file(const char *filename)
{
thread_data *t = get_thread_data();
if (t == NULL)
return; // User abort
// If this fails, it shows appropriate error messages too.
t->pair = io_open(filename);
if (t->pair == NULL) {
release_thread_data(t);
return;
}
// TODO Currently only one-thread-per-file mode is implemented.
if (create_thread(&single, t)) {
io_close(t->pair, false);
release_thread_data(t);
}
return;
}