X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Feventclient%2Fserv_eventclient.c;h=f7e286a9ab14cf6188f352b52ac036224df99982;hb=8357d67fb22adec3b854d61bdbd898dcfcc91959;hp=39bb108a7af1dce3e03fc2ef65f8fe39e1a5eb70;hpb=ef347e598ab670b87e178af7fc6b00795494303a;p=citadel.git diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 39bb108a7..f7e286a9a 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -61,36 +61,41 @@ ev_loop *event_base; int DebugEventLoop = 0; +int DebugEventLoopBacktrace = 0; int DebugCurl = 0; +pthread_key_t evConKey; long EvIDSource = 1; /***************************************************************************** * libevent / curl integration * *****************************************************************************/ -#define MOPT(s, v)\ +#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) + +#define EVCURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID, __VA_ARGS__) + +#define EVCURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID) + +#define CURL_syslog(LEVEL, FORMAT, ...) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) + +#define CURLM_syslog(LEVEL, FORMAT) \ + DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT) + +#define MOPT(s, v) \ do { \ sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v)); \ if (sta) { \ - syslog(LOG_ERR, "EVCURL: error setting option " \ + EVQ_syslog(LOG_ERR, "error setting option " \ #s " on curl multi handle: %s\n", \ curl_easy_strerror(sta)); \ exit (1); \ } \ } while (0) -#define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) - -#define EVCURL_syslog(LEVEL, FORMAT, ...) \ - DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID, __VA_ARGS__) - -#define EVCURLM_syslog(LEVEL, FORMAT) \ - DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, IO->ID, CCID) - -#define CURL_syslog(LEVEL, FORMAT, ...) \ - DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) - -#define CURLM_syslog(LEVEL, FORMAT) \ - DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT) typedef struct _evcurl_global_data { int magic; @@ -102,6 +107,8 @@ typedef struct _evcurl_global_data { ev_async WakeupCurl; evcurl_global_data global; +eNextState QueueAnDBOperation(AsyncIO *IO); + static void gotstatus(int nnrun) { @@ -119,7 +126,7 @@ gotstatus(int nnrun) if (CURLMSG_DONE == msg->msg) { CURL *chnd; - char *chandle = NULL; + void *chandle = NULL; CURLcode sta; CURLMcode msta; AsyncIO*IO; @@ -129,15 +136,23 @@ gotstatus(int nnrun) sta = curl_easy_getinfo(chnd, CURLINFO_PRIVATE, &chandle); - EVCURLM_syslog(LOG_DEBUG, "request complete\n"); if (sta) { - EVCURL_syslog(LOG_ERR, - "error asking curl for private" - " cookie of curl handle: %s\n", - curl_easy_strerror(sta)); + syslog(LOG_ERR, + "error asking curl for private" + " cookie of curl handle: %s\n", + curl_easy_strerror(sta)); continue; } IO = (AsyncIO *)chandle; + if (IO->ID == 0) { + EVCURL_syslog(LOG_ERR, + "Error, invalid IO context %p\n", + IO); + continue; + } + SetEVState(IO, eCurlGotStatus); + + EVCURLM_syslog(LOG_DEBUG, "request complete\n"); IO->Now = ev_now(event_base); @@ -149,9 +164,15 @@ gotstatus(int nnrun) EVCURL_syslog(LOG_ERR, "error description: %s\n", IO->HttpReq.errdesc); + IO->HttpReq.CurlError = curl_easy_strerror(sta); EVCURL_syslog(LOG_ERR, "error performing request: %s\n", - curl_easy_strerror(sta)); + IO->HttpReq.CurlError); + if (sta == CURLE_OPERATION_TIMEDOUT) + { + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } } sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, @@ -161,7 +182,7 @@ gotstatus(int nnrun) "error asking curl for " "response code from request: %s\n", curl_easy_strerror(sta)); - EVCURL_syslog(LOG_ERR, + EVCURL_syslog(LOG_DEBUG, "http response code was %ld\n", (long)IO->HttpReq.httpcode); @@ -181,8 +202,8 @@ gotstatus(int nnrun) switch(IO->SendDone(IO)) { case eDBQuery: - curl_easy_cleanup(IO->HttpReq.chnd); - IO->HttpReq.chnd = NULL; + FreeURL(&IO->ConnectMe); + QueueAnDBOperation(IO); break; case eSendDNSQuery: case eReadDNSReply: @@ -194,8 +215,6 @@ gotstatus(int nnrun) case eReadMore: case eReadPayload: case eReadFile: - curl_easy_cleanup(IO->HttpReq.chnd); - IO->HttpReq.chnd = NULL; break; case eTerminateConnection: case eAbort: @@ -261,9 +280,11 @@ got_out(struct ev_loop *loop, ev_io *ioev, int events) } static size_t -gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { +gotdata(void *data, size_t size, size_t nmemb, void *cglobal) +{ AsyncIO *IO = (AsyncIO*) cglobal; + SetEVState(IO, eCurlGotData); if (IO->HttpReq.ReplyData == NULL) { IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); @@ -297,7 +318,7 @@ gotwatchsock(CURL *easy, { evcurl_global_data *global = cglobal; CURLM *mhnd = global->mhnd; - char *f; + void *f; AsyncIO *IO = (AsyncIO*) vIO; CURLcode sta; const char *Action; @@ -305,13 +326,14 @@ gotwatchsock(CURL *easy, if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); if (sta) { - EVCURL_syslog(LOG_ERR, - "EVCURL: error asking curl for private " - "cookie of curl handle: %s\n", - curl_easy_strerror(sta)); + CURL_syslog(LOG_ERR, + "EVCURL: error asking curl for private " + "cookie of curl handle: %s\n", + curl_easy_strerror(sta)); return -1; } IO = (AsyncIO *) f; + SetEVState(IO, eCurlNewIO); EVCURL_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl); @@ -327,6 +349,7 @@ gotwatchsock(CURL *easy, curl_multi_assign(mhnd, fd, IO); } + SetEVState(IO, eCurlGotIO); IO->Now = ev_now(event_base); Action = ""; @@ -357,12 +380,12 @@ gotwatchsock(CURL *easy, switch (action) { case CURL_POLL_NONE: - EVCURLM_syslog(LOG_ERR, + EVCURLM_syslog(LOG_DEBUG, "called first time " "to register this sockwatcker\n"); break; case CURL_POLL_REMOVE: - EVCURLM_syslog(LOG_ERR, + EVCURLM_syslog(LOG_DEBUG, "called last time to unregister " "this sockwatcher\n"); ev_io_stop(event_base, &IO->recv_event); @@ -481,6 +504,11 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, { CURLMcode msta; AsyncIO *IO = watcher->data; + + if (IO == NULL) + return; + + SetEVState(IO, eCurlShutdown); IO->Now = ev_now(event_base); EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); @@ -509,6 +537,7 @@ evcurl_handle_start(AsyncIO *IO) CURLcode sta; CURL *chnd; + SetEVState(IO, eCurlStart); chnd = IO->HttpReq.chnd; EVCURL_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); @@ -544,10 +573,12 @@ evcurl_handle_start(AsyncIO *IO) IO->HttpReq.attached = 1; ev_async_send (event_base, &WakeupCurl); + ev_cleanup_init(&IO->abort_by_shutdown, IOcurl_abort_shutdown_callback); ev_cleanup_start(event_base, &IO->abort_by_shutdown); + return eReadMessage; } @@ -572,17 +603,21 @@ static void evcurl_shutdown (void) * this currently is the main loop (which may change in some future?) */ int evbase_count = 0; -int event_add_pipe[2] = {-1, -1}; pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; +extern void ShutDownCLient(AsyncIO *IO); ev_async AddJob; ev_async ExitEventLoop; static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { + CitContext *Ctx; + long IOID = -1; + long count = 0; ev_tstamp Now; HashList *q; void *v; @@ -607,17 +642,43 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) while (GetNextHashPos(q, It, &len, &Key, &v)) { IOAddHandler *h = v; + count ++; if (h->IO->ID == 0) { h->IO->ID = EvIDSource++; } + IOID = h->IO->ID; if (h->IO->StartIO == 0.0) h->IO->StartIO = Now; + + SetEVState(h->IO, eIOAttach); + + Ctx = h->IO->CitContext; + become_session(Ctx); + h->IO->Now = Now; - h->EvAttch(h->IO); + switch (h->EvAttch(h->IO)) + { + case eReadMore: + case eReadMessage: + case eReadFile: + case eSendReply: + case eSendMore: + case eReadPayload: + case eSendFile: + case eDBQuery: + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + break; + case eTerminateConnection: + case eAbort: + ShutDownCLient(h->IO); + break; + } } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "EVENT Q Add done.\n"); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld done.", IOSTR, IOID, count); } @@ -625,26 +686,15 @@ static void EventExitCallback(EV_P_ ev_async *w, int revents) { ev_break(event_base, EVBREAK_ALL); - syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); } void InitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&EventQueueMutex, NULL); - - if (pipe(event_add_pipe) != 0) { - syslog(LOG_EMERG, - "Unable to create pipe for libev queueing: %s\n", - strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(event_add_pipe[1], &LimitSet); + pthread_mutex_init(&EventExitQueueMutex, NULL); QueueEvents = NewHash(1, Flathash); InboundEventQueues[0] = NewHash(1, Flathash); @@ -654,6 +704,7 @@ void InitEventQueue(void) extern void CtdlDestroyEVCleanupHooks(void); extern int EVQShutDown; +const char *IOLog = "IO"; /* * this thread operates the select() etc. via libev. */ @@ -662,8 +713,10 @@ void *client_event_thread(void *arg) struct CitContext libev_client_CC; CtdlFillSystemContext(&libev_client_CC, "LibEv Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - syslog(LOG_DEBUG, "client_event_thread() initializing\n"); + + pthread_setspecific(evConKey, IOLog); + + EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n"); event_base = ev_default_loop (EVFLAG_AUTO); ev_async_init(&AddJob, QueueEventAddCallback); @@ -677,22 +730,23 @@ void *client_event_thread(void *arg) ev_run (event_base, 0); - syslog(LOG_DEBUG, "client_event_thread() exiting\n"); + EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n"); ///what todo here? CtdlClearSystemContext(); + pthread_mutex_lock(&EventExitQueueMutex); ev_loop_destroy (EV_DEFAULT_UC); + event_base = NULL; DeleteHash(&QueueEvents); InboundEventQueue = NULL; DeleteHash(&InboundEventQueues[0]); DeleteHash(&InboundEventQueues[1]); /* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); - close(event_add_pipe[0]); - close(event_add_pipe[1]); CtdlDestroyEVCleanupHooks(); - EVQShutDown = 1; + pthread_mutex_unlock(&EventExitQueueMutex); + EVQShutDown = 1; return(NULL); } @@ -703,8 +757,8 @@ void *client_event_thread(void *arg) */ ev_loop *event_db; int evdb_count = 0; -int evdb_add_pipe[2] = {-1, -1}; pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */ HashList *DBQueueEvents = NULL; HashList *DBInboundEventQueue = NULL; HashList *DBInboundEventQueues[2] = { NULL, NULL }; @@ -716,6 +770,9 @@ extern void ShutDownDBCLient(AsyncIO *IO); static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) { + CitContext *Ctx; + long IOID = -1; + long count = 0;; ev_tstamp Now; HashList *q; void *v; @@ -742,11 +799,18 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) { IOAddHandler *h = v; eNextState rc; + count ++; if (h->IO->ID == 0) h->IO->ID = EvIDSource++; + IOID = h->IO->ID; if (h->IO->StartDB == 0.0) h->IO->StartDB = Now; h->IO->Now = Now; + + SetEVState(h->IO, eDBAttach); + Ctx = h->IO->CitContext; + become_session(Ctx); + ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); rc = h->EvAttch(h->IO); switch (rc) { @@ -758,13 +822,13 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "DBEVENT Q Add done.\n"); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count); } static void DBEventExitCallback(EV_P_ ev_async *w, int revents) { - syslog(LOG_DEBUG, "DB EVENT Q exiting.\n"); + EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n"); ev_break(event_db, EVBREAK_ALL); } @@ -772,17 +836,8 @@ static void DBEventExitCallback(EV_P_ ev_async *w, int revents) void DBInitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&DBEventQueueMutex, NULL); - - if (pipe(evdb_add_pipe) != 0) { - syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(evdb_add_pipe[1], &LimitSet); + pthread_mutex_init(&DBEventExitQueueMutex, NULL); DBQueueEvents = NewHash(1, Flathash); DBInboundEventQueues[0] = NewHash(1, Flathash); @@ -790,18 +845,23 @@ void DBInitEventQueue(void) DBInboundEventQueue = DBInboundEventQueues[0]; } +const char *DBLog = "BD"; + /* * this thread operates writing to the message database via libev. */ void *db_event_thread(void *arg) { + ev_loop *tmp; struct CitContext libev_msg_CC; + pthread_setspecific(evConKey, DBLog); + CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); - syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); - event_db = ev_loop_new (EVFLAG_AUTO); + EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); + + tmp = event_db = ev_loop_new (EVFLAG_AUTO); ev_async_init(&DBAddJob, DBQueueEventAddCallback); ev_async_start(event_db, &DBAddJob); @@ -810,26 +870,26 @@ void *db_event_thread(void *arg) ev_run (event_db, 0); - syslog(LOG_DEBUG, "dbevent_thread() exiting\n"); + pthread_mutex_lock(&DBEventExitQueueMutex); -//// what to do here? CtdlClearSystemContext(); - ev_loop_destroy (event_db); + event_db = NULL; + EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n"); DeleteHash(&DBQueueEvents); DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); - close(evdb_add_pipe[0]); - close(evdb_add_pipe[1]); /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ + ev_loop_destroy (tmp); + pthread_mutex_unlock(&DBEventExitQueueMutex); return(NULL); } void ShutDownEventQueues(void) { - syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n"); + EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n"); pthread_mutex_lock(&DBEventQueueMutex); ev_async_send (event_db, &DBExitEventLoop); @@ -840,22 +900,33 @@ void ShutDownEventQueues(void) pthread_mutex_unlock(&EventQueueMutex); } -void DebugEventloopEnable(void) +void DebugEventloopEnable(const int n) { - DebugEventLoop = 1; + DebugEventLoop = n; +} +void DebugEventloopBacktraceEnable(const int n) +{ + DebugEventLoopBacktrace = n; } -void DebugCurlEnable(void) +void DebugCurlEnable(const int n) { - DebugCurl = 1; + DebugCurl = n; } +const char *WLog = "WX"; CTDL_MODULE_INIT(event_client) { if (!threading) { - CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable); - CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable); + if (pthread_key_create(&evConKey, NULL) != 0) { + syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno)); + } + pthread_setspecific(evConKey, WLog); + + CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop); + CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace); + CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl); InitEventQueue(); DBInitEventQueue(); CtdlThreadCreate(client_event_thread);