Merge branch 'master' of ssh://git.citadel.org/appl/gitroot/citadel
[citadel.git] / citadel / event_client.c
index 5a32cea9bca45878d533fe68cb1cec7f68700147..2e19b17b4ecaee4d9e41b355cf4faaeeda2889ae 100644 (file)
@@ -47,6 +47,9 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <assert.h>
+#if HAVE_BACKTRACE
+#include <execinfo.h>
+#endif
 
 #include <libcitadel.h>
 #include "citadel.h"
@@ -66,6 +69,9 @@
 #include "citadel_dirs.h"
 
 #include "event_client.h"
+#include "ctdl_module.h"
+
+static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
 
 static void IO_abort_shutdown_callback(struct ev_loop *loop,
                                       ev_cleanup *watcher,
@@ -121,15 +127,15 @@ void ShutDownDBCLient(AsyncIO *IO)
        EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
        ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
 
-       assert(IO->Terminate);
-       IO->Terminate(IO);
+       assert(IO->DBTerminate);
+       IO->DBTerminate(IO);
 }
 
 void
 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
-       EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
+       EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
        become_session(IO->CitContext);
 
        ev_idle_stop(event_db, &IO->db_unwind_stack);
@@ -256,7 +262,10 @@ void StopClientWatchers(AsyncIO *IO)
        ev_io_stop(event_base, &IO->conn_event);
        ev_io_stop(event_base, &IO->send_event);
        ev_io_stop(event_base, &IO->recv_event);
-       close(IO->SendBuf.fd);
+
+       if (IO->SendBuf.fd != 0) {
+               close(IO->SendBuf.fd);
+       }
        IO->SendBuf.fd = 0;
        IO->RecvBuf.fd = 0;
 }
@@ -273,6 +282,8 @@ void ShutDownCLient(AsyncIO *IO)
 
        if (IO->DNS.Channel != NULL) {
                ares_destroy(IO->DNS.Channel);
+               EV_DNS_LOG_STOP(DNS.recv_event);
+               EV_DNS_LOG_STOP(DNS.send_event);
                ev_io_stop(event_base, &IO->DNS.recv_event);
                ev_io_stop(event_base, &IO->DNS.send_event);
                IO->DNS.Channel = NULL;
@@ -281,6 +292,37 @@ void ShutDownCLient(AsyncIO *IO)
        IO->Terminate(IO);
 }
 
+void PostInbound(AsyncIO *IO)
+{
+       switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eSendReply:
+       case eSendMore:
+               assert(IO->SendDone);
+               IO->NextState = IO->SendDone(IO);
+               ev_io_start(event_base, &IO->send_event);
+               break;
+       case eReadPayload:
+       case eReadMore:
+       case eReadFile:
+               ev_io_start(event_base, &IO->recv_event);
+               break;
+       case eTerminateConnection:
+               ShutDownCLient(IO);
+               break;
+       case eAbort:
+               ShutDownCLient(IO);
+               break;
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eDBQuery:
+       case eConnect:
+       case eReadMessage:
+               break;
+       }
+}
 eReadState HandleInbound(AsyncIO *IO)
 {
        const char *Err = NULL;
@@ -333,34 +375,8 @@ eReadState HandleInbound(AsyncIO *IO)
                }
        }
 
-       switch (IO->NextState) {
-       case eSendFile:
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eSendReply:
-       case eSendMore:
-               assert(IO->SendDone);
-               IO->NextState = IO->SendDone(IO);
-               ev_io_start(event_base, &IO->send_event);
-               break;
-       case eReadPayload:
-       case eReadMore:
-       case eReadFile:
-               ev_io_start(event_base, &IO->recv_event);
-               break;
-       case eTerminateConnection:
-//////TODOxxxx
-               break;
-       case eAbort:
-               ShutDownCLient(IO);
-               break;
-       case eSendDNSQuery:
-       case eReadDNSReply:
-       case eDBQuery:
-       case eConnect:
-       case eReadMessage:
-               break;
-       }
+       PostInbound(IO);
+
        return Finished;
 }
 
@@ -432,6 +448,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eSendFile:
                        if (IO->IOB.ChunkSendRemain > 0) {
                                ev_io_start(event_base, &IO->recv_event);
+                               SetNextTimeout(IO, 100.0);
+
                        } else {
                                assert(IO->ReadDone);
                                IO->NextState = IO->ReadDone(IO);
@@ -491,8 +509,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                }
        }
        else if (rc < 0) {
-               assert(IO->Timeout);
-               IO->Timeout(IO);
+               IO_Timeout_callback(loop, &IO->rw_timeout, revents);
        }
        /* else : must write more. */
 }
@@ -632,6 +649,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                        if (IO->IOB.ChunkSendRemain == 0)
                        {
                                IO->NextState = eSendReply;
+                               assert(IO->ReadDone);
+                               ev_io_stop(event_base, &IO->recv_event);
+                               PostInbound(IO);
+                               return;
                        }
                        else
                                return;
