libev migration - timeouts seem to be working.
authorWilfried Goesgens <dothebart@citadel.org>
Sat, 15 Jan 2011 19:57:34 +0000 (20:57 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Sat, 15 Jan 2011 19:57:34 +0000 (20:57 +0100)
  - handle timeouts with libev timers

citadel/event_client.c
citadel/event_client.h
citadel/modules/smtp/serv_smtpeventclient.c

index 9ad065422341a98ec8da5035c0dd31245c74c3bc..f3730ca67bc9dbb8f36abf6f57bc291ff7e2ddff 100644 (file)
@@ -122,25 +122,6 @@ void FreeAsyncIOContents(AsyncIO *IO)
        ares_destroy(IO->DNSChannel);
 }
 
-/*
-  static void
-  setup_signal_handlers(struct instance *instance)
-  {
-  signal(SIGPIPE, SIG_IGN);
-
-  event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST,
-  exit_event_callback, instance);
-  event_add(&instance->sigterm_event, NULL);
-
-  event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST,
-  exit_event_callback, instance);
-  event_add(&instance->sigint_event, NULL);
-
-  event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST,
-  exit_event_callback, instance);
-  event_add(&instance->sigquit_event, NULL);
-  }
-*/
 
 void ShutDownCLient(AsyncIO *IO)
 {
@@ -150,13 +131,15 @@ void ShutDownCLient(AsyncIO *IO)
        {
                ev_io_stop(event_base, &IO->send_event);
                ev_io_stop(event_base, &IO->recv_event);
+               ev_timer_stop (event_base, &IO->conn_fail);
+               ev_timer_stop (event_base, &IO->conn_timeout);
                close(IO->sock);
                IO->sock = 0;
                IO->SendBuf.fd = 0;
                IO->RecvBuf.fd = 0;
        }
 
-       IO->Terminate(IO->Data);
+       IO->Terminate(IO);
        
 }
 
@@ -191,7 +174,7 @@ eReadState HandleInbound(AsyncIO *IO)
                        
                if (Finished != eMustReadMore) {
                        ev_io_stop(event_base, &IO->recv_event);
-                       IO->NextState = IO->ReadDone(IO->Data);
+                       IO->NextState = IO->ReadDone(IO);
                        Finished = StrBufCheckBuffer(&IO->RecvBuf);
                }
        }
@@ -200,7 +183,7 @@ eReadState HandleInbound(AsyncIO *IO)
        if ((IO->NextState == eSendReply) ||
            (IO->NextState == eSendMore))
        {
-               IO->NextState = IO->SendDone(IO->Data);
+               IO->NextState = IO->SendDone(IO);
                ev_io_start(event_base, &IO->send_event);
        }
        else if ((IO->NextState == eTerminateConnection) ||
@@ -215,16 +198,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        int rc;
        AsyncIO *IO = watcher->data;
-/*
-       CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d  : [%s%s%s%s]\n",
-                     watcher->fd,
-                     (event&EV_TIMEOUT) ? " timeout" : "",
-                     (event&EV_READ)    ? " read" : "",
-                     (event&EV_WRITE)   ? " write" : "",
-                     (event&EV_SIGNAL)  ? " signal" : "");
-*/
-///    assert(fd == IO->sock);
-       
+
        rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
 
        if (rc == 0)
@@ -259,7 +233,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eSendReply:
                        break;
                case eSendMore:
-                       IO->NextState = IO->SendDone(IO->Data);
+                       IO->NextState = IO->SendDone(IO);
 
                        if ((IO->NextState == eTerminateConnection) ||
                            (IO->NextState == eAbort) )
@@ -321,12 +295,8 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 
 
 static void
-set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents, int first_rw_timeout)
+set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
 {
-       ev_timer_init(&IO->conn_timeout, 
-                     IO_Timout_callback, 
-                     first_rw_timeout, 0);
-       IO->conn_timeout.data = IO;
        
        switch(IO->NextState) {
        case eReadMessage:
@@ -386,6 +356,8 @@ IO->curr_ai->ai_family,
        IO->recv_event.data = IO;
        ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE);
        IO->send_event.data = IO;
+       ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
+       IO->conn_fail.data = IO;
        
        memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) );
 
