00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #include <stdio.h>
00021 #include <stdlib.h>
00022 #include <unistd.h>
00023 #include <string.h>
00024 #include <pthread.h>
00025 #include <semaphore.h>
00026 #include "hsms_msgs.hpp"
00027 #include "hsmsd_cli.h"
00028
00029 static void active_hsms_msg_handler(hsmsd_msg_t* msg);
00030 static void passive_hsms_msg_handler(hsmsd_msg_t* msg);
00031 static void cnx_state_handler(hsmsd_cnx_state_t state);
00032 static void cnx_state_handler(hsmsd_cnx_state_t state);
00033 static void prepare_dict(void);
00034 static void start_threads(void*);
00035 static void* thread_func0(void*);
00036 static void* thread_func1(void*);
00037 static void* thread_func2(void*);
00038 static void* thread_func3(void*);
00039 static void* thread_funcx(int num, void* data);
00040
00041 static pthread_cond_t cnx_ready;
00042 static pthread_mutex_t cnx_mx = PTHREAD_MUTEX_INITIALIZER;
00043
00044 static struct
00045 {
00046 sem_t sent_sem;
00047 sem_t recv_sem;
00048 sem_t err_sem;
00049 unsigned long us_timeout;
00050 unsigned int transaction_id;
00051 hsmsd_msg_t* primary;
00052 hsmsd_msg_t* secondary;
00053 }msg_dict[4] =
00054 {
00055 {.us_timeout = 1, .transaction_id = 0 << 24,
00056 .primary = (hsmsd_msg_t*)&s1f13_msg, .secondary = (hsmsd_msg_t*)&s1f14_msg},
00057 {.us_timeout = 1, .transaction_id = 1 << 24,
00058 .primary = (hsmsd_msg_t*)&s3f17_msg, .secondary = (hsmsd_msg_t*)&s3f18_msg},
00059 {.us_timeout = 1, .transaction_id = 2 << 24,
00060 .primary = (hsmsd_msg_t*)&s16f15_msg, .secondary = (hsmsd_msg_t*)&s16f16_msg},
00061 {.us_timeout = 1, .transaction_id = 3 << 24,
00062 .primary = (hsmsd_msg_t*)&s14f9_msg, .secondary = (hsmsd_msg_t*)&s14f10_msg}
00063 };
00064
00065
00066 static hsmsd_handle_t active_cnx, passive_cnx;
00067
00068 int main(int argc, char **argv)
00069 {
00070 int r, sent[4], recv[4], err[4];
00071 int is_active = 0;
00072
00073 hsmsd_handle_t *pcnx;
00074 hsmsd_cnx_state_t state;
00075 hsmsd_msg_handler_t msg_handler;
00076 const char *cnx_name;
00077 int close_timeout, open_timeout;
00078 int counter = 0;
00079
00080
00081 if(argc < 2)
00082 {
00083 printf("usage: hsmsd_test4 -[a,p]\n");
00084 exit(-1);
00085 }
00086 if(0 == strcmp(argv[1], "-a"))
00087 {
00088 is_active = 1;
00089 }
00090 else if(0 == strcmp(argv[1], "-p"))
00091 {
00092 is_active = 0;
00093 }
00094 else
00095 {
00096 printf("usage: hsmsd_test4 -[a,p]\n");
00097 exit(-1);
00098 }
00099
00100
00101 pthread_cond_init(&cnx_ready, NULL);
00102
00103 if(is_active)
00104 {
00105 pcnx = &active_cnx;
00106 msg_handler = active_hsms_msg_handler;
00107 cnx_name = "hsms_test4_active";
00108 close_timeout = 3;
00109 open_timeout = 3;
00110 }
00111 else
00112 {
00113 pcnx = &passive_cnx;
00114 msg_handler = passive_hsms_msg_handler;
00115 cnx_name = "hsms_test4_passive";
00116 close_timeout = 10;
00117 open_timeout = 10;
00118 }
00119
00120 r = hsmsd_alloc_handle(pcnx, cnx_name);
00121 printf("alloc cnx returned %d\n", r);
00122 if(r) exit(-1);
00123
00124 r = hsmsd_subscribe_for_msgs(*pcnx,msg_handler);
00125 fprintf(stdout, "hsmsd_subscribe_for_msgs returned %d\n", r);
00126 if(r) exit(-3);
00127
00128 r = hsmsd_subscribe_for_cnx_state(*pcnx, cnx_state_handler);
00129 fprintf(stdout, "hsmsd_subscribe_for_cnx_state returned %d\n", r);
00130 if(r) exit(-5);
00131
00132 prepare_dict();
00133 fprintf(stdout, "dict prepared\n");
00134
00135 start_threads((void*)pcnx);
00136 fprintf(stdout, "threads started\n");
00137
00138 r = hsmsd_cnx_start(*pcnx);
00139 fprintf(stdout, "hsmsd_cnx_start returned %d\n", r);
00140 if(r) exit(-8);
00141
00142 pthread_mutex_lock(&cnx_mx);
00143 pthread_cond_wait(&cnx_ready, &cnx_mx);
00144 pthread_mutex_unlock(&cnx_mx);
00145
00146 while(1)
00147 {
00148 counter++;
00149 sleep(1);
00150
00151 sem_getvalue(&msg_dict[0].sent_sem, &sent[0]);
00152 sem_getvalue(&msg_dict[1].sent_sem, &sent[1]);
00153 sem_getvalue(&msg_dict[2].sent_sem, &sent[2]);
00154 sem_getvalue(&msg_dict[3].sent_sem, &sent[3]);
00155
00156 sem_getvalue(&msg_dict[0].recv_sem, &recv[0]);
00157 sem_getvalue(&msg_dict[1].recv_sem, &recv[1]);
00158 sem_getvalue(&msg_dict[2].recv_sem, &recv[2]);
00159 sem_getvalue(&msg_dict[3].recv_sem, &recv[3]);
00160
00161 sem_getvalue(&msg_dict[0].err_sem, &err[0]);
00162 sem_getvalue(&msg_dict[1].err_sem, &err[1]);
00163 sem_getvalue(&msg_dict[2].err_sem, &err[2]);
00164 sem_getvalue(&msg_dict[3].err_sem, &err[3]);
00165
00166 fprintf(stdout, "sent:\t%d\t|\t%d\t|\t%d\t|\t%d\n",
00167 sent[0], sent[1], sent[2], sent[3]);
00168 fprintf(stdout, "recv:\t%d\t|\t%d\t|\t%d\t|\t%d\n",
00169 recv[0], recv[1], recv[2], recv[3]);
00170 fprintf(stdout, "err:\t%d\t|\t%d\t|\t%d\t|\t%d\n",
00171 err[0], err[1], err[2], err[3]);
00172 fflush(stdout);
00173
00174 if(close_timeout > 0)
00175 {
00176 printf("main: get_cnx_state -->\n");
00177 hsmsd_cnx_get_state(*pcnx, &state);
00178 printf("main: get_cnx_state: %d <--\n", state);
00179 printf("main: counter: %d\n", counter);
00180 if(CNX_SELECTED == state && counter >= close_timeout)
00181 {
00182 counter = 0;
00183 printf("stopping %s...\n", cnx_name);
00184 fflush(stdout);
00185 hsmsd_cnx_stop(*pcnx);
00186 }
00187 else if(CNX_NOT_CONNECTED == state && counter >= open_timeout)
00188 {
00189 counter = 0;
00190 printf("starting %s...\n", cnx_name);
00191 fflush(stdout);
00192 hsmsd_cnx_start(*pcnx);
00193
00194 fprintf(stdout, "main thread: waiting for cnx ready sem...\n");
00195 fflush(stdout);
00196 pthread_mutex_lock(&cnx_mx);
00197 pthread_cond_wait(&cnx_ready, &cnx_mx);
00198 pthread_mutex_unlock(&cnx_mx);
00199 fprintf(stdout, "main thread: cnx ready sem UP!\n");
00200 fflush(stdout);
00201 }
00202 }
00203 }
00204
00205 return 0;
00206 }
00207
00208 void active_hsms_msg_handler(hsmsd_msg_t* msg)
00209 {
00210 int pool_num = msg->sysbytes >> 24;
00211 if(4 > pool_num)
00212 {
00213 sem_post(&msg_dict[pool_num].recv_sem);
00214 }
00215 else
00216 {
00217 fprintf(stdout, ">>>!!!!!unknown id: %d!!!!!<<<\n",pool_num);
00218 }
00219 free(msg);
00220
00221 }
00222
00223 void passive_hsms_msg_handler(hsmsd_msg_t* msg)
00224 {
00225 int r;
00226 switch(msg->function)
00227 {
00228 case 9:
00229 r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s14f10_msg);
00230 break;
00231 case 13:
00232 r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s1f14_msg);
00233 break;
00234 case 15:
00235 r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s16f16_msg);
00236 break;
00237 case 17:
00238 r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s3f18_msg);
00239 break;
00240 default:
00241 break;
00242 }
00243 free(msg);
00244 }
00245
00246 static char str_state[][32] = {"NOT CONNECTED", "NOT SELECTED", "SELECTED"};
00247 static hsmsd_cnx_state_t cached_state;
00248 void cnx_state_handler(hsmsd_cnx_state_t state)
00249 {
00250 cached_state = state;
00251 if(CNX_SELECTED == state)
00252 {
00253 pthread_mutex_lock(&cnx_mx);
00254 pthread_cond_broadcast(&cnx_ready);
00255 pthread_mutex_unlock(&cnx_mx);
00256 }
00257 fprintf(stdout, "cnx_state_handler: %s\n", str_state[state]);
00258 fflush(stdout);
00259 }
00260
00261
00262 void prepare_dict(void)
00263 {
00264 sem_init(&msg_dict[0].sent_sem, 0, 0);
00265 sem_init(&msg_dict[1].sent_sem, 0, 0);
00266 sem_init(&msg_dict[2].sent_sem, 0, 0);
00267 sem_init(&msg_dict[3].sent_sem, 0, 0);
00268
00269 sem_init(&msg_dict[0].recv_sem, 0, 0);
00270 sem_init(&msg_dict[1].recv_sem, 0, 0);
00271 sem_init(&msg_dict[2].recv_sem, 0, 0);
00272 sem_init(&msg_dict[3].recv_sem, 0, 0);
00273
00274 sem_init(&(msg_dict[0].err_sem), 0, 0);
00275 sem_init(&(msg_dict[1].err_sem), 0, 0);
00276 sem_init(&(msg_dict[2].err_sem), 0, 0);
00277 sem_init(&(msg_dict[3].err_sem), 0, 0);
00278 }
00279
00280 void start_threads(void* pdata)
00281 {
00282 pthread_t id0, id1, id2, id3;
00283 pthread_create(&id0, NULL, thread_func0, pdata);
00284 pthread_create(&id1, NULL, thread_func1, pdata);
00285 pthread_create(&id2, NULL, thread_func2, pdata);
00286 pthread_create(&id3, NULL, thread_func3, pdata);
00287 }
00288
00289 void* thread_funcx(int num, void* data)
00290 {
00291 int send_failure = 0;
00292 hsmsd_cnx_state_t state;
00293
00294 hsmsd_handle_t h = *((hsmsd_handle_t*)data);
00295
00296 fprintf(stdout, "thread function %d\n", num);
00297
00298 for(;;)
00299 {
00300 hsmsd_cnx_get_state(h, &state);
00301 if(CNX_SELECTED != state)
00302 {
00303 fprintf(stdout, "thread function %d: waiting for cnx ready sem...\n", num);
00304 pthread_mutex_lock(&cnx_mx);
00305 pthread_cond_wait(&cnx_ready, &cnx_mx);
00306 pthread_mutex_unlock(&cnx_mx);
00307 fprintf(stdout, "thread function %d: cnx ready sem UP!\n", num);
00308 }
00309 for(;;)
00310 {
00311 usleep(msg_dict[num].us_timeout);
00312 msg_dict[num].primary->sysbytes += 1;
00313 msg_dict[num].secondary->sysbytes += 1;
00314
00315 send_failure = hsmsd_cnx_send_msg(h, msg_dict[num].primary);
00316
00317 if(0 == send_failure)
00318 {
00319
00320 sem_post(&msg_dict[num].sent_sem);
00321 }
00322 else
00323 {
00324 fprintf(stdout, "thread function %d: error sending msg s%df%d - %s\n",
00325 num, msg_dict[num].primary->stream,
00326 msg_dict[num].primary->function,
00327 strerror(send_failure));
00328 sem_post(&msg_dict[num].err_sem);
00329
00330 break;
00331 }
00332 }
00333 }
00334 fprintf(stdout, "thread function %d: exit...\n", num);
00335 return NULL;
00336 }
00337
00338 void* thread_func0(void* data)
00339 {
00340 return thread_funcx(0, data);
00341 }
00342
00343 void* thread_func1(void* data)
00344 {
00345 return thread_funcx(1, data);
00346 }
00347
00348 void* thread_func2(void* data)
00349 {
00350 return thread_funcx(2, data);
00351 }
00352
00353 void* thread_func3(void* data)
00354 {
00355 return thread_funcx(3, data);
00356 }