00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00039 #include "blocxx/BLOCXX_config.h"
00040 #include "blocxx/Select.hpp"
00041 #include "blocxx/AutoPtr.hpp"
00042 #include "blocxx/Assertion.hpp"
00043 #include "blocxx/Thread.hpp"
00044 #include "blocxx/TimeoutTimer.hpp"
00045 #include "blocxx/AutoDescriptor.hpp"
00046
00047 #if defined(BLOCXX_WIN32)
00048 #include <cassert>
00049 #endif
00050
00051 extern "C"
00052 {
00053
00054 #ifndef BLOCXX_WIN32
00055 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00056 #include <sys/epoll.h>
00057 #endif
00058 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00059 #include <sys/poll.h>
00060 #endif
00061 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00062 #include <sys/select.h>
00063 #endif
00064 #endif
00065
00066 #ifdef BLOCXX_HAVE_SYS_TIME_H
00067 #include <sys/time.h>
00068 #endif
00069
00070 #include <sys/types.h>
00071
00072 #ifdef BLOCXX_HAVE_UNISTD_H
00073 #include <unistd.h>
00074 #endif
00075
00076 #include <errno.h>
00077 }
00078
00079 namespace BLOCXX_NAMESPACE
00080 {
00081
00082 namespace Select
00083 {
00084
00085 namespace
00086 {
00087 const float LOOP_TIMEOUT = 10.0;
00088 }
00090
00091 int
00092 selectRW(SelectObjectArray& selarray, UInt32 ms)
00093 {
00094 return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
00095 }
00096
00097 #if defined(BLOCXX_WIN32)
00098
00099 int
00100 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
00101 {
00102 int rc;
00103 size_t hcount = static_cast<DWORD>(selarray.size());
00104 AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
00105
00106 size_t handleidx = 0;
00107 for (size_t i = 0; i < selarray.size(); i++, handleidx++)
00108 {
00109 if(selarray[i].s.isSocket && selarray[i].s.networkevents)
00110 {
00111 ::WSAEventSelect(selarray[i].s.sockfd,
00112 selarray[i].s.event, selarray[i].s.networkevents);
00113 }
00114
00115 hdls[handleidx] = selarray[i].s.event;
00116 }
00117
00118 TimeoutTimer timer(timeout);
00119 timer.start();
00120 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
00121
00122 assert(cc != WAIT_ABANDONED);
00123
00124 switch (cc)
00125 {
00126 case WAIT_FAILED:
00127 rc = Select::SELECT_ERROR;
00128 break;
00129 case WAIT_TIMEOUT:
00130 rc = Select::SELECT_TIMEOUT;
00131 break;
00132 default:
00133 rc = cc - WAIT_OBJECT_0;
00134
00135
00136
00137 if(selarray[rc].s.isSocket)
00138 {
00139 if(selarray[rc].s.networkevents
00140 && selarray[rc].s.doreset == false)
00141 {
00142 ::WSAEventSelect(selarray[rc].s.sockfd,
00143 selarray[rc].s.event, selarray[rc].s.networkevents);
00144 }
00145 else
00146 {
00147
00148 ::WSAEventSelect(selarray[rc].s.sockfd,
00149 selarray[rc].s.event, 0);
00150 u_long ioctlarg = 0;
00151 ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
00152 }
00153 }
00154 break;
00155 }
00156
00157 if( rc < 0 )
00158 return rc;
00159
00160 int availableCount = 0;
00161 for (size_t i = 0; i < selarray.size(); i++)
00162 {
00163 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
00164 {
00165 if( selarray[i].waitForRead )
00166 selarray[i].readAvailable = true;
00167 if( selarray[i].waitForWrite )
00168 selarray[i].writeAvailable = true;
00169 ++availableCount;
00170 }
00171 else
00172 {
00173 selarray[i].readAvailable = false;
00174 selarray[i].writeAvailable = false;
00175 }
00176 }
00177 return availableCount;
00178 }
00179
00180
00181 #else
00182
00184
00185 int
00186 selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout)
00187 {
00188 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
00189 int ecc = 0;
00190 AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
00191 AutoDescriptor epfd(epoll_create(selarray.size()));
00192 if(epfd.get() == -1)
00193 {
00194 if (errno == ENOSYS)
00195 {
00196 return SELECT_NOT_IMPLEMENTED;
00197 }
00198
00199 return Select::SELECT_ERROR;
00200 }
00201
00202 UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
00203 UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
00204 for (size_t i = 0; i < selarray.size(); i++)
00205 {
00206 BLOCXX_ASSERT(selarray[i].s >= 0);
00207 selarray[i].readAvailable = false;
00208 selarray[i].writeAvailable = false;
00209 selarray[i].wasError = false;
00210 events[i].data = epoll_data_t();
00211 events[i].data.u32 = i;
00212 events[i].events = 0;
00213 if(selarray[i].waitForRead)
00214 {
00215 events[i].events |= read_events;
00216 }
00217 if(selarray[i].waitForWrite)
00218 {
00219 events[i].events |= write_events;
00220 }
00221
00222 if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
00223 {
00224 return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
00225 }
00226 }
00227
00228
00229
00230 TimeoutTimer timer(timeout);
00231 timer.start();
00232 int savedErrno;
00233 do
00234 {
00235 Thread::testCancel();
00236 const float maxWaitSec = LOOP_TIMEOUT;
00237 ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
00238 savedErrno = errno;
00239 if (ecc < 0 && errno == EINTR)
00240 {
00241 ecc = 0;
00242 errno = 0;
00243 Thread::testCancel();
00244 }
00245 timer.loop();
00246 } while ((ecc == 0) && !timer.expired());
00247
00248 if (ecc < 0)
00249 {
00250 errno = savedErrno;
00251 return Select::SELECT_ERROR;
00252 }
00253 if (ecc == 0)
00254 {
00255 return Select::SELECT_TIMEOUT;
00256 }
00257
00258 for(int i = 0; i < ecc; i++)
00259 {
00260 SelectObject & so = selarray[events[i].data.u32];
00261 so.readAvailable = so.waitForRead && (events[i].events & read_events);
00262 so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
00263 }
00264
00265 return ecc;
00266 #else
00267 return SELECT_NOT_IMPLEMENTED;
00268 #endif
00269 }
00270
00272
00273 int
00274 selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout)
00275 {
00276 #if defined (BLOCXX_HAVE_SYS_POLL_H)
00277 int rc = 0;
00278
00279 AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
00280
00281
00282 TimeoutTimer timer(timeout);
00283 timer.start();
00284
00285 int savedErrno;
00286 do
00287 {
00288 for (size_t i = 0; i < selarray.size(); i++)
00289 {
00290 BLOCXX_ASSERT(selarray[i].s >= 0);
00291 selarray[i].readAvailable = false;
00292 selarray[i].writeAvailable = false;
00293 selarray[i].wasError = false;
00294 pfds[i].revents = 0;
00295 pfds[i].fd = selarray[i].s;
00296 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
00297 if(selarray[i].waitForWrite)
00298 pfds[i].events |= POLLOUT;
00299 }
00300
00301 Thread::testCancel();
00302 const float maxWaitSec = LOOP_TIMEOUT;
00303 rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
00304 savedErrno = errno;
00305 if (rc < 0 && errno == EINTR)
00306 {
00307 rc = 0;
00308 errno = 0;
00309 Thread::testCancel();
00310 #ifdef BLOCXX_NETWARE
00311
00312
00313
00314
00315 pthread_yield();
00316 #endif
00317 }
00318
00319 timer.loop();
00320 } while ((rc == 0) && !timer.expired());
00321
00322 if (rc < 0)
00323 {
00324 errno = savedErrno;
00325 return Select::SELECT_ERROR;
00326 }
00327 if (rc == 0)
00328 {
00329 return Select::SELECT_TIMEOUT;
00330 }
00331 for (size_t i = 0; i < selarray.size(); i++)
00332 {
00333 if (pfds[i].revents & (POLLERR | POLLNVAL))
00334 {
00335 selarray[i].wasError = true;
00336 }
00337
00338 if(selarray[i].waitForRead)
00339 {
00340 selarray[i].readAvailable = (pfds[i].revents &
00341 (POLLIN | POLLPRI | POLLHUP));
00342 }
00343
00344 if(selarray[i].waitForWrite)
00345 {
00346 selarray[i].writeAvailable = (pfds[i].revents &
00347 (POLLOUT | POLLHUP));
00348 }
00349 }
00350
00351 return rc;
00352 #else
00353 return SELECT_NOT_IMPLEMENTED;
00354 #endif
00355 }
00357
00358 int
00359 selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout)
00360 {
00361 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
00362 int rc = 0;
00363 fd_set ifds;
00364 fd_set ofds;
00365
00366
00367 TimeoutTimer timer(timeout);
00368 timer.start();
00369
00370 int savedErrno;
00371 do
00372 {
00373 int maxfd = 0;
00374 FD_ZERO(&ifds);
00375 FD_ZERO(&ofds);
00376 for (size_t i = 0; i < selarray.size(); ++i)
00377 {
00378 int fd = selarray[i].s;
00379 BLOCXX_ASSERT(fd >= 0);
00380 if (maxfd < fd)
00381 {
00382 maxfd = fd;
00383 }
00384 if (fd < 0 || fd >= FD_SETSIZE)
00385 {
00386 errno = EINVAL;
00387 return Select::SELECT_ERROR;
00388 }
00389 if (selarray[i].waitForRead)
00390 {
00391 FD_SET(fd, &ifds);
00392 }
00393 if (selarray[i].waitForWrite)
00394 {
00395 FD_SET(fd, &ofds);
00396 }
00397 }
00398
00399 Thread::testCancel();
00400 struct timeval tv;
00401 const float maxWaitSec = LOOP_TIMEOUT;
00402 rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
00403 savedErrno = errno;
00404 if (rc < 0 && errno == EINTR)
00405 {
00406 rc = 0;
00407 errno = 0;
00408 Thread::testCancel();
00409 #ifdef BLOCXX_NETWARE
00410
00411
00412
00413
00414 pthread_yield();
00415 #endif
00416 }
00417
00418 timer.loop();
00419 } while ((rc == 0) && !timer.expired());
00420
00421 if (rc < 0)
00422 {
00423 errno = savedErrno;
00424 return Select::SELECT_ERROR;
00425 }
00426 if (rc == 0)
00427 {
00428 return Select::SELECT_TIMEOUT;
00429 }
00430 int availableCount = 0;
00431 int cval;
00432 for (size_t i = 0; i < selarray.size(); i++)
00433 {
00434 selarray[i].wasError = false;
00435 cval = 0;
00436 if (FD_ISSET(selarray[i].s, &ifds))
00437 {
00438 selarray[i].readAvailable = true;
00439 cval = 1;
00440 }
00441 else
00442 {
00443 selarray[i].readAvailable = false;
00444 }
00445
00446 if (FD_ISSET(selarray[i].s, &ofds))
00447 {
00448 selarray[i].writeAvailable = true;
00449 cval = 1;
00450 }
00451 else
00452 {
00453 selarray[i].writeAvailable = false;
00454 }
00455
00456 availableCount += cval;
00457
00458 }
00459
00460 return availableCount;
00461 #else
00462 return SELECT_NOT_IMPLEMENTED;
00463 #endif
00464 }
00465
00466 int
00467 selectRW(SelectObjectArray& selarray, const Timeout& timeout)
00468 {
00469 int rv = selectRWEpoll(selarray, timeout);
00470 if (rv != SELECT_NOT_IMPLEMENTED)
00471 {
00472 return rv;
00473 }
00474
00475 rv = selectRWPoll(selarray, timeout);
00476 if (rv != SELECT_NOT_IMPLEMENTED)
00477 {
00478 return rv;
00479 }
00480
00481 rv = selectRWSelect(selarray, timeout);
00482 BLOCXX_ASSERT(rv != SELECT_NOT_IMPLEMENTED);
00483 return rv;
00484 }
00485
00487 #endif // #else BLOCXX_WIN32
00488
00489 int
00490 select(const SelectTypeArray& selarray, UInt32 ms)
00491 {
00492 return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
00493 }
00494
00496 int
00497 select(const SelectTypeArray& selarray, const Timeout& timeout)
00498 {
00499 SelectObjectArray soa;
00500 soa.reserve(selarray.size());
00501 for (size_t i = 0; i < selarray.size(); ++i)
00502 {
00503 SelectObject curObj(selarray[i]);
00504 curObj.waitForRead = true;
00505 soa.push_back(curObj);
00506 }
00507 int rv = selectRW(soa, timeout);
00508 if (rv < 0)
00509 {
00510 return rv;
00511 }
00512
00513
00514 for (size_t i = 0; i < soa.size(); ++i)
00515 {
00516 if (soa[i].readAvailable)
00517 {
00518 return i;
00519 }
00520 }
00521 errno = 0;
00522 return SELECT_ERROR;
00523 }
00524
00525 }
00526
00527 }
00528