libev migration: use async methods to schedule jobs. thanks to Marc Lehmann for the...
authorWilfried Goesgens <dothebart@citadel.org>
Sun, 30 Jan 2011 23:10:11 +0000 (00:10 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Sun, 30 Jan 2011 23:10:11 +0000 (00:10 +0100)
citadel/event_client.c
citadel/modules/eventclient/serv_eventclient.c

index c1788a8a6e0dc9f590862efc74b2aa2b90b9cd65..960ca4d55ab2610556236afc06d8afe71aa06330 100644 (file)
 
 #include "event_client.h"
 
-extern int event_add_pipe[2];
 extern citthread_mutex_t EventQueueMutex;
 extern HashList *InboundEventQueue;
 extern struct ev_loop *event_base;
+extern ev_async AddJob;   
+extern ev_async ExitEventLoop;
 
        
 int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
@@ -83,17 +84,12 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
        h->EvAttch = CB;
 
        citthread_mutex_lock(&EventQueueMutex);
-       if (event_add_pipe[1] == -1) {
-               citthread_mutex_unlock(&EventQueueMutex);
-               free (h);
-               return -1;
-       }
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
        i = GetCount(InboundEventQueue);
        Put(InboundEventQueue, IKEY(i), h, NULL);
        citthread_mutex_unlock(&EventQueueMutex);
 
-       write(event_add_pipe[1], "+_", 1);
+       ev_async_send (event_base, &AddJob);
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
        return 0;
 }
@@ -102,14 +98,7 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 int ShutDownEventQueue(void)
 {
        citthread_mutex_lock(&EventQueueMutex);
-       if (event_add_pipe[1] == -1) {
-               citthread_mutex_unlock(&EventQueueMutex);
-
-               return -1;
-       }
-       write(event_add_pipe[1], "x_", 1);
-       close(event_add_pipe[1]);
-       event_add_pipe[1] = -1;
+       ev_async_send (EV_DEFAULT_ &ExitEventLoop);
        citthread_mutex_unlock(&EventQueueMutex);
        return 0;
 }
index ec6af084e856aa51f08908fa2d4a29eac991c29b..8f05df409a020ef10fec2249bf162a81a5270c7b 100644 (file)
@@ -66,11 +66,12 @@ HashList *InboundEventQueue = NULL;
 HashList *InboundEventQueues[2] = { NULL, NULL };
 
 struct ev_loop *event_base;
-struct ev_io queue_add_event;
 
-static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents)
+ev_async AddJob;   
+ev_async ExitEventLoop;
+
+static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
 {
-       char buf[10];
        HashList *q;
        void *v;
        HashPos  *It;
@@ -78,41 +79,39 @@ static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int reve
        const char *Key;
 
        /* get the control command... */
-       read(watcher->fd, buf, 1);
-       switch (buf[0]) {
-       case '+':
-               citthread_mutex_lock(&EventQueueMutex);
-
-               if (InboundEventQueues[0] == InboundEventQueue) {
-                       InboundEventQueue = InboundEventQueues[1];
-                       q = InboundEventQueues[0];
-               }
-               else {
-                       InboundEventQueue = InboundEventQueues[0];
-                       q = InboundEventQueues[1];
-               }
-               citthread_mutex_unlock(&EventQueueMutex);
-
-               It = GetNewHashPos(q, 0);
-               while (GetNextHashPos(q, It, &len, &Key, &v))
-               {
-                       IOAddHandler *h = v;
-                       h->EvAttch(h->IO);
-               }
-               DeleteHashPos(&It);
-               DeleteHashContent(&q);
-/// TODO: add it to QueueEvents
-               break;
-       case 'x':
-               close(event_add_pipe[0]);
-/// TODO; flush QueueEvents fd's and delete it.
-               ev_io_stop(event_base, &queue_add_event);
-               ev_unloop(event_base, EVUNLOOP_ALL);
+       citthread_mutex_lock(&EventQueueMutex);
+
+       if (InboundEventQueues[0] == InboundEventQueue) {
+               InboundEventQueue = InboundEventQueues[1];
+               q = InboundEventQueues[0];
+       }
+       else {
+               InboundEventQueue = InboundEventQueues[0];
+               q = InboundEventQueues[1];
+       }
+       citthread_mutex_unlock(&EventQueueMutex);
+
+       It = GetNewHashPos(q, 0);
+       while (GetNextHashPos(q, It, &len, &Key, &v))
+       {
+               IOAddHandler *h = v;
+               h->EvAttch(h->IO);
        }
+       DeleteHashPos(&It);
+       DeleteHashContent(&q);
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
 }
 
 
+static void EventExitCallback(EV_P_ ev_async *w, int revents)
+{
+       ev_unloop(event_base, EVUNLOOP_ALL);
+
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
+}
+
+
+
 void InitEventQueue(void)
 {
        struct rlimit LimitSet;
@@ -147,8 +146,10 @@ void *client_event_thread(void *arg)
 
        event_base = ev_default_loop (EVFLAG_AUTO);
 
-       ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
-       ev_io_start(event_base, &queue_add_event);
+       ev_async_init(&AddJob, QueueEventAddCallback);
+       ev_async_start(event_base, &AddJob);
+       ev_async_init(&ExitEventLoop, EventExitCallback);
+       ev_async_start(event_base, &ExitEventLoop);
 
        ev_loop (event_base, 0);
        CtdlClearSystemContext();