00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00039 #include "blocxx/BLOCXX_config.h"
00040
00041 #if !defined(BLOCXX_WIN32)
00042
00043 #include "blocxx/PosixUnnamedPipe.hpp"
00044 #include "blocxx/AutoPtr.hpp"
00045 #include "blocxx/IOException.hpp"
00046 #include "blocxx/Format.hpp"
00047 #include "blocxx/SocketUtils.hpp"
00048 #include "blocxx/Assertion.hpp"
00049 #include "blocxx/DescriptorUtils.hpp"
00050 #include "blocxx/SignalScope.hpp"
00051 #include "blocxx/Logger.hpp"
00052 #include "blocxx/GlobalString.hpp"
00053
00054
00055 #include "blocxx/Thread.hpp"
00056 #ifdef BLOCXX_HAVE_UNISTD_H
00057 #include <unistd.h>
00058 #endif
00059 #include <sys/socket.h>
00060 #include <sys/types.h>
00061
00062 #include <fcntl.h>
00063 #include <errno.h>
00064 #include <cstring>
00065
00066 #if defined(BLOCXX_DARWIN)
00067
00068 #include "blocxx/ThreadOnce.hpp"
00069 #include "blocxx/PosixRegEx.hpp"
00070 #include <sys/utsname.h>
00071 #endif
00072
00073
00074 namespace BLOCXX_NAMESPACE
00075 {
00076
00077 namespace
00078 {
00079 int upclose(int fd)
00080 {
00081 int rc;
00082 do
00083 {
00084 rc = ::close(fd);
00085 } while (rc < 0 && errno == EINTR);
00086 if (rc == -1)
00087 {
00088 int lerrno = errno;
00089 Logger lgr("blocxx");
00090 BLOCXX_LOG_ERROR(lgr, Format("Closing pipe handle %1 failed: %2", fd, lerrno));
00091 }
00092 return rc;
00093 }
00094
00095 ::ssize_t upread(int fd, void * buf, std::size_t count)
00096 {
00097 ::ssize_t rv;
00098 do
00099 {
00100 Thread::testCancel();
00101 rv = ::read(fd, buf, count);
00102 } while (rv < 0 && errno == EINTR);
00103 return rv;
00104 }
00105
00106 ::ssize_t upwrite(int fd, void const * buf, std::size_t count)
00107 {
00108 ::ssize_t rv;
00109
00110 SignalScope ss(SIGPIPE, SIG_IGN);
00111 do
00112 {
00113 Thread::testCancel();
00114 rv = ::write(fd, buf, count);
00115 } while (rv < 0 && errno == EINTR);
00116 return rv;
00117 }
00118
00119 int upaccept(int s, struct sockaddr * addr, socklen_t * addrlen)
00120 {
00121 int rv;
00122 do
00123 {
00124 rv = ::accept(s, addr, addrlen);
00125 } while (rv < 0 && errno == EINTR);
00126 return rv;
00127 }
00128 enum EDirection
00129 {
00130 E_WRITE_PIPE, E_READ_PIPE
00131 };
00132
00133
00134
00135 void setKernelBufferSize(Descriptor sockfd, int bufsz, EDirection edir)
00136 {
00137 if (sockfd == BLOCXX_INVALID_HANDLE)
00138 {
00139 return;
00140 }
00141
00142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
00143
00144 int getbufsz;
00145 socklen_t getbufsz_len = sizeof(getbufsz);
00146
00147 #ifdef BLOCXX_NCR
00148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (char*)&getbufsz, &getbufsz_len);
00149 #else
00150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
00151 #endif
00152 if (errc == 0 && getbufsz < bufsz)
00153 {
00154 #ifdef BLOCXX_NCR
00155 ::setsockopt(sockfd, SOL_SOCKET, optname, (char*)&bufsz, sizeof(bufsz));
00156 #else
00157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz, sizeof(bufsz));
00158 #endif
00159 }
00160 }
00161
00162 void setDefaultKernelBufsz(Descriptor sockfd_read, Descriptor sockfd_write)
00163 {
00164 int const BUFSZ = 64 * 1024;
00165 setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
00166 setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
00167 }
00168
00169 GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.PosixUnnamedPipe");
00170
00171 #if defined(BLOCXX_DARWIN)
00172
00173
00174 bool needDescriptorPassingWorkaround = true;
00175
00176
00177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
00178
00179
00180 void detectDescriptorPassingBug()
00181 {
00182
00183 needDescriptorPassingWorkaround = true;
00184 return;
00185 #if 0
00186
00187 struct utsname unamerv;
00188 if (::uname(&unamerv) == -1)
00189 {
00190 needDescriptorPassingWorkaround = true;
00191 return;
00192 }
00193 String release(unamerv.release);
00194 PosixRegEx re("([^.]*)\\..*");
00195 StringArray releaseCapture = re.capture(release);
00196 if (releaseCapture.size() < 2)
00197 {
00198 needDescriptorPassingWorkaround = true;
00199 return;
00200 }
00201 String majorRelease = releaseCapture[1];
00202 try
00203 {
00204 needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
00205 }
00206 catch (StringConversionException& e)
00207 {
00208 needDescriptorPassingWorkaround = true;
00209 return;
00210 }
00211 #endif
00212 }
00213 #endif
00214
00215 }
00216
00217 #ifdef BLOCXX_NETWARE
00218 namespace
00219 {
00220 class AcceptThread
00221 {
00222 public:
00223 AcceptThread(int serversock)
00224 : m_serversock(serversock)
00225 , m_serverconn(-1)
00226 {
00227 }
00228
00229 void acceptConnection();
00230 int getConnectFD() { return m_serverconn; }
00231 private:
00232 int m_serversock;
00233 int m_serverconn;
00234 };
00235
00236 void
00237 AcceptThread::acceptConnection()
00238 {
00239 struct sockaddr_in sin;
00240 size_t val;
00241 int tmp = 1;
00242
00243 tmp = 1;
00244 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
00245 (char*) &tmp, sizeof(int));
00246
00247 val = sizeof(struct sockaddr_in);
00248 if ((m_serverconn = upaccept(m_serversock, (struct sockaddr*)&sin, &val))
00249 == -1)
00250 {
00251 return;
00252 }
00253 tmp = 1;
00254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
00255 (char *) &tmp, sizeof(int));
00256 tmp = 0;
00257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
00258 (char*) &tmp, sizeof(int));
00259 }
00260
00261 void*
00262 runConnClass(void* arg)
00263 {
00264 AcceptThread* acceptThread = (AcceptThread*)(arg);
00265 acceptThread->acceptConnection();
00266 ::pthread_exit(NULL);
00267 return 0;
00268 }
00269
00270 int
00271 _pipe(int *fds)
00272 {
00273 int svrfd, lerrno, connectfd;
00274 size_t val;
00275 struct sockaddr_in sin;
00276
00277 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
00278 sin.sin_family = AF_INET;
00279 sin.sin_addr.s_addr = htonl( 0x7f000001 );
00280 sin.sin_port = 0;
00281 memset(sin.sin_zero, 0, 8 );
00282 if (bind(svrfd, (struct sockaddr * )&sin, sizeof( struct sockaddr_in ) ) == -1)
00283 {
00284 int lerrno = errno;
00285 upclose(svrfd);
00286 fprintf(stderr, "CreateSocket(): Failed to bind on socket" );
00287 return -1;
00288 }
00289 if (listen(svrfd, 1) == -1)
00290 {
00291 int lerrno = errno;
00292 upclose(svrfd);
00293 return -1;
00294 }
00295 val = sizeof(struct sockaddr_in);
00296 if (getsockname(svrfd, ( struct sockaddr * )&sin, &val ) == -1)
00297 {
00298 int lerrno = errno;
00299 fprintf(stderr, "CreateSocket(): Failed to obtain socket name" );
00300 upclose(svrfd);
00301 return -1;
00302 }
00303
00304 AcceptThread* pat = new AcceptThread(svrfd);
00305 pthread_t athread;
00306
00307
00308 pthread_create(&athread, NULL, runConnClass, pat);
00309
00310 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
00311 if (clientfd == -1)
00312 {
00313 delete pat;
00314 return -1;
00315 }
00316
00317
00318 struct sockaddr_in csin;
00319 csin.sin_family = AF_INET;
00320 csin.sin_addr.s_addr = htonl(0x7f000001);
00321 csin.sin_port = sin.sin_port;
00322 if (::connect(clientfd, (struct sockaddr*)&csin, sizeof(csin)) == -1)
00323 {
00324 delete pat;
00325 return -1;
00326 }
00327
00328 #define TCP_NODELAY 1
00329 int tmp = 1;
00330
00331
00332
00333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (char*)&tmp, sizeof(int));
00334 tmp = 0;
00335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (char*)&tmp, sizeof(int));
00336
00337 void* threadResult;
00338
00339 ::pthread_join(athread, &threadResult);
00340
00341 upclose(svrfd);
00342 fds[0] = pat->getConnectFD();
00343 fds[1] = clientfd;
00344 delete pat;
00345 return 0;
00346 }
00347 }
00348 #endif // BLOCXX_NETWARE
00349
00351 PosixUnnamedPipe::PosixUnnamedPipe(EOpen doOpen)
00352 {
00353 m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE;
00354 if (doOpen)
00355 {
00356 open();
00357 }
00358 setTimeouts(Timeout::relative(60 * 10));
00359 setBlocking(E_BLOCKING);
00360 }
00361
00363 PosixUnnamedPipe::PosixUnnamedPipe(AutoDescriptor inputfd, AutoDescriptor outputfd)
00364 {
00365 m_fds[0] = inputfd.get();
00366 m_fds[1] = outputfd.get();
00367 setTimeouts(Timeout::relative(60 * 10));
00368 setBlocking(E_BLOCKING);
00369 setDefaultKernelBufsz(m_fds[0], m_fds[1]);
00370 inputfd.release();
00371 outputfd.release();
00372 }
00373
00375 PosixUnnamedPipe::~PosixUnnamedPipe()
00376 {
00377 close();
00378 }
00380 namespace
00381 {
00382 typedef UnnamedPipe::EBlockingMode EBlockingMode;
00383
00384 void set_desc_blocking(
00385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
00386 {
00387 BLOCXX_ASSERT(d != BLOCXX_INVALID_HANDLE);
00388 int fdflags = fcntl(d, F_GETFL, 0);
00389 if (fdflags == -1)
00390 {
00391 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
00392 }
00393 if (blocking_mode == UnnamedPipe::E_BLOCKING)
00394 {
00395 fdflags &= ~O_NONBLOCK;
00396 }
00397 else
00398 {
00399 fdflags |= O_NONBLOCK;
00400 }
00401 if (fcntl(d, F_SETFL, fdflags) == -1)
00402 {
00403 BLOCXX_THROW_ERRNO_MSG(IOException, "Failed to set pipe blocking mode");
00404 }
00405 bmflag = blocking_mode;
00406 }
00407 }
00409 void
00410 PosixUnnamedPipe::setBlocking(EBlockingMode blocking_mode)
00411 {
00412 BLOCXX_ASSERT(m_fds[0] != BLOCXX_INVALID_HANDLE || m_fds[1] != BLOCXX_INVALID_HANDLE);
00413
00414 for (size_t i = 0; i < 2; ++i)
00415 {
00416 if (m_fds[i] != -1)
00417 {
00418 set_desc_blocking(m_fds[i], m_blocking[i], blocking_mode);
00419 }
00420 }
00421 }
00423 void
00424 PosixUnnamedPipe::setWriteBlocking(EBlockingMode blocking_mode)
00425 {
00426 set_desc_blocking(m_fds[1], m_blocking[1], blocking_mode);
00427 }
00429 void
00430 PosixUnnamedPipe::setReadBlocking(EBlockingMode blocking_mode)
00431 {
00432 set_desc_blocking(m_fds[0], m_blocking[0], blocking_mode);
00433 }
00435 void
00436 PosixUnnamedPipe::open()
00437 {
00438 if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00439 {
00440 close();
00441 }
00442 #if defined(BLOCXX_NETWARE)
00443 if (_pipe(m_fds) == BLOCXX_INVALID_HANDLE)
00444 {
00445 m_fds[0] = m_fds[1] = BLOCXX_INVALID_HANDLE;
00446 BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
00447 }
00448
00449 #else
00450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0, m_fds) == -1)
00451 {
00452 m_fds[0] = m_fds[1] = -1;
00453 BLOCXX_THROW_ERRNO_MSG(UnnamedPipeException, "PosixUnamedPipe::open(): soketpair()");
00454 }
00455 ::shutdown(m_fds[0], SHUT_WR);
00456 ::shutdown(m_fds[1], SHUT_RD);
00457 setDefaultKernelBufsz(m_fds[0], m_fds[1]);
00458 #endif
00459 }
00461 int
00462 PosixUnnamedPipe::close()
00463 {
00464 int rc = -1;
00465
00466
00467 if (m_fds[0] == m_fds[1])
00468 {
00469 m_fds[1] = BLOCXX_INVALID_HANDLE;
00470 }
00471
00472 if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00473 {
00474
00475 rc = upclose(m_fds[0]);
00476 m_fds[0] = BLOCXX_INVALID_HANDLE;
00477 }
00478
00479 if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00480 {
00481 rc = upclose(m_fds[1]);
00482 m_fds[1] = BLOCXX_INVALID_HANDLE;
00483 }
00484
00485 return rc;
00486 }
00488 bool
00489 PosixUnnamedPipe::isOpen() const
00490 {
00491 return (m_fds[0] != BLOCXX_INVALID_HANDLE) || (m_fds[1] != BLOCXX_INVALID_HANDLE);
00492 }
00493
00495 int
00496 PosixUnnamedPipe::closeInputHandle()
00497 {
00498 int rc = -1;
00499 if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00500 {
00501 if (m_fds[0] != m_fds[1])
00502 {
00503 rc = upclose(m_fds[0]);
00504 }
00505 m_fds[0] = BLOCXX_INVALID_HANDLE;
00506 }
00507 return rc;
00508 }
00510 int
00511 PosixUnnamedPipe::closeOutputHandle()
00512 {
00513 int rc = -1;
00514 if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00515 {
00516 if (m_fds[0] != m_fds[1])
00517 {
00518 rc = upclose(m_fds[1]);
00519 }
00520 m_fds[1] = BLOCXX_INVALID_HANDLE;
00521 }
00522 return rc;
00523 }
00525 int
00526 PosixUnnamedPipe::write(const void* data, int dataLen, ErrorAction errorAsException)
00527 {
00528 int rc = -1;
00529 if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00530 {
00531 if (m_blocking[1] == E_BLOCKING)
00532 {
00533 rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT);
00534 if (rc != 0)
00535 {
00536 if (rc == ETIMEDOUT)
00537 {
00538 errno = ETIMEDOUT;
00539 }
00540 if (errorAsException == E_THROW_ON_ERROR)
00541 {
00542 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00543 }
00544 else
00545 {
00546 return -1;
00547 }
00548 }
00549 }
00550 rc = upwrite(m_fds[1], data, dataLen);
00551 }
00552 if (errorAsException == E_THROW_ON_ERROR && rc == -1)
00553 {
00554 if (m_fds[1] == BLOCXX_INVALID_HANDLE)
00555 {
00556 BLOCXX_THROW(IOException, "pipe write failed because pipe is closed");
00557 }
00558 else
00559 {
00560 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe write failed");
00561 }
00562 }
00563 return rc;
00564 }
00566 int
00567 PosixUnnamedPipe::read(void* buffer, int bufferLen, ErrorAction errorAsException)
00568 {
00569 int rc = -1;
00570 if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00571 {
00572 if (m_blocking[0] == E_BLOCKING)
00573 {
00574 rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT);
00575 if (rc != 0)
00576 {
00577 if (rc == ETIMEDOUT)
00578 {
00579 errno = ETIMEDOUT;
00580 }
00581 if (errorAsException == E_THROW_ON_ERROR)
00582 {
00583 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00584 }
00585 else
00586 {
00587 return -1;
00588 }
00589 }
00590 }
00591 rc = upread(m_fds[0], buffer, bufferLen);
00592 }
00593
00594 if (rc == 0)
00595 {
00596 closeInputHandle();
00597 }
00598
00599 if (errorAsException == E_THROW_ON_ERROR && rc == -1)
00600 {
00601 if (m_fds[0] == BLOCXX_INVALID_HANDLE)
00602 {
00603 BLOCXX_THROW(IOException, "pipe read failed because pipe is closed");
00604 }
00605 else
00606 {
00607 BLOCXX_THROW_ERRNO_MSG(IOException, "pipe read failed");
00608 }
00609 }
00610 return rc;
00611 }
00613 Select_t
00614 PosixUnnamedPipe::getReadSelectObj() const
00615 {
00616 return m_fds[0];
00617 }
00618
00620 Select_t
00621 PosixUnnamedPipe::getWriteSelectObj() const
00622 {
00623 return m_fds[1];
00624 }
00625
00627 void
00628 PosixUnnamedPipe::passDescriptor(Descriptor descriptor, const UnnamedPipeRef& ackPipe, const ProcessRef& targetProcess)
00629 {
00630 int rc = -1;
00631 if (m_fds[1] != BLOCXX_INVALID_HANDLE)
00632 {
00633 if (m_blocking[1] == E_BLOCKING)
00634 {
00635 rc = SocketUtils::waitForIO(m_fds[1], getWriteTimeout(), SocketFlags::E_WAIT_FOR_OUTPUT);
00636
00637 if (rc != 0)
00638 {
00639 if (rc == ETIMEDOUT)
00640 {
00641 errno = ETIMEDOUT;
00642 }
00643 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00644 }
00645 }
00646
00647 rc = blocxx::passDescriptor(m_fds[1], descriptor);
00648 if (rc == -1)
00649 {
00650 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: passDescriptor()");
00651 }
00652
00653 #if defined(BLOCXX_DARWIN)
00654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
00655 if (rc != -1 && needDescriptorPassingWorkaround)
00656 {
00657
00658 rc = SocketUtils::waitForIO(ackPipe->getInputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_INPUT);
00659 if (rc != -1)
00660 {
00661 char ack = 'Z';
00662 rc = ackPipe->read(&ack, sizeof(ack), E_RETURN_ON_ERROR);
00663 if (rc == -1)
00664 {
00665 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: ackPipe->read()");
00666 }
00667 if (ack != 'A')
00668 {
00669 BLOCXX_THROW(IOException, Format("sendDescriptor() failed: ackPipe->read() didn't get 'A', got %1", static_cast<int>(ack)).c_str());
00670 }
00671 }
00672 else
00673 {
00674 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed: waitForIO()");
00675 }
00676 }
00677 #endif
00678 }
00679 if (rc == -1)
00680 {
00681 if (m_fds[1] == BLOCXX_INVALID_HANDLE)
00682 {
00683 BLOCXX_THROW(IOException, "sendDescriptor() failed because pipe is closed");
00684 }
00685 else
00686 {
00687 BLOCXX_THROW_ERRNO_MSG(IOException, "sendDescriptor() failed");
00688 }
00689 }
00690 }
00691
00693 AutoDescriptor
00694 PosixUnnamedPipe::receiveDescriptor(const UnnamedPipeRef& ackPipe)
00695 {
00696 int rc = -1;
00697 AutoDescriptor descriptor;
00698 if (m_fds[0] != BLOCXX_INVALID_HANDLE)
00699 {
00700 if (m_blocking[0] == E_BLOCKING)
00701 {
00702 rc = SocketUtils::waitForIO(m_fds[0], getReadTimeout(), SocketFlags::E_WAIT_FOR_INPUT);
00703
00704 if (rc != 0)
00705 {
00706 if (rc == ETIMEDOUT)
00707 {
00708 errno = ETIMEDOUT;
00709 }
00710 BLOCXX_THROW_ERRNO_MSG(IOException, "SocketUtils::waitForIO failed.");
00711 }
00712 }
00713 descriptor = blocxx::receiveDescriptor(m_fds[0]);
00714
00715 #if defined(BLOCXX_DARWIN)
00716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
00717 if (needDescriptorPassingWorkaround)
00718 {
00719
00720 rc = SocketUtils::waitForIO(ackPipe->getOutputDescriptor(), Timeout::infinite, SocketFlags::E_WAIT_FOR_OUTPUT);
00721 if (rc != -1)
00722 {
00723 char ack = 'A';
00724 ackPipe->write(&ack, sizeof(ack), E_THROW_ON_ERROR);
00725 }
00726 }
00727 #endif
00728 }
00729 else
00730 {
00731 BLOCXX_THROW(IOException, "receiveDescriptor() failed because pipe is closed");
00732 }
00733 return descriptor;
00734 }
00735
00737 Descriptor
00738 PosixUnnamedPipe::getInputDescriptor() const
00739 {
00740 return m_fds[0];
00741 }
00742
00744 Descriptor
00745 PosixUnnamedPipe::getOutputDescriptor() const
00746 {
00747 return m_fds[1];
00748 }
00749
00751 EBlockingMode
00752 PosixUnnamedPipe::getReadBlocking() const
00753 {
00754 return m_blocking[0];
00755 }
00756
00758 EBlockingMode
00759 PosixUnnamedPipe::getWriteBlocking() const
00760 {
00761 return m_blocking[1];
00762 }
00763
00764 }
00765
00766 #endif