00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00038 #include "blocxx/BLOCXX_config.h"
00039
00040 #if !defined(BLOCXX_WIN32)
00041
00042 #include "blocxx/SocketBaseImpl.hpp"
00043 #include "blocxx/SocketUtils.hpp"
00044 #include "blocxx/Format.hpp"
00045 #include "blocxx/Assertion.hpp"
00046 #include "blocxx/IOException.hpp"
00047 #include "blocxx/Mutex.hpp"
00048 #include "blocxx/MutexLock.hpp"
00049 #include "blocxx/GlobalMutex.hpp"
00050 #include "blocxx/PosixUnnamedPipe.hpp"
00051 #include "blocxx/Socket.hpp"
00052 #include "blocxx/Thread.hpp"
00053 #include "blocxx/DateTime.hpp"
00054 #include "blocxx/TimeoutTimer.hpp"
00055 #include "blocxx/AutoDescriptor.hpp"
00056 #include "blocxx/Logger.hpp"
00057 #include "blocxx/Select.hpp"
00058
00059
00060 extern "C"
00061 {
00062 #include <sys/types.h>
00063 #include <sys/time.h>
00064 #include <sys/socket.h>
00065 #include <sys/stat.h>
00066 #include <netdb.h>
00067 #include <arpa/inet.h>
00068 #include <unistd.h>
00069 #include <fcntl.h>
00070 #include <netinet/in.h>
00071 }
00072
00073 #include <fstream>
00074 #include <cerrno>
00075 #include <cstdio>
00076
00077 namespace BLOCXX_NAMESPACE
00078 {
00079
00080 using std::istream;
00081 using std::ostream;
00082 using std::iostream;
00083 using std::ifstream;
00084 using std::ofstream;
00085 using std::fstream;
00086 using std::ios;
00087
00088 namespace
00089 {
00090 static GlobalMutex g_guard = BLOCXX_GLOBAL_MUTEX_INIT();
00091 }
00092
00093 String SocketBaseImpl::m_traceFileOut;
00094 String SocketBaseImpl::m_traceFileIn;
00095
00097 SocketBaseImpl::SocketBaseImpl()
00098 : SelectableIFC()
00099 , IOIFC()
00100 , m_isConnected(false)
00101 , m_sockfd(-1)
00102 , m_localAddress()
00103 , m_peerAddress()
00104 , m_recvTimeoutExprd(false)
00105 , m_streamBuf(this)
00106 , m_in(&m_streamBuf)
00107 , m_out(&m_streamBuf)
00108 , m_inout(&m_streamBuf)
00109 , m_recvTimeout(Timeout::infinite)
00110 , m_sendTimeout(Timeout::infinite)
00111 , m_connectTimeout(Timeout::infinite)
00112 {
00113 m_out.exceptions(std::ios::badbit);
00114 m_inout.exceptions(std::ios::badbit);
00115 }
00117 SocketBaseImpl::SocketBaseImpl(SocketHandle_t fd,
00118 SocketAddress::AddressType addrType)
00119 : SelectableIFC()
00120 , IOIFC()
00121 , m_isConnected(true)
00122 , m_sockfd(fd)
00123 , m_localAddress(SocketAddress::getAnyLocalHost())
00124 , m_peerAddress(SocketAddress::allocEmptyAddress(addrType))
00125 , m_recvTimeoutExprd(false)
00126 , m_streamBuf(this)
00127 , m_in(&m_streamBuf)
00128 , m_out(&m_streamBuf)
00129 , m_inout(&m_streamBuf)
00130 , m_recvTimeout(Timeout::infinite)
00131 , m_sendTimeout(Timeout::infinite)
00132 , m_connectTimeout(Timeout::infinite)
00133 {
00134 m_out.exceptions(std::ios::badbit);
00135 m_inout.exceptions(std::ios::badbit);
00136 if (addrType == SocketAddress::INET)
00137 {
00138 fillInetAddrParms();
00139 }
00140 else if (addrType == SocketAddress::UDS)
00141 {
00142 fillUnixAddrParms();
00143 }
00144 else
00145 {
00146 BLOCXX_ASSERT(0);
00147 }
00148 }
00150 SocketBaseImpl::SocketBaseImpl(const SocketAddress& addr)
00151 : SelectableIFC()
00152 , IOIFC()
00153 , m_isConnected(false)
00154 , m_sockfd(-1)
00155 , m_localAddress(SocketAddress::getAnyLocalHost())
00156 , m_peerAddress(addr)
00157 , m_recvTimeoutExprd(false)
00158 , m_streamBuf(this)
00159 , m_in(&m_streamBuf)
00160 , m_out(&m_streamBuf)
00161 , m_inout(&m_streamBuf)
00162 , m_recvTimeout(Timeout::infinite)
00163 , m_sendTimeout(Timeout::infinite)
00164 , m_connectTimeout(Timeout::infinite)
00165 {
00166 m_out.exceptions(std::ios::badbit);
00167 m_inout.exceptions(std::ios::badbit);
00168 connect(m_peerAddress);
00169 }
00171 SocketBaseImpl::~SocketBaseImpl()
00172 {
00173 try
00174 {
00175 disconnect();
00176 }
00177 catch (...)
00178 {
00179
00180 }
00181 }
00183 Select_t
00184 SocketBaseImpl::getSelectObj() const
00185 {
00186 return m_sockfd;
00187 }
00189 void
00190 SocketBaseImpl::connect(const SocketAddress& addr)
00191 {
00192 if (m_isConnected)
00193 {
00194 disconnect();
00195 }
00196 m_streamBuf.reset();
00197 m_in.clear();
00198 m_out.clear();
00199 m_inout.clear();
00200 BLOCXX_ASSERT(m_sockfd == -1);
00201 BLOCXX_ASSERT(addr.getType() == SocketAddress::INET || addr.getType() == SocketAddress::UDS);
00202
00203 int domain_type = PF_UNIX;
00204 if( addr.getType() == SocketAddress::INET )
00205 {
00206 domain_type = PF_INET;
00207 #ifdef BLOCXX_HAVE_IPV6
00208
00209 if( reinterpret_cast<const sockaddr*>(addr.getInetAddress())->sa_family == AF_INET6)
00210 {
00211 domain_type = PF_INET6;
00212 }
00213 #endif
00214 }
00215
00216 AutoDescriptor sockfd(::socket(domain_type, SOCK_STREAM, 0));
00217 if (sockfd.get() == -1)
00218 {
00219 BLOCXX_THROW_ERRNO_MSG(SocketException,
00220 "Failed to create a socket");
00221 }
00222
00223
00224 if (::fcntl(sockfd.get(), F_SETFD, FD_CLOEXEC) == -1)
00225 {
00226 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() failed to set close-on-exec flag on socket");
00227 }
00228 int n;
00229 int flags = ::fcntl(sockfd.get(), F_GETFL, 0);
00230 ::fcntl(sockfd.get(), F_SETFL, flags | O_NONBLOCK);
00231 #if defined(BLOCXX_NCR)
00232 if ((n = ::connect(sockfd.get(), const_cast<SocketAddress_t *>(addr.getNativeForm()), addr.getNativeFormSize())) < 0)
00233 #else
00234 if ((n = ::connect(sockfd.get(), addr.getNativeForm(), addr.getNativeFormSize())) < 0)
00235 #endif
00236 {
00237 if (errno != EINPROGRESS)
00238 {
00239 BLOCXX_THROW_ERRNO_MSG(SocketException,
00240 Format("Failed to connect to: %1", addr.toString()).c_str());
00241 }
00242 }
00243 if (n == -1)
00244 {
00245
00246
00247 PosixUnnamedPipeRef lUPipe;
00248 int pipefd = -1;
00249 if (Socket::getShutDownMechanism())
00250 {
00251 UnnamedPipeRef foo = Socket::getShutDownMechanism();
00252 lUPipe = foo.cast_to<PosixUnnamedPipe>();
00253 BLOCXX_ASSERT(lUPipe);
00254 pipefd = lUPipe->getInputHandle();
00255 }
00256 Select::SelectObjectArray selra;
00257 Select::SelectObject sockSo(sockfd.get());
00258 sockSo.waitForRead = true;
00259 sockSo.waitForWrite = true;
00260 selra.push_back(sockSo);
00261 if (pipefd != -1)
00262 {
00263 Select::SelectObject pipeSo(pipefd);
00264 pipeSo.waitForRead = true;
00265 selra.push_back(pipeSo);
00266 }
00267
00268 TimeoutTimer timer(m_connectTimeout);
00269 timer.start();
00270 do
00271 {
00272 Thread::testCancel();
00273 n = Select::selectRW(selra, timer.asRelativeTimeout(0.1));
00274 timer.loop();
00275 } while (n == Select::SELECT_TIMEOUT && !timer.expired());
00276
00277 if (timer.expired())
00278 {
00279 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect() select timedout");
00280 }
00281 else if (n == Select::SELECT_ERROR)
00282 {
00283 if (errno == EINTR)
00284 {
00285 Thread::testCancel();
00286 }
00287 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::connect() select failed");
00288 }
00289
00290 if (selra.size() == 2 && selra[1].readAvailable)
00291 {
00292 BLOCXX_THROW(SocketException, "Sockets have been shutdown");
00293 }
00294 else if (selra[0].readAvailable || selra[0].writeAvailable)
00295 {
00296 int error = 0;
00297 socklen_t len = sizeof(error);
00298 #if defined(BLOCXX_NCR)
00299 if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, (char*)&error, &len) < 0)
00300 #else
00301 if (::getsockopt(sockfd.get(), SOL_SOCKET, SO_ERROR, &error, &len) < 0)
00302 #endif
00303 {
00304 BLOCXX_THROW_ERRNO_MSG(SocketException,
00305 "SocketBaseImpl::connect() getsockopt() failed");
00306 }
00307 if (error != 0)
00308 {
00309 errno = error;
00310 BLOCXX_THROW_ERRNO_MSG(SocketException,
00311 "SocketBaseImpl::connect() failed");
00312 }
00313 }
00314 else
00315 {
00316 BLOCXX_THROW(SocketException, "SocketBaseImpl::connect(). Logic error, sockfd not in FD set.");
00317 }
00318 }
00319 ::fcntl(sockfd.get(), F_SETFL, flags);
00320 m_sockfd = sockfd.release();
00321 m_isConnected = true;
00322 m_peerAddress = addr;
00323 if (addr.getType() == SocketAddress::INET)
00324 {
00325 fillInetAddrParms();
00326 }
00327 else if (addr.getType() == SocketAddress::UDS)
00328 {
00329 fillUnixAddrParms();
00330 }
00331 else
00332 {
00333 BLOCXX_ASSERT(0);
00334 }
00335
00336 if (!m_traceFileOut.empty())
00337 {
00338 MutexLock ml(g_guard);
00339
00340 String combofilename = m_traceFileOut + "Combo";
00341 ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00342 if (!comboTraceFile)
00343 {
00344 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00345 }
00346 DateTime curDateTime;
00347 curDateTime.setToCurrent();
00348 comboTraceFile << Format("\n--->fd: %1 opened to \"%2\" at %3.%4 <---\n", getfd(),
00349 addr.toString(),
00350 curDateTime.toString("%X"), curDateTime.getMicrosecond());
00351 }
00352 }
00354 void
00355 SocketBaseImpl::disconnect()
00356 {
00357 if (m_in)
00358 {
00359 m_in.clear(ios::eofbit);
00360 }
00361 if (m_out)
00362 {
00363 m_out.clear(ios::eofbit);
00364 }
00365 if (m_inout)
00366 {
00367 m_inout.clear(ios::eofbit);
00368 }
00369 if (m_sockfd != -1 && m_isConnected)
00370 {
00371 if (::close(m_sockfd) == -1)
00372 {
00373 int lerrno = errno;
00374 Logger lgr("blocxx");
00375 BLOCXX_LOG_ERROR(lgr, Format("Closing socket handle %1 failed: %2", m_sockfd, lerrno));
00376 }
00377 m_isConnected = false;
00378
00379 if (!m_traceFileOut.empty())
00380 {
00381 MutexLock ml(g_guard);
00382
00383 String combofilename = m_traceFileOut + "Combo";
00384 ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00385 if (!comboTraceFile)
00386 {
00387 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00388 }
00389 DateTime curDateTime;
00390 curDateTime.setToCurrent();
00391 comboTraceFile << "\n--->fd: " << getfd() << " closed at " << curDateTime.toString("%X") <<
00392 '.' << curDateTime.getMicrosecond() << "<---\n";
00393 }
00394
00395 m_sockfd = -1;
00396 }
00397 }
00399
00400 void
00401 SocketBaseImpl::fillInetAddrParms()
00402 {
00403
00404 socklen_t len;
00405 struct sockaddr *p_addr;
00406 InetSocketAddress_t ss;
00407 memset(&ss, 0, sizeof(ss));
00408 len = sizeof(ss);
00409 p_addr = reinterpret_cast<struct sockaddr*>(&ss);
00410 if (getsockname(m_sockfd, p_addr, &len) != -1)
00411 {
00412 m_localAddress.assignFromNativeForm(&ss, len);
00413 }
00414 memset(&ss, 0, sizeof(ss));
00415 len = sizeof(ss);
00416 if (getpeername(m_sockfd, p_addr, &len) != -1)
00417 {
00418 m_peerAddress.assignFromNativeForm(&ss, len);
00419 }
00420 }
00422 void
00423 SocketBaseImpl::fillUnixAddrParms()
00424 {
00425 socklen_t len;
00426 UnixSocketAddress_t addr;
00427 memset(&addr, 0, sizeof(addr));
00428 len = sizeof(addr);
00429 if (getsockname(m_sockfd, reinterpret_cast<struct sockaddr*>(&addr), &len) == -1)
00430 {
00431 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::fillUnixAddrParms: getsockname");
00432 }
00433 m_localAddress.assignFromNativeForm(&addr, len);
00434 m_peerAddress.assignFromNativeForm(&addr, len);
00435 }
00437 int
00438 SocketBaseImpl::write(const void* dataOut, int dataOutLen, ErrorAction errorAsException)
00439 {
00440 int rc = 0;
00441 bool isError = false;
00442 if (m_isConnected)
00443 {
00444 isError = waitForOutput(m_sendTimeout);
00445 if (isError)
00446 {
00447 rc = -1;
00448 }
00449 else
00450 {
00451 rc = writeAux(dataOut, dataOutLen);
00452 if (!m_traceFileOut.empty() && rc > 0)
00453 {
00454 MutexLock ml(g_guard);
00455 ofstream traceFile(m_traceFileOut.c_str(), std::ios::app);
00456 if (!traceFile)
00457 {
00458 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", m_traceFileOut));
00459 }
00460 if (!traceFile.write(static_cast<const char*>(dataOut), rc))
00461 {
00462 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00463 }
00464
00465 String combofilename = m_traceFileOut + "Combo";
00466 ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00467 if (!comboTraceFile)
00468 {
00469 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00470 }
00471 DateTime curDateTime;
00472 curDateTime.setToCurrent();
00473 comboTraceFile << "\n--->fd: " << getfd() << " Out " << rc << " bytes at " << curDateTime.toString("%X") <<
00474 '.' << curDateTime.getMicrosecond() << "<---\n";
00475 if (!comboTraceFile.write(static_cast<const char*>(dataOut), rc))
00476 {
00477 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00478 }
00479 }
00480 }
00481 }
00482 else
00483 {
00484 rc = -1;
00485 }
00486 if (rc < 0 && errorAsException == E_THROW_ON_ERROR)
00487 {
00488 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::write");
00489 }
00490 return rc;
00491 }
00493 int
00494 SocketBaseImpl::read(void* dataIn, int dataInLen, ErrorAction errorAsException)
00495 {
00496 int rc = 0;
00497 bool isError = false;
00498 if (m_isConnected)
00499 {
00500 isError = waitForInput(m_recvTimeout);
00501 if (isError)
00502 {
00503 rc = -1;
00504 }
00505 else
00506 {
00507 rc = readAux(dataIn, dataInLen);
00508 if (!m_traceFileIn.empty() && rc > 0)
00509 {
00510 MutexLock ml(g_guard);
00511 ofstream traceFile(m_traceFileIn.c_str(), std::ios::app);
00512 if (!traceFile)
00513 {
00514 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening tracefile \"%1\"", m_traceFileIn));
00515 }
00516 if (!traceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00517 {
00518 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00519 }
00520
00521 String combofilename = m_traceFileOut + "Combo";
00522 ofstream comboTraceFile(combofilename.c_str(), std::ios::app);
00523 if (!comboTraceFile)
00524 {
00525 BLOCXX_THROW_ERRNO_MSG(IOException, Format("Failed opening socket dump file \"%1\"", combofilename));
00526 }
00527 DateTime curDateTime;
00528 curDateTime.setToCurrent();
00529 comboTraceFile << "\n--->fd: " << getfd() << " In " << rc << " bytes at " << curDateTime.toString("%X") <<
00530 '.' << curDateTime.getMicrosecond() << "<---\n";
00531 if (!comboTraceFile.write(reinterpret_cast<const char*>(dataIn), rc))
00532 {
00533 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed writing to socket dump");
00534 }
00535 }
00536 }
00537 }
00538 else
00539 {
00540 rc = -1;
00541 }
00542 if (rc < 0)
00543 {
00544 if (errorAsException == E_THROW_ON_ERROR)
00545 {
00546 BLOCXX_THROW_ERRNO_MSG(SocketException, "SocketBaseImpl::read");
00547 }
00548 }
00549 return rc;
00550 }
00552 bool
00553 SocketBaseImpl::waitForInput(const Timeout& timeout)
00554 {
00555 int rval = SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_INPUT);
00556 if (rval == ETIMEDOUT)
00557 {
00558 m_recvTimeoutExprd = true;
00559 }
00560 else
00561 {
00562 m_recvTimeoutExprd = false;
00563 }
00564 return (rval != 0);
00565 }
00567 bool
00568 SocketBaseImpl::waitForOutput(const Timeout& timeout)
00569 {
00570 return SocketUtils::waitForIO(m_sockfd, timeout, SocketFlags::E_WAIT_FOR_OUTPUT) != 0;
00571 }
00573 istream&
00574 SocketBaseImpl::getInputStream()
00575 {
00576 return m_in;
00577 }
00579 ostream&
00580 SocketBaseImpl::getOutputStream()
00581 {
00582 return m_out;
00583 }
00585 iostream&
00586 SocketBaseImpl::getIOStream()
00587 {
00588 return m_inout;
00589 }
00591
00592 void
00593 SocketBaseImpl::setDumpFiles(const String& in, const String& out)
00594 {
00595 m_traceFileOut = out;
00596 m_traceFileIn = in;
00597 }
00598
00599 }
00600
00601 #endif // #if !defined(BLOCXX_WIN32)
00602