This commit is contained in:
Igor Pavlov
2011-04-11 00:00:00 +00:00
committed by Kornel Lesiński
parent de4f8c22fe
commit 35596517f2
322 changed files with 9989 additions and 7759 deletions

View File

@@ -2,149 +2,125 @@
#include "StdAfx.h"
#include "StreamBinder.h"
#include "../../Common/Defs.h"
#include "../../Common/MyCom.h"
using namespace NWindows;
using namespace NSynchronization;
#include "StreamBinder.h"
class CSequentialInStreamForBinder:
class CBinderInStream:
public ISequentialInStream,
public CMyUnknownImp
{
CStreamBinder *_binder;
public:
MY_UNKNOWN_IMP
STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
private:
CStreamBinder *m_StreamBinder;
public:
~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); }
void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
~CBinderInStream() { _binder->CloseRead(); }
CBinderInStream(CStreamBinder *binder): _binder(binder) {}
};
STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
{ return m_StreamBinder->Read(data, size, processedSize); }
STDMETHODIMP CBinderInStream::Read(void *data, UInt32 size, UInt32 *processedSize)
{ return _binder->Read(data, size, processedSize); }
class CSequentialOutStreamForBinder:
class CBinderOutStream:
public ISequentialOutStream,
public CMyUnknownImp
{
CStreamBinder *_binder;
public:
MY_UNKNOWN_IMP
STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);
private:
CStreamBinder *m_StreamBinder;
public:
~CSequentialOutStreamForBinder() { m_StreamBinder->CloseWrite(); }
void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
~CBinderOutStream() { _binder->CloseWrite(); }
CBinderOutStream(CStreamBinder *binder): _binder(binder) {}
};
STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
{ return m_StreamBinder->Write(data, size, processedSize); }
STDMETHODIMP CBinderOutStream::Write(const void *data, UInt32 size, UInt32 *processedSize)
{ return _binder->Write(data, size, processedSize); }
//////////////////////////
// CStreamBinder
// (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished.
HRes CStreamBinder::CreateEvents()
WRes CStreamBinder::CreateEvents()
{
RINOK(_allBytesAreWritenEvent.Create(true));
RINOK(_thereAreBytesToReadEvent.Create());
return _readStreamIsClosedEvent.Create();
RINOK(_canWrite_Event.Create(true));
RINOK(_canRead_Event.Create());
return _readingWasClosed_Event.Create();
}
void CStreamBinder::ReInit()
{
_thereAreBytesToReadEvent.Reset();
_readStreamIsClosedEvent.Reset();
_waitWrite = true;
_canRead_Event.Reset();
_readingWasClosed_Event.Reset();
ProcessedSize = 0;
}
void CStreamBinder::CreateStreams(ISequentialInStream **inStream,
ISequentialOutStream **outStream)
void CStreamBinder::CreateStreams(ISequentialInStream **inStream, ISequentialOutStream **outStream)
{
CSequentialInStreamForBinder *inStreamSpec = new
CSequentialInStreamForBinder;
_waitWrite = true;
_bufSize = 0;
_buf = NULL;
ProcessedSize = 0;
CBinderInStream *inStreamSpec = new CBinderInStream(this);
CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec);
inStreamSpec->SetBinder(this);
*inStream = inStreamLoc.Detach();
CSequentialOutStreamForBinder *outStreamSpec = new
CSequentialOutStreamForBinder;
CBinderOutStream *outStreamSpec = new CBinderOutStream(this);
CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec);
outStreamSpec->SetBinder(this);
*outStream = outStreamLoc.Detach();
_buffer = NULL;
_bufferSize= 0;
ProcessedSize = 0;
}
// (_canRead_Event && _bufSize == 0) means that stream is finished.
HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
{
UInt32 sizeToRead = size;
if (size > 0)
if (processedSize)
*processedSize = 0;
if (size != 0)
{
RINOK(_thereAreBytesToReadEvent.Lock());
sizeToRead = MyMin(_bufferSize, size);
if (_bufferSize > 0)
if (_waitWrite)
{
memcpy(data, _buffer, sizeToRead);
_buffer = ((const Byte *)_buffer) + sizeToRead;
_bufferSize -= sizeToRead;
if (_bufferSize == 0)
RINOK(_canRead_Event.Lock());
_waitWrite = false;
}
if (size > _bufSize)
size = _bufSize;
if (size != 0)
{
memcpy(data, _buf, size);
_buf = ((const Byte *)_buf) + size;
ProcessedSize += size;
if (processedSize)
*processedSize = size;
_bufSize -= size;
if (_bufSize == 0)
{
_thereAreBytesToReadEvent.Reset();
_allBytesAreWritenEvent.Set();
_waitWrite = true;
_canRead_Event.Reset();
_canWrite_Event.Set();
}
}
}
if (processedSize != NULL)
*processedSize = sizeToRead;
ProcessedSize += sizeToRead;
return S_OK;
}
void CStreamBinder::CloseRead()
{
_readStreamIsClosedEvent.Set();
}
HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
{
if (size > 0)
if (processedSize)
*processedSize = 0;
if (size != 0)
{
_buffer = data;
_bufferSize = size;
_allBytesAreWritenEvent.Reset();
_thereAreBytesToReadEvent.Set();
_buf = data;
_bufSize = size;
_canWrite_Event.Reset();
_canRead_Event.Set();
HANDLE events[2];
events[0] = _allBytesAreWritenEvent;
events[1] = _readStreamIsClosedEvent;
HANDLE events[2] = { _canWrite_Event, _readingWasClosed_Event };
DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
if (waitResult != WAIT_OBJECT_0 + 0)
{
// ReadingWasClosed = true;
return S_FALSE;
}
// if(!_allBytesAreWritenEvent.Lock())
// return E_FAIL;
if (processedSize)
*processedSize = size;
}
if (processedSize != NULL)
*processedSize = size;
return S_OK;
}
void CStreamBinder::CloseWrite()
{
// _bufferSize must be = 0
_thereAreBytesToReadEvent.Set();
}