Libev migration:
authorWilfried Goesgens <dothebart@citadel.org>
Sun, 16 Jan 2011 17:46:56 +0000 (18:46 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Sun, 16 Jan 2011 17:46:56 +0000 (18:46 +0100)
  - timeouts implemented & working
  - async connects working

citadel/event_client.c
citadel/event_client.h
citadel/modules/smtp/serv_smtpeventclient.c

index f3730ca67bc9dbb8f36abf6f57bc291ff7e2ddff..0dd97d4b95f964c414fc423c5e0822f374937425 100644 (file)
@@ -63,8 +63,6 @@
 #include "locate_host.h"
 #include "citadel_dirs.h"
 
-#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT
-
 #include "event_client.h"
 
 extern int event_add_pipe[2];
@@ -131,8 +129,7 @@ void ShutDownCLient(AsyncIO *IO)
        {
                ev_io_stop(event_base, &IO->send_event);
                ev_io_stop(event_base, &IO->recv_event);
-               ev_timer_stop (event_base, &IO->conn_fail);
-               ev_timer_stop (event_base, &IO->conn_timeout);
+               ev_timer_stop (event_base, &IO->rw_timeout);
                close(IO->sock);
                IO->sock = 0;
                IO->SendBuf.fd = 0;
@@ -143,6 +140,7 @@ void ShutDownCLient(AsyncIO *IO)
        
 }
 
+
 eReadState HandleInbound(AsyncIO *IO)
 {
        eReadState Finished = eBufferNotEmpty;
@@ -260,12 +258,31 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                IO->Timeout(IO);
        /* else : must write more. */
 }
+static void
+set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
+{
+       
+       switch(IO->NextState) {
+       case eReadMessage:
+               ev_io_start(event_base, &IO->recv_event);
+               break;
+       case eSendReply:
+       case eSendMore:
+               IO_send_callback(loop, &IO->send_event, revents);
+               break;
+       case eTerminateConnection:
+       case eAbort:
+               /// TODO: WHUT?
+               break;
+       }
+}
 
 static void
 IO_Timout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       ev_timer_stop (event_base, &IO->rw_timeout);
        IO->Timeout(IO);
 }
 static void
@@ -273,9 +290,20 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
 {
        AsyncIO *IO = watcher->data;
 
+       ev_timer_stop (event_base, &IO->conn_fail);
+       ev_io_stop(loop, &IO->conn_event);
        IO->ConnFail(IO);
 }
 static void
+IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
+{
+       AsyncIO *IO = watcher->data;
+
+       ev_io_stop(loop, &IO->conn_event);
+       ev_timer_stop (event_base, &IO->conn_fail);
+       set_start_callback(loop, IO, revents);
+}
+static void
 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        ssize_t nbytes;
@@ -294,26 +322,8 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 }
 
 
-static void
-set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
-{
-       
-       switch(IO->NextState) {
-       case eReadMessage:
-               ev_io_start(event_base, &IO->recv_event);
-               break;
-       case eSendReply:
-       case eSendMore:
-               IO_send_callback(loop, &IO->send_event, revents);
-               break;
-       case eTerminateConnection:
-       case eAbort:
-               /// TODO: WHUT?
-               break;
-       }
-}
 
-int event_connect_socket(AsyncIO *IO, int conn_timeout, int first_rw_timeout)
+int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout)
 {
        struct sockaddr_in  saddr;
        int fdflags; 
@@ -356,14 +366,18 @@ IO->curr_ai->ai_family,
        IO->recv_event.data = IO;
        ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE);
        IO->send_event.data = IO;
+
        ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
        IO->conn_fail.data = IO;
+       ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0);
+       IO->rw_timeout.data = IO;
        
        memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) );
 
        memcpy(&saddr.sin_addr, 
               IO->HEnt->h_addr_list[0],
               sizeof(struct in_addr));
+       saddr.sin_addr.s_addr = inet_addr("127.0.0.1");
 
        saddr.sin_family = AF_INET;
        saddr.sin_port = htons(IO->dport);
