merge multi threading to master branch

- updated zstd to latest devel release
- lz4, lz5 and zstd is included now
- all three support threading
This commit is contained in:
Tino Reichardt
2016-10-16 23:38:46 +02:00
parent f3f39b74b0
commit 58069903d0
108 changed files with 21091 additions and 609 deletions

View File

@@ -1,33 +1,76 @@
// ZstdDecoder.cpp
// (C) 2016 Tino Reichardt
#include "StdAfx.h"
#include "ZstdDecoder.h"
int ZstdRead(void *arg, ZSTDMT_Buffer * in)
{
struct ZstdStream *x = (struct ZstdStream*)arg;
size_t size = in->size;
HRESULT res = ReadStream(x->inStream, in->buf, &size);
if (res != S_OK)
return -1;
in->size = size;
*x->processedIn += size;
return 0;
}
int ZstdWrite(void *arg, ZSTDMT_Buffer * out)
{
struct ZstdStream *x = (struct ZstdStream*)arg;
UInt32 todo = (UInt32)out->size;
UInt32 done = 0;
while (todo != 0)
{
UInt32 block;
HRESULT res = x->outStream->Write((char*)out->buf + done, todo, &block);
done += block;
if (res == k_My_HRESULT_WritingWasCut)
break;
if (res != S_OK)
return -1;
if (block == 0)
return E_FAIL;
todo -= block;
}
*x->processedOut += done;
if (x->progress)
x->progress->SetRatioInfo(x->processedIn, x->processedOut);
return 0;
}
namespace NCompress {
namespace NZSTD {
CDecoder::CDecoder():
_dstream(NULL),
_buffIn(NULL),
_buffOut(NULL),
_buffInSizeAllocated(0),
_buffOutSizeAllocated(0),
_buffInSize(ZSTD_DStreamInSize()),
_buffOutSize(ZSTD_DStreamOutSize()*4),
_processedIn(0),
_processedOut(0)
_processedOut(0),
_inputSize(0),
_numThreads(NWindows::NSystem::GetNumberOfProcessors())
{
_props.clear();
}
CDecoder::~CDecoder()
{
if (_dstream)
ZSTD_freeDStream(_dstream);
}
MyFree(_buffIn);
MyFree(_buffOut);
HRESULT CDecoder::ErrorOut(size_t code)
{
const char *strError = ZSTDMT_getErrorString(code);
wchar_t wstrError[200+5]; /* no malloc here, /TR */
mbstowcs(wstrError, strError, 200);
MessageBoxW(0, wstrError, L"7-Zip ZStandard", MB_ICONERROR | MB_OK);
MyFree(wstrError);
return S_FALSE;
}
STDMETHODIMP CDecoder::SetDecoderProperties2(const Byte * prop, UInt32 size)
@@ -37,97 +80,23 @@ STDMETHODIMP CDecoder::SetDecoderProperties2(const Byte * prop, UInt32 size)
if (size != sizeof(DProps))
return E_FAIL;
#ifdef ZSTD_LEGACY_SUPPORT
/* version 0.x and 1.x are okay */
if (pProps->_ver_major > 1)
return E_FAIL;
/* 0.5, 0.6, 0.7, 0.8 are supported! */
if (pProps->_ver_major == 0) {
switch (pProps->_ver_minor) {
case 5:
break;
case 6:
break;
case 7:
break;
case 8:
break;
default:
return E_FAIL;
}}
#else
/* only exact version is okay */
if (pProps->_ver_major != ZSTD_VERSION_MAJOR)
return E_FAIL;
if (pProps->_ver_minor != ZSTD_VERSION_MINOR)
return E_FAIL;
#endif
memcpy(&_props, pProps, sizeof (DProps));
return S_OK;
}
HRESULT CDecoder::ErrorOut(size_t code)
STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 numThreads)
{
const char *strError = ZSTD_getErrorName(code);
size_t strErrorLen = strlen(strError) + 1;
wchar_t *wstrError = (wchar_t *)MyAlloc(sizeof(wchar_t) * strErrorLen);
if (!wstrError)
return E_FAIL;
mbstowcs(wstrError, strError, strErrorLen - 1);
MessageBoxW(0, wstrError, L"7-Zip ZStandard", MB_ICONERROR | MB_OK);
MyFree(wstrError);
return E_FAIL;
}
HRESULT CDecoder::CreateDecompressor()
{
size_t result;
if (!_dstream) {
_dstream = ZSTD_createDStream();
if (!_dstream)
return E_FAIL;
}
result = ZSTD_initDStream(_dstream);
if (ZSTD_isError(result))
return ErrorOut(result);
/* allocate buffers */
if (_buffInSizeAllocated != _buffInSize)
{
MyFree(_buffIn);
_buffIn = MyAlloc(_buffInSize);
if (!_buffIn)
return E_OUTOFMEMORY;
_buffInSizeAllocated = _buffInSize;
}
if (_buffOutSizeAllocated != _buffOutSize)
{
MyFree(_buffOut);
_buffOut = MyAlloc(_buffOutSize);
if (!_buffOut)
return E_OUTOFMEMORY;
_buffOutSizeAllocated = _buffOutSize;
}
const UInt32 kNumThreadsMax = ZSTDMT_THREAD_MAX;
if (numThreads < 1) numThreads = 1;
if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax;
_numThreads = numThreads;
return S_OK;
}
HRESULT CDecoder::SetOutStreamSizeResume(const UInt64 * /*outSize*/)
{
_processedOut = 0;
RINOK(CreateDecompressor());
return S_OK;
}
@@ -135,73 +104,48 @@ STDMETHODIMP CDecoder::SetOutStreamSize(const UInt64 * outSize)
{
_processedIn = 0;
RINOK(SetOutStreamSizeResume(outSize));
return S_OK;
}
STDMETHODIMP CDecoder::SetInBufSize(UInt32, UInt32 size)
HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream,
ISequentialOutStream * outStream, ICompressProgressInfo * progress)
{
_buffInSize = size;
return S_OK;
}
STDMETHODIMP CDecoder::SetOutBufSize(UInt32, UInt32 size)
{
_buffOutSize = size;
return S_OK;
}
HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream, ISequentialOutStream * outStream, ICompressProgressInfo * progress)
{
RINOK(CreateDecompressor());
ZSTDMT_RdWr_t rdwr;
size_t result;
UInt32 const toRead = static_cast < const UInt32 > (_buffInSize);
for(;;) {
UInt32 read;
HRESULT res = S_OK;
/* read input */
RINOK(inStream->Read(_buffIn, toRead, &read));
size_t InSize = static_cast < size_t > (read);
_processedIn += InSize;
struct ZstdStream Rd;
Rd.inStream = inStream;
Rd.processedIn = &_processedIn;
if (InSize == 0)
return S_OK;
struct ZstdStream Wr;
Wr.progress = progress;
Wr.outStream = outStream;
Wr.processedIn = &_processedIn;
Wr.processedOut = &_processedOut;
/* decompress input */
ZSTD_inBuffer input = { _buffIn, InSize, 0 };
for (;;) {
ZSTD_outBuffer output = { _buffOut, _buffOutSize, 0 };
result = ZSTD_decompressStream(_dstream, &output , &input);
#if 0
printf("%s in=%d out=%d result=%d in.pos=%d in.size=%d\n", __FUNCTION__,
InSize, output.pos, result, input.pos, input.size);
fflush(stdout);
#endif
if (ZSTD_isError(result))
return ErrorOut(result);
/* 1) setup read/write functions */
rdwr.fn_read = ::ZstdRead;
rdwr.fn_write = ::ZstdWrite;
rdwr.arg_read = (void *)&Rd;
rdwr.arg_write = (void *)&Wr;
/* write decompressed stream and update progress */
RINOK(WriteStream(outStream, _buffOut, output.pos));
_processedOut += output.pos;
RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut));
/* 2) create decompression context */
ZSTDMT_DCtx *ctx = ZSTDMT_createDCtx(_numThreads, _inputSize);
if (!ctx)
return S_FALSE;
/* one more round */
if ((input.pos == input.size) && (result == 1) && output.pos)
continue;
/* 3) decompress */
result = ZSTDMT_decompressDCtx(ctx, &rdwr);
//printf("decompress = %d / %d\n", result, ZSTDMT_error_read_fail);
if (result == (size_t)-ZSTDMT_error_read_fail)
res = E_ABORT;
else if (ZSTDMT_isError(result))
return ErrorOut(result);
/* finished */
if (input.pos == input.size)
break;
/* end of frame */
if (result == 0) {
result = ZSTD_initDStream(_dstream);
if (ZSTD_isError(result))
return ErrorOut(result);
}
}
}
/* 4) free resources */
ZSTDMT_freeDCtx(ctx);
return res;
}
STDMETHODIMP CDecoder::Code(ISequentialInStream * inStream, ISequentialOutStream * outStream,
@@ -221,69 +165,14 @@ STDMETHODIMP CDecoder::SetInStream(ISequentialInStream * inStream)
STDMETHODIMP CDecoder::ReleaseInStream()
{
_inStream.Release();
return S_OK;
}
STDMETHODIMP CDecoder::Read(void *data, UInt32 /*size*/, UInt32 *processedSize)
{
if (processedSize)
*processedSize = 0;
size_t result;
if (!_dstream)
if (CreateDecompressor() != S_OK)
return E_FAIL;
UInt32 read, toRead = static_cast < UInt32 > (_buffInSize);
Byte *dataout = static_cast < Byte* > (data);
for(;;) {
/* read input */
RINOK(_inStream->Read(_buffIn, toRead, &read));
size_t InSize = static_cast < size_t > (read);
_processedIn += InSize;
if (InSize == 0) {
return S_OK;
}
/* decompress input */
ZSTD_inBuffer input = { _buffIn, InSize, 0 };
for (;;) {
ZSTD_outBuffer output = { dataout, _buffOutSize, 0 };
result = ZSTD_decompressStream(_dstream, &output , &input);
if (ZSTD_isError(result))
return ErrorOut(result);
if (processedSize)
*processedSize += static_cast < UInt32 > (output.pos);
dataout += output.pos;
/* one more round */
if ((input.pos == input.size) && (result == 1) && output.pos)
continue;
/* finished */
if (input.pos == input.size)
break;
/* end of frame */
if (result == 0) {
result = ZSTD_initDStream(_dstream);
if (ZSTD_isError(result))
return ErrorOut(result);
}
}
}
}
#endif
HRESULT CDecoder::CodeResume(ISequentialOutStream * outStream, const UInt64 * outSize, ICompressProgressInfo * progress)
{
RINOK(SetOutStreamSizeResume(outSize));
return CodeSpec(_inStream, outStream, progress);
}
#endif
}}