]> code.citadel.org Git - citadel.git/blobdiff - citadel/event_client.c
Switch handling to have a pointer to the target address
[citadel.git] / citadel / event_client.c
index 4dca578f7232122e63c0ab301d7cd5b6e1cfc5c8..dd0fa7c976bf9fbe2fd0f77a277dff8c1297b20f 100644 (file)
 
 #include "event_client.h"
 
-extern int event_add_pipe[2];
 extern citthread_mutex_t EventQueueMutex;
 extern HashList *InboundEventQueue;
 extern struct ev_loop *event_base;
+extern ev_async AddJob;   
+extern ev_async ExitEventLoop;
 
+static void
+IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents)
+{
+       CtdlLogPrintf(CTDL_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
+
+       AsyncIO *IO = watcher->data;
+       IO->ShutdownAbort(IO);
+}
        
 int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 {
@@ -81,19 +90,18 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
        h->IO = IO;
        h->EvAttch = CB;
+       ev_cleanup_init(&IO->abort_by_shutdown, 
+                       IO_abort_shutdown_callback);
+       IO->abort_by_shutdown.data = IO;
+       ev_cleanup_start(event_base, &IO->abort_by_shutdown);
 
        citthread_mutex_lock(&EventQueueMutex);
-       if (event_add_pipe[1] == -1) {
-               citthread_mutex_unlock(&EventQueueMutex);
-               free (h);
-               return -1;
-       }
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n");
        i = GetCount(InboundEventQueue);
        Put(InboundEventQueue, IKEY(i), h, NULL);
        citthread_mutex_unlock(&EventQueueMutex);
 
-       write(event_add_pipe[1], "+_", 1);
+       ev_async_send (event_base, &AddJob);
        CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n");
        return 0;
 }
@@ -102,14 +110,7 @@ int QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 int ShutDownEventQueue(void)
 {
        citthread_mutex_lock(&EventQueueMutex);
-       if (event_add_pipe[1] == -1) {
-               citthread_mutex_unlock(&EventQueueMutex);
-
-               return -1;
-       }
-       write(event_add_pipe[1], "x_", 1);
-       close(event_add_pipe[1]);
-       event_add_pipe[1] = -1;
+       ev_async_send (EV_DEFAULT_ &ExitEventLoop);
        citthread_mutex_unlock(&EventQueueMutex);
        return 0;
 }
@@ -119,7 +120,6 @@ void FreeAsyncIOContents(AsyncIO *IO)
        FreeStrBuf(&IO->IOBuf);
        FreeStrBuf(&IO->SendBuf.Buf);
        FreeStrBuf(&IO->RecvBuf.Buf);
-       ares_destroy(IO->DNSChannel);
 }
 
 
@@ -127,6 +127,8 @@ void ShutDownCLient(AsyncIO *IO)
 {
        CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock);
 
+       ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
+
        if (IO->sock != 0)
        {
                ev_io_stop(event_base, &IO->send_event);
@@ -137,6 +139,12 @@ void ShutDownCLient(AsyncIO *IO)
                IO->SendBuf.fd = 0;
                IO->RecvBuf.fd = 0;
        }
+       if (IO->DNSChannel != NULL) {
+               ares_destroy(IO->DNSChannel);
+               ev_io_stop(event_base, &IO->dns_recv_event);
+               ev_io_stop(event_base, &IO->dns_send_event);
+               IO->DNSChannel = NULL;
+       }
        assert(IO->Terminate);
        IO->Terminate(IO);
        
@@ -331,7 +339,14 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        }
 }
 
+void
+IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
+{
+       AsyncIO *IO = watcher->data;
+       CtdlLogPrintf(CTDL_DEBUG, "event: %s\n", __FUNCTION__);
 
+       IO->DNSQuery->PostDNS(IO);
+}
 
 eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
 {
@@ -379,11 +394,11 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r
        IO->conn_fail.data = IO;
        ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0);
        IO->rw_timeout.data = IO;
-       ///struct sockaddr_in *addr = &IO->Addr;
+
        if (IO->IP6)
-               rc = connect(IO->sock, &IO->Addr, sizeof(struct in6_addr));
+               rc = connect(IO->sock, IO->Addr, sizeof(struct sockaddr_in6));
        else
-               rc = connect(IO->sock, (struct sockaddr_in *)&IO->Addr, sizeof(struct sockaddr_in));
+               rc = connect(IO->sock, (struct sockaddr_in *)IO->Addr, sizeof(struct sockaddr_in));
 
        if (rc >= 0){
 ////           freeaddrinfo(res);
@@ -430,7 +445,5 @@ eNextState InitEventIO(AsyncIO *IO,
        else {
                IO->NextState = eSendReply;
        }
-       IO->IP6 = IO->HEnt->h_addrtype == AF_INET6;
-//     IO->res = HEnt->h_addr_list[0];
        return event_connect_socket(IO, conn_timeout, first_rw_timeout);
 }