@@ -402,23 +374,26 @@ IO->curr_ai->ai_family,
                     sizeof(struct sockaddr_in));
        if (rc >= 0){
 ////           freeaddrinfo(res);
-               set_start_callback(event_base, IO, 0, first_rw_timeout);
+               ev_timer_init(&IO->conn_timeout, IO_Timout_callback, first_rw_timeout, 0);
+               IO->conn_timeout.data = IO;
+               set_start_callback(event_base, IO, 0);
+               ev_timer_start(event_base, &IO->conn_timeout);
                return 0;
        }
        else if (errno == EINPROGRESS) {
-               set_start_callback(event_base, IO, 0, first_rw_timeout);
-               ev_timer_init(&IO->conn_fail, 
-                             IO_connfail_callback, 
-                             conn_timeout, 0);
-               IO->conn_fail.data = IO;
+               set_start_callback(event_base, IO, 0);
+               ev_timer_init(&IO->conn_timeout, 
+                             IO_Timout_callback, 
+                             conn_timeout + first_rw_timeout, 0);
                ev_timer_start(event_base, &IO->conn_fail);
+               ev_timer_start(event_base, &IO->conn_timeout);
                
                return 0;
        }
        else {
                CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno));
                StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno));
-               close(IO->sock);
+///            close(IO->sock);
 /*             IO->curr_ai = IO->curr_ai->ai_next;
                if (IO->curr_ai != NULL)
                        return event_connect_socket(IO);
@@ -427,6 +402,14 @@ IO->curr_ai->ai_family,
        }
 }
 
+void SetNextTimeout(AsyncIO *IO, int timeout)
+{
+       ev_timer_init(&IO->conn_timeout, 
+                     IO_Timout_callback, 
+                     timeout, 0);
+       ev_timer_again (event_base, &IO->conn_timeout);
+}
+
 void InitEventIO(AsyncIO *IO, 
                 void *pData, 
                 IO_CallBack ReadDone, 
index e6b41f3bd97401ccbc041b538874dd772ca082f6..c5761ffae482c27ceee0cc73e04f09f329b1b981 100644 (file)
@@ -16,7 +16,7 @@ typedef enum _eNextState {
 }eNextState;
 
 typedef int (*EventContextAttach)(void *Data);
-typedef eNextState (*IO_CallBack)(void *Data);
+typedef eNextState (*IO_CallBack)(AsyncIO *IO);
 typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO);
 typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int);
 typedef void (*FreeDNSReply)(void *DNSData);
@@ -99,3 +99,5 @@ void InitEventIO(AsyncIO *IO,
 int QueueQuery(ns_type Type, char *name, AsyncIO *IO, IO_CallBack PostDNS);
 
 void StopClient(AsyncIO *IO);
+
+void SetNextTimeout(AsyncIO *IO, int timeout);
index fc4dc20a64ed1ee83704760ee4fc4a130f3cb088..f1f3371282e984046c30c0cfaa0c37af15041e94 100644 (file)
@@ -110,6 +110,18 @@ typedef enum _eSMTP_C_States {
 
 const long SMTP_C_ConnTimeout = 60; /* wail 1 minute for connections... */
 const long SMTP_C_ReadTimeouts[eMaxSMTPC] = {
+       300, /* Greeting... */
+       30, /* EHLO */
+       30, /* HELO */
+       30, /* Auth */
+       30, /* From */
+       90, /* RCPT */
+       30, /* DATA */
+       90, /* DATABody */
+       0, /* end of body... */
+       30  /* QUIT */
+};
+const long SMTP_C_SendTimeouts[eMaxSMTPC] = {
        90, /* Greeting... */
        30, /* EHLO */
        30, /* HELO */
@@ -125,16 +137,16 @@ const long SMTP_C_ReadTimeouts[eMaxSMTPC] = {
 const long SMTP_C_SendTimeouts[eMaxSMTPC] = {
 
 }; */
-const char *ReadErrors[eMaxSMTPC] = {
-       "Connection broken during SMTP conversation",
-       "Connection broken during SMTP EHLO",
-       "Connection broken during SMTP HELO",
-       "Connection broken during SMTP AUTH",
-       "Connection broken during SMTP MAIL FROM",
-       "Connection broken during SMTP RCPT",
-       "Connection broken during SMTP DATA",
-       "Connection broken during SMTP message transmit",
-       ""/* quit reply, don't care. */
+static const ConstStr ReadErrors[eMaxSMTPC] = {
+       {HKEY("Connection broken during SMTP conversation")},
+       {HKEY("Connection broken during SMTP EHLO")},
+       {HKEY("Connection broken during SMTP HELO")},
+       {HKEY("Connection broken during SMTP AUTH")},
+       {HKEY("Connection broken during SMTP MAIL FROM")},
+       {HKEY("Connection broken during SMTP RCPT")},
+       {HKEY("Connection broken during SMTP DATA")},
+       {HKEY("Connection broken during SMTP message transmit")},
+       {HKEY("")}/* quit reply, don't care. */
 };
 
 
@@ -171,14 +183,14 @@ void DeleteSmtpOutMsg(void *v)
        
        FreeStrBuf(&Msg->msgtext);
        FreeAsyncIOContents(&Msg->IO);
-       free(Msg);
+///    free(Msg);
 }
 
-eNextState SMTP_C_Timeout(void *Data);
-eNextState SMTP_C_ConnFail(void *Data);
-eNextState SMTP_C_DispatchReadDone(void *Data);
-eNextState SMTP_C_DispatchWriteDone(void *Data);
-eNextState SMTP_C_Terminate(void *Data);
+eNextState SMTP_C_Timeout(AsyncIO *IO);
+eNextState SMTP_C_ConnFail(AsyncIO *IO);
+eNextState SMTP_C_DispatchReadDone(AsyncIO *IO);
+eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO);
+eNextState SMTP_C_Terminate(AsyncIO *IO);
 eReadState SMTP_C_ReadServerStatus(AsyncIO *IO);
 
 typedef eNextState (*SMTPReadHandler)(SmtpOutMsg *Msg);
@@ -207,6 +219,8 @@ typedef eNextState (*SMTPSendHandler)(SmtpOutMsg *Msg);
 
 void FinalizeMessageSend(SmtpOutMsg *Msg)
 {
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+       
        if (DecreaseQReference(Msg->MyQItem)) 
        {
                int nRemain;
@@ -255,6 +269,7 @@ void get_one_mx_host_ip_done(void *Ctx,
 {
        AsyncIO *IO = Ctx;
        SmtpOutMsg *SendMsg = IO->Data;
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
        if ((status == ARES_SUCCESS) && (hostent != NULL) ) {
                unsigned long psaddr;
                // TODO: IPV6
@@ -262,7 +277,7 @@ void get_one_mx_host_ip_done(void *Ctx,
                psaddr = ntohl(psaddr); 
 
                CtdlLogPrintf(CTDL_DEBUG, 
-                             "SMTP client[%ld]: connecting to %s [%d.%d.%d.%d:%d] ...\n", 
+                             "SMTP client[%ld]: connecting to %s [%ld.%ld.%ld.%ld:%d] ...\n", 
                              SendMsg->n, 
                              SendMsg->mx_host, 
                              (psaddr >> 24) & 0xFF,
@@ -273,8 +288,13 @@ void get_one_mx_host_ip_done(void *Ctx,
 
                SendMsg->MyQEntry->Status = 5; 
                StrBufPrintf(SendMsg->MyQEntry->StatusMessage, 
-                            "Timeout while connecting %s", 
-                            SendMsg->mx_host);
+                            "Timeout while connecting %s [%ld.%ld.%ld.%ld:%d] ", 
+                            SendMsg->mx_host,
+                            (psaddr >> 24) & 0xFF,
+                            (psaddr >> 16) & 0xFF,
+                            (psaddr >>  8) & 0xFF,
+                            (psaddr >>  0) & 0xFF,
+                            SendMsg->IO.dport);
 
                SendMsg->IO.HEnt = hostent;
                InitEventIO(IO, SendMsg, 
@@ -297,6 +317,7 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg)
        //char *endpart;
        //char buf[SIZ];
 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
        SendMsg->IO.dport = DefaultMXPort;
 
 
@@ -328,7 +349,8 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg)
        CtdlLogPrintf(CTDL_DEBUG, 
                      "SMTP client[%ld]: looking up %s : %d ...\n", 
                      SendMsg->n, 
-                     SendMsg->mx_host);
+                     SendMsg->mx_host, 
+                     SendMsg->IO.dport);
 
        ares_gethostbyname(SendMsg->IO.DNSChannel,
                           SendMsg->mx_host,   
@@ -338,11 +360,12 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg)
 }
 
 
-eNextState smtp_resolve_mx_done(void *data)
+eNextState smtp_resolve_mx_done(AsyncIO *IO)
 {
-       AsyncIO *IO = data;
        SmtpOutMsg * SendMsg = IO->Data;
 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+
        SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024);
        SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024);
        SendMsg->IO.IOBuf = NewStrBuf();
@@ -359,6 +382,8 @@ int resolve_mx_records(void *Ctx)
 {
        SmtpOutMsg * SendMsg = Ctx;
 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+
        if (!QueueQuery(ns_t_mx, 
                        SendMsg->node, 
                        &SendMsg->IO, 
@@ -381,6 +406,8 @@ int smtp_resolve_recipients(SmtpOutMsg *SendMsg)
        int lp, rp;
        int i;
 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+
        if ((SendMsg==NULL) || 
            (SendMsg->MyQEntry == NULL) || 
            (StrLength(SendMsg->MyQEntry->Recipient) == 0)) {
@@ -461,6 +488,8 @@ void smtp_try(OneQueItem *MyQItem,
 {
        SmtpOutMsg * SendMsg;
 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+
        SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg));
        memset(SendMsg, 0, sizeof(SmtpOutMsg));
        SendMsg->IO.sock = (-1);
@@ -772,40 +801,69 @@ SMTPSendHandler SendHandlers[eMaxSMTPC] = {
        SMTPC_send_terminate_data_body,
        SMTPC_send_QUIT
 };
-eNextState SMTP_C_DispatchReadDone(void *Data)
+
+void SMTPSetTimeout(eNextState NextTCPState, SmtpOutMsg *pMsg)
+{
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+       long Timeout;
+       switch (NextTCPState) {
+       case eSendReply:
+       case eSendMore:
+               Timeout = SMTP_C_SendTimeouts[pMsg->State];
+               break;
+       case eReadMessage:
+               Timeout = SMTP_C_ReadTimeouts[pMsg->State];
+               break;
+       case eTerminateConnection:
+       case eAbort:
+               return;
+       }
+       SetNextTimeout(&pMsg->IO, Timeout);
+}
+eNextState SMTP_C_DispatchReadDone(AsyncIO *IO)
 {
-       SmtpOutMsg *pMsg = Data;
-       eNextState rc = ReadHandlers[pMsg->State](pMsg);
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+       SmtpOutMsg *pMsg = IO->Data;
+       eNextState rc;
+
+       rc = ReadHandlers[pMsg->State](pMsg);
        pMsg->State++;
+       SMTPSetTimeout(rc, pMsg);
        return rc;
 }
-eNextState SMTP_C_DispatchWriteDone(void *Data)
+eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO)
 {
-       SmtpOutMsg *pMsg = Data;
-       return SendHandlers[pMsg->State](pMsg); 
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
+       SmtpOutMsg *pMsg = IO->Data;
+       eNextState rc;
+
+       rc = SendHandlers[pMsg->State](pMsg);
+       SMTPSetTimeout(rc, pMsg);
+       return rc;
 }
 
 
 /*****************************************************************************/
 /*                     SMTP CLIENT ERROR CATCHERS                            */
 /*****************************************************************************/
-eNextState SMTP_C_Terminate(void *Data)
+eNextState SMTP_C_Terminate(AsyncIO *IO)
 {
-       AsyncIO *IO = Data;
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
        SmtpOutMsg *pMsg = IO->Data;
        FinalizeMessageSend(pMsg);
        return 0;
 }
-eNextState SMTP_C_Timeout(void *Data)
+eNextState SMTP_C_Timeout(AsyncIO *IO)
 {
-       AsyncIO *IO = Data;
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
        SmtpOutMsg *pMsg = IO->Data;
+       StrBufPlain(IO->ErrMsg, CKEY(ReadErrors[pMsg->State]));
        FinalizeMessageSend(pMsg);
        return 0;
 }
-eNextState SMTP_C_ConnFail(void *Data)
+eNextState SMTP_C_ConnFail(AsyncIO *IO)
 {
-       AsyncIO *IO = Data;
+       CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
        SmtpOutMsg *pMsg = IO->Data;
        FinalizeMessageSend(pMsg);
        return 0;