/builddir/build/BUILD/libassa-3.4.2/assa/Reactor.cpp

Go to the documentation of this file.
00001 // -*- c++ -*-
00002 //------------------------------------------------------------------------------
00003 //                          Reactor.cpp
00004 //------------------------------------------------------------------------------
00005 //  Copyright (C) 1997-2002,2005,2006  Vladislav Grinchenko
00006 //
00007 //  This library is free software; you can redistribute it and/or
00008 //  modify it under the terms of the GNU Library General Public
00009 //  License as published by the Free Software Foundation; either
00010 //  version 2 of the License, or (at your option) any later version.
00011 //----------------------------------------------------------------------------- 
00012 //  Created: 05/25/1999
00013 //----------------------------------------------------------------------------- 
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017 
00018 #include "assa/Reactor.h"
00019 #include "assa/Logger.h"
00020 
00021 using namespace ASSA;
00022 
00023 Reactor::
00024 Reactor () : 
00025     m_fd_setsize  (1024), 
00026     m_maxfd_plus1 (0), 
00027     m_active      (true)
00028 {
00029     trace_with_mask("Reactor::Reactor",REACTTRACE);
00030 
00034 #if defined(WIN32)
00035     m_fd_setsize = FD_SETSIZE;
00036 
00037 #else  // POSIX
00038     struct rlimit rlim;
00039     rlim.rlim_max = 0;
00040 
00041     if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042         m_fd_setsize = rlim.rlim_cur;
00043     }
00044 #endif
00045 
00048 #if defined (WIN32)             
00049     WSADATA data;
00050     WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }
00053 
00054 Reactor::
00055 ~Reactor()
00056 {   
00057     trace_with_mask("Reactor::~Reactor",REACTTRACE);
00058 
00059     m_readSet.clear   ();
00060     m_writeSet.clear  ();
00061     m_exceptSet.clear ();
00062     deactivate ();
00063 }
00064 
00065 TimerId
00066 Reactor::
00067 registerTimerHandler (EventHandler*      eh_, 
00068                       const TimeVal&     timeout_,
00069                       const std::string& name_)
00070 {
00071     trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00072     Assure_return (eh_);
00073 
00074     TimeVal now (TimeVal::gettimeofday());
00075     TimeVal t (now + timeout_);
00076 
00077     DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",  
00078         timeout_.sec(),timeout_.msec()));
00079     DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00080     DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00081 
00082     TimerId tid =  m_tqueue.insert (eh_, t, timeout_, name_);
00083 
00084     DL((REACT,"---Modified Timer Queue----\n"));
00085     m_tqueue.dump();
00086     DL((REACT,"---------------------------\n"));
00087 
00088     return (tid);
00089 }
00090 
00091 bool 
00092 Reactor::
00093 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_)
00094 {
00095     trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00096 
00097     std::ostringstream msg;
00098     Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00099 
00100     if (isReadEvent (et_)) 
00101     {
00102         if (!m_waitSet.m_rset.setFd (fd_)) 
00103         {
00104             DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00105             return (false);
00106         }
00107         m_readSet[fd_] = eh_;
00108         msg << "READ_EVENT";
00109     }
00110 
00111     if (isWriteEvent (et_)) 
00112     {
00113         if (!m_waitSet.m_wset.setFd (fd_)) 
00114         {
00115             DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00116             return (false);
00117         }
00118         m_writeSet[fd_] = eh_;
00119         msg << " WRITE_EVENT";
00120     }
00121 
00122     if (isExceptEvent (et_)) 
00123     {
00124         if (!m_waitSet.m_eset.setFd (fd_)) 
00125         {
00126             DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00127             return (false);
00128         }
00129         m_exceptSet[fd_] = eh_;
00130         msg << " EXCEPT_EVENT";
00131     }
00132     msg << std::ends;
00133 
00134     DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 
00135         eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00136 
00137 #if !defined (WIN32)
00138     if (m_maxfd_plus1 < fd_+1) {
00139         m_maxfd_plus1 = fd_+1;
00140         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00141     }
00142 #endif
00143 
00144     DL((REACT,"Modified waitSet:\n"));
00145     m_waitSet.dump ();
00146 
00147     return (true);
00148 }
00149 
00150 bool 
00151 Reactor::
00152 removeTimerHandler (TimerId tid_)
00153 {
00154     trace_with_mask("Reactor::removeTimer",REACTTRACE);
00155     bool ret;
00156 
00157     if ((ret = m_tqueue.remove (tid_))) {
00158         DL((REACT,"---Modified Timer Queue----\n"));
00159         m_tqueue.dump();
00160         DL((REACT,"---------------------------\n"));
00161     }
00162     else {
00163         EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00164     }
00165     return (ret);
00166 }
00167 
00171 bool 
00172 Reactor::
00173 removeHandler (EventHandler* eh_, EventType event_)
00174 {
00175     trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00176 
00177     bool ret = false;
00178     handler_t fd;
00179     handler_t rfdmax;
00180     handler_t wfdmax;
00181     handler_t efdmax;
00182     Fd2Eh_Map_Iter iter;
00183 
00184     rfdmax = wfdmax = efdmax = 0;
00185 
00186     if (eh_ == NULL) {
00187         return false;
00188     }
00189 
00190     if (isTimeoutEvent (event_)) {
00191         ret = m_tqueue.remove (eh_);
00192         ret = true;
00193     }
00194 
00195     if (isReadEvent (event_)) {
00196         iter = m_readSet.begin ();
00197         while (iter != m_readSet.end ()) {
00198             if ((*iter).second == eh_) {
00199                 fd = (*iter).first;
00200                 m_readSet.erase (iter);
00201                 m_waitSet.m_rset.clear (fd);
00202                 ret = true;
00203                 break;
00204             }
00205             rfdmax = fd;
00206             iter++;
00207         }
00208     } 
00209     
00210     if (isWriteEvent (event_)) {
00211         iter = m_writeSet.begin ();
00212         while (iter != m_writeSet.end ()) {
00213             if ((*iter).second == eh_) {
00214                 fd = (*iter).first;
00215                 m_writeSet.erase (iter);
00216                 m_waitSet.m_wset.clear (fd);
00217                 ret = true;
00218                 break;
00219             }
00220             wfdmax = fd;
00221             iter++;
00222         }
00223     }
00224 
00225     if (isExceptEvent (event_)) {
00226         iter = m_exceptSet.begin ();
00227         while (iter != m_exceptSet.end ()) {
00228             if ((*iter).second == eh_) {
00229                 fd = (*iter).first;
00230                 m_exceptSet.erase (iter);
00231                 m_waitSet.m_eset.clear (fd);
00232                 ret = true;
00233                 break;
00234             }
00235             efdmax = fd;
00236             iter++;
00237         }
00238     }
00239 
00240     if (ret == true) {
00241         DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
00242         eh_->handle_close (fd);
00243     }
00244 
00245     adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00246 
00247     DL((REACT,"Modifies waitSet:\n"));
00248     m_waitSet.dump ();
00249 
00250     return (ret);
00251 }
00252 
00253 bool
00254 Reactor::
00255 removeIOHandler (handler_t fd_)
00256 {
00257     trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00258 
00259     bool ret = false;
00260     EventHandler*  ehp = NULL;
00261     Fd2Eh_Map_Iter iter;
00262 
00263     handler_t      rfdmax;
00264     handler_t      wfdmax;
00265     handler_t      efdmax;
00266 
00267     rfdmax = wfdmax = efdmax = 0;
00268 
00269     Assure_return (ASSA::is_valid_handler (fd_));
00270 
00271     DL((REACT,"Removing handler for fd=%d\n",fd_));
00272 
00277     if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 
00278     {
00279         ehp = (*iter).second;
00280         m_readSet.erase (iter);
00281         m_waitSet.m_rset.clear (fd_);
00282         m_readySet.m_rset.clear (fd_);
00283         if (m_readSet.size () > 0) {
00284             iter = m_readSet.end ();
00285             iter--;
00286             rfdmax = (*iter).first;
00287         }
00288         ret = true;
00289     }
00290 
00291     if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 
00292     {
00293         ehp = (*iter).second;
00294         m_writeSet.erase (iter);
00295         m_waitSet.m_wset.clear (fd_);
00296         m_readySet.m_wset.clear (fd_);
00297         if (m_writeSet.size () > 0) {
00298             iter = m_writeSet.end ();
00299             iter--;
00300             wfdmax = (*iter).first;
00301         }
00302         ret = true;
00303     }
00304 
00305     if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 
00306     {
00307         ehp = (*iter).second;
00308         m_exceptSet.erase (iter);
00309         m_waitSet.m_eset.clear (fd_);
00310         m_readySet.m_eset.clear (fd_);
00311         if (m_exceptSet.size () > 0) {
00312             iter = m_exceptSet.end ();
00313             iter--;
00314             efdmax = (*iter).first;
00315         }
00316         ret = true;
00317     }
00318 
00319     if (ret == true && ehp != NULL) {
00320         DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
00321         ehp->handle_close (fd_);
00322     }
00323 
00324     adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00325 
00326     DL((REACT,"Modifies waitSet:\n"));
00327     m_waitSet.dump ();
00328 
00329     return (ret);
00330 }
00331 
00332 bool
00333 Reactor::
00334 checkFDs (void)
00335 {
00336     trace_with_mask("Reactor::checkFDs",REACTTRACE);
00337     
00338     bool num_removed = false;
00339     FdSet mask;
00340     timeval poll = { 0, 0 };
00341 
00342     for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00343         if ( m_readSet[fd] != NULL ) {
00344             mask.setFd (fd);
00345             if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00346                 removeIOHandler (fd);
00347                 num_removed = true;
00348                 DL((REACT,"Detected BAD FD: %d\n", fd ));
00349             }
00350             mask.clear (fd);
00351         }
00352     }
00353     return (num_removed);
00354 }
00355 
00356 bool
00357 Reactor::
00358 handleError (void)
00359 {
00360     trace_with_mask("Reactor::handleError",REACTTRACE);
00361 
00364     if ( !m_active ) {
00365         DL((REACT,"Received cmd to stop Reactor\n"));
00366         return (false);
00367     }
00368 
00369     /*---
00370       TODO: If select(2) returns before time expires, with
00371       a descriptor ready or with EINTR, timeval is not
00372       going to be updated with number of seconds remaining.
00373       This is true for all systems except Linux, which will
00374       do so. Therefore, to restart correctly in case of
00375       EINTR, we ought to take time measurement before and
00376       after select, and try to select() for remaining time.
00377     
00378       For now, we restart with the initial timing value.
00379       ---*/
00380     /*---
00381       BSD kernel never restarts select(2). SVR4 will restart if
00382       the SA_RESTART flag is specified when the signal handler
00383       for the signal delivered is installed. This means taht for
00384       portability, we must handle signal interrupts.
00385       ---*/
00386 
00387     if ( errno == EINTR ) {
00388         EL((REACT,"EINTR: interrupted select(2)\n"));
00389         /*
00390           If I was sitting in select(2) and received SIGTERM,
00391           the signal handler would have set m_active to 'false',
00392           and this function would have returned 'false' as above.
00393           For any other non-critical signals (USR1,...),
00394           we retry select.
00395         */
00396         return (true);
00397     }
00398     /*
00399       EBADF - bad file number. One of the file descriptors does
00400       not reference an open file to open(), close(), ioctl().
00401       This can happen if user closed fd and forgot to remove
00402       handler from Reactor.
00403     */
00404     if ( errno == EBADF ) {
00405         DL((REACT,"EBADF: bad file descriptor\n"));
00406         return (checkFDs ());
00407     }
00408     /*
00409       Any other error from select
00410     */
00411 #if defined (WIN32) 
00412     DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00413 #else
00414     EL((ASSAERR,"select(3) error\n"));
00415 #endif
00416     return (false);
00417 }
00418 
00419 int
00420 Reactor::
00421 isAnyReady (void)
00422 {
00423     trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00424 
00425     int n = m_readySet.m_rset.numSet () +
00426         m_readySet.m_wset.numSet () +
00427         m_readySet.m_eset.numSet ();
00428 
00429     if ( n > 0 ) {
00430         DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00431         m_readySet.dump ();
00432     }
00433     return (n);
00434 }
00435 
00436 void 
00437 Reactor::
00438 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00439 {
00440     trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00441 
00442     TimeVal now;
00443     TimeVal tv;
00444 
00445     if (m_tqueue.isEmpty () ) {
00446         howlong_ = maxwait_;
00447         goto done;
00448     }
00449     now = TimeVal::gettimeofday ();
00450     tv = m_tqueue.top ();
00451     
00452     if (tv < now) {
00453         /*--- 
00454           It took too long to get here (fraction of a millisecond), 
00455           and top timer had already expired. In this case,
00456           perform non-blocking select in order to drain the timer queue.
00457           ---*/
00458         *howlong_ = 0;
00459     }
00460     else {  
00461         DL((REACT,"--------- Timer Queue ----------\n"));
00462         m_tqueue.dump();
00463         DL((REACT,"--------------------------------\n"));
00464 
00465         if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00466             *howlong_ = tv - now;
00467         }
00468         else {
00469             *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00470         }
00471     }
00472 
00473  done:
00474     if (howlong_ != NULL) {
00475         DL((REACT,"delay (%f)\n", double (*howlong_) ));
00476     }
00477     else {
00478         DL((REACT,"delay (forever)\n"));
00479     }
00480 }
00481 
00485 void
00486 Reactor::
00487 waitForEvents (void)
00488 {
00489     while ( m_active ) {
00490         waitForEvents ((TimeVal*) NULL);
00491     }
00492 }
00493 
00510 void
00511 Reactor::
00512 waitForEvents (TimeVal* tv_)
00513 {
00514     trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00515 
00516     TimerCountdown traceTime (tv_);
00517     DL((REACT,"======================================\n"));
00518 
00519     /*--- Expire all stale Timers ---*/
00520     m_tqueue.expire (TimeVal::gettimeofday ());
00521 
00522     /* Test to see if Reactor has been deactivated as a result
00523      * of processing done by any TimerHandlers.
00524      */
00525     if (!m_active) {
00526         return;
00527     }
00528 
00529     int      nReady;
00530     TimeVal  delay;
00531     TimeVal* dlp = &delay;
00532 
00533     /*---
00534       In case if not all data have been processed by the EventHandler,
00535       and EventHandler stated so in its callback's return value
00536       to dispatcher (), it will be called again. This way 
00537       underlying file/socket stream can efficiently utilize its
00538       buffering mechaninsm.
00539       ---*/
00540     if ((nReady = isAnyReady ())) {
00541         DL((REACT,"isAnyReady returned: %d\n",nReady));
00542         dispatch (nReady);
00543         return;
00544     }
00545 
00546     DL((REACT,"=== m_waitSet ===\n"));
00547     m_waitSet.dump ();
00548 
00549     do {
00550         m_readySet.reset ();
00551         DL ((REACT,"m_readySet after reset():\n"));
00552         m_readySet.dump ();
00553 
00554         m_readySet = m_waitSet;
00555         DL ((REACT,"m_readySet after assign:\n"));
00556         m_readySet.dump ();
00557 
00558         calculateTimeout (dlp, tv_);
00559 
00560         nReady = ::select (m_maxfd_plus1, 
00561                            &m_readySet.m_rset,
00562                            &m_readySet.m_wset, 
00563                            &m_readySet.m_eset, 
00564                            dlp);
00565         DL((REACT,"::select() returned: %d\n",nReady));
00566 
00567         m_readySet.sync ();
00568         DL ((REACT,"m_readySet after select:\n"));
00569         m_readySet.dump ();
00570 
00571     } 
00572     while (nReady < 0 && handleError ());
00573 
00574     dispatch (nReady);
00575 }
00576 
00583 void 
00584 Reactor::
00585 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
00586 {
00587     trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00588 
00589     int ret = 0;
00590     handler_t fd;
00591     EventHandler* ehp = NULL;
00592     std::string eh_id;
00593 
00594     Fd2Eh_Map_Iter iter = fdSet_.begin ();
00595 
00596     while (iter != fdSet_.end ()) 
00597     {
00598         fd  = (*iter).first;
00599         ehp = (*iter).second;
00600 
00601         if (mask_.isSet (fd) && ehp != NULL) 
00602         {
00603             eh_id = ehp->get_id ();
00604             DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00605                 eh_id.c_str (), fd));
00606 
00607             ret = (ehp->*callback_) (fd); /* Fire up a callback */
00608 
00609             if (ret == -1) {
00610                 removeIOHandler (fd);
00611             }
00612             else if (ret > 0) {
00613                 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00614                     ret, fd, eh_id.c_str ()));
00615                 //return;   <-- would starve other connections
00616             }
00617             else {
00618                 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 
00619                     eh_id.c_str (), fd));
00620                 mask_.clear (fd);
00621             }
00628             iter = fdSet_.begin ();
00629         }
00630         else {
00631             iter++;
00632         }
00633     }
00634 }
00635 
00641 bool
00642 Reactor::
00643 dispatch (int ready_)
00644 {
00645     trace_with_mask("Reactor::dispatch", REACTTRACE);
00646 
00647     m_tqueue.expire (TimeVal::gettimeofday ());
00648 
00649     if ( ready_ < 0 ) 
00650     {
00651 #if !defined (WIN32)
00652         EL((ASSAERR,"::select(3) error\n"));
00653 #endif
00654         return (false);
00655     }
00656     if ( ready_ == 0 ) {
00657         return (true);
00658     }
00659 
00660     DL((REACT,"Dispatching %d FDs.\n",ready_));
00661     DL((REACT,"m_readySet:\n"));
00662     m_readySet.dump ();
00663 
00664     /*--- Writes first ---*/
00665     dispatchHandler (m_readySet.m_wset, 
00666                      m_writeSet, 
00667                      &EventHandler::handle_write);
00668 
00669     /*--- Exceptions next ---*/
00670     dispatchHandler (m_readySet.m_eset, 
00671                      m_exceptSet, 
00672                      &EventHandler::handle_except);
00673 
00674     /*--- Finally, the Reads ---*/
00675     dispatchHandler (m_readySet.m_rset, 
00676                      m_readSet, 
00677                      &EventHandler::handle_read);
00678 
00679     return (true);
00680 }
00681 
00682 void 
00683 Reactor::
00684 stopReactor (void) 
00685 { 
00686     trace_with_mask("Reactor::stopReactor", REACTTRACE);
00687 
00688     m_active = false; 
00689 
00690     Fd2Eh_Map_Iter iter;
00691     EventHandler* ehp;
00692 
00693     while (m_readSet.size () > 0) {
00694         iter = m_readSet.begin ();
00695         ehp = (*iter).second;
00696         removeHandler (ehp);
00697     }
00698 
00699     while (m_writeSet.size () > 0) {
00700         iter = m_writeSet.begin ();
00701         ehp = (*iter).second;
00702         removeHandler (ehp);
00703     }
00704 
00705     while (m_exceptSet.size () > 0) {
00706         iter = m_exceptSet.begin ();
00707         ehp = (*iter).second;
00708         removeHandler (ehp);
00709     }
00710 }
00711 
00716 void
00717 Reactor::
00718 adjust_maxfdp1 (handler_t fd_, 
00719                 handler_t rmax_, 
00720                 handler_t wmax_, 
00721                 handler_t emax_)
00722 {
00723 #if !defined (WIN32)  /* POSIX */
00724 
00725     trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00726 
00727     if (m_maxfd_plus1 == fd_ + 1) {
00728         m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00729 
00730         DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00731     }
00732 #endif
00733 }

Generated on Sat Sep 15 05:41:16 2007 for libassa by  doxygen 1.5.1