#include "event_client.h"
-extern int event_add_pipe[2];
extern citthread_mutex_t EventQueueMutex;
extern HashList *InboundEventQueue;
extern struct ev_loop *event_base;
+extern ev_async AddJob;
+extern ev_async ExitEventLoop;
int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
h->EvAttch = CB;
citthread_mutex_lock(&EventQueueMutex);
- if (event_add_pipe[1] == -1) {
- citthread_mutex_unlock(&EventQueueMutex);
- free (h);
- return -1;
- }
CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
i = GetCount(InboundEventQueue);
Put(InboundEventQueue, IKEY(i), h, NULL);
citthread_mutex_unlock(&EventQueueMutex);
- write(event_add_pipe[1], "+_", 1);
+ ev_async_send (event_base, &AddJob);
CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
return 0;
}
int ShutDownEventQueue(void)
{
citthread_mutex_lock(&EventQueueMutex);
- if (event_add_pipe[1] == -1) {
- citthread_mutex_unlock(&EventQueueMutex);
-
- return -1;
- }
- write(event_add_pipe[1], "x_", 1);
- close(event_add_pipe[1]);
- event_add_pipe[1] = -1;
+ ev_async_send (EV_DEFAULT_ &ExitEventLoop);
citthread_mutex_unlock(&EventQueueMutex);
return 0;
}
HashList *InboundEventQueues[2] = { NULL, NULL };
struct ev_loop *event_base;
-struct ev_io queue_add_event;
-static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents)
+ev_async AddJob;
+ev_async ExitEventLoop;
+
+static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
{
- char buf[10];
HashList *q;
void *v;
HashPos *It;
const char *Key;
/* get the control command... */
- read(watcher->fd, buf, 1);
- switch (buf[0]) {
- case '+':
- citthread_mutex_lock(&EventQueueMutex);
-
- if (InboundEventQueues[0] == InboundEventQueue) {
- InboundEventQueue = InboundEventQueues[1];
- q = InboundEventQueues[0];
- }
- else {
- InboundEventQueue = InboundEventQueues[0];
- q = InboundEventQueues[1];
- }
- citthread_mutex_unlock(&EventQueueMutex);
-
- It = GetNewHashPos(q, 0);
- while (GetNextHashPos(q, It, &len, &Key, &v))
- {
- IOAddHandler *h = v;
- h->EvAttch(h->IO);
- }
- DeleteHashPos(&It);
- DeleteHashContent(&q);
-/// TODO: add it to QueueEvents
- break;
- case 'x':
- close(event_add_pipe[0]);
-/// TODO; flush QueueEvents fd's and delete it.
- ev_io_stop(event_base, &queue_add_event);
- ev_unloop(event_base, EVUNLOOP_ALL);
+ citthread_mutex_lock(&EventQueueMutex);
+
+ if (InboundEventQueues[0] == InboundEventQueue) {
+ InboundEventQueue = InboundEventQueues[1];
+ q = InboundEventQueues[0];
+ }
+ else {
+ InboundEventQueue = InboundEventQueues[0];
+ q = InboundEventQueues[1];
+ }
+ citthread_mutex_unlock(&EventQueueMutex);
+
+ It = GetNewHashPos(q, 0);
+ while (GetNextHashPos(q, It, &len, &Key, &v))
+ {
+ IOAddHandler *h = v;
+ h->EvAttch(h->IO);
}
+ DeleteHashPos(&It);
+ DeleteHashContent(&q);
CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Read done.\n");
}
+static void EventExitCallback(EV_P_ ev_async *w, int revents)
+{
+ ev_unloop(event_base, EVUNLOOP_ALL);
+
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT Q exiting.\n");
+}
+
+
+
void InitEventQueue(void)
{
struct rlimit LimitSet;
event_base = ev_default_loop (EVFLAG_AUTO);
- ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ);
- ev_io_start(event_base, &queue_add_event);
+ ev_async_init(&AddJob, QueueEventAddCallback);
+ ev_async_start(event_base, &AddJob);
+ ev_async_init(&ExitEventLoop, EventExitCallback);
+ ev_async_start(event_base, &ExitEventLoop);
ev_loop (event_base, 0);
CtdlClearSystemContext();