00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _EVENT_PUMP_H
00021 #define _EVENT_PUMP_H
00022 #include <semaphore.h>
00023 #include "tsqueue.h"
00024 #include "log.h"
00025
00026 #define EVENT_PUMP_LOG_LEVEL 50
00027
00028 namespace freesecs
00029 {
00035 template <class T>
00036 class threaded_event_pump_t
00037 {
00038 public:
00039 threaded_event_pump_t():_thread_id(0), _stop(0)
00040 {
00041 if(0 != sem_init(&_sem, 0, 0))
00042 {
00043 throw;
00044 }
00045 if(0 != pthread_create(&_thread_id, NULL, &threaded_event_pump_t::thread_func, (void*)this))
00046 {
00047 throw;
00048 }
00049 }
00050 virtual ~threaded_event_pump_t()
00051 {
00052 _stop = true;
00053 sem_post(&_sem);
00054 pthread_join(_thread_id, NULL);
00055 sem_destroy(&_sem);
00056 }
00057 threaded_event_pump_t<T>& operator << (const T& t)
00058 {
00059 _q.push(t);
00060 sem_post(&_sem);
00061 return *this;
00062 }
00063
00064 protected:
00070 virtual int process(const T&) = 0;
00071
00072 private:
00073 static void* thread_func(void *pdata)
00074 {
00075 threaded_event_pump_t<T> *pump = (threaded_event_pump_t<T>*)pdata;
00076 TRACE_DEBUG(EVENT_PUMP_LOG_LEVEL,
00077 "event pump: start loop, "
00078 "this=0x%x, thread_id=%d",
00079 (int)pump, (int)pump->_thread_id);
00080
00081 while(1)
00082 {
00083 sem_wait(&pump->_sem);
00084 if(pump->_stop)
00085 {
00086 TRACE_DEBUG(EVENT_PUMP_LOG_LEVEL, "event pump: exit from thread");
00087 pthread_exit(NULL);
00088 }
00089 if(true == pump->_q.empty())
00090 {
00091 TRACE_DEBUG(EVENT_PUMP_LOG_LEVEL, "event pump: q is empty");
00092 continue;
00093 }
00094
00095 pump->process(pump->_q.front());
00096 pump->_q.pop();
00097 }
00098 return NULL;
00099 }
00100 private:
00101 pthread_t _thread_id;
00102 thread_safe_queue<T> _q;
00103 sem_t _sem;
00104 bool _stop;
00105 };
00106
00107
00113 template <class T>
00114 class simple_event_pump_t
00115 {
00116 public:
00117 simple_event_pump_t()
00118 {
00119 pthread_mutexattr_t attr;
00120 pthread_mutexattr_init(&attr);
00121 pthread_mutex_init(&_mutex, NULL);
00122 }
00123
00124 virtual ~simple_event_pump_t()
00125 {
00126 pthread_mutex_destroy(&_mutex);
00127 }
00128
00129 simple_event_pump_t<T>& operator << (const T& t)
00130 {
00131 lock_t l(_mutex);
00132 process(t);
00133 return *this;
00134 }
00135
00136 int process_event(const T& t)
00137 {
00138 lock_t l(_mutex);
00139 return process(t);
00140 }
00141
00142 protected:
00148 virtual int process(const T&) = 0;
00149
00150 private:
00151 pthread_mutex_t _mutex;
00152 };
00153
00154
00155 #if 0
00156
00161 #define event_pump_t simple_event_pump_t
00162 #endif//if 0
00163
00164 }
00165 #endif //_EVENT_PUMP_H