@@ -374,40 +388,31 @@ IO->curr_ai->ai_family,
                     sizeof(struct sockaddr_in));
        if (rc >= 0){
 ////           freeaddrinfo(res);
-               ev_timer_init(&IO->conn_timeout, IO_Timout_callback, first_rw_timeout, 0);
-               IO->conn_timeout.data = IO;
                set_start_callback(event_base, IO, 0);
-               ev_timer_start(event_base, &IO->conn_timeout);
+               ev_timer_start(event_base, &IO->rw_timeout);
                return 0;
        }
        else if (errno == EINPROGRESS) {
-               set_start_callback(event_base, IO, 0);
-               ev_timer_init(&IO->conn_timeout, 
-                             IO_Timout_callback, 
-                             conn_timeout + first_rw_timeout, 0);
+
+               ev_io_init(&IO->conn_event, IO_connestd_callback, IO->sock, EV_READ|EV_WRITE);
+               IO->conn_event.data = IO;
+
+               ev_io_start(event_base, &IO->conn_event);
                ev_timer_start(event_base, &IO->conn_fail);
-               ev_timer_start(event_base, &IO->conn_timeout);
-               
                return 0;
        }
        else {
                CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno));
                StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno));
-///            close(IO->sock);
-/*             IO->curr_ai = IO->curr_ai->ai_next;
-               if (IO->curr_ai != NULL)
-                       return event_connect_socket(IO);
-                       else*/
-                       return -1;
+               IO->ConnFail(IO);
+               return -1;
        }
 }
 
