00001
00002
00003
#include "pch.h"
00004
#include "network.h"
00005
#include "wait.h"
00006
00007
#define CRYPTOPP_TRACE_NETWORK 0
00008
00009 NAMESPACE_BEGIN(CryptoPP)
00010
00011 unsigned
int NonblockingSource::PumpMessages2(
unsigned int &messageCount,
bool blocking)
00012 {
00013
if (messageCount == 0)
00014
return 0;
00015
00016
unsigned long byteCount = ULONG_MAX;
00017 messageCount = 0;
00018 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00019
if (!m_messageEndSent && SourceExhausted())
00020 {
00021 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(),
true));
00022 m_messageEndSent =
true;
00023 messageCount = 1;
00024 }
00025
return 0;
00026 }
00027
00028
bool NonblockingSink::IsolatedFlush(
bool hardFlush,
bool blocking)
00029 {
00030
TimedFlush(blocking ? INFINITE_TIME : 0);
00031
return hardFlush && !!GetCurrentBufferSize();
00032 }
00033
00034
00035
00036
#ifdef HIGHRES_TIMER_AVAILABLE
00037
00038 NetworkSource::NetworkSource(
BufferedTransformation *attachment)
00039 :
NonblockingSource(attachment), m_buf(1024*16)
00040 , m_waitingForResult(false), m_outputBlocked(false)
00041 , m_dataBegin(0), m_dataEnd(0)
00042 {
00043 }
00044
00045 void NetworkSource::GetWaitObjects(
WaitObjectContainer &container)
00046 {
00047
if (!m_outputBlocked)
00048 {
00049
if (m_dataBegin == m_dataEnd)
00050 AccessReceiver().
GetWaitObjects(container);
00051
else
00052 container.
SetNoWait();
00053 }
00054
AttachedTransformation()->
GetWaitObjects(container);
00055 }
00056
00057 unsigned int NetworkSource::GeneralPump2(
unsigned long &byteCount,
bool blockingOutput,
unsigned long maxTime,
bool checkDelimiter, byte delimiter)
00058 {
00059
NetworkReceiver &receiver = AccessReceiver();
00060
00061
unsigned long maxSize = byteCount;
00062 byteCount = 0;
00063
bool forever = maxTime ==
INFINITE_TIME;
00064
Timer timer(Timer::MILLISECONDS, forever);
00065
BufferedTransformation *t =
AttachedTransformation();
00066
00067
if (m_outputBlocked)
00068
goto DoOutput;
00069
00070
while (
true)
00071 {
00072
if (m_dataBegin == m_dataEnd)
00073 {
00074
if (receiver.
EofReceived())
00075
break;
00076
00077
if (m_waitingForResult)
00078 {
00079
if (receiver.
MustWaitForResult() && !receiver.
Wait(SaturatingSubtract(maxTime, timer.
ElapsedTime())))
00080
break;
00081
00082
unsigned int recvResult = receiver.
GetReceiveResult();
00083
#if CRYPTOPP_TRACE_NETWORK
00084
OutputDebugString((IntToString((
unsigned int)
this) +
": Received " + IntToString(recvResult) +
" bytes\n").c_str());
00085
#endif
00086
m_dataEnd += recvResult;
00087 m_waitingForResult =
false;
00088
00089
if (!receiver.
MustWaitToReceive() && !receiver.
EofReceived() && m_dataEnd != m_buf.
size())
00090
goto ReceiveNoWait;
00091 }
00092
else
00093 {
00094 m_dataEnd = m_dataBegin = 0;
00095
00096
if (receiver.
MustWaitToReceive())
00097 {
00098
if (!receiver.
Wait(SaturatingSubtract(maxTime, timer.
ElapsedTime())))
00099
break;
00100
00101 receiver.
Receive(m_buf+m_dataEnd, m_buf.
size()-m_dataEnd);
00102 m_waitingForResult =
true;
00103 }
00104
else
00105 {
00106 ReceiveNoWait:
00107 m_waitingForResult =
true;
00108
00109
00110
#if CRYPTOPP_TRACE_NETWORK
00111
OutputDebugString((IntToString((
unsigned int)
this) +
": Receiving " + IntToString(m_buf.
size()-m_dataEnd) +
" bytes\n").c_str());
00112
#endif
00113
while (receiver.
Receive(m_buf+m_dataEnd, m_buf.
size()-m_dataEnd))
00114 {
00115
unsigned int recvResult = receiver.
GetReceiveResult();
00116
#if CRYPTOPP_TRACE_NETWORK
00117
OutputDebugString((IntToString((
unsigned int)
this) +
": Received " + IntToString(recvResult) +
" bytes\n").c_str());
00118
#endif
00119
m_dataEnd += recvResult;
00120
if (receiver.
EofReceived() || m_dataEnd > m_buf.
size() /2)
00121 {
00122 m_waitingForResult =
false;
00123
break;
00124 }
00125 }
00126 }
00127 }
00128 }
00129
else
00130 {
00131 m_putSize = STDMIN((
unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount);
00132
if (checkDelimiter)
00133 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00134
00135 DoOutput:
00136
unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00137
if (result)
00138 {
00139
if (t->Wait(SaturatingSubtract(maxTime, timer.
ElapsedTime())))
00140
goto DoOutput;
00141
else
00142 {
00143 m_outputBlocked =
true;
00144
return result;
00145 }
00146 }
00147 m_outputBlocked =
false;
00148
00149 byteCount += m_putSize;
00150 m_dataBegin += m_putSize;
00151
if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00152
break;
00153
if (byteCount == maxSize)
00154
break;
00155
00156
00157
00158
if (maxTime > 0 && timer.
ElapsedTime() > maxTime)
00159
break;
00160 }
00161 }
00162
00163
return 0;
00164 }
00165
00166
00167
00168 NetworkSink::NetworkSink(
unsigned int maxBufferSize,
unsigned int autoFlushBound)
00169 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00170 , m_needSendResult(false), m_wasBlocked(false)
00171 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
00172 , m_speedTimer(
Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00173 , m_currentSpeed(0), m_maxObservedSpeed(0)
00174 {
00175 }
00176
00177 float NetworkSink::ComputeCurrentSpeed()
00178 {
00179
if (m_speedTimer.
ElapsedTime() > 1000)
00180 {
00181 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.
ElapsedTime();
00182 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00183 m_byteCountSinceLastTimerReset = 0;
00184 m_speedTimer.
StartTimer();
00185
00186 }
00187
return m_currentSpeed;
00188 }
00189
00190 unsigned int NetworkSink::Put2(
const byte *inString,
unsigned int length,
int messageEnd,
bool blocking)
00191 {
00192
if (m_skipBytes)
00193 {
00194 assert(length >= m_skipBytes);
00195 inString += m_skipBytes;
00196 length -= m_skipBytes;
00197 }
00198 m_buffer.
LazyPut(inString, length);
00199
00200
if (!blocking || m_buffer.
CurrentSize() > m_autoFlushBound)
00201
TimedFlush(0, 0);
00202
00203
unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize;
00204
if (blocking)
00205
TimedFlush(
INFINITE_TIME, targetSize);
00206
00207
if (m_buffer.
CurrentSize() > targetSize)
00208 {
00209 assert(!blocking);
00210
unsigned int blockedBytes = STDMIN(m_buffer.
CurrentSize() - targetSize, (
unsigned long)length);
00211 m_buffer.
UndoLazyPut(blockedBytes);
00212 m_buffer.
FinalizeLazyPut();
00213 m_wasBlocked =
true;
00214 m_skipBytes += length - blockedBytes;
00215
return STDMAX(blockedBytes, 1U);
00216 }
00217
00218 m_buffer.
FinalizeLazyPut();
00219 m_wasBlocked =
false;
00220 m_skipBytes = 0;
00221
00222
if (messageEnd)
00223 AccessSender().
SendEof();
00224
return 0;
00225 }
00226
00227 unsigned int NetworkSink::TimedFlush(
unsigned long maxTime,
unsigned int targetSize)
00228 {
00229
NetworkSender &sender = AccessSender();
00230
00231
bool forever = maxTime ==
INFINITE_TIME;
00232
Timer timer(Timer::MILLISECONDS, forever);
00233
unsigned int totalFlushSize = 0;
00234
00235
while (
true)
00236 {
00237
if (m_buffer.
CurrentSize() <= targetSize)
00238
break;
00239
00240
if (m_needSendResult)
00241 {
00242
if (sender.
MustWaitForResult() && !sender.
Wait(SaturatingSubtract(maxTime, timer.
ElapsedTime())))
00243
break;
00244
00245
unsigned int sendResult = sender.
GetSendResult();
00246
#if CRYPTOPP_TRACE_NETWORK
00247
OutputDebugString((IntToString((
unsigned int)
this) +
": Sent " + IntToString(sendResult) +
" bytes\n").c_str());
00248
#endif
00249
m_buffer.
Skip(sendResult);
00250 totalFlushSize += sendResult;
00251 m_needSendResult =
false;
00252
00253
if (!m_buffer.
AnyRetrievable())
00254
break;
00255 }
00256
00257
unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.
ElapsedTime()) : 0;
00258
if (sender.
MustWaitToSend() && !sender.
Wait(timeOut))
00259
break;
00260
00261
unsigned int contiguousSize = 0;
00262
const byte *block = m_buffer.
Spy(contiguousSize);
00263
00264
#if CRYPTOPP_TRACE_NETWORK
00265
OutputDebugString((IntToString((
unsigned int)
this) +
": Sending " + IntToString(contiguousSize) +
" bytes\n").c_str());
00266
#endif
00267
sender.
Send(block, contiguousSize);
00268 m_needSendResult =
true;
00269
00270
if (maxTime > 0 && timeOut == 0)
00271
break;
00272 }
00273
00274 m_byteCountSinceLastTimerReset += totalFlushSize;
00275
ComputeCurrentSpeed();
00276
00277
return totalFlushSize;
00278 }
00279
00280
#endif // #ifdef HIGHRES_TIMER_AVAILABLE
00281
00282 NAMESPACE_END