From 21624a786a61a7797e9bd1f93ccb76f20164bb18 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Mon, 31 Jan 2011 00:10:11 +0100 Subject: [PATCH] libev migration: use async methods to schedule jobs. thanks to Marc Lehmann for the friendly help. --- citadel/event_client.c | 19 ++--- .../modules/eventclient/serv_eventclient.c | 71 ++++++++++--------- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index c1788a8a6..960ca4d55 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -67,10 +67,11 @@ #include "event_client.h" -extern int event_add_pipe[2]; extern citthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; +extern ev_async AddJob; +extern ev_async ExitEventLoop; int QueueEventContext(AsyncIO *IO, IO_CallBack CB) @@ -83,17 +84,12 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB) h->EvAttch = CB; citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); - free (h); - return -1; - } CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); i = GetCount(InboundEventQueue); Put(InboundEventQueue, IKEY(i), h, NULL); citthread_mutex_unlock(&EventQueueMutex); - write(event_add_pipe[1], "+_", 1); + ev_async_send (event_base, &AddJob); CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n"); return 0; } @@ -102,14 +98,7 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB) int ShutDownEventQueue(void) { citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); - - return -1; - } - write(event_add_pipe[1], "x_", 1); - close(event_add_pipe[1]); - event_add_pipe[1] = -1; + ev_async_send (EV_DEFAULT_ &ExitEventLoop); citthread_mutex_unlock(&EventQueueMutex); return 0; } diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index ec6af084e..8f05df409 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -66,11 +66,12 @@ HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; struct ev_loop *event_base; -struct ev_io queue_add_event; -static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents) +ev_async AddJob; +ev_async ExitEventLoop; + +static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) { - char buf[10]; HashList *q; void *v; HashPos *It; @@ -78,41 +79,39 @@ static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int reve const char *Key; /* get the control command... */ - read(watcher->fd, buf, 1); - switch (buf[0]) { - case '+': - citthread_mutex_lock(&EventQueueMutex); - - if (InboundEventQueues[0] == InboundEventQueue) { - InboundEventQueue = InboundEventQueues[1]; - q = InboundEventQueues[0]; - } - else { - InboundEventQueue = InboundEventQueues[0]; - q = InboundEventQueues[1]; - } - citthread_mutex_unlock(&EventQueueMutex); - - It = GetNewHashPos(q, 0); - while (GetNextHashPos(q, It, &len, &Key, &v)) - { - IOAddHandler *h = v; - h->EvAttch(h->IO); - } - DeleteHashPos(&It); - DeleteHashContent(&q); -/// TODO: add it to QueueEvents - break; - case 'x': - close(event_add_pipe[0]); -/// TODO; flush QueueEvents fd's and delete it. - ev_io_stop(event_base, &queue_add_event); - ev_unloop(event_base, EVUNLOOP_ALL); + citthread_mutex_lock(&EventQueueMutex); + + if (InboundEventQueues[0] == InboundEventQueue) { + InboundEventQueue = InboundEventQueues[1]; + q = InboundEventQueues[0]; + } + else { + InboundEventQueue = InboundEventQueues[0]; + q = InboundEventQueues[1]; + } + citthread_mutex_unlock(&EventQueueMutex); + + It = GetNewHashPos(q, 0); + while (GetNextHashPos(q, It, &len, &Key, &v)) + { + IOAddHandler *h = v; + h->EvAttch(h->IO); } + DeleteHashPos(&It); + DeleteHashContent(&q); CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n"); } +static void EventExitCallback(EV_P_ ev_async *w, int revents) +{ + ev_unloop(event_base, EVUNLOOP_ALL); + + CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n"); +} + + + void InitEventQueue(void) { struct rlimit LimitSet; @@ -147,8 +146,10 @@ void *client_event_thread(void *arg) event_base = ev_default_loop (EVFLAG_AUTO); - ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ); - ev_io_start(event_base, &queue_add_event); + ev_async_init(&AddJob, QueueEventAddCallback); + ev_async_start(event_base, &AddJob); + ev_async_init(&ExitEventLoop, EventExitCallback); + ev_async_start(event_base, &ExitEventLoop); ev_loop (event_base, 0); CtdlClearSystemContext(); -- 2.30.2