X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Feventclient%2Fserv_eventclient.c;h=f991cf90587480a53b73735a9ab5fe9387c6ce0c;hb=271924aeff4c786b31ec293ab48c861f7fe77bd0;hp=ce5d7f0412c12246e8663e6d2025b992cd6992f7;hpb=b1493ab199a2cf39b919cb8d0536263ae1d280aa;p=citadel.git diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index ce5d7f041..f991cf905 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -60,22 +60,42 @@ #include "serv_curl.h" ev_loop *event_base; +int DebugEventLoop = 0; +int DebugEventLoopBacktrace = 0; +int DebugCurl = 0; long EvIDSource = 1; /***************************************************************************** * libevent / curl integration * *****************************************************************************/ -#define MOPT(s, v)\ +#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) + +#define EVCURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, \ + IO->ID, CCID, __VA_ARGS__) + +#define EVCURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, \ + IO->ID, CCID) + +#define CURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) + +#define CURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT) + +#define MOPT(s, v) \ do { \ sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ if (sta) { \ - syslog(LOG_ERR, "EVCURL: error setting option " \ + EVQ_syslog(LOG_ERR, "error setting option " \ #s " on curl multi handle: %s\n", \ curl_easy_strerror(sta)); \ exit (1); \ } \ } while (0) + typedef struct _evcurl_global_data { int magic; CURLM *mhnd; @@ -94,16 +114,16 @@ gotstatus(int nnrun) global.nrun = nnrun; - syslog(LOG_DEBUG, - "CURLEV: gotstatus(): about to call curl_multi_info_read\n"); + CURLM_syslog(LOG_DEBUG, + "gotstatus(): about to call curl_multi_info_read\n"); while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) { - syslog(LOG_ERR, - "EVCURL: got curl multi_info message msg=%d\n", - msg->msg); + CURL_syslog(LOG_DEBUG, + "got curl multi_info message msg=%d\n", + msg->msg); if (CURLMSG_DONE == msg->msg) { CURL *chnd; - char *chandle; + char *chandle = NULL; CURLcode sta; CURLMcode msta; AsyncIO*IO; @@ -113,13 +133,22 @@ gotstatus(int nnrun) sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle); - syslog(LOG_ERR, "EVCURL: request complete\n"); - if (sta) - syslog(LOG_ERR, - "EVCURL: error asking curl for private" - " cookie of curl handle: %s\n", - curl_easy_strerror(sta)); + if (sta) { + EVCURL_syslog(LOG_ERR, + "error asking curl for private" + " cookie of curl handle: %s\n", + curl_easy_strerror(sta)); + continue; + } IO = (AsyncIO *)chandle; + if (IO->ID == 0) { + EVCURL_syslog(LOG_ERR, + "Error, invalid IO context %p\n", + IO); + continue; + } + + EVCURLM_syslog(LOG_DEBUG, "request complete\n"); IO->Now = ev_now(event_base); @@ -128,36 +157,36 @@ gotstatus(int nnrun) sta = msg->data.result; if (sta) { - EV_syslog(LOG_ERR, - "EVCURL: error description: %s\n", - IO->HttpReq.errdesc); - EV_syslog(LOG_ERR, - "EVCURL: error performing request: %s\n", - curl_easy_strerror(sta)); + EVCURL_syslog(LOG_ERR, + "error description: %s\n", + IO->HttpReq.errdesc); + EVCURL_syslog(LOG_ERR, + "error performing request: %s\n", + curl_easy_strerror(sta)); } sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, &IO->HttpReq.httpcode); if (sta) - EV_syslog(LOG_ERR, - "EVCURL: error asking curl for " - "response code from request: %s\n", - curl_easy_strerror(sta)); - EV_syslog(LOG_ERR, - "EVCURL: http response code was %ld\n", - (long)IO->HttpReq.httpcode); + EVCURL_syslog(LOG_ERR, + "error asking curl for " + "response code from request: %s\n", + curl_easy_strerror(sta)); + EVCURL_syslog(LOG_DEBUG, + "http response code was %ld\n", + (long)IO->HttpReq.httpcode); curl_slist_free_all(IO->HttpReq.headers); msta = curl_multi_remove_handle(global.mhnd, chnd); if (msta) - EV_syslog(LOG_ERR, - "EVCURL: warning problem detaching " - "completed handle from curl multi: " - "%s\n", - curl_multi_strerror(msta)); + EVCURL_syslog(LOG_ERR, + "warning problem detaching " + "completed handle from curl multi: " + "%s\n", + curl_multi_strerror(msta)); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); IO->HttpReq.attached = 0; switch(IO->SendDone(IO)) @@ -203,13 +232,13 @@ stepmulti(void *data, curl_socket_t fd, int which) which, &running_handles); - syslog(LOG_DEBUG, "EVCURL: stepmulti(): calling gotstatus()\n"); + CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n"); if (msta) - syslog(LOG_ERR, - "EVCURL: error in curl processing events" - "on multi handle, fd %d: %s\n", - (int)fd, - curl_multi_strerror(msta)); + CURL_syslog(LOG_ERR, + "error in curl processing events" + "on multi handle, fd %d: %s\n", + (int)fd, + curl_multi_strerror(msta)); if (global.nrun != running_handles) gotstatus(running_handles); @@ -218,16 +247,16 @@ stepmulti(void *data, curl_socket_t fd, int which) static void gottime(struct ev_loop *loop, ev_timer *timeev, int events) { - syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); + CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n"); stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0); } static void got_in(struct ev_loop *loop, ev_io *ioev, int events) { - syslog(LOG_DEBUG, - "EVCURL: waking up curl for io on fd %d\n", - (int)ioev->fd); + CURL_syslog(LOG_DEBUG, + "EVCURL: waking up curl for io on fd %d\n", + (int)ioev->fd); stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN); } @@ -235,9 +264,9 @@ got_in(struct ev_loop *loop, ev_io *ioev, int events) static void got_out(struct ev_loop *loop, ev_io *ioev, int events) { - syslog(LOG_DEBUG, - "EVCURL: waking up curl for io on fd %d\n", - (int)ioev->fd); + CURL_syslog(LOG_DEBUG, + "waking up curl for io on fd %d\n", + (int)ioev->fd); stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT); } @@ -259,7 +288,7 @@ gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { static int gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) { - syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); + CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms); evcurl_global_data *global = cglobal; ev_timer_stop(EV_DEFAULT, &global->timeev); if (tblock_ms < 0 || 14000 < tblock_ms) @@ -287,16 +316,16 @@ gotwatchsock(CURL *easy, if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); if (sta) { - EV_syslog(LOG_ERR, - "EVCURL: error asking curl for private " - "cookie of curl handle: %s\n", - curl_easy_strerror(sta)); + EVCURL_syslog(LOG_ERR, + "EVCURL: error asking curl for private " + "cookie of curl handle: %s\n", + curl_easy_strerror(sta)); return -1; } IO = (AsyncIO *) f; - EV_syslog(LOG_DEBUG, - "EVCURL: got socket for URL: %s\n", - IO->ConnectMe->PlainUrl); + EVCURL_syslog(LOG_DEBUG, + "EVCURL: got socket for URL: %s\n", + IO->ConnectMe->PlainUrl); if (IO->SendBuf.fd != 0) { @@ -332,21 +361,21 @@ gotwatchsock(CURL *easy, } - EV_syslog(LOG_DEBUG, - "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", - (int)fd, Action, action); + EVCURL_syslog(LOG_DEBUG, + "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", + (int)fd, Action, action); switch (action) { case CURL_POLL_NONE: - EVM_syslog(LOG_ERR, - "EVCURL: called first time " - "to register this sockwatcker\n"); + EVCURLM_syslog(LOG_DEBUG, + "called first time " + "to register this sockwatcker\n"); break; case CURL_POLL_REMOVE: - EVM_syslog(LOG_ERR, - "EVCURL: called last time to unregister " - "this sockwatcher\n"); + EVCURLM_syslog(LOG_DEBUG, + "called last time to unregister " + "this sockwatcher\n"); ev_io_stop(event_base, &IO->recv_event); ev_io_stop(event_base, &IO->send_event); break; @@ -377,17 +406,17 @@ void curl_init_connectionpool(void) if (sta) { - syslog(LOG_ERR, - "EVCURL: error initializing curl library: %s\n", - curl_easy_strerror(sta)); + CURL_syslog(LOG_ERR, + "error initializing curl library: %s\n", + curl_easy_strerror(sta)); exit(1); } mhnd = global.mhnd = curl_multi_init(); if (!mhnd) { - syslog(LOG_ERR, - "EVCURL: error initializing curl multi handle\n"); + CURLM_syslog(LOG_ERR, + "error initializing curl multi handle\n"); exit(3); } @@ -404,12 +433,12 @@ int evcurl_init(AsyncIO *IO) CURLcode sta; CURL *chnd; - EVM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); + EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n"); IO->HttpReq.attached = 0; chnd = IO->HttpReq.chnd = curl_easy_init(); if (!chnd) { - EVM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); + EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n"); return 0; } @@ -463,17 +492,20 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, { CURLMcode msta; AsyncIO *IO = watcher->data; + + if (IO == NULL) + return; IO->Now = ev_now(event_base); - EV_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); + EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); curl_slist_free_all(IO->HttpReq.headers); msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd); if (msta) { - EV_syslog(LOG_ERR, - "EVCURL: warning problem detaching completed handle " - "from curl multi: %s\n", - curl_multi_strerror(msta)); + EVCURL_syslog(LOG_ERR, + "EVCURL: warning problem detaching completed handle " + "from curl multi: %s\n", + curl_multi_strerror(msta)); } curl_easy_cleanup(IO->HttpReq.chnd); @@ -492,7 +524,7 @@ evcurl_handle_start(AsyncIO *IO) CURL *chnd; chnd = IO->HttpReq.chnd; - EV_syslog(LOG_DEBUG, + EVCURL_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); OPT(URL, IO->ConnectMe->PlainUrl); if (StrLength(IO->ConnectMe->CurlCreds)) @@ -515,27 +547,29 @@ evcurl_handle_start(AsyncIO *IO) OPT(HTTPHEADER, IO->HttpReq.headers); IO->NextState = eConnect; - EVM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); + EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n"); msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd); if (msta) { - EV_syslog(LOG_ERR, + EVCURL_syslog(LOG_ERR, "EVCURL: error attaching to curl multi handle: %s\n", curl_multi_strerror(msta)); } IO->HttpReq.attached = 1; ev_async_send (event_base, &WakeupCurl); + ev_cleanup_init(&IO->abort_by_shutdown, IOcurl_abort_shutdown_callback); ev_cleanup_start(event_base, &IO->abort_by_shutdown); + return eReadMessage; } static void WakeupCurlCallback(EV_P_ ev_async *w, int revents) { - syslog(LOG_DEBUG, "EVCURL: waking up curl multi handle\n"); + CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n"); curl_multi_perform(&global, CURL_POLL_NONE); } @@ -544,7 +578,7 @@ static void evcurl_shutdown (void) { curl_global_cleanup(); curl_multi_cleanup(global.mhnd); - syslog(LOG_DEBUG, "client_event_thread() initializing\n"); + CURLM_syslog(LOG_DEBUG, "exiting\n"); } /***************************************************************************** * libevent integration * @@ -554,8 +588,8 @@ static void evcurl_shutdown (void) * this currently is the main loop (which may change in some future?) */ int evbase_count = 0; -int event_add_pipe[2] = {-1, -1}; pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; @@ -594,12 +628,13 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) } if (h->IO->StartIO == 0.0) h->IO->StartIO = Now; + h->IO->Now = Now; h->EvAttch(h->IO); } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "EVENT Q Add done.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n"); } @@ -607,26 +642,15 @@ static void EventExitCallback(EV_P_ ev_async *w, int revents) { ev_break(event_base, EVBREAK_ALL); - syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); } void InitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&EventQueueMutex, NULL); - - if (pipe(event_add_pipe) != 0) { - syslog(LOG_EMERG, - "Unable to create pipe for libev queueing: %s\n", - strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(event_add_pipe[1], &LimitSet); + pthread_mutex_init(&EventExitQueueMutex, NULL); QueueEvents = NewHash(1, Flathash); InboundEventQueues[0] = NewHash(1, Flathash); @@ -645,7 +669,7 @@ void *client_event_thread(void *arg) CtdlFillSystemContext(&libev_client_CC, "LibEv Thread"); // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - syslog(LOG_DEBUG, "client_event_thread() initializing\n"); + EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n"); event_base = ev_default_loop (EVFLAG_AUTO); ev_async_init(&AddJob, QueueEventAddCallback); @@ -659,22 +683,23 @@ void *client_event_thread(void *arg) ev_run (event_base, 0); - syslog(LOG_DEBUG, "client_event_thread() exiting\n"); + EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n"); ///what todo here? CtdlClearSystemContext(); + pthread_mutex_lock(&EventExitQueueMutex); ev_loop_destroy (EV_DEFAULT_UC); + event_base = NULL; DeleteHash(&QueueEvents); InboundEventQueue = NULL; DeleteHash(&InboundEventQueues[0]); DeleteHash(&InboundEventQueues[1]); /* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); - close(event_add_pipe[0]); - close(event_add_pipe[1]); CtdlDestroyEVCleanupHooks(); - EVQShutDown = 1; + pthread_mutex_unlock(&EventExitQueueMutex); + EVQShutDown = 1; return(NULL); } @@ -685,8 +710,8 @@ void *client_event_thread(void *arg) */ ev_loop *event_db; int evdb_count = 0; -int evdb_add_pipe[2] = {-1, -1}; pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */ HashList *DBQueueEvents = NULL; HashList *DBInboundEventQueue = NULL; HashList *DBInboundEventQueues[2] = { NULL, NULL }; @@ -729,6 +754,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) if (h->IO->StartDB == 0.0) h->IO->StartDB = Now; h->IO->Now = Now; + ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); rc = h->EvAttch(h->IO); switch (rc) { @@ -740,13 +766,13 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "DBEVENT Q Add done.\n"); + EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n"); } static void DBEventExitCallback(EV_P_ ev_async *w, int revents) { - syslog(LOG_DEBUG, "DB EVENT Q exiting.\n"); + EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n"); ev_break(event_db, EVBREAK_ALL); } @@ -754,17 +780,8 @@ static void DBEventExitCallback(EV_P_ ev_async *w, int revents) void DBInitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&DBEventQueueMutex, NULL); - - if (pipe(evdb_add_pipe) != 0) { - syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(evdb_add_pipe[1], &LimitSet); + pthread_mutex_init(&DBEventExitQueueMutex, NULL); DBQueueEvents = NewHash(1, Flathash); DBInboundEventQueues[0] = NewHash(1, Flathash); @@ -777,13 +794,14 @@ void DBInitEventQueue(void) */ void *db_event_thread(void *arg) { + ev_loop *tmp; struct CitContext libev_msg_CC; CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); - event_db = ev_loop_new (EVFLAG_AUTO); + EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); + + tmp = event_db = ev_loop_new (EVFLAG_AUTO); ev_async_init(&DBAddJob, DBQueueEventAddCallback); ev_async_start(event_db, &DBAddJob); @@ -792,26 +810,26 @@ void *db_event_thread(void *arg) ev_run (event_db, 0); - syslog(LOG_DEBUG, "dbevent_thread() exiting\n"); + pthread_mutex_lock(&DBEventExitQueueMutex); -//// what to do here? CtdlClearSystemContext(); - ev_loop_destroy (event_db); + event_db = NULL; + EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n"); DeleteHash(&DBQueueEvents); DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); - close(evdb_add_pipe[0]); - close(evdb_add_pipe[1]); /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ + ev_loop_destroy (tmp); + pthread_mutex_unlock(&DBEventExitQueueMutex); return(NULL); } void ShutDownEventQueues(void) { - syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n"); pthread_mutex_lock(&DBEventQueueMutex); ev_async_send (event_db, &DBExitEventLoop); @@ -822,10 +840,27 @@ void ShutDownEventQueues(void) pthread_mutex_unlock(&EventQueueMutex); } +void DebugEventloopEnable(const int n) +{ + DebugEventLoop = n; +} +void DebugEventloopBacktraceEnable(const int n) +{ + DebugEventLoopBacktrace = n; +} + +void DebugCurlEnable(const int n) +{ + DebugCurl = n; +} + CTDL_MODULE_INIT(event_client) { if (!threading) { + CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop); + CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace); + CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl); InitEventQueue(); DBInitEventQueue(); CtdlThreadCreate(client_event_thread);