Logging: add timestamps for eventdriven IO
[citadel.git] / citadel / event_client.c
index 9f01e9a4aeddb193026e3411858c0b7e48a9718b..d0e0fd9aefe9f20295a8827fda8fb5dabcf3f26b 100644 (file)
 #include "ctdl_module.h"
 
 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
-
 static void IO_abort_shutdown_callback(struct ev_loop *loop,
                                       ev_cleanup *watcher,
-                                      int revents)
-{
-       AsyncIO *IO = watcher->data;
-       EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
-
-       assert(IO->ShutdownAbort);
-       IO->ShutdownAbort(IO);
-}
+                                      int revents);
 
 
 /*------------------------------------------------------------------------------
@@ -127,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO)
        EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
        ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
 
-       assert(IO->Terminate);
-       IO->Terminate(IO);
+       assert(IO->DBTerminate);
+       IO->DBTerminate(IO);
 }
 
 void
 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
-       EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
+       IO->Now = ev_now(event_db);
+       EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
        become_session(IO->CitContext);
 
        ev_idle_stop(event_db, &IO->db_unwind_stack);
@@ -185,6 +178,17 @@ extern struct ev_loop *event_base;
 extern ev_async AddJob;
 extern ev_async ExitEventLoop;
 
+static void IO_abort_shutdown_callback(struct ev_loop *loop,
+                                      ev_cleanup *watcher,
+                                      int revents)
+{
+       AsyncIO *IO = watcher->data;
+       EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
+       IO->Now = ev_now(event_base);
+       assert(IO->ShutdownAbort);
+       IO->ShutdownAbort(IO);
+}
+
 
 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
 {
@@ -292,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO)
        IO->Terminate(IO);
 }
 
+void PostInbound(AsyncIO *IO)
+{
+       switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eSendReply:
+       case eSendMore:
+               assert(IO->SendDone);
+               IO->NextState = IO->SendDone(IO);
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eReadPayload:
+       case eReadMore:
+       case eReadFile:
+               ev_io_start(event_base, &IO->recv_event);
+               break;
+       case eTerminateConnection:
+               ShutDownCLient(IO);
+               break;
+       case eAbort:
+               ShutDownCLient(IO);
+               break;
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eDBQuery:
+       case eConnect:
+       case eReadMessage:
+               break;
+       }
+}
 eReadState HandleInbound(AsyncIO *IO)
 {
        const char *Err = NULL;
@@ -344,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO)
                }
        }
 
-       switch (IO->NextState) {
-       case eSendFile:
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eSendReply:
-       case eSendMore:
-               assert(IO->SendDone);
-               IO->NextState = IO->SendDone(IO);
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eReadPayload:
-       case eReadMore:
-       case eReadFile:
-               ev_io_start(event_base, &IO->recv_event);
-               break;
-       case eTerminateConnection:
-               ShutDownCLient(IO);
-               break;
-       case eAbort:
-               ShutDownCLient(IO);
-               break;
-       case eSendDNSQuery:
-       case eReadDNSReply:
-       case eDBQuery:
-       case eConnect:
-       case eReadMessage:
-               break;
-       }
+       PostInbound(IO);
+
        return Finished;
 }
 
@@ -383,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        AsyncIO *IO = watcher->data;
        const char *errmsg = NULL;
 
+       IO->Now = ev_now(event_base);
        become_session(IO->CitContext);
 #ifdef BIGBAD_IODBG
        {
@@ -540,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_timer_stop (event_base, &IO->rw_timeout);
        become_session(IO->CitContext);
 
@@ -567,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_timer_stop (event_base, &IO->conn_fail);
 
        if (IO->SendBuf.fd != 0)
@@ -598,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop,
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_idle_stop (event_base, &IO->conn_fail_immediate);
 
        if (IO->SendBuf.fd != 0)
@@ -623,6 +636,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        ev_io_stop(loop, &IO->conn_event);
        ev_timer_stop (event_base, &IO->conn_fail);
        set_start_callback(loop, IO, revents);
@@ -634,6 +648,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
+       IO->Now = ev_now(event_base);
        switch (IO->NextState) {
        case eReadFile:
                nbytes = FileRecvChunked(&IO->IOB, &errmsg);
@@ -644,6 +659,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                        if (IO->IOB.ChunkSendRemain == 0)
                        {
                                IO->NextState = eSendReply;
+                               assert(IO->ReadDone);
+                               ev_io_stop(event_base, &IO->recv_event);
+                               PostInbound(IO);
+                               return;
                        }
                        else
                                return;
@@ -685,15 +704,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        if (nbytes > 0) {
                HandleInbound(IO);
        } else if (nbytes == 0) {
-               assert(IO->Timeout);
-
-               switch (IO->Timeout(IO))
-               {
-               case eAbort:
-                       ShutDownCLient(IO);
-               default:
-                       break;
-               }
+               IO_Timeout_callback(loop, &IO->rw_timeout, revents);
                return;
        } else if (nbytes == -1) {
                // FD is gone. kick it. 
@@ -710,6 +721,7 @@ void
 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
+       IO->Now = ev_now(event_base);
        EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
        become_session(IO->CitContext);
        assert(IO->DNS.Query->PostDNS);
@@ -905,6 +917,7 @@ void InitIOStruct(AsyncIO *IO,
                  IO_CallBack SendDone,
                  IO_CallBack ReadDone,
                  IO_CallBack Terminate,
+                 IO_CallBack DBTerminate,
                  IO_CallBack ConnFail,
                  IO_CallBack Timeout,
                  IO_CallBack ShutdownAbort)
@@ -919,6 +932,7 @@ void InitIOStruct(AsyncIO *IO,
        IO->SendDone      = SendDone;
        IO->ReadDone      = ReadDone;
        IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->LineReader    = LineReader;
        IO->ConnFail      = ConnFail;
        IO->Timeout       = Timeout;
@@ -942,6 +956,7 @@ int InitcURLIOStruct(AsyncIO *IO,
                     const char* Desc,
                     IO_CallBack SendDone,
                     IO_CallBack Terminate,
+                    IO_CallBack DBTerminate,
                     IO_CallBack ShutdownAbort)
 {
        IO->Data          = Data;
@@ -949,8 +964,9 @@ int InitcURLIOStruct(AsyncIO *IO,
        IO->CitContext    = CloneContext(CC);
        ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
 
-       IO->SendDone = SendDone;
-       IO->Terminate = Terminate;
+       IO->SendDone      = SendDone;
+       IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->ShutdownAbort = ShutdownAbort;
 
        strcpy(IO->HttpReq.errdesc, Desc);