test4_main.c

00001 /*   
00002  *   (c) Copyright 2008 Philipp Skadorov (philipp_s@users.sourceforge.net)
00003  *
00004  *   This file is part of FREESECS.
00005  *
00006  *   FREESECS is free software: you can redistribute it and/or modify
00007  *   it under the terms of the GNU General Public License as published by
00008  *   the Free Software Foundation, either version 3 of the License, or
00009  *   (at your option) any later version.
00010  *
00011  *   FREESECS is distributed in the hope that it will be useful,
00012  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *   GNU General Public License for more details.
00015  *
00016  *   You should have received a copy of the GNU General Public License
00017  *   along with FREESECS, see COPYING.
00018  *   If not, see <http://www.gnu.org/licenses/>.
00019  */
00020 #include <stdio.h>
00021 #include <stdlib.h>
00022 #include <unistd.h>
00023 #include <string.h>
00024 #include <pthread.h>
00025 #include <semaphore.h>
00026 #include "hsms_msgs.hpp"
00027 #include "hsmsd_cli.h"
00028 
00029 static void active_hsms_msg_handler(hsmsd_msg_t* msg);
00030 static void passive_hsms_msg_handler(hsmsd_msg_t* msg);
00031 static void cnx_state_handler(hsmsd_cnx_state_t state);
00032 static void cnx_state_handler(hsmsd_cnx_state_t state);
00033 static void prepare_dict(void);
00034 static void start_threads(void*);
00035 static void* thread_func0(void*);
00036 static void* thread_func1(void*);
00037 static void* thread_func2(void*);
00038 static void* thread_func3(void*);
00039 static void* thread_funcx(int num, void* data);
00040 
00041 static pthread_cond_t cnx_ready;
00042 static pthread_mutex_t cnx_mx = PTHREAD_MUTEX_INITIALIZER;
00043 
00044 static struct
00045 {
00046     sem_t sent_sem;
00047     sem_t recv_sem;
00048     sem_t err_sem;
00049     unsigned long us_timeout;
00050     unsigned int transaction_id;
00051     hsmsd_msg_t* primary;
00052     hsmsd_msg_t* secondary;
00053 }msg_dict[4] = 
00054 {
00055     {.us_timeout = 1, .transaction_id = 0 << 24, 
00056      .primary = (hsmsd_msg_t*)&s1f13_msg,   .secondary = (hsmsd_msg_t*)&s1f14_msg},
00057     {.us_timeout = 1, .transaction_id = 1 << 24, 
00058      .primary = (hsmsd_msg_t*)&s3f17_msg,   .secondary = (hsmsd_msg_t*)&s3f18_msg},
00059     {.us_timeout = 1, .transaction_id = 2 << 24, 
00060      .primary = (hsmsd_msg_t*)&s16f15_msg,  .secondary = (hsmsd_msg_t*)&s16f16_msg},
00061     {.us_timeout = 1, .transaction_id = 3 << 24, 
00062      .primary = (hsmsd_msg_t*)&s14f9_msg,   .secondary = (hsmsd_msg_t*)&s14f10_msg}
00063 };
00064 
00065 
00066 static hsmsd_handle_t active_cnx, passive_cnx;
00067 
00068 int main(int argc, char **argv)
00069 {
00070     int r, sent[4], recv[4], err[4];
00071     int is_active = 0;
00072 
00073     hsmsd_handle_t              *pcnx;
00074     hsmsd_cnx_state_t           state;
00075     hsmsd_msg_handler_t         msg_handler;
00076     const char                  *cnx_name;
00077     int                         close_timeout, open_timeout;
00078     int                         counter = 0;
00079 
00080 
00081     if(argc < 2)
00082     {
00083         printf("usage: hsmsd_test4 -[a,p]\n");
00084         exit(-1);
00085     }
00086     if(0 == strcmp(argv[1], "-a"))
00087     {
00088         is_active = 1;
00089     }
00090     else if(0 == strcmp(argv[1], "-p"))
00091     {
00092         is_active = 0;
00093     }
00094     else
00095     {
00096         printf("usage: hsmsd_test4 -[a,p]\n");
00097         exit(-1);
00098     }
00099 
00100     //sem_init(&cnx_ready_sem, 0, 0);
00101     pthread_cond_init(&cnx_ready, NULL);
00102 
00103     if(is_active)
00104     {
00105         pcnx            = &active_cnx;
00106         msg_handler     = active_hsms_msg_handler;
00107         cnx_name        = "hsms_test4_active";
00108         close_timeout   = 3;
00109         open_timeout    = 3;
00110     }
00111     else
00112     {
00113         pcnx            = &passive_cnx;
00114         msg_handler     = passive_hsms_msg_handler;
00115         cnx_name        = "hsms_test4_passive";
00116         close_timeout   = 10;
00117         open_timeout    = 10;
00118     }
00119 
00120     r = hsmsd_alloc_handle(pcnx, cnx_name);
00121     printf("alloc cnx returned %d\n", r);
00122     if(r) exit(-1);
00123 
00124     r = hsmsd_subscribe_for_msgs(*pcnx,msg_handler);
00125     fprintf(stdout, "hsmsd_subscribe_for_msgs returned %d\n", r);
00126     if(r) exit(-3);
00127 
00128     r = hsmsd_subscribe_for_cnx_state(*pcnx, cnx_state_handler);
00129     fprintf(stdout, "hsmsd_subscribe_for_cnx_state returned %d\n", r);
00130     if(r) exit(-5);
00131 
00132     prepare_dict();
00133     fprintf(stdout, "dict prepared\n");
00134 
00135     start_threads((void*)pcnx);
00136     fprintf(stdout, "threads started\n");
00137     
00138     r = hsmsd_cnx_start(*pcnx);
00139     fprintf(stdout, "hsmsd_cnx_start returned %d\n", r);
00140     if(r) exit(-8);
00141     
00142     pthread_mutex_lock(&cnx_mx);
00143     pthread_cond_wait(&cnx_ready, &cnx_mx);
00144     pthread_mutex_unlock(&cnx_mx);
00145 
00146     while(1)
00147     {
00148         counter++;
00149         sleep(1);
00150 
00151         sem_getvalue(&msg_dict[0].sent_sem, &sent[0]);
00152         sem_getvalue(&msg_dict[1].sent_sem, &sent[1]);
00153         sem_getvalue(&msg_dict[2].sent_sem, &sent[2]);
00154         sem_getvalue(&msg_dict[3].sent_sem, &sent[3]);
00155 
00156         sem_getvalue(&msg_dict[0].recv_sem, &recv[0]);
00157         sem_getvalue(&msg_dict[1].recv_sem, &recv[1]);
00158         sem_getvalue(&msg_dict[2].recv_sem, &recv[2]);
00159         sem_getvalue(&msg_dict[3].recv_sem, &recv[3]);
00160 
00161         sem_getvalue(&msg_dict[0].err_sem, &err[0]);
00162         sem_getvalue(&msg_dict[1].err_sem, &err[1]);
00163         sem_getvalue(&msg_dict[2].err_sem, &err[2]);
00164         sem_getvalue(&msg_dict[3].err_sem, &err[3]);
00165         
00166         fprintf(stdout, "sent:\t%d\t|\t%d\t|\t%d\t|\t%d\n", 
00167                                  sent[0], sent[1], sent[2], sent[3]);
00168         fprintf(stdout, "recv:\t%d\t|\t%d\t|\t%d\t|\t%d\n", 
00169                                  recv[0], recv[1], recv[2], recv[3]);
00170         fprintf(stdout, "err:\t%d\t|\t%d\t|\t%d\t|\t%d\n", 
00171                                  err[0], err[1], err[2], err[3]);
00172         fflush(stdout);
00173 
00174         if(close_timeout > 0)
00175         {
00176             printf("main: get_cnx_state -->\n");
00177             hsmsd_cnx_get_state(*pcnx, &state);
00178             printf("main: get_cnx_state: %d <--\n", state);
00179             printf("main: counter: %d\n", counter);
00180             if(CNX_SELECTED == state && counter >= close_timeout)
00181             {
00182                 counter = 0;
00183                 printf("stopping %s...\n", cnx_name);
00184                 fflush(stdout);
00185                 hsmsd_cnx_stop(*pcnx);
00186             }
00187             else if(CNX_NOT_CONNECTED == state && counter >= open_timeout)
00188             {
00189                 counter = 0;
00190                 printf("starting %s...\n", cnx_name);
00191                 fflush(stdout);
00192                 hsmsd_cnx_start(*pcnx);
00193 
00194                 fprintf(stdout, "main thread: waiting for cnx ready sem...\n");
00195                 fflush(stdout);
00196                 pthread_mutex_lock(&cnx_mx);
00197                 pthread_cond_wait(&cnx_ready, &cnx_mx);
00198                 pthread_mutex_unlock(&cnx_mx);
00199                 fprintf(stdout, "main thread: cnx ready sem UP!\n");
00200                 fflush(stdout);
00201             }
00202         }
00203     }
00204 
00205     return 0;
00206 }
00207 
00208 void active_hsms_msg_handler(hsmsd_msg_t* msg)
00209 {
00210     int pool_num = msg->sysbytes >> 24;
00211     if(4 > pool_num)
00212     {
00213         sem_post(&msg_dict[pool_num].recv_sem);
00214     }
00215     else
00216     {
00217         fprintf(stdout, ">>>!!!!!unknown id: %d!!!!!<<<\n",pool_num);
00218     }
00219     free(msg);
00220 
00221 }
00222 
00223 void passive_hsms_msg_handler(hsmsd_msg_t* msg)
00224 {
00225     int r;
00226     switch(msg->function)
00227     {
00228         case 9:
00229             r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s14f10_msg);
00230         break;
00231         case 13:
00232             r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s1f14_msg);
00233         break;
00234         case 15:
00235             r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s16f16_msg);
00236         break;
00237         case 17:
00238             r = hsmsd_cnx_send_msg(passive_cnx, (hsmsd_msg_t*)&s3f18_msg);
00239         break;
00240         default:
00241         break;
00242     }
00243     free(msg);
00244 }
00245 
00246 static char str_state[][32] = {"NOT CONNECTED", "NOT SELECTED", "SELECTED"};
00247 static hsmsd_cnx_state_t cached_state;
00248 void cnx_state_handler(hsmsd_cnx_state_t state)
00249 {
00250     cached_state = state;
00251     if(CNX_SELECTED == state)
00252     {
00253         pthread_mutex_lock(&cnx_mx);
00254         pthread_cond_broadcast(&cnx_ready);
00255         pthread_mutex_unlock(&cnx_mx);
00256     }
00257     fprintf(stdout, "cnx_state_handler: %s\n", str_state[state]);
00258     fflush(stdout);
00259 }
00260 
00261 
00262 void prepare_dict(void)
00263 {
00264     sem_init(&msg_dict[0].sent_sem, 0, 0);
00265     sem_init(&msg_dict[1].sent_sem, 0, 0);
00266     sem_init(&msg_dict[2].sent_sem, 0, 0);
00267     sem_init(&msg_dict[3].sent_sem, 0, 0);
00268 
00269     sem_init(&msg_dict[0].recv_sem, 0, 0);
00270     sem_init(&msg_dict[1].recv_sem, 0, 0);
00271     sem_init(&msg_dict[2].recv_sem, 0, 0);
00272     sem_init(&msg_dict[3].recv_sem, 0, 0);
00273 
00274     sem_init(&(msg_dict[0].err_sem), 0, 0);
00275     sem_init(&(msg_dict[1].err_sem), 0, 0);
00276     sem_init(&(msg_dict[2].err_sem), 0, 0);
00277     sem_init(&(msg_dict[3].err_sem), 0, 0);
00278 }
00279 
00280 void start_threads(void* pdata)
00281 {
00282     pthread_t id0, id1, id2, id3;
00283     pthread_create(&id0, NULL, thread_func0, pdata);
00284     pthread_create(&id1, NULL, thread_func1, pdata);
00285     pthread_create(&id2, NULL, thread_func2, pdata);
00286     pthread_create(&id3, NULL, thread_func3, pdata);
00287 }
00288 
00289 void* thread_funcx(int num, void* data)
00290 {
00291     int send_failure = 0;
00292     hsmsd_cnx_state_t state;
00293 
00294     hsmsd_handle_t h = *((hsmsd_handle_t*)data);
00295 
00296     fprintf(stdout, "thread function %d\n", num);
00297 
00298     for(;;)
00299     {
00300         hsmsd_cnx_get_state(h, &state);
00301         if(CNX_SELECTED != state)
00302         {
00303             fprintf(stdout, "thread function %d: waiting for cnx ready sem...\n", num);
00304             pthread_mutex_lock(&cnx_mx);
00305             pthread_cond_wait(&cnx_ready, &cnx_mx);
00306             pthread_mutex_unlock(&cnx_mx);
00307             fprintf(stdout, "thread function %d: cnx ready sem UP!\n", num);
00308         }
00309         for(;;)
00310         {
00311             usleep(msg_dict[num].us_timeout);
00312             msg_dict[num].primary->sysbytes += 1;
00313             msg_dict[num].secondary->sysbytes += 1;
00314 
00315             send_failure = hsmsd_cnx_send_msg(h, msg_dict[num].primary);
00316 
00317             if(0 == send_failure)
00318             {
00319 
00320                 sem_post(&msg_dict[num].sent_sem);
00321             }
00322             else
00323             {
00324                 fprintf(stdout, "thread function %d: error sending msg s%df%d - %s\n", 
00325                             num, msg_dict[num].primary->stream, 
00326                             msg_dict[num].primary->function,
00327                             strerror(send_failure));
00328                 sem_post(&msg_dict[num].err_sem);
00329                 //pthread_exit(NULL);
00330                 break;//go through cnx state check
00331             }
00332         }
00333     }
00334     fprintf(stdout, "thread function %d: exit...\n", num);
00335     return NULL;
00336 }
00337 
00338 void* thread_func0(void* data)
00339 {
00340     return thread_funcx(0, data);
00341 }
00342 
00343 void* thread_func1(void* data)
00344 {
00345     return thread_funcx(1, data);
00346 }
00347 
00348 void* thread_func2(void* data)
00349 {
00350     return thread_funcx(2, data);
00351 }
00352 
00353 void* thread_func3(void* data)
00354 {
00355     return thread_funcx(3, data);
00356 }

Generated on Fri Oct 3 15:30:01 2008 for FREESECS hsms by  doxygen 1.5.1