Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members

network.cpp

00001 // network.cpp - written and placed in the public domain by Wei Dai
00002 
00003 #include "pch.h"
00004 #include "network.h"
00005 #include "wait.h"
00006 
00007 #define CRYPTOPP_TRACE_NETWORK 0
00008 
00009 NAMESPACE_BEGIN(CryptoPP)
00010 
00011 size_t NonblockingSource::PumpMessages2(unsigned int &messageCount, bool blocking)
00012 {
00013         if (messageCount == 0)
00014                 return 0;
00015 
00016         messageCount = 0;
00017 
00018         lword byteCount;
00019         do {
00020                 byteCount = LWORD_MAX;
00021                 RETURN_IF_NONZERO(Pump2(byteCount, blocking));
00022         } while(byteCount == LWORD_MAX);
00023 
00024         if (!m_messageEndSent && SourceExhausted())
00025         {
00026                 RETURN_IF_NONZERO(AttachedTransformation()->Put2(NULL, 0, GetAutoSignalPropagation(), true));
00027                 m_messageEndSent = true;
00028                 messageCount = 1;
00029         }
00030         return 0;
00031 }
00032 
00033 bool NonblockingSink::IsolatedFlush(bool hardFlush, bool blocking)
00034 {
00035         TimedFlush(blocking ? INFINITE_TIME : 0);
00036         return hardFlush && !!GetCurrentBufferSize();
00037 }
00038 
00039 // *************************************************************
00040 
00041 #ifdef HIGHRES_TIMER_AVAILABLE
00042 
00043 NetworkSource::NetworkSource(BufferedTransformation *attachment)
00044         : NonblockingSource(attachment), m_buf(1024*16)
00045         , m_waitingForResult(false), m_outputBlocked(false)
00046         , m_dataBegin(0), m_dataEnd(0)
00047 {
00048 }
00049 
00050 void NetworkSource::GetWaitObjects(WaitObjectContainer &container)
00051 {
00052         if (!m_outputBlocked)
00053         {
00054                 if (m_dataBegin == m_dataEnd)
00055                         AccessReceiver().GetWaitObjects(container); 
00056                 else
00057                         container.SetNoWait();
00058         }
00059         AttachedTransformation()->GetWaitObjects(container);
00060 }
00061 
00062 size_t NetworkSource::GeneralPump2(lword &byteCount, bool blockingOutput, unsigned long maxTime, bool checkDelimiter, byte delimiter)
00063 {
00064         NetworkReceiver &receiver = AccessReceiver();
00065 
00066         lword maxSize = byteCount;
00067         byteCount = 0;
00068         bool forever = maxTime == INFINITE_TIME;
00069         Timer timer(Timer::MILLISECONDS, forever);
00070         BufferedTransformation *t = AttachedTransformation();
00071 
00072         if (m_outputBlocked)
00073                 goto DoOutput;
00074 
00075         while (true)
00076         {
00077                 if (m_dataBegin == m_dataEnd)
00078                 {
00079                         if (receiver.EofReceived())
00080                                 break;
00081 
00082                         if (m_waitingForResult)
00083                         {
00084                                 if (receiver.MustWaitForResult() && !receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00085                                         break;
00086 
00087                                 unsigned int recvResult = receiver.GetReceiveResult();
00088 #if CRYPTOPP_TRACE_NETWORK
00089                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00090 #endif
00091                                 m_dataEnd += recvResult;
00092                                 m_waitingForResult = false;
00093 
00094                                 if (!receiver.MustWaitToReceive() && !receiver.EofReceived() && m_dataEnd != m_buf.size())
00095                                         goto ReceiveNoWait;
00096                         }
00097                         else
00098                         {
00099                                 m_dataEnd = m_dataBegin = 0;
00100 
00101                                 if (receiver.MustWaitToReceive())
00102                                 {
00103                                         if (!receiver.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00104                                                 break;
00105 
00106                                         receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd);
00107                                         m_waitingForResult = true;
00108                                 }
00109                                 else
00110                                 {
00111 ReceiveNoWait:
00112                                         m_waitingForResult = true;
00113                                         // call Receive repeatedly as long as data is immediately available,
00114                                         // because some receivers tend to return data in small pieces
00115 #if CRYPTOPP_TRACE_NETWORK
00116                                         OutputDebugString((IntToString((unsigned int)this) + ": Receiving " + IntToString(m_buf.size()-m_dataEnd) + " bytes\n").c_str());
00117 #endif
00118                                         while (receiver.Receive(m_buf+m_dataEnd, m_buf.size()-m_dataEnd))
00119                                         {
00120                                                 unsigned int recvResult = receiver.GetReceiveResult();
00121 #if CRYPTOPP_TRACE_NETWORK
00122                                                 OutputDebugString((IntToString((unsigned int)this) + ": Received " + IntToString(recvResult) + " bytes\n").c_str());
00123 #endif
00124                                                 m_dataEnd += recvResult;
00125                                                 if (receiver.EofReceived() || m_dataEnd > m_buf.size() /2)
00126                                                 {
00127                                                         m_waitingForResult = false;
00128                                                         break;
00129                                                 }
00130                                         }
00131                                 }
00132                         }
00133                 }
00134                 else
00135                 {
00136                         m_putSize = (size_t)STDMIN((lword)m_dataEnd-m_dataBegin, maxSize-byteCount);
00137                         if (checkDelimiter)
00138                                 m_putSize = std::find(m_buf+m_dataBegin, m_buf+m_dataBegin+m_putSize, delimiter) - (m_buf+m_dataBegin);
00139 
00140 DoOutput:
00141                         size_t result = t->PutModifiable2(m_buf+m_dataBegin, m_putSize, 0, forever || blockingOutput);
00142                         if (result)
00143                         {
00144                                 if (t->Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00145                                         goto DoOutput;
00146                                 else
00147                                 {
00148                                         m_outputBlocked = true;
00149                                         return result;
00150                                 }
00151                         }
00152                         m_outputBlocked = false;
00153 
00154                         byteCount += m_putSize;
00155                         m_dataBegin += m_putSize;
00156                         if (checkDelimiter && m_dataBegin < m_dataEnd && m_buf[m_dataBegin] == delimiter)
00157                                 break;
00158                         if (byteCount == maxSize)
00159                                 break;
00160                         // once time limit is reached, return even if there is more data waiting
00161                         // but make 0 a special case so caller can request a large amount of data to be
00162                         // pumped as long as it is immediately available
00163                         if (maxTime > 0 && timer.ElapsedTime() > maxTime)
00164                                 break;
00165                 }
00166         }
00167 
00168         return 0;
00169 }
00170 
00171 // *************************************************************
00172 
00173 NetworkSink::NetworkSink(unsigned int maxBufferSize, unsigned int autoFlushBound)
00174         : m_maxBufferSize(maxBufferSize), m_autoFlushBound(autoFlushBound)
00175         , m_needSendResult(false), m_wasBlocked(false)
00176         , m_buffer(STDMIN(16U*1024U+256, maxBufferSize)), m_skipBytes(0) 
00177         , m_speedTimer(Timer::MILLISECONDS), m_byteCountSinceLastTimerReset(0)
00178         , m_currentSpeed(0), m_maxObservedSpeed(0)
00179 {
00180 }
00181 
00182 float NetworkSink::ComputeCurrentSpeed()
00183 {
00184         if (m_speedTimer.ElapsedTime() > 1000)
00185         {
00186                 m_currentSpeed = m_byteCountSinceLastTimerReset * 1000 / m_speedTimer.ElapsedTime();
00187                 m_maxObservedSpeed = STDMAX(m_currentSpeed, m_maxObservedSpeed * 0.98f);
00188                 m_byteCountSinceLastTimerReset = 0;
00189                 m_speedTimer.StartTimer();
00190 //              OutputDebugString(("max speed: " + IntToString((int)m_maxObservedSpeed) + " current speed: " + IntToString((int)m_currentSpeed) + "\n").c_str());
00191         }
00192         return m_currentSpeed;
00193 }
00194 
00195 size_t NetworkSink::Put2(const byte *inString, size_t length, int messageEnd, bool blocking)
00196 {
00197         if (m_skipBytes)
00198         {
00199                 assert(length >= m_skipBytes);
00200                 inString += m_skipBytes;
00201                 length -= m_skipBytes;
00202         }
00203         m_buffer.LazyPut(inString, length);
00204 
00205         if (!blocking || m_buffer.CurrentSize() > m_autoFlushBound)
00206                 TimedFlush(0, 0);
00207 
00208         size_t targetSize = messageEnd ? 0 : m_maxBufferSize;
00209         if (blocking)
00210                 TimedFlush(INFINITE_TIME, targetSize);
00211 
00212         if (m_buffer.CurrentSize() > targetSize)
00213         {
00214                 assert(!blocking);
00215                 size_t blockedBytes = (size_t)STDMIN(m_buffer.CurrentSize() - targetSize, (lword)length);
00216                 m_buffer.UndoLazyPut(blockedBytes);
00217                 m_buffer.FinalizeLazyPut();
00218                 m_wasBlocked = true;
00219                 m_skipBytes += length - blockedBytes;
00220                 return UnsignedMin(1, blockedBytes);
00221         }
00222 
00223         m_buffer.FinalizeLazyPut();
00224         m_wasBlocked = false;
00225         m_skipBytes = 0;
00226 
00227         if (messageEnd)
00228                 AccessSender().SendEof();
00229         return 0;
00230 }
00231 
00232 lword NetworkSink::TimedFlush(unsigned long maxTime, size_t targetSize)
00233 {
00234         NetworkSender &sender = AccessSender();
00235 
00236         bool forever = maxTime == INFINITE_TIME;
00237         Timer timer(Timer::MILLISECONDS, forever);
00238         unsigned int totalFlushSize = 0;
00239 
00240         while (true)
00241         {
00242                 if (m_buffer.CurrentSize() <= targetSize)
00243                         break;
00244                 
00245                 if (m_needSendResult)
00246                 {
00247                         if (sender.MustWaitForResult() && !sender.Wait(SaturatingSubtract(maxTime, timer.ElapsedTime())))
00248                                 break;
00249 
00250                         unsigned int sendResult = sender.GetSendResult();
00251 #if CRYPTOPP_TRACE_NETWORK
00252                         OutputDebugString((IntToString((unsigned int)this) + ": Sent " + IntToString(sendResult) + " bytes\n").c_str());
00253 #endif
00254                         m_buffer.Skip(sendResult);
00255                         totalFlushSize += sendResult;
00256                         m_needSendResult = false;
00257 
00258                         if (!m_buffer.AnyRetrievable())
00259                                 break;
00260                 }
00261 
00262                 unsigned long timeOut = maxTime ? SaturatingSubtract(maxTime, timer.ElapsedTime()) : 0;
00263                 if (sender.MustWaitToSend() && !sender.Wait(timeOut))
00264                         break;
00265 
00266                 size_t contiguousSize = 0;
00267                 const byte *block = m_buffer.Spy(contiguousSize);
00268 
00269 #if CRYPTOPP_TRACE_NETWORK
00270                 OutputDebugString((IntToString((unsigned int)this) + ": Sending " + IntToString(contiguousSize) + " bytes\n").c_str());
00271 #endif
00272                 sender.Send(block, contiguousSize);
00273                 m_needSendResult = true;
00274 
00275                 if (maxTime > 0 && timeOut == 0)
00276                         break;  // once time limit is reached, return even if there is more data waiting
00277         }
00278 
00279         m_byteCountSinceLastTimerReset += totalFlushSize;
00280         ComputeCurrentSpeed();
00281         
00282         return totalFlushSize;
00283 }
00284 
00285 #endif  // #ifdef HIGHRES_TIMER_AVAILABLE
00286 
00287 NAMESPACE_END

Generated on Tue Aug 16 08:38:42 2005 for Crypto++ by  doxygen 1.3.9.1