X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Feventclient%2Fserv_eventclient.c;h=4a27c2482effb42bd47f7cbb99054218d58e63c8;hb=f9692bdc53d7094bbb7e5c5aba06a8dc8d19c852;hp=adf896972a0eb702a79dadd689179c3db5268774;hpb=96f09a277a33193bf175b81fbf79e676ab228615;p=citadel.git diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index adf896972..4a27c2482 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -63,6 +63,7 @@ ev_loop *event_base; int DebugEventLoop = 0; int DebugEventLoopBacktrace = 0; int DebugCurl = 0; +pthread_key_t evConKey; long EvIDSource = 1; /***************************************************************************** @@ -71,12 +72,12 @@ long EvIDSource = 1; #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0)) #define EVCURL_syslog(LEVEL, FORMAT, ...) \ - DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, \ - IO->ID, CCID, __VA_ARGS__) + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID, __VA_ARGS__) #define EVCURLM_syslog(LEVEL, FORMAT) \ - DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT, \ - IO->ID, CCID) + DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:%s[%ld]CC[%d] " FORMAT, \ + IOSTR, IO->ID, CCID) #define CURL_syslog(LEVEL, FORMAT, ...) \ DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__) @@ -147,6 +148,7 @@ gotstatus(int nnrun) IO); continue; } + SetEVState(IO, eCurlGotStatus); EVCURLM_syslog(LOG_DEBUG, "request complete\n"); @@ -160,9 +162,15 @@ gotstatus(int nnrun) EVCURL_syslog(LOG_ERR, "error description: %s\n", IO->HttpReq.errdesc); + IO->HttpReq.CurlError = curl_easy_strerror(sta); EVCURL_syslog(LOG_ERR, "error performing request: %s\n", - curl_easy_strerror(sta)); + IO->HttpReq.CurlError); + if (sta == CURLE_OPERATION_TIMEDOUT) + { + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } } sta = curl_easy_getinfo(chnd, CURLINFO_RESPONSE_CODE, @@ -186,15 +194,12 @@ gotstatus(int nnrun) "%s\n", curl_multi_strerror(msta)); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); IO->HttpReq.attached = 0; switch(IO->SendDone(IO)) { case eDBQuery: - curl_easy_cleanup(IO->HttpReq.chnd); - IO->HttpReq.chnd = NULL; - break; case eSendDNSQuery: case eReadDNSReply: case eConnect: @@ -205,8 +210,6 @@ gotstatus(int nnrun) case eReadMore: case eReadPayload: case eReadFile: - curl_easy_cleanup(IO->HttpReq.chnd); - IO->HttpReq.chnd = NULL; break; case eTerminateConnection: case eAbort: @@ -272,9 +275,11 @@ got_out(struct ev_loop *loop, ev_io *ioev, int events) } static size_t -gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { +gotdata(void *data, size_t size, size_t nmemb, void *cglobal) +{ AsyncIO *IO = (AsyncIO*) cglobal; + SetEVState(IO, eCurlGotData); if (IO->HttpReq.ReplyData == NULL) { IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); @@ -316,13 +321,14 @@ gotwatchsock(CURL *easy, if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); if (sta) { - EVCURL_syslog(LOG_ERR, - "EVCURL: error asking curl for private " - "cookie of curl handle: %s\n", - curl_easy_strerror(sta)); + CURL_syslog(LOG_ERR, + "EVCURL: error asking curl for private " + "cookie of curl handle: %s\n", + curl_easy_strerror(sta)); return -1; } IO = (AsyncIO *) f; + SetEVState(IO, eCurlNewIO); EVCURL_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl); @@ -338,6 +344,7 @@ gotwatchsock(CURL *easy, curl_multi_assign(mhnd, fd, IO); } + SetEVState(IO, eCurlGotIO); IO->Now = ev_now(event_base); Action = ""; @@ -495,6 +502,8 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, if (IO == NULL) return; + + SetEVState(IO, eCurlShutdown); IO->Now = ev_now(event_base); EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); @@ -523,6 +532,7 @@ evcurl_handle_start(AsyncIO *IO) CURLcode sta; CURL *chnd; + SetEVState(IO, eCurlStart); chnd = IO->HttpReq.chnd; EVCURL_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); @@ -593,6 +603,7 @@ pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue poin HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; +extern void ShutDownCLient(AsyncIO *IO); ev_async AddJob; ev_async ExitEventLoop; @@ -600,6 +611,8 @@ ev_async ExitEventLoop; static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { CitContext *Ctx; + long IOID = -1; + long count = 0; ev_tstamp Now; HashList *q; void *v; @@ -624,21 +637,43 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) while (GetNextHashPos(q, It, &len, &Key, &v)) { IOAddHandler *h = v; + count ++; if (h->IO->ID == 0) { h->IO->ID = EvIDSource++; } + IOID = h->IO->ID; if (h->IO->StartIO == 0.0) h->IO->StartIO = Now; + SetEVState(h->IO, eIOAttach); + Ctx = h->IO->CitContext; become_session(Ctx); h->IO->Now = Now; - h->EvAttch(h->IO); + switch (h->EvAttch(h->IO)) + { + case eReadMore: + case eReadMessage: + case eReadFile: + case eSendReply: + case eSendMore: + case eReadPayload: + case eSendFile: + case eDBQuery: + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + break; + case eTerminateConnection: + case eAbort: + ShutDownCLient(h->IO); + break; + } } DeleteHashPos(&It); DeleteHashContent(&q); - EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n"); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] EVENT Q Add %ld done.", IOSTR, IOID, count); } @@ -664,6 +699,7 @@ void InitEventQueue(void) extern void CtdlDestroyEVCleanupHooks(void); extern int EVQShutDown; +const char *IOLog = "IO"; /* * this thread operates the select() etc. via libev. */ @@ -672,7 +708,9 @@ void *client_event_thread(void *arg) struct CitContext libev_client_CC; CtdlFillSystemContext(&libev_client_CC, "LibEv Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + + pthread_setspecific(evConKey, IOLog); + EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n"); event_base = ev_default_loop (EVFLAG_AUTO); @@ -728,6 +766,8 @@ extern void ShutDownDBCLient(AsyncIO *IO); static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) { CitContext *Ctx; + long IOID = -1; + long count = 0;; ev_tstamp Now; HashList *q; void *v; @@ -754,12 +794,15 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) { IOAddHandler *h = v; eNextState rc; + count ++; if (h->IO->ID == 0) h->IO->ID = EvIDSource++; + IOID = h->IO->ID; if (h->IO->StartDB == 0.0) h->IO->StartDB = Now; h->IO->Now = Now; + SetEVState(h->IO, eDBAttach); Ctx = h->IO->CitContext; become_session(Ctx); ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); @@ -774,7 +817,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) } DeleteHashPos(&It); DeleteHashContent(&q); - EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n"); + EVQ_syslog(LOG_DEBUG, "%s CC[%ld] DBEVENT Q Add %ld done.", IOSTR, IOID, count); } @@ -797,6 +840,8 @@ void DBInitEventQueue(void) DBInboundEventQueue = DBInboundEventQueues[0]; } +const char *DBLog = "BD"; + /* * this thread operates writing to the message database via libev. */ @@ -805,6 +850,8 @@ void *db_event_thread(void *arg) ev_loop *tmp; struct CitContext libev_msg_CC; + pthread_setspecific(evConKey, DBLog); + CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); @@ -862,10 +909,16 @@ void DebugCurlEnable(const int n) DebugCurl = n; } +const char *WLog = "WX"; CTDL_MODULE_INIT(event_client) { if (!threading) { + if (pthread_key_create(&evConKey, NULL) != 0) { + syslog(LOG_CRIT, "Can't create TSD key: %s", strerror(errno)); + } + pthread_setspecific(evConKey, WLog); + CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop); CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace); CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);