@@ -673,18 +694,11 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
        if (nbytes > 0) {
                HandleInbound(IO);
        } else if (nbytes == 0) {
-               assert(IO->Timeout);
-
-               switch (IO->Timeout(IO))
-               {
-               case eAbort:
-                       ShutDownCLient(IO);
-               default:
-                       break;
-               }
+               IO_Timeout_callback(loop, &IO->rw_timeout, revents);
                return;
        } else if (nbytes == -1) {
-/// TODO: FD is gone. kick it.        sock_buff_invoke_free(sb, errno);
+               // FD is gone. kick it. 
+               StopClientWatchers(IO);
                EV_syslog(LOG_DEBUG,
                          "EVENT: Socket Invalid! %s \n",
                          strerror(errno));
@@ -699,13 +713,14 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
        AsyncIO *IO = watcher->data;
        EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
        become_session(IO->CitContext);
-       assert(IO->DNS.Fail);
        assert(IO->DNS.Query->PostDNS);
        switch (IO->DNS.Query->PostDNS(IO))
        {
        case eAbort:
+               assert(IO->DNS.Fail);
                switch (IO->DNS.Fail(IO)) {
                case eAbort:
+////                   StopClientWatchers(IO);
                        ShutDownCLient(IO);
                default:
                        break;
@@ -721,6 +736,7 @@ eNextState EvConnectSock(AsyncIO *IO,
                         double first_rw_timeout,
                         int ReadFirst)
 {
+       struct sockaddr_in egress_sin;
        int fdflags;
        int rc = -1;
 
@@ -747,6 +763,7 @@ eNextState EvConnectSock(AsyncIO *IO,
                StrBufPrintf(IO->ErrMsg,
                             "Failed to create socket: %s",
                             strerror(errno));
+               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
                return eAbort;
        }
        fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
@@ -757,6 +774,8 @@ eNextState EvConnectSock(AsyncIO *IO,
                StrBufPrintf(IO->ErrMsg,
                             "Failed to get socket flags: %s",
                             strerror(errno));
+               close(IO->SendBuf.fd);
+               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
                return eAbort;
        }
        fdflags = fdflags | O_NONBLOCK;
@@ -769,7 +788,7 @@ eNextState EvConnectSock(AsyncIO *IO,
                             "Failed to set socket flags: %s",
                             strerror(errno));
                close(IO->SendBuf.fd);
-               IO->SendBuf.fd = IO->RecvBuf.fd = -1;
+               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
                return eAbort;
        }
 /* TODO: maye we could use offsetof() to calc the position of data...
@@ -786,19 +805,38 @@ eNextState EvConnectSock(AsyncIO *IO,
        IO->rw_timeout.data = IO;
 
 
+
+
        /* for debugging you may bypass it like this:
         * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
         * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
         *   inet_addr("127.0.0.1");
         */
-       if (IO->ConnectMe->IPv6)
+       if (IO->ConnectMe->IPv6) {
                rc = connect(IO->SendBuf.fd,
                             &IO->ConnectMe->Addr,
                             sizeof(struct sockaddr_in6));
-       else
+       }
+       else {
+               /* If citserver is bound to a specific IP address on the host, make
+                * sure we use that address for outbound connections.
+                */
+       
+               memset(&egress_sin, 0, sizeof(egress_sin));
+               egress_sin.sin_family = AF_INET;
+               if (!IsEmptyStr(config.c_ip_addr)) {
+                       egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
+                       if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
+                               egress_sin.sin_addr.s_addr = INADDR_ANY;
+                       }
+
+                       /* If this bind fails, no problem; we can still use INADDR_ANY */
+                       bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
+               }
                rc = connect(IO->SendBuf.fd,
                             (struct sockaddr_in *)&IO->ConnectMe->Addr,
                             sizeof(struct sockaddr_in));
+       }
 
        if (rc >= 0){
                EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
@@ -868,6 +906,7 @@ void InitIOStruct(AsyncIO *IO,
                  IO_CallBack SendDone,
                  IO_CallBack ReadDone,
                  IO_CallBack Terminate,
+                 IO_CallBack DBTerminate,
                  IO_CallBack ConnFail,
                  IO_CallBack Timeout,
                  IO_CallBack ShutdownAbort)
@@ -882,6 +921,7 @@ void InitIOStruct(AsyncIO *IO,
        IO->SendDone      = SendDone;
        IO->ReadDone      = ReadDone;
        IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->LineReader    = LineReader;
        IO->ConnFail      = ConnFail;
        IO->Timeout       = Timeout;
@@ -892,6 +932,9 @@ void InitIOStruct(AsyncIO *IO,
        IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
        IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
        IO->IOBuf         = NewStrBuf();
+       EV_syslog(LOG_DEBUG,
+                 "EVENT: Session lives at %p IO at %p \n",
+                 Data, IO);
 
 }
 
@@ -902,6 +945,7 @@ int InitcURLIOStruct(AsyncIO *IO,
                     const char* Desc,
                     IO_CallBack SendDone,
                     IO_CallBack Terminate,
+                    IO_CallBack DBTerminate,
                     IO_CallBack ShutdownAbort)
 {
        IO->Data          = Data;
@@ -909,8 +953,9 @@ int InitcURLIOStruct(AsyncIO *IO,
        IO->CitContext    = CloneContext(CC);
        ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
 
-       IO->SendDone = SendDone;
-       IO->Terminate = Terminate;
+       IO->SendDone      = SendDone;
+       IO->Terminate     = Terminate;
+       IO->DBTerminate   = DBTerminate;
        IO->ShutdownAbort = ShutdownAbort;
 
        strcpy(IO->HttpReq.errdesc, Desc);
@@ -919,3 +964,29 @@ int InitcURLIOStruct(AsyncIO *IO,
        return  evcurl_init(IO);
 
 }
+
+void EV_backtrace(AsyncIO *IO)
+{
+#ifdef HAVE_BACKTRACE
+       void *stack_frames[50];
+       size_t size, i;
+       char **strings;
+
+
+       size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
+       strings = backtrace_symbols(stack_frames, size);
+       for (i = 0; i < size; i++) {
+               if (strings != NULL)
+                       EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
+               else
+                       EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
+       }
+       free(strings);
+#endif
+}
+
+
+ev_tstamp ctdl_ev_now (void)
+{
+       return ev_now(event_base);
+}