Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members

network.cpp

00001 // network.cpp - written and placed in the public domain by Wei Dai 00002 00003 #include "pch.h" 00004 #include "network.h" 00005 #include "wait.h" 00006 00007 #define CRYPTOPP_TRACE_NETWORK 0 00008 00009 NAMESPACE_BEGIN(CryptoPP) 00010 00011 unsigned int NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking) 00012 { 00013 if (messageCount == 0) 00014 return 0; 00015 00016 unsigned long byteCount = ULONG_MAX; 00017 messageCount = 0; 00018 RETURN_IF_NONZERO(Pump2(byteCount, blocking)); 00019 if (!m_messageEndSent && SourceExhausted()) 00020 { 00021 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true)); 00022 m_messageEndSent = true; 00023 messageCount = 1; 00024 } 00025 return 0; 00026 } 00027 00028 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking) 00029 { 00030 TimedFlush(blocking ? INFINITE_TIME : 0); 00031 return hardFlush && !!GetCurrentBufferSize(); 00032 } 00033 00034 // ************************************************************* 00035 00036 #ifdef HIGHRES_TIMER_AVAILABLE 00037 00038 NetworkSource::NetworkSource(BufferedTransformation *attachment) 00039 : NonblockingSource(attachment), m_buf(1024*16) 00040 , m_waitingForResult(false), m_outputBlocked(false) 00041 , m_dataBegin(0), m_dataEnd(0) 00042 { 00043 } 00044 00045 void NetworkSource::GetWaitObjects(WaitObjectContainer &container) 00046 { 00047 if (!m_outputBlocked) 00048 { 00049 if (m_dataBegin == m_dataEnd) 00050 AccessReceiver().GetWaitObjects(container); 00051 else 00052 container.SetNoWait(); 00053 } 00054 AttachedTransformation()->GetWaitObjects(container); 00055 } 00056 00057 unsigned int NetworkSource::GeneralPump2(unsigned long &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter) 00058 { 00059 NetworkReceiver &receiver = AccessReceiver(); 00060 00061 unsigned long maxSize = byteCount; 00062 byteCount = 0; 00063 bool forever = maxTime == INFINITE_TIME; 00064 Timer timer(Timer::MILLISECONDS, forever); 00065 BufferedTransformation *t = AttachedTransformation(); 00066 00067 if (m_outputBlocked) 00068 goto DoOutput; 00069 00070 while (true) 00071 { 00072 if (m_dataBegin == m_dataEnd) 00073 { 00074 if (receiver.EofReceived()) 00075 break; 00076 00077 if (m_waitingForResult) 00078 { 00079 if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) 00080 break; 00081 00082 unsigned int recvResult = receiver.GetReceiveResult(); 00083 #if CRYPTOPP_TRACE_NETWORK 00084 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); 00085 #endif 00086 m_dataEnd += recvResult; 00087 m_waitingForResult = false; 00088 00089 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size()) 00090 goto ReceiveNoWait; 00091 } 00092 else 00093 { 00094 m_dataEnd = m_dataBegin = 0; 00095 00096 if (receiver.MustWaitToReceive()) 00097 { 00098 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) 00099 break; 00100 00101 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd); 00102 m_waitingForResult = true; 00103 } 00104 else 00105 { 00106 ReceiveNoWait: 00107 m_waitingForResult = true; 00108 // call Receive repeatedly as long as data is immediately available, 00109 // because some receivers tend to return data in small pieces 00110 #if CRYPTOPP_TRACE_NETWORK 00111 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str()); 00112 #endif 00113 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd)) 00114 { 00115 unsigned int recvResult = receiver.GetReceiveResult(); 00116 #if CRYPTOPP_TRACE_NETWORK 00117 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str()); 00118 #endif 00119 m_dataEnd += recvResult; 00120 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2) 00121 { 00122 m_waitingForResult = false; 00123 break; 00124 } 00125 } 00126 } 00127 } 00128 } 00129 else 00130 { 00131 m_putSize = STDMIN((unsigned long)m_dataEnd-m_dataBegin, maxSize-byteCount); 00132 if (checkDelimiter) 00133 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin); 00134 00135 DoOutput: 00136 unsigned int result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput); 00137 if (result) 00138 { 00139 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) 00140 goto DoOutput; 00141 else 00142 { 00143 m_outputBlocked = true; 00144 return result; 00145 } 00146 } 00147 m_outputBlocked = false; 00148 00149 byteCount += m_putSize; 00150 m_dataBegin += m_putSize; 00151 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter) 00152 break; 00153 if (byteCount == maxSize) 00154 break; 00155 // once time limit is reached, return even if there is more data waiting 00156 // but make 0 a special case so caller can request a large amount of data to be 00157 // pumped as long as it is immediately available 00158 if (maxTime > 0 && timer.ElapsedTime() > maxTime) 00159 break; 00160 } 00161 } 00162 00163 return 0; 00164 } 00165 00166 // ************************************************************* 00167 00168 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound) 00169 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound) 00170 , m_needSendResult(false), m_wasBlocked(false) 00171 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 00172 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0) 00173 , m_currentSpeed(0), m_maxObservedSpeed(0) 00174 { 00175 } 00176 00177 float NetworkSink::ComputeCurrentSpeed() 00178 { 00179 if (m_speedTimer.ElapsedTime() > 1000) 00180 { 00181 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime(); 00182 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f); 00183 m_byteCountSinceLastTimerReset = 0; 00184 m_speedTimer.StartTimer(); 00185 // OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str()); 00186 } 00187 return m_currentSpeed; 00188 } 00189 00190 unsigned int NetworkSink::Put2(const byte *inString, unsigned int length, int messageEnd, bool blocking) 00191 { 00192 if (m_skipBytes) 00193 { 00194 assert(length >= m_skipBytes); 00195 inString += m_skipBytes; 00196 length -= m_skipBytes; 00197 } 00198 m_buffer.LazyPut(inString, length); 00199 00200 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound) 00201 TimedFlush(0, 0); 00202 00203 unsigned int targetSize = messageEnd ? 0 : m_maxBufferSize; 00204 if (blocking) 00205 TimedFlush(INFINITE_TIME, targetSize); 00206 00207 if (m_buffer.CurrentSize() > targetSize) 00208 { 00209 assert(!blocking); 00210 unsigned int blockedBytes = STDMIN(m_buffer.CurrentSize() - targetSize, (unsigned long)length); 00211 m_buffer.UndoLazyPut(blockedBytes); 00212 m_buffer.FinalizeLazyPut(); 00213 m_wasBlocked = true; 00214 m_skipBytes += length - blockedBytes; 00215 return STDMAX(blockedBytes, 1U); 00216 } 00217 00218 m_buffer.FinalizeLazyPut(); 00219 m_wasBlocked = false; 00220 m_skipBytes = 0; 00221 00222 if (messageEnd) 00223 AccessSender().SendEof(); 00224 return 0; 00225 } 00226 00227 unsigned int NetworkSink::TimedFlush(unsigned long maxTime, unsigned int targetSize) 00228 { 00229 NetworkSender &sender = AccessSender(); 00230 00231 bool forever = maxTime == INFINITE_TIME; 00232 Timer timer(Timer::MILLISECONDS, forever); 00233 unsigned int totalFlushSize = 0; 00234 00235 while (true) 00236 { 00237 if (m_buffer.CurrentSize() <= targetSize) 00238 break; 00239 00240 if (m_needSendResult) 00241 { 00242 if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime()))) 00243 break; 00244 00245 unsigned int sendResult = sender.GetSendResult(); 00246 #if CRYPTOPP_TRACE_NETWORK 00247 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str()); 00248 #endif 00249 m_buffer.Skip(sendResult); 00250 totalFlushSize += sendResult; 00251 m_needSendResult = false; 00252 00253 if (!m_buffer.AnyRetrievable()) 00254 break; 00255 } 00256 00257 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0; 00258 if (sender.MustWaitToSend() && !sender.Wait(timeOut)) 00259 break; 00260 00261 unsigned int contiguousSize = 0; 00262 const byte *block = m_buffer.Spy(contiguousSize); 00263 00264 #if CRYPTOPP_TRACE_NETWORK 00265 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str()); 00266 #endif 00267 sender.Send(block, contiguousSize); 00268 m_needSendResult = true; 00269 00270 if (maxTime > 0 && timeOut == 0) 00271 break; // once time limit is reached, return even if there is more data waiting 00272 } 00273 00274 m_byteCountSinceLastTimerReset += totalFlushSize; 00275 ComputeCurrentSpeed(); 00276 00277 return totalFlushSize; 00278 } 00279 00280 #endif // #ifdef HIGHRES_TIMER_AVAILABLE 00281 00282 NAMESPACE_END

Generated on Wed Jul 21 19:15:30 2004 for Crypto++ by doxygen 1.3.7-20040704