Logging: add timestamps for eventdriven IO
[citadel.git] / citadel / event_client.c
index 0b6ffabb315f131cb133f7ee0ce2de936d845c58..d0e0fd9aefe9f20295a8827fda8fb5dabcf3f26b 100644 (file)
 #include "citadel_dirs.h"
 
 #include "event_client.h"
+#include "ctdl_module.h"
 
+static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
 static void IO_abort_shutdown_callback(struct ev_loop *loop,
                                       ev_cleanup *watcher,
-                                      int revents)
-{
-       AsyncIO *IO = watcher->data;
-       EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
-
-       assert(IO->ShutdownAbort);
-       IO->ShutdownAbort(IO);
-}
+                                      int revents);
 
 
 /*------------------------------------------------------------------------------
@@ -124,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO)
        EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
        ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
 
-       assert(IO->Terminate);
-       IO->Terminate(IO);
+       assert(IO->DBTerminate);
+       IO->DBTerminate(IO);
 }
 
 void
 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
-       EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
+       IO->Now = ev_now(event_db);
+       EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
        become_session(IO->CitContext);
 
        ev_idle_stop(event_db, &IO->db_unwind_stack);
@@ -182,6 +178,17 @@ extern struct ev_loop *event_base;
 extern ev_async AddJob;
 extern ev_async ExitEventLoop;
 
+static void IO_abort_shutdown_callback(struct ev_loop *loop,
+                                      ev_cleanup *watcher,
+                                      int revents)
+{
+       AsyncIO *IO = watcher->data;
+       EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
+       IO->Now = ev_now(event_base);
+       assert(IO->ShutdownAbort);
+       IO->ShutdownAbort(IO);
+}
+
 
 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 {
@@ -289,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO)
        IO->Terminate(IO);
 }
 
+void PostInbound(AsyncIO *IO)
+{
+       switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eSendReply:
+       case eSendMore:
+               assert(IO->SendDone);
+               IO->NextState = IO->SendDone(IO);
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eReadPayload:
+       case eReadMore:
+       case eReadFile:
+               ev_io_start(event_base, &IO->recv_event);
+               break;
+       case eTerminateConnection:
+               ShutDownCLient(IO);
+               break;
+       case eAbort:
+               ShutDownCLient(IO);
+               break;
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eDBQuery:
+       case eConnect:
+       case eReadMessage:
+               break;
+       }
+}
 eReadState HandleInbound(AsyncIO *IO)
 {
        const char *Err = NULL;
@@ -341,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO)
                }
        }
 
-       switch (IO->NextState) {
-       case eSendFile:
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eSendReply:
-       case eSendMore:
-               assert(IO->SendDone);
-               IO->NextState = IO->SendDone(IO);
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eReadPayload:
-       case eReadMore:
-       case eReadFile:
-               ev_io_start(event_base, &IO->recv_event);
-               break;
-       case eTerminateConnection:
-               ShutDownCLient(IO);
-               break;
-       case eAbort:
-               ShutDownCLient(IO);
-               break;
-       case eSendDNSQuery:
-       case eReadDNSReply:
-       case eDBQuery:
-       case eConnect:
-       case eReadMessage:
-               break;
-       }
+       PostInbound(IO);
+
        return Finished;
 }
 
@@ -380,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        AsyncIO *IO = watcher->data;
        const char *errmsg = NULL;
 
+       IO->Now = ev_now(event_base);
        become_session(IO->CitContext);
 #ifdef BIGBAD_IODBG
        {
@@ -440,6 +453,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eSendFile:
                        if (IO->IOB.ChunkSendRemain > 0) {
                                ev_io_start(event_base, &IO->recv_event);
+                               SetNextTimeout(IO, 100.0);
+
                        } else {
                                assert(IO->ReadDone);
                                IO->NextState = IO->ReadDone(IO);
@@ -499,8 +514,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                }
        }
        else if (rc < 0) {
-               assert(IO->Timeout);
-               IO->Timeout(IO);
+               IO_Timeout_callback(loop, &IO->rw_timeout, revents);
        }
        /* else : must write more. */
 }
@@ -536,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_timer_stop (event_base, &IO->rw_timeout);
        become_session(IO->CitContext);
 
@@ -563,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_timer_stop (event_base, &IO->conn_fail);
 
        if (IO->SendBuf.fd != 0)
@@ -594,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop,
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_idle_stop (event_base, &IO->conn_fail_immediate);
 
        if (IO->SendBuf.fd != 0)
@@ -619,6 +636,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_io_stop(loop, &IO->conn_event);
        ev_timer_stop (event_base, &IO->conn_fail);
        set_start_callback(loop, IO, revents);
