This commit is contained in:
Igor Pavlov
2021-07-22 23:00:14 +01:00
committed by Kornel
parent 4a960640a3
commit 585698650f
619 changed files with 34904 additions and 10859 deletions

View File

@@ -1,16 +1,21 @@
/* MtDec.c -- Multi-thread Decoder
2019-02-02 : Igor Pavlov : Public domain */
2021-02-27 : Igor Pavlov : Public domain */
#include "Precomp.h"
// #define SHOW_DEBUG_INFO
// #include <stdio.h>
#include <string.h>
#ifdef SHOW_DEBUG_INFO
#include <stdio.h>
#endif
#include "MtDec.h"
#ifndef _7ZIP_ST
#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
#else
@@ -19,10 +24,6 @@
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
#include "MtDec.h"
#ifndef _7ZIP_ST
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
{
p->progress = progress;
@@ -77,7 +78,7 @@ void MtProgress_SetError(CMtProgress *p, SRes res)
}
#define RINOK_THREAD(x) RINOK(x)
#define RINOK_THREAD(x) RINOK_WRes(x)
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
@@ -156,8 +157,7 @@ static void MtDecThread_CloseThread(CMtDecThread *t)
{
Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
Event_Set(&t->canRead);
Thread_Wait(&t->thread);
Thread_Close(&t->thread);
Thread_Wait_Close(&t->thread);
}
Event_Close(&t->canRead);
@@ -289,12 +289,13 @@ static WRes ThreadFunc2(CMtDecThread *t)
Byte *afterEndData = NULL;
size_t afterEndData_Size = 0;
BoolInt afterEndData_IsCross = False;
BoolInt canCreateNewThread = False;
// CMtDecCallbackInfo parse;
CMtDecThread *nextThread;
PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
RINOK_THREAD(Event_Wait(&t->canRead));
if (p->exitThread)
@@ -418,10 +419,12 @@ static WRes ThreadFunc2(CMtDecThread *t)
parse.srcFinished = finish;
parse.canCreateNewThread = True;
// PRF(printf("\nParse size = %d\n", (unsigned)size))
PRF(printf("\nParse size = %d\n", (unsigned)size));
p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
needWrite = True;
canCreateNewThread = parse.canCreateNewThread;
@@ -478,16 +481,12 @@ static WRes ThreadFunc2(CMtDecThread *t)
if (parse.state == MTDEC_PARSE_END)
{
p->crossStart = 0;
p->crossEnd = 0;
if (crossSize != 0)
memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
afterEndData_Size = size - parse.srcSize;
afterEndData = parseData + parse.srcSize;
afterEndData_Size = size - parse.srcSize;
if (crossSize != 0)
afterEndData_IsCross = True;
// we reduce data size to required bytes (parsed only)
inDataSize -= (size - parse.srcSize);
inDataSize -= afterEndData_Size;
if (!prev)
inDataSize_Start = parse.srcSize;
break;
@@ -752,13 +751,15 @@ static WRes ThreadFunc2(CMtDecThread *t)
{
// p->inProcessed += inCodePos;
PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
res = p->mtCallback->Write(p->mtCallbackObject, t->index,
res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
afterEndData, afterEndData_Size,
afterEndData, afterEndData_Size, afterEndData_IsCross,
&needContinue,
&canRecode);
// res= E_INVALIDARG; // for test
// res = SZ_ERROR_FAIL; // for test
PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
@@ -847,7 +848,7 @@ static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
res = ThreadFunc2(t);
p = t->mtDec;
if (res == 0)
return p->exitThreadWRes;
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
{
// it's unexpected situation for some threading function error
if (p->exitThreadWRes == 0)
@@ -858,15 +859,14 @@ static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
Event_Set(&p->threads[0].canWrite);
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
}
return res;
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
}
static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
{
CMtDecThread *t = (CMtDecThread *)pp;
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
#ifdef USE_ALLOCA
CMtDecThread *t = (CMtDecThread *)pp;
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
t->allocaPtr = alloca(t->index * 128);
#endif
return ThreadFunc1(pp);
@@ -1092,13 +1092,14 @@ SRes MtDec_Code(CMtDec *p)
{
WRes wres;
WRes sres;
SRes sres;
CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
// wres = MtDecThread_CreateAndStart(nextThread);
wres = MtDecThread_CreateEvents(nextThread);
if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
if (wres == 0) { wres = Event_Set(&nextThread->canRead);
if (wres == 0) { wres = ThreadFunc(nextThread);
if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
wres = (WRes)(UINT_PTR)res;
if (wres != 0)
{
p->needContinue = False;
@@ -1130,8 +1131,8 @@ SRes MtDec_Code(CMtDec *p)
return SZ_OK;
// if (sres != SZ_OK)
return sres;
// return E_FAIL;
return sres;
// return SZ_ERROR_FAIL;
}
}