libev migration: use async methods to schedule jobs. thanks to Marc Lehmann for the...
[citadel.git] / citadel / modules / eventclient / serv_eventclient.c
index ec6af084e856aa51f08908fa2d4a29eac991c29b..8f05df409a020ef10fec2249bf162a81a5270c7b 100644 (file)
@@ -66,11 +66,12 @@ HashList *InboundEventQueue = NULL;
 HashList *InboundEventQueues[2] = { NULL, NULL };
 
 struct ev_loop *event_base;
-struct ev_io queue_add_event;
 
-static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents)
+ev_async AddJob;   
+ev_async ExitEventLoop;
+
+static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
 {
-       char buf[10];
        HashList *q;
        void *v;
        HashPos  *It;
@@ -78,41 +79,39 @@ static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int reve
        const char *Key;
 
        /* get the control command... */
-       read(watcher->fd, buf, 1);
-       switch (buf[0]) {
-       case '+':
-               citthread_mutex_lock(&EventQueueMutex);
-
-               if (InboundEventQueues[0] == InboundEventQueue) {
-                       InboundEventQueue = InboundEventQueues[1];
-                       q = InboundEventQueues[0];
-               }
-               else {
-                       InboundEventQueue = InboundEventQueues[0];
-                       q = InboundEventQueues[1];
-               }
-               citthread_mutex_unlock(&EventQueueMutex);
-
-               It = GetNewHashPos(q, 0);
-               while (GetNextHashPos(q, It, &len, &Key, &v))
-               {
-                       IOAddHandler *h = v;
-                       h->EvAttch(h->IO);
-               }
-               DeleteHashPos(&It);
-               DeleteHashContent(&q);
-/// TODO: add it to QueueEvents
-               break;
-       case 'x':
-               close(event_add_pipe[0]);
-/// TODO; flush QueueEvents fd's and delete it.
-               ev_io_stop(event_base, &queue_add_event);
-               ev_unloop(event_base, EVUNLOOP_ALL);
+       citthread_mutex_lock(&EventQueueMutex);
+
+       if (InboundEventQueues[0] == InboundEventQueue) {
+               InboundEventQueue = InboundEventQueues[1];
+               q = InboundEventQueues[0];
+       }
+       else {
+               InboundEventQueue = InboundEventQueues[0];
+               q = InboundEventQueues[1];
+       }
+       citthread_mutex_unlock(&EventQueueMutex);
+
+       It = GetNewHashPos(q, 0);
+       while (GetNextHashPos(q, It, &len, &Key, &v))
+       {
+               IOAddHandler *h = v;
+               h->EvAttch(h->IO);
        }
+       DeleteHashPos(&It);
+       DeleteHashContent(&q);
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
 }
 
 
+static void EventExitCallback(EV_P_ ev_async *w, int revents)
+{
+       ev_unloop(event_base, EVUNLOOP_ALL);
+
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
+}
+
+
+
 void InitEventQueue(void)
 {
        struct rlimit LimitSet;
@@ -147,8 +146,10 @@ void *client_event_thread(void *arg)
 
        event_base = ev_default_loop (EVFLAG_AUTO);
 
-       ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
-       ev_io_start(event_base, &queue_add_event);
+       ev_async_init(&AddJob, QueueEventAddCallback);
+       ev_async_start(event_base, &AddJob);
+       ev_async_init(&ExitEventLoop, EventExitCallback);
+       ev_async_start(event_base, &ExitEventLoop);
 
        ev_loop (event_base, 0);
        CtdlClearSystemContext();