async_reception.cpp

00001 /*   
00002  *   (c) Copyright 2008 Philipp Skadorov (philipp_s@users.sourceforge.net)
00003  *
00004  *   This file is part of FREESECS.
00005  *
00006  *   FREESECS is free software: you can redistribute it and/or modify
00007  *   it under the terms of the GNU General Public License as published by
00008  *   the Free Software Foundation, either version 3 of the License, or
00009  *   (at your option) any later version.
00010  *
00011  *   FREESECS is distributed in the hope that it will be useful,
00012  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *   GNU General Public License for more details.
00015  *
00016  *   You should have received a copy of the GNU General Public License
00017  *   along with FREESECS, see COPYING.
00018  *   If not, see <http://www.gnu.org/licenses/>.
00019  */
00020 #include <sys/socket.h>
00021 #include <sys/un.h>
00022 #include <errno.h>
00023 #include <string.h>
00024 #include <algorithm>
00025 #include "hsms_signums.h"
00026 #include "async_reception.h"
00027 #include "tsqueue.h"
00028 
00029 #include "log.h"
00030 
00031 #define FSM_NAME "ASYNC READ"
00032 
00033 #define ASYNC_RECEPTION_LOG_LEVEL 1000
00034 
00035 #define max(a,b) ((a)>(b)?(a):(b))
00036 
00037 using namespace freesecs;
00038 
00039 int                         async_reception_t::_subscr_count    = 0;
00040 int                         async_reception_t::_update_fd[2]    = {0,0};
00041 freesecs::async_reception_t::thisv_t  async_reception_t::_thisv;
00042 pthread_mutex_t             async_reception_t::_thisv_mx        = PTHREAD_MUTEX_INITIALIZER;
00043 pthread_t                   async_reception_t::_sigthread_id    = 0;
00044 
00045 freesecs::async_reception_t::async_reception_t(int fd, const char *name)
00046 :_fd(fd)
00047 ,_buf(NULL)
00048 ,_bytes_to_read(0)
00049 ,_bytes_read(0)
00050 ,_rx_error(0)
00051 ,_state(IDLE)
00052 {
00053     snprintf(_name, MAX_FSM_NAME, "ASYNC READ (%s)", name);
00054     if(0 == _subscr_count)
00055     {
00056         lock_t l(_thisv_mx);
00057         //open local socket here
00058         if(socketpair(PF_UNIX, SOCK_DGRAM, 0, _update_fd))
00059         {
00060             throw;
00061         }
00062     }
00063 
00064     if(1)
00065     {
00066         lock_t l(_thisv_mx);
00067         _thisv.push_back(this);
00068         _subscr_count++;
00069 
00070 
00071         if(0 == _sigthread_id)
00072         {
00073             if(pthread_create(&_sigthread_id, NULL, signalthread, (void*)this))
00074             {
00075                 throw;
00076             }
00077         }
00078 
00079         set_need_update_fdset();
00080     }
00081 }
00082 
00083 freesecs::async_reception_t::~async_reception_t()
00084 {
00085     if(0 == (_subscr_count--))
00086     {
00087         set_need_exit();
00088         pthread_join(_sigthread_id, NULL);
00089         
00090         _thisv.clear();
00091         shutdown(_update_fd[0], SHUT_RDWR);
00092         shutdown(_update_fd[1], SHUT_RDWR);
00093         close(_update_fd[0]);
00094         close(_update_fd[1]);
00095     }
00096     else
00097     {
00098         lock_t l(_thisv_mx);
00099         
00100         thisv_t::iterator it = std::find(_thisv.begin(), _thisv.end(), this);
00101         if(it != _thisv.end())
00102         {
00103             _thisv.erase(it);
00104         }
00105         set_need_update_fdset();
00106     }
00107 }
00108 
00109 void 
00110 freesecs::async_reception_t::start_recv(void *buf, size_t count)
00111 {
00112     event_data_t data;
00113 
00114     data.e0.buf     = buf;
00115     data.e0.count   = count;
00116 
00117     process(0, data);
00118 }
00119 
00120 void 
00121 freesecs::async_reception_t::stop_recv()
00122 {
00123     event_data_t data;
00124     process(2, data);
00125 }
00126 
00127 freesecs::async_reception_t::state_t
00128 freesecs::async_reception_t::get_state()
00129 {
00130     return _state;
00131 }
00132 
00133 size_t 
00134 freesecs::async_reception_t::get_bytes_read()
00135 {
00136     return _bytes_read;
00137 }
00138 
00139 void 
00140 freesecs::async_reception_t::process(int e, event_data_t data)
00141 {
00142     state_t state_old = _state;
00143     TRACE_FSM_BEGIN(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00144 
00145     switch(_state)
00146     {
00147         case IDLE:
00148             if(0 == e)
00149             {
00150                 _state = WTG_FOR_DATA;
00151             }
00152             else
00153             {
00154                 goto sm_wrong_ev;
00155             }
00156         break;
00157         case WTG_FOR_DATA:
00158             if(1 == e)
00159             {
00160                 _rx_error           =   data.e1.rx_error;
00161                 _bytes_read         +=  data.e1.bytes_read;
00162                 if(x1())
00163                 {
00164                     z1(data);
00165                     _state = IDLE;
00166                 }
00167                 else if(x2())
00168                 {
00169                     _state = RECEIVED;
00170                 }
00171                 else
00172                 {
00173                     z2(data);
00174                 }
00175             }
00176             else if(2 == e)
00177             {
00178                 z3(data);
00179                 _state = IDLE;
00180             }
00181             else
00182             {
00183                 goto sm_wrong_ev;
00184             }
00185         break;
00186         case RECEIVED:
00187             if(0 == e)
00188             {
00189                 _state = WTG_FOR_DATA;
00190             }
00191             else
00192             {
00193                 goto sm_wrong_ev;
00194             }
00195         break;
00196         default:
00197             goto sm_wrong_ev;
00198         break;
00199     }
00200 
00201     if(state_old == _state)
00202     {
00203         goto sm_end;
00204     }
00205 
00206     TRACE_FSM_TRANS(ASYNC_RECEPTION_LOG_LEVEL, _name, state_old, _state);
00207     
00208     switch(_state)
00209     {
00210         case IDLE:
00211         break;
00212         case WTG_FOR_DATA:
00213             z1_0(data);
00214         break;
00215         case RECEIVED:
00216             z2_0(data);
00217         break;
00218         default:
00219         break;
00220     }
00221 
00222 sm_end:
00223     TRACE_FSM_END(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00224     return;
00225 
00226 sm_wrong_ev:
00227     TRACE_FSM_ERROR(ASYNC_RECEPTION_LOG_LEVEL, _name, "event %d not handled in current state: %d", e, _state);
00228     TRACE_FSM_END(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00229     return;
00230     
00231 }
00232 
00233 void 
00234 freesecs::async_reception_t::z1_0(event_data_t data)
00235 {
00236     _bytes_read = 0;
00237 
00238     _buf = data.e0.buf;
00239     _bytes_to_read = data.e0.count;
00240 }
00241 
00242 #define UPDATE_VAL  0xff
00243 #define EXIT_VAL    0xfe
00244 
00245 void 
00246 freesecs::async_reception_t::set_need_update_fdset()
00247 {
00248     char    data = UPDATE_VAL;
00249     send(_update_fd[0], &data, 1, 0);
00250 }
00251 
00252 void 
00253 freesecs::async_reception_t::set_need_exit()
00254 {
00255     char    data = EXIT_VAL;
00256     send(_update_fd[0], &data, 1, 0);
00257 }
00258 
00259 bool 
00260 freesecs::async_reception_t::get_need_update_fdset()
00261 {
00262     unsigned char    data;
00263     bool    need_update = false;
00264 
00265     if(1 == recv(_update_fd[1], &data, 1, MSG_DONTWAIT))
00266     {
00267         if(EXIT_VAL == data)
00268         {
00269             TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL, "async rcv: got exit command. Exiting...");
00270             pthread_exit(NULL);
00271         }
00272         need_update = true;
00273     }
00274 
00275     return need_update;
00276 }
00277 
00278 
00279 void 
00280 freesecs::async_reception_t::update_fdset(int *fdn, fd_set *fds, thisv_t *thisv, bool clear_cache)
00281 {
00282     typedef std::list<int> fdl_t;
00283     static fdl_t fdl;
00284 
00285     if(true == clear_cache)
00286     {
00287         lock_t l(_thisv_mx);
00288 
00289         fdl.clear();
00290         
00291         for(thisv_t::iterator it = _thisv.begin(); it != _thisv.end(); ++it)
00292         {
00293             fdl.push_back((*it)->_fd);
00294         }
00295     
00296         fdl.push_back(_update_fd[1]);
00297         fdl.sort();
00298     
00299         thisv->assign(_thisv.begin(), _thisv.end());
00300     }
00301 
00302     FD_ZERO (fds);
00303     *fdn = fdl.back();
00304 
00305     for(fdl_t::iterator it = fdl.begin(); it != fdl.end(); ++it)
00306     {
00307         FD_SET(*it, fds);
00308     }
00309 }
00310 
00311 void 
00312 freesecs::async_reception_t::z2_0(event_data_t)
00313 {
00314     data_recvd_signal(_bytes_read);
00315 }
00316 void 
00317 freesecs::async_reception_t::z1(event_data_t)
00318 {
00319     recv_error_signal(_rx_error);
00320 }
00321 void 
00322 freesecs::async_reception_t::z2(event_data_t data)
00323 {
00324     data_partially_recvd_signal(_bytes_read);
00325 }
00326 void 
00327 freesecs::async_reception_t::z3(event_data_t)
00328 {
00329 }
00330 bool 
00331 freesecs::async_reception_t::x1()
00332 {
00333     return 0 != _rx_error;
00334 }
00335 bool 
00336 freesecs::async_reception_t::x2()
00337 {
00338     return _bytes_read >= _bytes_to_read;
00339 }
00340 
00341 void *
00342 freesecs::async_reception_t::signalthread(void *arg)
00343 {
00344     int                 r, nfds = 0;
00345     fd_set              rd;
00346     bool                force_update = true;
00347     thisv_t             local_thisv;
00348 
00349 
00350     for(;;)
00351     {
00352         update_fdset(&nfds, &rd, &local_thisv, force_update);
00353 
00354         r = select(nfds + 1, &rd, NULL, NULL, NULL);
00355 
00356         if(0 == r)
00357         {
00358             continue;
00359         }
00360         else if(EBADF == r)
00361         {
00362             TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL, 
00363                         "async rcv thread: bad file "
00364                         "descriptor. Cleanup...");
00365             //TODO: remove bad fd from the list
00366         }
00367         else if(0 > r)
00368         {
00369             //TRACE_DEBUG("async rcv thread: pselect failed. Exiting...");
00370             //return NULL;
00371             TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00372                         "async rcv thread: pselect "
00373                         "failed: %s. Renew fdset and restart..", strerror(errno));
00374             force_update = true;
00375             continue;
00376         }
00377         else
00378         {
00379             if(FD_ISSET(_update_fd[1], &rd))
00380             {
00381                 force_update = true;
00382                 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00383                             "async rcv thread: update signal.");
00384                 if(true == get_need_update_fdset() && 1 == r)
00385                 {
00386                     //only update sock has been written to,
00387                     //so just update fdset and continue polling
00388                     continue;
00389                 }
00390             }
00391             else
00392             {
00393                 force_update = false;
00394             }
00395 
00396             //use a copy of thisv for the case
00397             //thisv is updated when new async recv
00398             //objects are created from async_reception_t::check_fd_process
00399             //thisv copy is updated each update_fdset call
00400             for_each(local_thisv.begin(), local_thisv.end(), 
00401                     std::bind2nd(std::mem_fun(&async_reception_t::check_fd_process),&rd));
00402         }
00403     }
00404 
00405 }
00406 
00407 bool 
00408 freesecs::async_reception_t::check_fd_process(const fd_set* fds)
00409 {
00410     int             r = 0;
00411     event_data_t    event_data;
00412 
00413     bool can_read = 0 < FD_ISSET(_fd, fds);
00414 
00415     if(can_read)
00416     {
00417         TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00418                     "%s: shall read %d bytes, read %d bytes", _name, _bytes_to_read, _bytes_read);
00419         if(_bytes_to_read)
00420         {
00421             if(_bytes_to_read > _bytes_read)
00422             {
00423                 r = read(_fd, _buf, _bytes_to_read - _bytes_read);
00424 
00425                 if(r < 0)
00426                 {
00427                     TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00428                         "%s: read error: %s", _name, strerror(errno));
00429                     event_data.e1.rx_error = errno;
00430                     event_data.e1.bytes_read = 0;
00431                 }
00432                 if(r == 0)//file empty/socket closed
00433                 {
00434                     TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00435                         "%s: read 0 bytes, errno: %s", _name, strerror(errno));
00436                     event_data.e1.rx_error = ECANCELED;
00437                     event_data.e1.bytes_read = 0;
00438                 }
00439                 else
00440                 {
00441                     event_data.e1.rx_error = 0;
00442                     event_data.e1.bytes_read = r;
00443                 }
00444             }
00445         }
00446         else
00454         {
00455             event_data.e1.rx_error = 0;
00456             event_data.e1.bytes_read = 0;
00457         }
00458         TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00459                     "%s: read %d bytes", _name, r);
00460         process(1, event_data);
00461     }    
00462     else
00463     {
00464         //TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL, "%s: nothing to read", _name);
00465     }
00466     return can_read;
00467 }

Generated on Fri Oct 3 15:30:01 2008 for FREESECS hsms by  doxygen 1.5.1