From 271924aeff4c786b31ec293ab48c861f7fe77bd0 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 6 May 2012 22:15:30 +0200 Subject: [PATCH] Eventhandling: fix shutdownhandlers - add a mutex for securely accessing the eventqueues from outside via ev_async_send() (s.b. might want to shut down meanwhile) - remove unused add-pipes we used for signaling before switching to libev4 - start the shutdown handlers only from within the eventqueue --- citadel/event_client.c | 78 ++++++++++++++++++- citadel/event_client.h | 1 + citadel/modules/c-ares-dns/serv_c-ares-dns.c | 18 ++++- .../modules/eventclient/serv_eventclient.c | 67 ++++++++-------- citadel/modules/rssclient/serv_rssclient.c | 10 +-- 5 files changed, 123 insertions(+), 51 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 0b40edcf4..058e87c50 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -82,6 +82,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, *----------------------------------------------------------------------------*/ extern int evdb_count; extern pthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventExitQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; extern ev_async DBAddJob; @@ -98,26 +99,47 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; - ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); pthread_mutex_lock(&DBEventQueueMutex); + if (DBInboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n"); + pthread_mutex_unlock(&DBEventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventExitQueueMutex); + if (event_db == NULL) + { + pthread_mutex_unlock(&DBEventExitQueueMutex); + return eAbort; + } ev_async_send (event_db, &DBAddJob); + pthread_mutex_unlock(&DBEventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } +void StopDBWatchers(AsyncIO *IO) +{ + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + ev_idle_stop(event_db, &IO->db_unwind_stack); +} + void ShutDownDBCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; become_session(Ctx); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); - ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + StopDBWatchers(IO); assert(IO->DBTerminate); IO->DBTerminate(IO); @@ -173,6 +195,7 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) *----------------------------------------------------------------------------*/ extern int evbase_count; extern pthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventExitQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; extern ev_async AddJob; @@ -201,15 +224,28 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; - ev_cleanup_start(event_base, &IO->abort_by_shutdown); pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + free(h); + /* shutting down... */ + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -226,12 +262,28 @@ eNextState QueueCurlContext(AsyncIO *IO) h->EvAttch = evcurl_handle_start; pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -262,6 +314,25 @@ void StopClientWatchers(AsyncIO *IO) ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + + if (IO->SendBuf.fd != 0) { + close(IO->SendBuf.fd); + } + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; +} + +void StopCurlWatchers(AsyncIO *IO) +{ + ev_timer_stop (event_base, &IO->rw_timeout); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); ev_io_stop(event_base, &IO->conn_event); ev_io_stop(event_base, &IO->send_event); @@ -281,7 +352,6 @@ void ShutDownCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); if (IO->DNS.Channel != NULL) { diff --git a/citadel/event_client.h b/citadel/event_client.h index d5c2741e0..20c2bfe40 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -241,6 +241,7 @@ void FreeAsyncIOContents(AsyncIO *IO); eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB); eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB); +void StopDBWatchers(AsyncIO *IO); eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB); eNextState QueueCurlContext(AsyncIO *IO); diff --git a/citadel/modules/c-ares-dns/serv_c-ares-dns.c b/citadel/modules/c-ares-dns/serv_c-ares-dns.c index 8d6546260..a06e2e5a1 100644 --- a/citadel/modules/c-ares-dns/serv_c-ares-dns.c +++ b/citadel/modules/c-ares-dns/serv_c-ares-dns.c @@ -72,8 +72,8 @@ static void HostByAddrCb(void *data, AsyncIO *IO = data; EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); - EV_DNS_LOGT_STOP(DNS.timeout); + EV_DNS_LOGT_STOP(DNS.timeout); ev_timer_stop (event_base, &IO->DNS.timeout); IO->DNS.Query->DNSStatus = status; @@ -269,8 +269,8 @@ void QueryCb(void *arg, AsyncIO *IO = arg; EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); - EV_DNS_LOGT_STOP(DNS.timeout); + EV_DNS_LOGT_STOP(DNS.timeout); ev_timer_stop (event_base, &IO->DNS.timeout); IO->DNS.Query->DNSStatus = status; @@ -295,21 +295,28 @@ void QueryCb(void *arg, void QueryCbDone(AsyncIO *IO) { EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); + EV_DNS_LOGT_STOP(DNS.timeout); + ev_timer_stop (event_base, &IO->DNS.timeout); + EV_DNS_LOGT_STOP(unwind_stack); ev_idle_stop(event_base, &IO->unwind_stack); } void DestructCAres(AsyncIO *IO) { EV_DNS_syslog(LOG_DEBUG, "C-ARES: %s\n", __FUNCTION__); - EV_DNS_LOGT_STOP(DNS.timeout); EV_DNS_LOG_STOP(DNS.recv_event); ev_io_stop(event_base, &IO->DNS.recv_event); + EV_DNS_LOG_STOP(DNS.send_event); ev_io_stop(event_base, &IO->DNS.send_event); + + EV_DNS_LOGT_STOP(DNS.timeout); ev_timer_stop (event_base, &IO->DNS.timeout); + + EV_DNS_LOGT_STOP(unwind_stack); ev_idle_stop(event_base, &IO->unwind_stack); ares_destroy_options(&IO->DNS.Options); } @@ -369,13 +376,16 @@ void QueueGetHostByNameDone(void *Ctx, IO->DNS.Query->VParsedDNSReply = hostent; IO->DNS.Query->DNSReplyFree = (FreeDNSReply) ares_free_hostent; + EV_DNS_LOGT_STOP(DNS.timeout); + ev_timer_stop (event_base, &IO->DNS.timeout); + ev_idle_init(&IO->unwind_stack, IO_postdns_callback); IO->unwind_stack.data = IO; EV_DNS_LOGT_INIT(unwind_stack); EV_DNS_LOGT_START(unwind_stack); ev_idle_start(event_base, &IO->unwind_stack); - ev_timer_stop (event_base, &IO->DNS.timeout); + } void QueueGetHostByName(AsyncIO *IO, diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index b0d9a3c32..f991cf905 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -141,6 +141,12 @@ gotstatus(int nnrun) continue; } IO = (AsyncIO *)chandle; + if (IO->ID == 0) { + EVCURL_syslog(LOG_ERR, + "Error, invalid IO context %p\n", + IO); + continue; + } EVCURLM_syslog(LOG_DEBUG, "request complete\n"); @@ -166,7 +172,7 @@ gotstatus(int nnrun) "error asking curl for " "response code from request: %s\n", curl_easy_strerror(sta)); - EVCURL_syslog(LOG_ERR, + EVCURL_syslog(LOG_DEBUG, "http response code was %ld\n", (long)IO->HttpReq.httpcode); @@ -180,7 +186,7 @@ gotstatus(int nnrun) "%s\n", curl_multi_strerror(msta)); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); IO->HttpReq.attached = 0; switch(IO->SendDone(IO)) @@ -486,6 +492,9 @@ static void IOcurl_abort_shutdown_callback(struct ev_loop *loop, { CURLMcode msta; AsyncIO *IO = watcher->data; + + if (IO == NULL) + return; IO->Now = ev_now(event_base); EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__); @@ -549,10 +558,12 @@ evcurl_handle_start(AsyncIO *IO) IO->HttpReq.attached = 1; ev_async_send (event_base, &WakeupCurl); + ev_cleanup_init(&IO->abort_by_shutdown, IOcurl_abort_shutdown_callback); ev_cleanup_start(event_base, &IO->abort_by_shutdown); + return eReadMessage; } @@ -577,8 +588,8 @@ static void evcurl_shutdown (void) * this currently is the main loop (which may change in some future?) */ int evbase_count = 0; -int event_add_pipe[2] = {-1, -1}; pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; @@ -617,6 +628,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) } if (h->IO->StartIO == 0.0) h->IO->StartIO = Now; + h->IO->Now = Now; h->EvAttch(h->IO); } @@ -637,19 +649,8 @@ static void EventExitCallback(EV_P_ ev_async *w, int revents) void InitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&EventQueueMutex, NULL); - - if (pipe(event_add_pipe) != 0) { - syslog(LOG_EMERG, - "Unable to create pipe for libev queueing: %s\n", - strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(event_add_pipe[1], &LimitSet); + pthread_mutex_init(&EventExitQueueMutex, NULL); QueueEvents = NewHash(1, Flathash); InboundEventQueues[0] = NewHash(1, Flathash); @@ -685,19 +686,20 @@ void *client_event_thread(void *arg) EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n"); ///what todo here? CtdlClearSystemContext(); + pthread_mutex_lock(&EventExitQueueMutex); ev_loop_destroy (EV_DEFAULT_UC); + event_base = NULL; DeleteHash(&QueueEvents); InboundEventQueue = NULL; DeleteHash(&InboundEventQueues[0]); DeleteHash(&InboundEventQueues[1]); /* citthread_mutex_destroy(&EventQueueMutex); TODO */ evcurl_shutdown(); - close(event_add_pipe[0]); - close(event_add_pipe[1]); CtdlDestroyEVCleanupHooks(); - EVQShutDown = 1; + pthread_mutex_unlock(&EventExitQueueMutex); + EVQShutDown = 1; return(NULL); } @@ -708,8 +710,8 @@ void *client_event_thread(void *arg) */ ev_loop *event_db; int evdb_count = 0; -int evdb_add_pipe[2] = {-1, -1}; pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */ +pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */ HashList *DBQueueEvents = NULL; HashList *DBInboundEventQueue = NULL; HashList *DBInboundEventQueues[2] = { NULL, NULL }; @@ -752,6 +754,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) if (h->IO->StartDB == 0.0) h->IO->StartDB = Now; h->IO->Now = Now; + ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown); rc = h->EvAttch(h->IO); switch (rc) { @@ -777,17 +780,8 @@ static void DBEventExitCallback(EV_P_ ev_async *w, int revents) void DBInitEventQueue(void) { - struct rlimit LimitSet; - pthread_mutex_init(&DBEventQueueMutex, NULL); - - if (pipe(evdb_add_pipe) != 0) { - EVQ_syslog(LOG_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); - abort(); - } - LimitSet.rlim_cur = 1; - LimitSet.rlim_max = 1; - setrlimit(evdb_add_pipe[1], &LimitSet); + pthread_mutex_init(&DBEventExitQueueMutex, NULL); DBQueueEvents = NewHash(1, Flathash); DBInboundEventQueues[0] = NewHash(1, Flathash); @@ -800,13 +794,14 @@ void DBInitEventQueue(void) */ void *db_event_thread(void *arg) { + ev_loop *tmp; struct CitContext libev_msg_CC; CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread"); -// citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); + EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n"); - event_db = ev_loop_new (EVFLAG_AUTO); + tmp = event_db = ev_loop_new (EVFLAG_AUTO); ev_async_init(&DBAddJob, DBQueueEventAddCallback); ev_async_start(event_db, &DBAddJob); @@ -815,20 +810,20 @@ void *db_event_thread(void *arg) ev_run (event_db, 0); - EVQM_syslog(LOG_DEBUG, "dbevent_thread() exiting\n"); + pthread_mutex_lock(&DBEventExitQueueMutex); -//// what to do here? CtdlClearSystemContext(); - ev_loop_destroy (event_db); + event_db = NULL; + EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n"); DeleteHash(&DBQueueEvents); DBInboundEventQueue = NULL; DeleteHash(&DBInboundEventQueues[0]); DeleteHash(&DBInboundEventQueues[1]); - close(evdb_add_pipe[0]); - close(evdb_add_pipe[1]); /* citthread_mutex_destroy(&DBEventQueueMutex); TODO */ + ev_loop_destroy (tmp); + pthread_mutex_unlock(&DBEventExitQueueMutex); return(NULL); } diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index f6561e6b1..00c32971d 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -186,6 +186,7 @@ void DeleteRssCfg(void *vptr) } FreeAsyncIOContents(&RSSAggr->IO); + memset(RSSAggr, 0, sizeof(rss_aggregator)); free(RSSAggr); } @@ -195,7 +196,7 @@ eNextState RSSAggregator_Terminate(AsyncIO *IO) EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); - + StopCurlWatchers(IO); UnlinkRSSAggregator(RSSAggr); return eAbort; } @@ -207,6 +208,7 @@ eNextState RSSAggregator_TerminateDB(AsyncIO *IO) EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); + StopDBWatchers(&RSSAggr->IO); UnlinkRSSAggregator(RSSAggr); return eAbort; } @@ -227,12 +229,6 @@ eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) return eAbort; } - -eNextState AbortNetworkSaveMessage (AsyncIO *IO) -{ - return eAbort; ///TODO -} - eNextState RSSSaveMessage(AsyncIO *IO) { long len; -- 2.30.2