@@ -630,6 +648,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        switch (IO->NextState) {
        case eReadFile:
                nbytes = FileRecvChunked(&IO->IOB, &errmsg);
@@ -640,6 +659,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                        if (IO->IOB.ChunkSendRemain == 0)
                        {
                                IO->NextState = eSendReply;
+                               assert(IO->ReadDone);
+                               ev_io_stop(event_base, &IO->recv_event);
+                               PostInbound(IO);
+                               return;
                        }
                        else
                                return;
@@ -681,15 +704,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        if (nbytes > 0) {
                HandleInbound(IO);
        } else if (nbytes == 0) {
-               assert(IO->Timeout);
-
-               switch (IO->Timeout(IO))
-               {
-               case eAbort:
-                       ShutDownCLient(IO);
-               default:
-                       break;
-               }
+               IO_Timeout_callback(loop, &IO->rw_timeout, revents);
                return;
        } else if (nbytes == -1) {
                // FD is gone. kick it. 
@@ -706,13 +721,14 @@ void
 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
+       IO->Now = ev_now(event_base);
        EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
        become_session(IO->CitContext);
-       assert(IO->DNS.Fail);
        assert(IO->DNS.Query->PostDNS);
        switch (IO->DNS.Query->PostDNS(IO))
        {
        case eAbort:
+               assert(IO->DNS.Fail);
                switch (IO->DNS.Fail(IO)) {
                case eAbort:
 ////                   StopClientWatchers(IO);
@@ -731,6 +747,7 @@ eNextState EvConnectSock(AsyncIO *IO,
                         double first_rw_timeout,
                         int ReadFirst)
 {
+       struct sockaddr_in egress_sin;
        int fdflags;
        int rc = -1;
 
@@ -799,19 +816,38 @@ eNextState EvConnectSock(AsyncIO *IO,
        IO->rw_timeout.data = IO;
 
 
+
+
        /* for debugging you may bypass it like this:
         * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
         * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
         *   inet_addr("127.0.0.1");
         */
-       if (IO->ConnectMe->IPv6)
+       if (IO->ConnectMe->IPv6) {
                rc = connect(IO->SendBuf.fd,
                             &IO->ConnectMe->Addr,
                             sizeof(struct sockaddr_in6));
-       else
+       }
+       else {
+               /* If citserver is bound to a specific IP address on the host, make
+                * sure we use that address for outbound connections.
+                */
+       
+               memset(&egress_sin, 0, sizeof(egress_sin));
+               egress_sin.sin_family = AF_INET;
+               if (!IsEmptyStr(config.c_ip_addr)) {
+                       egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
+                       if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
+                               egress_sin.sin_addr.s_addr = INADDR_ANY;
+                       }
+
+                       /* If this bind fails, no problem; we can still use INADDR_ANY */
+                       bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
+               }
                rc = connect(IO->SendBuf.fd,
                             (struct sockaddr_in *)&IO->ConnectMe->Addr,
                             sizeof(struct sockaddr_in));
+       }
 
        if (rc >= 0){
                EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
@@ -881,6 +917,7 @@ void InitIOStruct(AsyncIO *IO,
                  IO_CallBack SendDone,
                  IO_CallBack ReadDone,
                  IO_CallBack Terminate,
+                 IO_CallBack DBTerminate,
                  IO_CallBack ConnFail,
                  IO_CallBack Timeout,
                  IO_CallBack ShutdownAbort)
@@ -895,6 +932,7 @@ void InitIOStruct(AsyncIO *IO,
        IO->SendDone      = SendDone;
        IO->ReadDone      = ReadDone;
        IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->LineReader    = LineReader;
        IO->ConnFail      = ConnFail;
        IO->Timeout       = Timeout;
@@ -918,6 +956,7 @@ int InitcURLIOStruct(AsyncIO *IO,
                     const char* Desc,
                     IO_CallBack SendDone,
                     IO_CallBack Terminate,
+                    IO_CallBack DBTerminate,
                     IO_CallBack ShutdownAbort)
 {
        IO->Data          = Data;
@@ -925,8 +964,9 @@ int InitcURLIOStruct(AsyncIO *IO,
        IO->CitContext    = CloneContext(CC);
        ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
 
-       IO->SendDone = SendDone;
-       IO->Terminate = Terminate;
+       IO->SendDone      = SendDone;
+       IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->ShutdownAbort = ShutdownAbort;
 
        strcpy(IO->HttpReq.errdesc, Desc);
@@ -955,3 +995,9 @@ void EV_backtrace(AsyncIO *IO)
        free(strings);
 #endif
 }
+
+
+ev_tstamp ctdl_ev_now (void)
+{
+       return ev_now(event_base);
+}