From 9563ce7303d1d0d20b963d643f9afc06b53c8e4d Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 12 May 2013 18:36:33 +0200 Subject: [PATCH] EV: expose current event queue state to the RWHO command --- citadel/event_client.c | 67 +++++++++++++++++++ citadel/event_client.h | 34 ++++++++++ citadel/modules/c-ares-dns/serv_c-ares-dns.c | 3 + .../modules/eventclient/serv_eventclient.c | 15 ++++- 4 files changed, 117 insertions(+), 2 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 90c5e5dd9..fdd66d7ae 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -64,6 +64,48 @@ #include "event_client.h" #include "ctdl_module.h" + +ConstStr IOStates[] = { + {HKEY("DB Queue")}, + {HKEY("DB Q Next")}, + {HKEY("DB Attach")}, + {HKEY("DB Next")}, + {HKEY("DB Stop")}, + {HKEY("DB Exit")}, + {HKEY("DB Terminate")}, + {HKEY("IO Queue")}, + {HKEY("IO Attach")}, + {HKEY("IO Connect Socket")}, + {HKEY("IO Abort")}, + {HKEY("IO Timeout")}, + {HKEY("IO ConnFail")}, + {HKEY("IO ConnFail Now")}, + {HKEY("IO Conn Now")}, + {HKEY("IO Conn Wait")}, + {HKEY("Curl Q")}, + {HKEY("Curl Start")}, + {HKEY("Curl Shotdown")}, + {HKEY("Curl More IO")}, + {HKEY("Curl Got IO")}, + {HKEY("Curl Got Data")}, + {HKEY("Curl Got Status")}, + {HKEY("C-Ares Start")}, + {HKEY("C-Ares IO Done")}, + {HKEY("C-Ares Finished")}, + {HKEY("C-Ares exit")}, + {HKEY("Killing")}, + {HKEY("Exit")} +}; + +void SetEVState(AsyncIO *IO, eIOState State) +{ + + CitContext* CCC = IO->CitContext; + memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1); + +} + + static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, @@ -86,6 +128,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) IOAddHandler *h; int i; + SetEVState(IO, eDBQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; @@ -122,6 +165,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) void StopDBWatchers(AsyncIO *IO) { + SetEVState(IO, eDBStop); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); ev_idle_stop(event_db, &IO->db_unwind_stack); } @@ -131,6 +175,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); + SetEVState(IO, eDBTerm); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); StopDBWatchers(IO); @@ -142,6 +187,8 @@ void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eDBNext); IO->Now = ev_now(event_db); EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); @@ -175,6 +222,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) { + SetEVState(IO, eQDBNext); IO->NextDBOperation = CB; ev_idle_init(&IO->db_unwind_stack, DB_PerformNext); @@ -199,6 +247,8 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eIOAbort); EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); IO->Now = ev_now(event_base); assert(IO->ShutdownAbort); @@ -211,6 +261,7 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) IOAddHandler *h; int i; + SetEVState(IO, eIOQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; @@ -250,6 +301,7 @@ eNextState QueueCurlContext(AsyncIO *IO) IOAddHandler *h; int i; + SetEVState(IO, eCurlQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = evcurl_handle_start; @@ -342,6 +394,8 @@ void StopCurlWatchers(AsyncIO *IO) void ShutDownCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; + + SetEVState(IO, eExit); become_session(Ctx); EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); @@ -642,6 +696,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOTimeout); IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -670,6 +725,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfail); IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); @@ -702,6 +758,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfailNow); IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); @@ -731,6 +788,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) socklen_t lon = sizeof(so_err); int err; + SetEVState(IO, eIOConnNow); IO->Now = ev_now(event_base); EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); @@ -844,6 +902,8 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eCaresFinished); IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); @@ -874,6 +934,7 @@ eNextState EvConnectSock(AsyncIO *IO, int fdflags; int rc = -1; + SetEVState(IO, eIOConnectSock); become_session(IO->CitContext); if (ReadFirst) { @@ -976,11 +1037,13 @@ eNextState EvConnectSock(AsyncIO *IO, } if (rc >= 0){ + SetEVState(IO, eIOConnNow); EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); set_start_callback(event_base, IO, 0); return IO->NextState; } else if (errno == EINPROGRESS) { + SetEVState(IO, eIOConnWait); EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); ev_io_init(&IO->conn_event, @@ -995,6 +1058,7 @@ eNextState EvConnectSock(AsyncIO *IO, return IO->NextState; } else { + SetEVState(IO, eIOConnfail); ev_idle_init(&IO->conn_fail_immediate, IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; @@ -1024,6 +1088,7 @@ eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst) { + SetEVState(IO, eIOAttach); IO->Data = pData; become_session(IO->CitContext); ev_cleanup_start(event_base, &IO->abort_by_shutdown); @@ -1136,6 +1201,8 @@ eNextState KillOtherContextNow(AsyncIO *IO) { KillOtherSessionContext *Ctx = IO->Data; + SetEVState(IO, eKill); + if (Ctx->OtherOne->ShutdownAbort != NULL) Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); return eTerminateConnection; diff --git a/citadel/event_client.h b/citadel/event_client.h index bf4aa4ea4..1dce3d9cc 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -32,6 +32,38 @@ typedef struct AsyncIO AsyncIO; typedef struct CitContext CitContext; #endif +typedef enum __eIOState { + eDBQ, + eQDBNext, + eDBAttach, + eDBNext, + eDBStop, + eDBX, + eDBTerm, + eIOQ, + eIOAttach, + eIOConnectSock, + eIOAbort, + eIOTimeout, + eIOConnfail, + eIOConnfailNow, + eIOConnNow, + eIOConnWait, + eCurlQ, + eCurlStart, + eCurlShutdown, + eCurlNewIO, + eCurlGotIO, + eCurlGotData, + eCurlGotStatus, + eCaresStart, + eCaresDoneIO, + eCaresFinished, + eCaresX, + eKill, + eExit +}eIOState; + typedef enum _eNextState { eSendDNSQuery, eReadDNSReply, @@ -52,6 +84,8 @@ typedef enum _eNextState { eAbort }eNextState; +void SetEVState(AsyncIO *IO, eIOState State); + typedef eNextState (*IO_CallBack)(AsyncIO *IO); typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO); typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int); diff --git a/citadel/modules/c-ares-dns/serv_c-ares-dns.c b/citadel/modules/c-ares-dns/serv_c-ares-dns.c index 846720b35..94592a161 100644 --- a/citadel/modules/c-ares-dns/serv_c-ares-dns.c +++ b/citadel/modules/c-ares-dns/serv_c-ares-dns.c @@ -268,6 +268,7 @@ void QueryCb(void *arg, { AsyncIO *IO = arg; + SetEVState(IO, eCaresStart); EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); EV_DNS_LOGT_STOP(DNS.timeout); @@ -298,6 +299,7 @@ void QueryCb(void *arg, void QueryCbDone(AsyncIO *IO) { + SetEVState(IO, eCaresDoneIO); EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); EV_DNS_LOGT_STOP(DNS.timeout); @@ -309,6 +311,7 @@ void QueryCbDone(AsyncIO *IO) void DestructCAres(AsyncIO *IO) { + SetEVState(IO, eCaresX); EVNC_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); EVNC_syslog(LOG_DEBUG, "C-ARES: - stopping %s %d %p \n", "DNS.recv_event", IO->DNS.recv_event.fd, &IO->DNS.recv_event); diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 01492e56c..e08a39f28 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -147,6 +147,7 @@ gotstatus(int nnrun) IO); continue; } + SetEVState(IO, eCurlGotStatus); EVCURLM_syslog(LOG_DEBUG, "request complete\n"); @@ -186,7 +187,7 @@ gotstatus(int nnrun) "%s\n", curl_multi_strerror(msta)); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); IO->HttpReq.attached = 0; switch(IO->SendDone(IO)) @@ -272,9 +273,11 @@ got_out(struct ev_loop *loop, ev_io *ioev, int events) } static size_t -gotdata(void *data, size_t size, size_t nmemb, void *cglobal) { +gotdata(void *data, size_t size, size_t nmemb, void *cglobal) +{ AsyncIO *IO = (AsyncIO*) cglobal; + SetEVState(IO, eCurlGotData); if (IO->HttpReq.ReplyData == NULL) { IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ); @@ -323,6 +326,7 @@ gotwatchsock(CURL *easy, return -1; } IO = (AsyncIO *) f; + SetEVState(IO, eCurlNewIO); EVCURL_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl); @@ -338,6 +342,7 @@ gotwatchsock(CURL *easy, curl_multi_assign(mhnd, fd, IO); } + SetEVState(IO, eCurlGotIO); IO->Now = ev_now(event_base); Action = ""; @@ -495,6 +500,8 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, if (IO == NULL) return; + + SetEVState(IO, eCurlShutdown); IO->Now = ev_now(event_base); EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); @@ -523,6 +530,7 @@ evcurl_handle_start(AsyncIO *IO) CURLcode sta; CURL *chnd; + SetEVState(IO, eCurlStart); chnd = IO->HttpReq.chnd; EVCURL_syslog(LOG_DEBUG, "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl); @@ -631,6 +639,8 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) if (h->IO->StartIO == 0.0) h->IO->StartIO = Now; + SetEVState(h->IO, eIOAttach); + Ctx = h->IO->CitContext; become_session(Ctx); @@ -779,6 +789,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) h->IO->StartDB = Now; h->IO->Now = Now; + SetEVState(h->IO, eDBAttach); Ctx = h->IO->CitContext; become_session(Ctx); ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); -- 2.30.2