This commit is contained in:
Igor Pavlov
2023-12-22 17:17:05 +00:00
committed by Kornel
parent ec44a8a070
commit a36c48cece
954 changed files with 42199 additions and 25482 deletions

101
C/MtDec.c
View File

@@ -1,5 +1,5 @@
/* MtDec.c -- Multi-thread Decoder
2021-12-21 : Igor Pavlov : Public domain */
2023-04-02 : Igor Pavlov : Public domain */
#include "Precomp.h"
@@ -14,7 +14,7 @@
#include "MtDec.h"
#ifndef _7ZIP_ST
#ifndef Z7_ST
#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
@@ -24,7 +24,7 @@
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
void MtProgress_Init(CMtProgress *p, ICompressProgressPtr progress)
{
p->progress = progress;
p->res = SZ_OK;
@@ -81,36 +81,28 @@ void MtProgress_SetError(CMtProgress *p, SRes res)
#define RINOK_THREAD(x) RINOK_WRes(x)
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
struct CMtDecBufLink_
{
if (Event_IsCreated(p))
return Event_Reset(p);
return AutoResetEvent_CreateNotSignaled(p);
}
struct __CMtDecBufLink
{
struct __CMtDecBufLink *next;
struct CMtDecBufLink_ *next;
void *pad[3];
};
typedef struct __CMtDecBufLink CMtDecBufLink;
typedef struct CMtDecBufLink_ CMtDecBufLink;
#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
static THREAD_FUNC_DECL ThreadFunc(void *pp);
static THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp);
static WRes MtDecThread_CreateEvents(CMtDecThread *t)
{
WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
WRes wres = AutoResetEvent_OptCreate_And_Reset(&t->canWrite);
if (wres == 0)
{
wres = ArEvent_OptCreate_And_Reset(&t->canRead);
wres = AutoResetEvent_OptCreate_And_Reset(&t->canRead);
if (wres == 0)
return SZ_OK;
}
@@ -126,7 +118,7 @@ static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
{
if (Thread_WasCreated(&t->thread))
return SZ_OK;
wres = Thread_Create(&t->thread, ThreadFunc, t);
wres = Thread_Create(&t->thread, MtDec_ThreadFunc, t);
if (wres == 0)
return SZ_OK;
}
@@ -167,7 +159,7 @@ static void MtDecThread_CloseThread(CMtDecThread *t)
static void MtDec_CloseThreads(CMtDec *p)
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
for (i = 0; i < MTDEC_THREADS_MAX; i++)
MtDecThread_CloseThread(&p->threads[i]);
}
@@ -179,25 +171,6 @@ static void MtDecThread_Destruct(CMtDecThread *t)
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
size_t size = *processedSize;
*processedSize = 0;
while (size != 0)
{
size_t cur = size;
SRes res = ISeqInStream_Read(stream, data, &cur);
*processedSize += cur;
data += cur;
size -= cur;
RINOK(res);
if (cur == 0)
return SZ_OK;
}
return SZ_OK;
}
static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
SRes res;
@@ -253,7 +226,7 @@ Byte *MtDec_GetCrossBuff(CMtDec *p)
/*
ThreadFunc2() returns:
MtDec_ThreadFunc2() returns:
0 - in all normal cases (even for stream error or memory allocation error)
(!= 0) - WRes error return by system threading function
*/
@@ -261,11 +234,11 @@ Byte *MtDec_GetCrossBuff(CMtDec *p)
// #define MTDEC_ProgessStep (1 << 22)
#define MTDEC_ProgessStep (1 << 0)
static WRes ThreadFunc2(CMtDecThread *t)
static WRes MtDec_ThreadFunc2(CMtDecThread *t)
{
CMtDec *p = t->mtDec;
PRF_STR_INT("ThreadFunc2", t->index);
PRF_STR_INT("MtDec_ThreadFunc2", t->index)
// SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
@@ -295,13 +268,13 @@ static WRes ThreadFunc2(CMtDecThread *t)
// CMtDecCallbackInfo parse;
CMtDecThread *nextThread;
PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index)
RINOK_THREAD(Event_Wait(&t->canRead));
RINOK_THREAD(Event_Wait(&t->canRead))
if (p->exitThread)
return 0;
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index)
// if (t->index == 3) return 19; // for test
@@ -373,7 +346,7 @@ static WRes ThreadFunc2(CMtDecThread *t)
{
size = p->inBufSize;
res = FullRead(p->inStream, data, &size);
res = SeqInStream_ReadMax(p->inStream, data, &size);
// size = 10; // test
@@ -615,7 +588,7 @@ static WRes ThreadFunc2(CMtDecThread *t)
// if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
// if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
// - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
// - otherwise we stop decoding and exit from ThreadFunc2()
// - otherwise we stop decoding and exit from MtDec_ThreadFunc2()
// Don't change (finish) variable in the further code
@@ -688,7 +661,7 @@ static WRes ThreadFunc2(CMtDecThread *t)
// ---------- WRITE ----------
RINOK_THREAD(Event_Wait(&t->canWrite));
RINOK_THREAD(Event_Wait(&t->canWrite))
{
BoolInt isErrorMode = False;
@@ -801,14 +774,14 @@ static WRes ThreadFunc2(CMtDecThread *t)
if (!finish)
{
RINOK_THREAD(Event_Set(&nextThread->canWrite));
RINOK_THREAD(Event_Set(&nextThread->canWrite))
}
else
{
if (needContinue)
{
// we restore decoding with new iteration
RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
RINOK_THREAD(Event_Set(&p->threads[0].canWrite))
}
else
{
@@ -817,7 +790,7 @@ static WRes ThreadFunc2(CMtDecThread *t)
return SZ_OK;
p->exitThread = True;
}
RINOK_THREAD(Event_Set(&p->threads[0].canRead));
RINOK_THREAD(Event_Set(&p->threads[0].canRead))
}
}
}
@@ -836,7 +809,7 @@ static WRes ThreadFunc2(CMtDecThread *t)
#endif
static THREAD_FUNC_DECL ThreadFunc1(void *pp)
static THREAD_FUNC_DECL MtDec_ThreadFunc1(void *pp)
{
WRes res;
@@ -845,7 +818,7 @@ static THREAD_FUNC_DECL ThreadFunc1(void *pp)
// fprintf(stdout, "\n%d = %p\n", t->index, &t);
res = ThreadFunc2(t);
res = MtDec_ThreadFunc2(t);
p = t->mtDec;
if (res == 0)
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
@@ -862,14 +835,14 @@ static THREAD_FUNC_DECL ThreadFunc1(void *pp)
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
}
static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)
static Z7_NO_INLINE THREAD_FUNC_DECL MtDec_ThreadFunc(void *pp)
{
#ifdef USE_ALLOCA
CMtDecThread *t = (CMtDecThread *)pp;
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
t->allocaPtr = alloca(t->index * 128);
#endif
return ThreadFunc1(pp);
return MtDec_ThreadFunc1(pp);
}
@@ -883,7 +856,7 @@ int MtDec_PrepareRead(CMtDec *p)
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
for (i = 0; i < MTDEC_THREADS_MAX; i++)
if (i > p->numStartedThreads
|| p->numFilledThreads <=
(i >= p->filledThreadStart ?
@@ -987,7 +960,7 @@ void MtDec_Construct(CMtDec *p)
p->allocatedBufsSize = 0;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
for (i = 0; i < MTDEC_THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
t->mtDec = p;
@@ -995,7 +968,7 @@ void MtDec_Construct(CMtDec *p)
t->inBuf = NULL;
Event_Construct(&t->canRead);
Event_Construct(&t->canWrite);
Thread_Construct(&t->thread);
Thread_CONSTRUCT(&t->thread)
}
// Event_Construct(&p->finishedEvent);
@@ -1010,7 +983,7 @@ static void MtDec_Free(CMtDec *p)
p->exitThread = True;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
for (i = 0; i < MTDEC_THREADS_MAX; i++)
MtDecThread_Destruct(&p->threads[i]);
// Event_Close(&p->finishedEvent);
@@ -1061,15 +1034,15 @@ SRes MtDec_Code(CMtDec *p)
{
unsigned numThreads = p->numThreadsMax;
if (numThreads > MTDEC__THREADS_MAX)
numThreads = MTDEC__THREADS_MAX;
if (numThreads > MTDEC_THREADS_MAX)
numThreads = MTDEC_THREADS_MAX;
p->numStartedThreads_Limit = numThreads;
p->numStartedThreads = 0;
}
if (p->inBufSize != p->allocatedBufsSize)
{
for (i = 0; i < MTDEC__THREADS_MAX; i++)
for (i = 0; i < MTDEC_THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
if (t->inBuf)
@@ -1086,7 +1059,7 @@ SRes MtDec_Code(CMtDec *p)
MtProgress_Init(&p->mtProgress, p->progress);
// RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
// RINOK_THREAD(AutoResetEvent_OptCreate_And_Reset(&p->finishedEvent))
p->exitThread = False;
p->exitThreadWRes = 0;
@@ -1098,7 +1071,7 @@ SRes MtDec_Code(CMtDec *p)
wres = MtDecThread_CreateEvents(nextThread);
if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
if (wres == 0) { wres = Event_Set(&nextThread->canRead);
if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
if (wres == 0) { THREAD_FUNC_RET_TYPE res = MtDec_ThreadFunc(nextThread);
wres = (WRes)(UINT_PTR)res;
if (wres != 0)
{
@@ -1137,3 +1110,5 @@ SRes MtDec_Code(CMtDec *p)
}
#endif
#undef PRF