-void SetNextTimeout(AsyncIO *IO, int timeout)
+void SetNextTimeout(AsyncIO *IO, double timeout)
 {
-       ev_timer_init(&IO->conn_timeout, 
-                     IO_Timout_callback, 
-                     timeout, 0);
-       ev_timer_again (event_base, &IO->conn_timeout);
+       IO->rw_timeout.repeat = timeout;
+       ev_timer_again (event_base,  &IO->rw_timeout);
 }
 
 void InitEventIO(AsyncIO *IO, 
@@ -418,8 +423,8 @@ void InitEventIO(AsyncIO *IO,
                 IO_CallBack Timeout, 
                 IO_CallBack ConnFail, 
                 IO_LineReaderCallback LineReader,
-                int conn_timeout, 
-                int first_rw_timeout,
+                double conn_timeout, 
+                double first_rw_timeout,
                 int ReadFirst)
 {
        IO->Data = pData;
@@ -428,6 +433,7 @@ void InitEventIO(AsyncIO *IO,
        IO->Terminate = Terminate;
        IO->LineReader = LineReader;
        IO->ConnFail = ConnFail;
+       IO->Timeout = Timeout;
        
        if (ReadFirst) {
                IO->NextState = eReadMessage;
@@ -439,6 +445,3 @@ void InitEventIO(AsyncIO *IO,
 //     IO->res = HEnt->h_addr_list[0];
        event_connect_socket(IO, conn_timeout, first_rw_timeout);
 }
-
-
-#endif
index c5761ffae482c27ceee0cc73e04f09f329b1b981..208a716616a7122ca8340490d2fa2511ffcc9d15 100644 (file)
@@ -37,9 +37,10 @@ struct AsyncIO {
                eNextState NextState;
 
        ev_timer conn_fail, 
-               conn_timeout;
+               rw_timeout;
        ev_io recv_event, 
-               send_event;
+               send_event, 
+               conn_event;
        StrBuf *ErrMsg; /* if we fail to connect, or lookup, error goes here. */
 
        /* read/send related... */
@@ -93,11 +94,11 @@ void InitEventIO(AsyncIO *IO,
                 IO_CallBack Timeout, 
                 IO_CallBack ConnFail, 
                 IO_LineReaderCallback LineReader,
-                int conn_timeout, int first_rw_timeout,
+                double conn_timeout, double first_rw_timeout,
                 int ReadFirst);
 
 int QueueQuery(ns_type Type, char *name, AsyncIO *IO, IO_CallBack PostDNS);
 
 void StopClient(AsyncIO *IO);
 
-void SetNextTimeout(AsyncIO *IO, int timeout);
+void SetNextTimeout(AsyncIO *IO, double timeout);
index f1f3371282e984046c30c0cfaa0c37af15041e94..ff99668bd1efbf983e10de642eac2246c2c911b8 100644 (file)
@@ -108,30 +108,30 @@ typedef enum _eSMTP_C_States {
        eMaxSMTPC
 } eSMTP_C_States;
 
-const long SMTP_C_ConnTimeout = 60; /* wail 1 minute for connections... */
-const long SMTP_C_ReadTimeouts[eMaxSMTPC] = {
-       300, /* Greeting... */
-       30, /* EHLO */
-       30, /* HELO */
-       30, /* Auth */
-       30, /* From */
-       90, /* RCPT */
-       30, /* DATA */
-       90, /* DATABody */
-       0, /* end of body... */
-       30  /* QUIT */
+const double SMTP_C_ConnTimeout = 60.; /* wail 1 minute for connections... */
+const double SMTP_C_ReadTimeouts[eMaxSMTPC] = {
+       300., /* Greeting... */
+       30., /* EHLO */
+       30., /* HELO */
+       30., /* Auth */
+       30., /* From */
+       90., /* RCPT */
+       30., /* DATA */
+       90., /* DATABody */
+       90., /* end of body... */
+       30.  /* QUIT */
 };
-const long SMTP_C_SendTimeouts[eMaxSMTPC] = {
-       90, /* Greeting... */
-       30, /* EHLO */
-       30, /* HELO */
-       30, /* Auth */
-       30, /* From */
-       30, /* RCPT */
-       30, /* DATA */
-       90, /* DATABody */
-       900, /* end of body... */
-       30  /* QUIT */
+const double SMTP_C_SendTimeouts[eMaxSMTPC] = {
+       90., /* Greeting... */
+       30., /* EHLO */
+       30., /* HELO */
+       30., /* Auth */
+       30., /* From */
+       30., /* RCPT */
+       30., /* DATA */
+       90., /* DATABody */
+       900., /* end of body... */
+       30.  /* QUIT */
 };
 /*
 const long SMTP_C_SendTimeouts[eMaxSMTPC] = {
@@ -183,7 +183,7 @@ void DeleteSmtpOutMsg(void *v)
        
        FreeStrBuf(&Msg->msgtext);
        FreeAsyncIOContents(&Msg->IO);
-///    free(Msg);
+       free(Msg);
 }
 
 eNextState SMTP_C_Timeout(AsyncIO *IO);
@@ -244,15 +244,13 @@ void FinalizeMessageSend(SmtpOutMsg *Msg)
                        msg->cm_anon_type = MES_NORMAL;
                        msg->cm_format_type = FMT_RFC822;
                        msg->cm_fields['M'] = SmashStrBuf(&MsgData);
-                       /* Generate 'bounce' messages */
-                       smtp_do_bounce(msg->cm_fields['M'],
-                                      Msg->msgtext); 
-
                        CtdlSubmitMsg(msg, NULL, SMTP_SPOOLOUT_ROOM, QP_EADDR);
                        CtdlFreeMessage(msg);
                }
-               else 
+               else {
                        CtdlDeleteMessages(SMTP_SPOOLOUT_ROOM, &Msg->MyQItem->MessageID, 1, "");
+                       FreeStrBuf(&MsgData);
+               }
 
                RemoveQItem(Msg->MyQItem);
        }
@@ -705,7 +703,6 @@ eNextState SMTPC_send_data_body(SmtpOutMsg *SendMsg)
        Buf = SendMsg->IO.SendBuf.Buf;
        SendMsg->IO.SendBuf.Buf = SendMsg->msgtext;
        SendMsg->msgtext = Buf;
-       //// TODO timeout like that: (SendMsg->msg_size / 128) + 50);
        SendMsg->State ++;
 
        return eSendMore;
@@ -805,14 +802,25 @@ SMTPSendHandler SendHandlers[eMaxSMTPC] = {
 void SMTPSetTimeout(eNextState NextTCPState, SmtpOutMsg *pMsg)
 {
        CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__);
-       long Timeout;
+       double Timeout;
        switch (NextTCPState) {
        case eSendReply:
        case eSendMore:
                Timeout = SMTP_C_SendTimeouts[pMsg->State];
+               if (pMsg->State == eDATABody) {
+                       /* if we're sending a huge message, we need more time. */
+                       Timeout += StrLength(pMsg->msgtext) / 1024;
+               }
                break;
        case eReadMessage:
                Timeout = SMTP_C_ReadTimeouts[pMsg->State];
+               if (pMsg->State == eDATATerminateBody) {
+                       /* 
+                        * some mailservers take a nap before accepting the message
+                        * content inspection and such.
+                        */
+                       Timeout += StrLength(pMsg->msgtext) / 1024;
+               }
                break;
        case eTerminateConnection:
        case eAbort: