00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <sys/socket.h>
00021 #include <sys/un.h>
00022 #include <errno.h>
00023 #include <string.h>
00024 #include <algorithm>
00025 #include "hsms_signums.h"
00026 #include "async_reception.h"
00027 #include "tsqueue.h"
00028
00029 #include "log.h"
00030
00031 #define FSM_NAME "ASYNC READ"
00032
00033 #define ASYNC_RECEPTION_LOG_LEVEL 1000
00034
00035 #define max(a,b) ((a)>(b)?(a):(b))
00036
00037 using namespace freesecs;
00038
00039 int async_reception_t::_subscr_count = 0;
00040 int async_reception_t::_update_fd[2] = {0,0};
00041 freesecs::async_reception_t::thisv_t async_reception_t::_thisv;
00042 pthread_mutex_t async_reception_t::_thisv_mx = PTHREAD_MUTEX_INITIALIZER;
00043 pthread_t async_reception_t::_sigthread_id = 0;
00044
00045 freesecs::async_reception_t::async_reception_t(int fd, const char *name)
00046 :_fd(fd)
00047 ,_buf(NULL)
00048 ,_bytes_to_read(0)
00049 ,_bytes_read(0)
00050 ,_rx_error(0)
00051 ,_state(IDLE)
00052 {
00053 snprintf(_name, MAX_FSM_NAME, "ASYNC READ (%s)", name);
00054 if(0 == _subscr_count)
00055 {
00056 lock_t l(_thisv_mx);
00057
00058 if(socketpair(PF_UNIX, SOCK_DGRAM, 0, _update_fd))
00059 {
00060 throw;
00061 }
00062 }
00063
00064 if(1)
00065 {
00066 lock_t l(_thisv_mx);
00067 _thisv.push_back(this);
00068 _subscr_count++;
00069
00070
00071 if(0 == _sigthread_id)
00072 {
00073 if(pthread_create(&_sigthread_id, NULL, signalthread, (void*)this))
00074 {
00075 throw;
00076 }
00077 }
00078
00079 set_need_update_fdset();
00080 }
00081 }
00082
00083 freesecs::async_reception_t::~async_reception_t()
00084 {
00085 if(0 == (_subscr_count--))
00086 {
00087 set_need_exit();
00088 pthread_join(_sigthread_id, NULL);
00089
00090 _thisv.clear();
00091 shutdown(_update_fd[0], SHUT_RDWR);
00092 shutdown(_update_fd[1], SHUT_RDWR);
00093 close(_update_fd[0]);
00094 close(_update_fd[1]);
00095 }
00096 else
00097 {
00098 lock_t l(_thisv_mx);
00099
00100 thisv_t::iterator it = std::find(_thisv.begin(), _thisv.end(), this);
00101 if(it != _thisv.end())
00102 {
00103 _thisv.erase(it);
00104 }
00105 set_need_update_fdset();
00106 }
00107 }
00108
00109 void
00110 freesecs::async_reception_t::start_recv(void *buf, size_t count)
00111 {
00112 event_data_t data;
00113
00114 data.e0.buf = buf;
00115 data.e0.count = count;
00116
00117 process(0, data);
00118 }
00119
00120 void
00121 freesecs::async_reception_t::stop_recv()
00122 {
00123 event_data_t data;
00124 process(2, data);
00125 }
00126
00127 freesecs::async_reception_t::state_t
00128 freesecs::async_reception_t::get_state()
00129 {
00130 return _state;
00131 }
00132
00133 size_t
00134 freesecs::async_reception_t::get_bytes_read()
00135 {
00136 return _bytes_read;
00137 }
00138
00139 void
00140 freesecs::async_reception_t::process(int e, event_data_t data)
00141 {
00142 state_t state_old = _state;
00143 TRACE_FSM_BEGIN(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00144
00145 switch(_state)
00146 {
00147 case IDLE:
00148 if(0 == e)
00149 {
00150 _state = WTG_FOR_DATA;
00151 }
00152 else
00153 {
00154 goto sm_wrong_ev;
00155 }
00156 break;
00157 case WTG_FOR_DATA:
00158 if(1 == e)
00159 {
00160 _rx_error = data.e1.rx_error;
00161 _bytes_read += data.e1.bytes_read;
00162 if(x1())
00163 {
00164 z1(data);
00165 _state = IDLE;
00166 }
00167 else if(x2())
00168 {
00169 _state = RECEIVED;
00170 }
00171 else
00172 {
00173 z2(data);
00174 }
00175 }
00176 else if(2 == e)
00177 {
00178 z3(data);
00179 _state = IDLE;
00180 }
00181 else
00182 {
00183 goto sm_wrong_ev;
00184 }
00185 break;
00186 case RECEIVED:
00187 if(0 == e)
00188 {
00189 _state = WTG_FOR_DATA;
00190 }
00191 else
00192 {
00193 goto sm_wrong_ev;
00194 }
00195 break;
00196 default:
00197 goto sm_wrong_ev;
00198 break;
00199 }
00200
00201 if(state_old == _state)
00202 {
00203 goto sm_end;
00204 }
00205
00206 TRACE_FSM_TRANS(ASYNC_RECEPTION_LOG_LEVEL, _name, state_old, _state);
00207
00208 switch(_state)
00209 {
00210 case IDLE:
00211 break;
00212 case WTG_FOR_DATA:
00213 z1_0(data);
00214 break;
00215 case RECEIVED:
00216 z2_0(data);
00217 break;
00218 default:
00219 break;
00220 }
00221
00222 sm_end:
00223 TRACE_FSM_END(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00224 return;
00225
00226 sm_wrong_ev:
00227 TRACE_FSM_ERROR(ASYNC_RECEPTION_LOG_LEVEL, _name, "event %d not handled in current state: %d", e, _state);
00228 TRACE_FSM_END(ASYNC_RECEPTION_LOG_LEVEL, _name, _state, e);
00229 return;
00230
00231 }
00232
00233 void
00234 freesecs::async_reception_t::z1_0(event_data_t data)
00235 {
00236 _bytes_read = 0;
00237
00238 _buf = data.e0.buf;
00239 _bytes_to_read = data.e0.count;
00240 }
00241
00242 #define UPDATE_VAL 0xff
00243 #define EXIT_VAL 0xfe
00244
00245 void
00246 freesecs::async_reception_t::set_need_update_fdset()
00247 {
00248 char data = UPDATE_VAL;
00249 send(_update_fd[0], &data, 1, 0);
00250 }
00251
00252 void
00253 freesecs::async_reception_t::set_need_exit()
00254 {
00255 char data = EXIT_VAL;
00256 send(_update_fd[0], &data, 1, 0);
00257 }
00258
00259 bool
00260 freesecs::async_reception_t::get_need_update_fdset()
00261 {
00262 unsigned char data;
00263 bool need_update = false;
00264
00265 if(1 == recv(_update_fd[1], &data, 1, MSG_DONTWAIT))
00266 {
00267 if(EXIT_VAL == data)
00268 {
00269 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL, "async rcv: got exit command. Exiting...");
00270 pthread_exit(NULL);
00271 }
00272 need_update = true;
00273 }
00274
00275 return need_update;
00276 }
00277
00278
00279 void
00280 freesecs::async_reception_t::update_fdset(int *fdn, fd_set *fds, thisv_t *thisv, bool clear_cache)
00281 {
00282 typedef std::list<int> fdl_t;
00283 static fdl_t fdl;
00284
00285 if(true == clear_cache)
00286 {
00287 lock_t l(_thisv_mx);
00288
00289 fdl.clear();
00290
00291 for(thisv_t::iterator it = _thisv.begin(); it != _thisv.end(); ++it)
00292 {
00293 fdl.push_back((*it)->_fd);
00294 }
00295
00296 fdl.push_back(_update_fd[1]);
00297 fdl.sort();
00298
00299 thisv->assign(_thisv.begin(), _thisv.end());
00300 }
00301
00302 FD_ZERO (fds);
00303 *fdn = fdl.back();
00304
00305 for(fdl_t::iterator it = fdl.begin(); it != fdl.end(); ++it)
00306 {
00307 FD_SET(*it, fds);
00308 }
00309 }
00310
00311 void
00312 freesecs::async_reception_t::z2_0(event_data_t)
00313 {
00314 data_recvd_signal(_bytes_read);
00315 }
00316 void
00317 freesecs::async_reception_t::z1(event_data_t)
00318 {
00319 recv_error_signal(_rx_error);
00320 }
00321 void
00322 freesecs::async_reception_t::z2(event_data_t data)
00323 {
00324 data_partially_recvd_signal(_bytes_read);
00325 }
00326 void
00327 freesecs::async_reception_t::z3(event_data_t)
00328 {
00329 }
00330 bool
00331 freesecs::async_reception_t::x1()
00332 {
00333 return 0 != _rx_error;
00334 }
00335 bool
00336 freesecs::async_reception_t::x2()
00337 {
00338 return _bytes_read >= _bytes_to_read;
00339 }
00340
00341 void *
00342 freesecs::async_reception_t::signalthread(void *arg)
00343 {
00344 int r, nfds = 0;
00345 fd_set rd;
00346 bool force_update = true;
00347 thisv_t local_thisv;
00348
00349
00350 for(;;)
00351 {
00352 update_fdset(&nfds, &rd, &local_thisv, force_update);
00353
00354 r = select(nfds + 1, &rd, NULL, NULL, NULL);
00355
00356 if(0 == r)
00357 {
00358 continue;
00359 }
00360 else if(EBADF == r)
00361 {
00362 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00363 "async rcv thread: bad file "
00364 "descriptor. Cleanup...");
00365
00366 }
00367 else if(0 > r)
00368 {
00369
00370
00371 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00372 "async rcv thread: pselect "
00373 "failed: %s. Renew fdset and restart..", strerror(errno));
00374 force_update = true;
00375 continue;
00376 }
00377 else
00378 {
00379 if(FD_ISSET(_update_fd[1], &rd))
00380 {
00381 force_update = true;
00382 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00383 "async rcv thread: update signal.");
00384 if(true == get_need_update_fdset() && 1 == r)
00385 {
00386
00387
00388 continue;
00389 }
00390 }
00391 else
00392 {
00393 force_update = false;
00394 }
00395
00396
00397
00398
00399
00400 for_each(local_thisv.begin(), local_thisv.end(),
00401 std::bind2nd(std::mem_fun(&async_reception_t::check_fd_process),&rd));
00402 }
00403 }
00404
00405 }
00406
00407 bool
00408 freesecs::async_reception_t::check_fd_process(const fd_set* fds)
00409 {
00410 int r = 0;
00411 event_data_t event_data;
00412
00413 bool can_read = 0 < FD_ISSET(_fd, fds);
00414
00415 if(can_read)
00416 {
00417 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00418 "%s: shall read %d bytes, read %d bytes", _name, _bytes_to_read, _bytes_read);
00419 if(_bytes_to_read)
00420 {
00421 if(_bytes_to_read > _bytes_read)
00422 {
00423 r = read(_fd, _buf, _bytes_to_read - _bytes_read);
00424
00425 if(r < 0)
00426 {
00427 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00428 "%s: read error: %s", _name, strerror(errno));
00429 event_data.e1.rx_error = errno;
00430 event_data.e1.bytes_read = 0;
00431 }
00432 if(r == 0)
00433 {
00434 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00435 "%s: read 0 bytes, errno: %s", _name, strerror(errno));
00436 event_data.e1.rx_error = ECANCELED;
00437 event_data.e1.bytes_read = 0;
00438 }
00439 else
00440 {
00441 event_data.e1.rx_error = 0;
00442 event_data.e1.bytes_read = r;
00443 }
00444 }
00445 }
00446 else
00454 {
00455 event_data.e1.rx_error = 0;
00456 event_data.e1.bytes_read = 0;
00457 }
00458 TRACE_DEBUG(ASYNC_RECEPTION_LOG_LEVEL,
00459 "%s: read %d bytes", _name, r);
00460 process(1, event_data);
00461 }
00462 else
00463 {
00464
00465 }
00466 return can_read;
00467 }