Fix decompression code ZstdDecoder.cpp

- remove (nearly) all threading stuff
- check if decompression works with different data sets
This commit is contained in:
Tino Reichardt
2018-10-28 19:07:05 +01:00
parent 739ec50c75
commit 54a9234630
3 changed files with 95 additions and 48 deletions

View File

@@ -1,5 +1,11 @@
// (C) 2016 - 2018 Tino Reichardt // (C) 2016 - 2018 Tino Reichardt
#define DEBUG 0
#if DEBUG
#include <stdio.h>
#endif
#include "StdAfx.h" #include "StdAfx.h"
#include "ZstdDecoder.h" #include "ZstdDecoder.h"
@@ -13,12 +19,9 @@ CDecoder::CDecoder():
_srcBufSize(ZSTD_DStreamInSize()), _srcBufSize(ZSTD_DStreamInSize()),
_dstBufSize(ZSTD_DStreamOutSize()), _dstBufSize(ZSTD_DStreamOutSize()),
_processedIn(0), _processedIn(0),
_processedOut(0), _processedOut(0)
_numThreads(NWindows::NSystem::GetNumberOfProcessors())
{ {
_props.clear(); _props.clear();
_hMutex = CreateMutex(NULL, FALSE, NULL);
} }
CDecoder::~CDecoder() CDecoder::~CDecoder()
@@ -27,7 +30,6 @@ CDecoder::~CDecoder()
ZSTD_freeDCtx(_ctx); ZSTD_freeDCtx(_ctx);
MyFree(_srcBuf); MyFree(_srcBuf);
MyFree(_dstBuf); MyFree(_dstBuf);
CloseHandle(_hMutex);
} }
} }
@@ -45,15 +47,6 @@ STDMETHODIMP CDecoder::SetDecoderProperties2(const Byte * prop, UInt32 size)
} }
} }
STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 numThreads)
{
const UInt32 kNumThreadsMax = ZSTD_THREAD_MAX;
if (numThreads < 1) numThreads = 1;
if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax;
_numThreads = numThreads;
return S_OK;
}
HRESULT CDecoder::SetOutStreamSizeResume(const UInt64 * /*outSize*/) HRESULT CDecoder::SetOutStreamSizeResume(const UInt64 * /*outSize*/)
{ {
_processedOut = 0; _processedOut = 0;
@@ -70,6 +63,10 @@ STDMETHODIMP CDecoder::SetOutStreamSize(const UInt64 * outSize)
HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream, HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream,
ISequentialOutStream * outStream, ICompressProgressInfo * progress) ISequentialOutStream * outStream, ICompressProgressInfo * progress)
{ {
size_t srcBufLen, result;
ZSTD_inBuffer zIn;
ZSTD_outBuffer zOut;
/* 1) create context */ /* 1) create context */
if (!_ctx) { if (!_ctx) {
_ctx = ZSTD_createDCtx(); _ctx = ZSTD_createDCtx();
@@ -87,37 +84,78 @@ HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream,
ZSTD_resetDStream(_ctx); ZSTD_resetDStream(_ctx);
} }
// _processedOut += zOut.pos;
// RINOK(ReadStream(inStream, _srcBuf, &size));
// RINOK(WriteStream(outStream, _dstBuf, zOut.pos));
// RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut));
zIn.src = _srcBuf;
zIn.size = _srcBufSize;
zIn.pos = 0;
zOut.dst = _dstBuf;
srcBufLen = _srcBufSize;
/* read first input block */
RINOK(ReadStream(inStream, _srcBuf, &srcBufLen));
_processedIn += srcBufLen;
/* Main decompression Loop */
for (;;) { for (;;) {
size_t size = _srcBufSize;
RINOK(ReadStream(inStream, _srcBuf, &size));
for (;;) { for (;;) {
ZSTD_inBuffer inBuff = { _srcBuf, size, 0 }; /* decompress loop */
ZSTD_outBuffer outBuff= { _dstBuf, _dstBufSize, 0 }; zOut.size = _dstBufSize;
size_t const readSizeHint = ZSTD_decompressStream(_ctx, &outBuff, &inBuff); zOut.pos = 0;
if (ZSTD_isError(readSizeHint)) result = ZSTD_decompressStream(_ctx, &zOut, &zIn);
return E_FAIL; if (ZSTD_isError(result)) {
return E_FAIL;
/* write decompressed data */
RINOK(WriteStream(outStream, _dstBuf, outBuff.pos));
WaitForSingleObject(_hMutex, INFINITE);
_processedOut += outBuff.pos;
RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut));
ReleaseMutex(_hMutex);
if (inBuff.pos > 0) {
memmove(_srcBuf, (char*)_srcBuf + inBuff.pos, inBuff.size - inBuff.pos);
size = _srcBufSize - inBuff.pos;
RINOK(ReadStream(inStream, (char*)_srcBuf + inBuff.pos, &size));
} }
if (inBuff.size != inBuff.pos) #if DEBUG
return E_FAIL; printf("res =%u\n", (unsigned)result);
printf("zIn.size =%u\n", (unsigned)zIn.size);
printf("zIn.pos =%u\n", (unsigned)zIn.pos);
printf("zOut.size =%u\n", (unsigned)zOut.size);
printf("zOut.pos =%u\n", (unsigned)zOut.pos);
fflush(stdout);
#endif
/* eof */ /* write decompressed result */
if (readSizeHint == 0) if (zOut.pos) {
return S_OK; RINOK(WriteStream(outStream, _dstBuf, zOut.pos));
} _processedOut += zOut.pos;
RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut));
}
/* one more round */
if ((zIn.pos == zIn.size) && (result == 1) && zOut.pos)
continue;
/* finished with buffer */
if (zIn.pos == zIn.size)
break;
/* end of frame */
if (result == 0) {
result = ZSTD_resetDStream(_ctx);
if (ZSTD_isError(result))
return E_FAIL;
}
} /* for() decompress */
/* read next input */
srcBufLen = _srcBufSize;
RINOK(ReadStream(inStream, _srcBuf, &srcBufLen));
_processedIn += srcBufLen;
/* finished */
if (srcBufLen == 0)
return S_OK;
zIn.size = srcBufLen;
zIn.pos = 0;
} }
} }
@@ -135,6 +173,11 @@ STDMETHODIMP CDecoder::SetInStream(ISequentialInStream * inStream)
return S_OK; return S_OK;
} }
STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 /* numThreads */)
{
return S_OK;
}
STDMETHODIMP CDecoder::ReleaseInStream() STDMETHODIMP CDecoder::ReleaseInStream()
{ {
_inStream.Release(); _inStream.Release();

View File

@@ -2,7 +2,6 @@
#define ZSTD_STATIC_LINKING_ONLY #define ZSTD_STATIC_LINKING_ONLY
#include "../../../C/Alloc.h" #include "../../../C/Alloc.h"
#include "../../../C/Threads.h"
#include "../../../C/zstd/zstd.h" #include "../../../C/zstd/zstd.h"
#include "../../Windows/System.h" #include "../../Windows/System.h"
@@ -26,7 +25,7 @@
#define ZSTD_LEVEL_MIN 1 #define ZSTD_LEVEL_MIN 1
#define ZSTD_LEVEL_MAX 22 #define ZSTD_LEVEL_MAX 22
#define ZSTD_THREAD_MAX 128 #define ZSTD_THREAD_MAX 256
namespace NCompress { namespace NCompress {
namespace NZSTD { namespace NZSTD {
@@ -64,8 +63,6 @@ class CDecoder:public ICompressCoder,
UInt64 _processedIn; UInt64 _processedIn;
UInt64 _processedOut; UInt64 _processedOut;
UInt32 _numThreads;
HANDLE _hMutex;
HRESULT CodeSpec(ISequentialInStream *inStream, ISequentialOutStream *outStream, ICompressProgressInfo *progress); HRESULT CodeSpec(ISequentialInStream *inStream, ISequentialOutStream *outStream, ICompressProgressInfo *progress);
HRESULT SetOutStreamSizeResume(const UInt64 *outSize); HRESULT SetOutStreamSizeResume(const UInt64 *outSize);

View File

@@ -133,12 +133,19 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream,
ReleaseMutex(_hMutex); ReleaseMutex(_hMutex);
for (;;) { for (;;) {
outBuff = { _dstBuf, _dstBufSize, 0 }; outBuff.dst = _dstBuf;
outBuff.size = _dstBufSize;
outBuff.pos = 0;
if (ZSTD_todo == ZSTD_e_continue) if (ZSTD_todo == ZSTD_e_continue) {
inBuff = { _srcBuf, srcSize, 0 }; inBuff.src = _srcBuf;
else inBuff.size = srcSize;
inBuff = { NULL, srcSize, 0 }; inBuff.pos = 0;
} else {
inBuff.src = 0;
inBuff.size = srcSize;
inBuff.pos = 0;
}
err = ZSTD_compress_generic(_ctx, &outBuff, &inBuff, ZSTD_todo); err = ZSTD_compress_generic(_ctx, &outBuff, &inBuff, ZSTD_todo);
if (ZSTD_isError(err)) return E_FAIL; if (ZSTD_isError(err)) return E_FAIL;