00001
00002
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008
00009 NAMESPACE_BEGIN(CryptoPP)
00010
00011 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00012 {
00013 if (messageCount == 0)
00014 return 0;
00015
00016 messageCount = 0;
00017
00018 lword byteCount;
00019 do {
00020 byteCount = LWORD_MAX;
00021 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00022 } while(byteCount == LWORD_MAX);
00023
00024 if (!m_messageEndSent && SourceExhausted())
00025 {
00026 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00027 m_messageEndSent = true;
00028 messageCount = 1;
00029 }
00030 return 0;
00031 }
00032
00033 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00034 {
00035 TimedFlush(blocking ? INFINITE_TIME : 0);
00036 return hardFlush && !!GetCurrentBufferSize();
00037 }
00038
00039
00040
00041 #ifdef HIGHRES_TIMER_AVAILABLE
00042
00043 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00044 : NonblockingSource(attachment), m_buf(1024*16)
00045 , m_waitingForResult(false), m_outputBlocked(false)
00046 , m_dataBegin(0), m_dataEnd(0)
00047 {
00048 }
00049
00050 void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
00051 {
00052 if (!m_outputBlocked)
00053 {
00054 if (m_dataBegin == m_dataEnd)
00055 AccessReceiver().GetWaitObjects(container);
00056 else
00057 container.SetNoWait();
00058 }
00059 AttachedTransformation()->GetWaitObjects(container);
00060 }
00061
00062 size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00063 {
00064 NetworkReceiver &receiver = AccessReceiver();
00065
00066 lword maxSize = byteCount;
00067 byteCount = 0;
00068 bool forever = maxTime == INFINITE_TIME;
00069 Timer timer(Timer::MILLISECONDS, forever);
00070 BufferedTransformation *t = AttachedTransformation();
00071
00072 if (m_outputBlocked)
00073 goto DoOutput;
00074
00075 while (true)
00076 {
00077 if (m_dataBegin == m_dataEnd)
00078 {
00079 if (receiver.EofReceived())
00080 break;
00081
00082 if (m_waitingForResult)
00083 {
00084 if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00085 break;
00086
00087 unsigned int recvResult = receiver.GetReceiveResult();
00088 #if CRYPTOPP_TRACE_NETWORK
00089 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00090 #endif
00091 m_dataEnd += recvResult;
00092 m_waitingForResult = false;
00093
00094 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00095 goto ReceiveNoWait;
00096 }
00097 else
00098 {
00099 m_dataEnd = m_dataBegin = 0;
00100
00101 if (receiver.MustWaitToReceive())
00102 {
00103 if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00104 break;
00105
00106 receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00107 m_waitingForResult = true;
00108 }
00109 else
00110 {
00111 ReceiveNoWait:
00112 m_waitingForResult = true;
00113
00114
00115 #if CRYPTOPP_TRACE_NETWORK
00116 OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00117 #endif
00118 while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00119 {
00120 unsigned int recvResult = receiver.GetReceiveResult();
00121 #if CRYPTOPP_TRACE_NETWORK
00122 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00123 #endif
00124 m_dataEnd += recvResult;
00125 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00126 {
00127 m_waitingForResult = false;
00128 break;
00129 }
00130 }
00131 }
00132 }
00133 }
00134 else
00135 {
00136 m_putSize = (size_t)STDMIN((lword)m_dataEnd-m_dataBegin, maxSize-byteCount);
00137 if (checkDelimiter)
00138 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00139
00140 DoOutput:
00141 size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00142 if (result)
00143 {
00144 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00145 goto DoOutput;
00146 else
00147 {
00148 m_outputBlocked = true;
00149 return result;
00150 }
00151 }
00152 m_outputBlocked = false;
00153
00154 byteCount += m_putSize;
00155 m_dataBegin += m_putSize;
00156 if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00157 break;
00158 if (byteCount == maxSize)
00159 break;
00160
00161
00162
00163 if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00164 break;
00165 }
00166 }
00167
00168 return 0;
00169 }
00170
00171
00172
00173 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00174 : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00175 , m_needSendResult(false), m_wasBlocked(false)
00176 , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0)
00177 , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00178 , m_currentSpeed(0), m_maxObservedSpeed(0)
00179 {
00180 }
00181
00182 float NetworkSink::ComputeCurrentSpeed()
00183 {
00184 if (m_speedTimer.ElapsedTime() > 1000)
00185 {
00186 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00187 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00188 m_byteCountSinceLastTimerReset = 0;
00189 m_speedTimer.StartTimer();
00190
00191 }
00192 return m_currentSpeed;
00193 }
00194
00195 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00196 {
00197 if (m_skipBytes)
00198 {
00199 assert(length >= m_skipBytes);
00200 inString += m_skipBytes;
00201 length -= m_skipBytes;
00202 }
00203 m_buffer.LazyPut(inString, length);
00204
00205 if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00206 TimedFlush(0, 0);
00207
00208 size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00209 if (blocking)
00210 TimedFlush(INFINITE_TIME, targetSize);
00211
00212 if (m_buffer.CurrentSize() > targetSize)
00213 {
00214 assert(!blocking);
00215 size_t blockedBytes = (size_t)STDMIN(m_buffer.CurrentSize() - targetSize, (lword)length);
00216 m_buffer.UndoLazyPut(blockedBytes);
00217 m_buffer.FinalizeLazyPut();
00218 m_wasBlocked = true;
00219 m_skipBytes += length - blockedBytes;
00220 return UnsignedMin(1, blockedBytes);
00221 }
00222
00223 m_buffer.FinalizeLazyPut();
00224 m_wasBlocked = false;
00225 m_skipBytes = 0;
00226
00227 if (messageEnd)
00228 AccessSender().SendEof();
00229 return 0;
00230 }
00231
00232 lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00233 {
00234 NetworkSender &sender = AccessSender();
00235
00236 bool forever = maxTime == INFINITE_TIME;
00237 Timer timer(Timer::MILLISECONDS, forever);
00238 unsigned int totalFlushSize = 0;
00239
00240 while (true)
00241 {
00242 if (m_buffer.CurrentSize() <= targetSize)
00243 break;
00244
00245 if (m_needSendResult)
00246 {
00247 if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00248 break;
00249
00250 unsigned int sendResult = sender.GetSendResult();
00251 #if CRYPTOPP_TRACE_NETWORK
00252 OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00253 #endif
00254 m_buffer.Skip(sendResult);
00255 totalFlushSize += sendResult;
00256 m_needSendResult = false;
00257
00258 if (!m_buffer.AnyRetrievable())
00259 break;
00260 }
00261
00262 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00263 if (sender.MustWaitToSend() && !sender.Wait(timeOut))
00264 break;
00265
00266 size_t contiguousSize = 0;
00267 const byte *block = m_buffer.Spy(contiguousSize);
00268
00269 #if CRYPTOPP_TRACE_NETWORK
00270 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00271 #endif
00272 sender.Send(block, contiguousSize);
00273 m_needSendResult = true;
00274
00275 if (maxTime > 0 && timeOut == 0)
00276 break;
00277 }
00278
00279 m_byteCountSinceLastTimerReset += totalFlushSize;
00280 ComputeCurrentSpeed();
00281
00282 return totalFlushSize;
00283 }
00284
00285 #endif // #ifdef HIGHRES_TIMER_AVAILABLE
00286
00287 NAMESPACE_END