#include "event_client.h"
-extern int event_add_pipe[2];
extern citthread_mutex_t EventQueueMutex;
extern HashList *InboundEventQueue;
extern struct ev_loop *event_base;
+extern ev_async AddJob;
+extern ev_async ExitEventLoop;
+static void
+IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents)
+{
+ CtdlLogPrintf(CTDL_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
+
+ AsyncIO *IO = watcher->data;
+ IO->ShutdownAbort(IO);
+}
int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
{
h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
h->IO = IO;
h->EvAttch = CB;
+ ev_cleanup_init(&IO->abort_by_shutdown,
+ IO_abort_shutdown_callback);
+ IO->abort_by_shutdown.data = IO;
+ ev_cleanup_start(event_base, &IO->abort_by_shutdown);
citthread_mutex_lock(&EventQueueMutex);
- if (event_add_pipe[1] == -1) {
- citthread_mutex_unlock(&EventQueueMutex);
- free (h);
- return -1;
- }
CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
i = GetCount(InboundEventQueue);
Put(InboundEventQueue, IKEY(i), h, NULL);
citthread_mutex_unlock(&EventQueueMutex);
- write(event_add_pipe[1], "+_", 1);
+ ev_async_send (event_base, &AddJob);
CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
return 0;
}
int ShutDownEventQueue(void)
{
citthread_mutex_lock(&EventQueueMutex);
- if (event_add_pipe[1] == -1) {
- citthread_mutex_unlock(&EventQueueMutex);
-
- return -1;
- }
- write(event_add_pipe[1], "x_", 1);
- close(event_add_pipe[1]);
- event_add_pipe[1] = -1;
+ ev_async_send (EV_DEFAULT_ &ExitEventLoop);
citthread_mutex_unlock(&EventQueueMutex);
return 0;
}
{
CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
+ ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
+
if (IO->sock != 0)
{
ev_io_stop(event_base, &IO->send_event);
}
void
-IO_postdns_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
+IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
{
AsyncIO *IO = watcher->data;
- IO->PostDNS(IO);
+ CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__);
+
+ IO->DNSQuery->PostDNS(IO);
}
eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
IO->rw_timeout.data = IO;
if (IO->IP6)
- rc = connect(IO->sock, &IO->Addr, sizeof(struct sockaddr_in6));
+ rc = connect(IO->sock, IO->Addr, sizeof(struct sockaddr_in6));
else
- rc = connect(IO->sock, (struct sockaddr_in *)&IO->Addr, sizeof(struct sockaddr_in));
+ rc = connect(IO->sock, (struct sockaddr_in *)IO->Addr, sizeof(struct sockaddr_in));
if (rc >= 0){
//// freeaddrinfo(res);