Switch to Zstandard's New advanced API.

- long distance matching is enabled
- the compression should improve, the speed also
- decompression code is single threaded only
This commit is contained in:
Tino Reichardt
2018-10-26 00:09:25 +02:00
parent cc10eb2d9b
commit 739ec50c75
15 changed files with 207 additions and 235 deletions

View File

@@ -1,4 +1,10 @@
// (C) 2016 Tino Reichardt
// (C) 2016 - 2018 Tino Reichardt
#define DEBUG 0
#if DEBUG
#include <stdio.h>
#endif
#include "StdAfx.h"
#include "ZstdEncoder.h"
@@ -9,31 +15,27 @@ namespace NCompress {
namespace NZSTD {
CEncoder::CEncoder():
_ctx(NULL),
_srcBuf(NULL),
_dstBuf(NULL),
_srcBufSize(ZSTD_CStreamInSize()),
_dstBufSize(ZSTD_CStreamOutSize()),
_processedIn(0),
_processedOut(0),
_inputSize(0),
_ctx(NULL),
_numThreads(NWindows::NSystem::GetNumberOfProcessors())
{
_props.clear();
_hMutex = CreateMutex(NULL, FALSE, NULL);
}
CEncoder::~CEncoder()
{
if (_ctx)
ZSTDCB_freeCCtx(_ctx);
}
HRESULT CEncoder::ErrorOut(size_t code)
{
const char *strError = ZSTDCB_getErrorString(code);
wchar_t wstrError[200+5]; /* no malloc here, /TR */
mbstowcs(wstrError, strError, 200);
MessageBoxW(0, wstrError, L"7-Zip Zstandard", MB_ICONERROR | MB_OK);
MyFree(wstrError);
return S_FALSE;
if (_ctx) {
ZSTD_freeCCtx(_ctx);
MyFree(_srcBuf);
MyFree(_dstBuf);
CloseHandle(_hMutex);
}
}
STDMETHODIMP CEncoder::SetCoderProperties(const PROPID * propIDs, const PROPVARIANT * coderProps, UInt32 numProps)
@@ -54,7 +56,7 @@ STDMETHODIMP CEncoder::SetCoderProperties(const PROPID * propIDs, const PROPVARI
/* level 1..22 */
_props._level = static_cast < Byte > (prop.ulVal);
Byte mylevel = static_cast < Byte > (ZSTDCB_LEVEL_MAX);
Byte mylevel = static_cast < Byte > (ZSTD_LEVEL_MAX);
if (_props._level > mylevel)
_props._level = mylevel;
@@ -84,52 +86,97 @@ STDMETHODIMP CEncoder::Code(ISequentialInStream *inStream,
ISequentialOutStream *outStream, const UInt64 * /*inSize*/ ,
const UInt64 * /*outSize */, ICompressProgressInfo *progress)
{
ZSTDCB_RdWr_t rdwr;
size_t result;
HRESULT res = S_OK;
ZSTD_EndDirective ZSTD_todo = ZSTD_e_continue;
ZSTD_outBuffer outBuff;
ZSTD_inBuffer inBuff;
size_t err, srcSize;
struct ZstdStream Rd;
Rd.inStream = inStream;
Rd.outStream = outStream;
Rd.processedIn = &_processedIn;
Rd.processedOut = &_processedOut;
if (!_ctx) {
_ctx = ZSTD_createCCtx();
if (!_ctx)
return E_OUTOFMEMORY;
struct ZstdStream Wr;
if (_processedIn == 0)
Wr.progress = progress;
else
Wr.progress = 0;
Wr.inStream = inStream;
Wr.outStream = outStream;
Wr.processedIn = &_processedIn;
Wr.processedOut = &_processedOut;
_srcBuf = MyAlloc(_srcBufSize);
if (!_srcBuf)
return E_OUTOFMEMORY;
/* 1) setup read/write functions */
rdwr.fn_read = ::ZstdRead;
rdwr.fn_write = ::ZstdWrite;
rdwr.arg_read = (void *)&Rd;
rdwr.arg_write = (void *)&Wr;
_dstBuf = MyAlloc(_dstBufSize);
if (!_dstBuf)
return E_OUTOFMEMORY;
/* 2) create compression context, if needed */
if (!_ctx)
_ctx = ZSTDCB_createCCtx(_numThreads, _props._level, _inputSize);
if (!_ctx)
return S_FALSE;
err = ZSTD_CCtx_setParameter(_ctx, ZSTD_p_compressionLevel, _props._level);
if (ZSTD_isError(err)) return E_FAIL;
/* 3) compress */
result = ZSTDCB_compressCCtx(_ctx, &rdwr);
if (ZSTDCB_isError(result)) {
if (result == (size_t)-ZSTDCB_error_canceled)
return E_ABORT;
return ErrorOut(result);
err = ZSTD_CCtx_setParameter(_ctx, ZSTD_p_nbWorkers, _numThreads);
if (ZSTD_isError(err)) return E_FAIL;
err = ZSTD_CCtx_setParameter(_ctx, ZSTD_p_contentSizeFlag, 1);
if (ZSTD_isError(err)) return E_FAIL;
err = ZSTD_CCtx_setParameter(_ctx, ZSTD_p_enableLongDistanceMatching, 1);
if (ZSTD_isError(err)) return E_FAIL;
}
return res;
for (;;) {
/* read input */
srcSize = _srcBufSize;
RINOK(ReadStream(inStream, _srcBuf, &srcSize));
/* eof */
if (srcSize == 0)
ZSTD_todo = ZSTD_e_end;
/* compress data */
WaitForSingleObject(_hMutex, INFINITE);
_processedIn += srcSize;
ReleaseMutex(_hMutex);
for (;;) {
outBuff = { _dstBuf, _dstBufSize, 0 };
if (ZSTD_todo == ZSTD_e_continue)
inBuff = { _srcBuf, srcSize, 0 };
else
inBuff = { NULL, srcSize, 0 };
err = ZSTD_compress_generic(_ctx, &outBuff, &inBuff, ZSTD_todo);
if (ZSTD_isError(err)) return E_FAIL;
#if DEBUG
printf("err=%u ", (unsigned)err);
printf("srcSize=%u ", (unsigned)srcSize);
printf("todo=%u\n", ZSTD_todo);
printf("inBuff.size=%u ", (unsigned)inBuff.size);
printf("inBuff.pos=%u\n", (unsigned)inBuff.pos);
printf("outBuff.size=%u ", (unsigned)outBuff.size);
printf("outBuff.pos=%u\n\n", (unsigned)outBuff.pos);
fflush(stdout);
#endif
/* write output */
if (outBuff.pos) {
RINOK(WriteStream(outStream, _dstBuf, outBuff.pos));
WaitForSingleObject(_hMutex, INFINITE);
_processedOut += outBuff.pos;
RINOK(progress->SetRatioInfo(&_processedIn, &_processedOut));
ReleaseMutex(_hMutex);
}
/* done */
if (ZSTD_todo == ZSTD_e_end && err == 0)
return S_OK;
/* need more input */
if (inBuff.pos == inBuff.size)
break;
}
}
}
STDMETHODIMP CEncoder::SetNumberOfThreads(UInt32 numThreads)
{
const UInt32 kNumThreadsMax = ZSTDCB_THREAD_MAX;
const UInt32 kNumThreadsMax = ZSTD_THREAD_MAX;
if (numThreads < 1) numThreads = 1;
if (numThreads > kNumThreadsMax) numThreads = kNumThreadsMax;
_numThreads = numThreads;