From 54a9234630521702f3b36524ab013ddfcf88b856 Mon Sep 17 00:00:00 2001 From: Tino Reichardt Date: Sun, 28 Oct 2018 19:07:05 +0100 Subject: [PATCH] Fix decompression code ZstdDecoder.cpp - remove (nearly) all threading stuff - check if decompression works with different data sets --- CPP/7zip/Compress/ZstdDecoder.cpp | 121 ++++++++++++++++++++---------- CPP/7zip/Compress/ZstdDecoder.h | 5 +- CPP/7zip/Compress/ZstdEncoder.cpp | 17 +++-- 3 files changed, 95 insertions(+), 48 deletions(-) diff --git a/CPP/7zip/Compress/ZstdDecoder.cpp b/CPP/7zip/Compress/ZstdDecoder.cpp index 7ba80334..8bad5aef 100644 --- a/CPP/7zip/Compress/ZstdDecoder.cpp +++ b/CPP/7zip/Compress/ZstdDecoder.cpp @@ -1,5 +1,11 @@ // (C) 2016 - 2018 Tino Reichardt +#define DEBUG 0 + +#if DEBUG +#include +#endif + #include "StdAfx.h" #include "ZstdDecoder.h" @@ -13,12 +19,9 @@ CDecoder::CDecoder(): _srcBufSize(ZSTD_DStreamInSize()), _dstBufSize(ZSTD_DStreamOutSize()), _processedIn(0), - _processedOut(0), - _numThreads(NWindows::NSystem::GetNumberOfProcessors()) - + _processedOut(0) { _props.clear(); - _hMutex = CreateMutex(NULL, FALSE, NULL); } CDecoder::~CDecoder() @@ -27,7 +30,6 @@ CDecoder::~CDecoder() ZSTD_freeDCtx(_ctx); MyFree(_srcBuf); MyFree(_dstBuf); - CloseHandle(_hMutex); } } @@ -45,15 +47,6 @@ STDMETHODIMP CDecoder::SetDecoderProperties2(const Byte * prop, UInt32 size) } } -STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 numThreads) -{ - const UInt32 kNumThreadsMax = ZSTD_THREAD_MAX; - if (numThreads < 1) numThreads = 1; - if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax; - _numThreads = numThreads; - return S_OK; -} - HRESULT CDecoder::SetOutStreamSizeResume(const UInt64 * /*outSize*/) { _processedOut = 0; @@ -70,6 +63,10 @@ STDMETHODIMP CDecoder::SetOutStreamSize(const UInt64 * outSize) HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream, ISequentialOutStream * outStream, ICompressProgressInfo * progress) { + size_t srcBufLen, result; + ZSTD_inBuffer zIn; + ZSTD_outBuffer zOut; + /* 1) create context */ if (!_ctx) { _ctx = ZSTD_createDCtx(); @@ -87,37 +84,78 @@ HRESULT CDecoder::CodeSpec(ISequentialInStream * inStream, ZSTD_resetDStream(_ctx); } +// _processedOut += zOut.pos; +// RINOK(ReadStream(inStream, _srcBuf, &size)); +// RINOK(WriteStream(outStream, _dstBuf, zOut.pos)); +// RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut)); + + zIn.src = _srcBuf; + zIn.size = _srcBufSize; + zIn.pos = 0; + + zOut.dst = _dstBuf; + srcBufLen = _srcBufSize; + + /* read first input block */ + RINOK(ReadStream(inStream, _srcBuf, &srcBufLen)); + _processedIn += srcBufLen; + + /* Main decompression Loop */ for (;;) { - size_t size = _srcBufSize; - RINOK(ReadStream(inStream, _srcBuf, &size)); for (;;) { - ZSTD_inBuffer inBuff = { _srcBuf, size, 0 }; - ZSTD_outBuffer outBuff= { _dstBuf, _dstBufSize, 0 }; - size_t const readSizeHint = ZSTD_decompressStream(_ctx, &outBuff, &inBuff); + /* decompress loop */ + zOut.size = _dstBufSize; + zOut.pos = 0; - if (ZSTD_isError(readSizeHint)) - return E_FAIL; - - /* write decompressed data */ - RINOK(WriteStream(outStream, _dstBuf, outBuff.pos)); - WaitForSingleObject(_hMutex, INFINITE); - _processedOut += outBuff.pos; - RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut)); - ReleaseMutex(_hMutex); - - if (inBuff.pos > 0) { - memmove(_srcBuf, (char*)_srcBuf + inBuff.pos, inBuff.size - inBuff.pos); - size = _srcBufSize - inBuff.pos; - RINOK(ReadStream(inStream, (char*)_srcBuf + inBuff.pos, &size)); + result = ZSTD_decompressStream(_ctx, &zOut, &zIn); + if (ZSTD_isError(result)) { + return E_FAIL; } - if (inBuff.size != inBuff.pos) - return E_FAIL; +#if DEBUG + printf("res =%u\n", (unsigned)result); + printf("zIn.size =%u\n", (unsigned)zIn.size); + printf("zIn.pos =%u\n", (unsigned)zIn.pos); + printf("zOut.size =%u\n", (unsigned)zOut.size); + printf("zOut.pos =%u\n", (unsigned)zOut.pos); + fflush(stdout); +#endif - /* eof */ - if (readSizeHint == 0) - return S_OK; - } + /* write decompressed result */ + if (zOut.pos) { + RINOK(WriteStream(outStream, _dstBuf, zOut.pos)); + _processedOut += zOut.pos; + RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut)); + } + + /* one more round */ + if ((zIn.pos == zIn.size) && (result == 1) && zOut.pos) + continue; + + /* finished with buffer */ + if (zIn.pos == zIn.size) + break; + + /* end of frame */ + if (result == 0) { + result = ZSTD_resetDStream(_ctx); + if (ZSTD_isError(result)) + return E_FAIL; + } + } /* for() decompress */ + + + /* read next input */ + srcBufLen = _srcBufSize; + RINOK(ReadStream(inStream, _srcBuf, &srcBufLen)); + _processedIn += srcBufLen; + + /* finished */ + if (srcBufLen == 0) + return S_OK; + + zIn.size = srcBufLen; + zIn.pos = 0; } } @@ -135,6 +173,11 @@ STDMETHODIMP CDecoder::SetInStream(ISequentialInStream * inStream) return S_OK; } +STDMETHODIMP CDecoder::SetNumberOfThreads(UInt32 /* numThreads */) +{ + return S_OK; +} + STDMETHODIMP CDecoder::ReleaseInStream() { _inStream.Release(); diff --git a/CPP/7zip/Compress/ZstdDecoder.h b/CPP/7zip/Compress/ZstdDecoder.h index 877a14db..351e3701 100644 --- a/CPP/7zip/Compress/ZstdDecoder.h +++ b/CPP/7zip/Compress/ZstdDecoder.h @@ -2,7 +2,6 @@ #define ZSTD_STATIC_LINKING_ONLY #include "../../../C/Alloc.h" -#include "../../../C/Threads.h" #include "../../../C/zstd/zstd.h" #include "../../Windows/System.h" @@ -26,7 +25,7 @@ #define ZSTD_LEVEL_MIN 1 #define ZSTD_LEVEL_MAX 22 -#define ZSTD_THREAD_MAX 128 +#define ZSTD_THREAD_MAX 256 namespace NCompress { namespace NZSTD { @@ -64,8 +63,6 @@ class CDecoder:public ICompressCoder, UInt64 _processedIn; UInt64 _processedOut; - UInt32 _numThreads; - HANDLE _hMutex; HRESULT CodeSpec(ISequentialInStream *inStream, ISequentialOutStream *outStream, ICompressProgressInfo *progress); HRESULT SetOutStreamSizeResume(const UInt64 *outSize); diff --git a/CPP/7zip/Compress/ZstdEncoder.cpp b/CPP/7zip/Compress/ZstdEncoder.cpp index 4958ff13..02cf36da 100644 --- a/CPP/7zip/Compress/ZstdEncoder.cpp +++ b/CPP/7zip/Compress/ZstdEncoder.cpp @@ -133,12 +133,19 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream, ReleaseMutex(_hMutex); for (;;) { - outBuff = { _dstBuf, _dstBufSize, 0 }; + outBuff.dst = _dstBuf; + outBuff.size = _dstBufSize; + outBuff.pos = 0; - if (ZSTD_todo == ZSTD_e_continue) - inBuff = { _srcBuf, srcSize, 0 }; - else - inBuff = { NULL, srcSize, 0 }; + if (ZSTD_todo == ZSTD_e_continue) { + inBuff.src = _srcBuf; + inBuff.size = srcSize; + inBuff.pos = 0; + } else { + inBuff.src = 0; + inBuff.size = srcSize; + inBuff.pos = 0; + } err = ZSTD_compress_generic(_ctx, &outBuff, &inBuff, ZSTD_todo); if (ZSTD_isError(err)) return E_FAIL;