Refactoring: create central place to init AsyncIO
authorWilfried Goesgens <dothebart@citadel.org>
Sun, 25 Dec 2011 16:45:57 +0000 (17:45 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Sun, 25 Dec 2011 16:45:57 +0000 (17:45 +0100)
  - set the CC in the Queue Runners, so InitIOStruct() can use that
  - call InitIOStruct() to reduce duplicate code
  - InitIOStruct(): a central place knowing whats needed to be inside of AsyncIO.

citadel/event_client.c
citadel/event_client.h
citadel/modules/network/serv_networkclient.c
citadel/modules/pop3client/serv_pop3client.c
citadel/modules/smtp/serv_smtpeventclient.c

index 4a2f481a9cafc2412cc7c8fd2ed3b6d8e2a7e2aa..5f418fa6a33b396f2d76f41adb4ba875d06a2c85 100644 (file)
@@ -701,18 +701,16 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
 }
 
 
-eNextState EvConnectSock(AsyncIO *IO, 
-                        void *pData, 
-                        double conn_timeout, 
+eNextState EvConnectSock(AsyncIO *IO,
+                        double conn_timeout,
                         double first_rw_timeout,
                         int ReadFirst)
 {
-       int fdflags; 
+       int fdflags;
        int rc = -1;
 
-       IO->Data = pData;
        become_session(IO->CitContext);
-       
+
        if (ReadFirst) {
                IO->NextState = eReadMessage;
        }
@@ -720,36 +718,46 @@ eNextState EvConnectSock(AsyncIO *IO,
                IO->NextState = eSendReply;
        }
 
-       IO->SendBuf.fd = IO->RecvBuf.fd = 
+       IO->SendBuf.fd = IO->RecvBuf.fd =
                socket(
-                       (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, 
-                       SOCK_STREAM, 
+                       (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
+                       SOCK_STREAM,
                        IPPROTO_TCP);
 
        if (IO->SendBuf.fd < 0) {
-               EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno));
-               StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno));
+               EV_syslog(LOG_ERR,
+                         "EVENT: socket() failed: %s\n",
+                         strerror(errno));
+
+               StrBufPrintf(IO->ErrMsg,
+                            "Failed to create socket: %s",
+                            strerror(errno));
                return eAbort;
        }
        fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
        if (fdflags < 0) {
-               EV_syslog(LOG_DEBUG, 
+               EV_syslog(LOG_DEBUG,
                          "EVENT: unable to get socket flags! %s \n",
                          strerror(errno));
-               StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno));
+               StrBufPrintf(IO->ErrMsg,
+                            "Failed to get socket flags: %s",
+                            strerror(errno));
                return eAbort;
        }
        fdflags = fdflags | O_NONBLOCK;
        if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
-               EV_syslog(LOG_DEBUG, 
-                         "EVENT: unable to set socket nonblocking flags! %s \n",
-                         strerror(errno));
-               StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno));
+               EV_syslog(
+                       LOG_DEBUG,
+                       "EVENT: unable to set socket nonblocking flags! %s \n",
+                       strerror(errno));
+               StrBufPrintf(IO->ErrMsg,
+                            "Failed to set socket flags: %s",
+                            strerror(errno));
                close(IO->SendBuf.fd);
                IO->SendBuf.fd = IO->RecvBuf.fd = -1;
                return eAbort;
        }
-/* TODO: maye we could use offsetof() to calc the position of data... 
+/* TODO: maye we could use offsetof() to calc the position of data...
  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
  */
        ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
@@ -759,16 +767,20 @@ eNextState EvConnectSock(AsyncIO *IO,
 
        ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
        IO->conn_fail.data = IO;
-       ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0);
+       ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
        IO->rw_timeout.data = IO;
 
 
        /*  Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
 ///    ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1");
        if (IO->ConnectMe->IPv6)
-               rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6));
+               rc = connect(IO->SendBuf.fd,
+                            &IO->ConnectMe->Addr,
+                            sizeof(struct sockaddr_in6));
        else
-               rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in));
+               rc = connect(IO->SendBuf.fd,
+                            (struct sockaddr_in *)&IO->ConnectMe->Addr,
+                            sizeof(struct sockaddr_in));
 
        if (rc >= 0){
                EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
@@ -779,7 +791,11 @@ eNextState EvConnectSock(AsyncIO *IO,
        else if (errno == EINPROGRESS) {
                EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
 
-               ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE);
+               ev_io_init(&IO->conn_event,
+                          IO_connestd_callback,
+                          IO->SendBuf.fd,
+                          EV_READ|EV_WRITE);
+
                IO->conn_event.data = IO;
 
                ev_io_start(event_base, &IO->conn_event);
@@ -791,9 +807,11 @@ eNextState EvConnectSock(AsyncIO *IO,
                             IO_connfailimmediate_callback);
                IO->conn_fail_immediate.data = IO;
                ev_idle_start(event_base, &IO->conn_fail_immediate);
-               
+
                EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
-               StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno));
+               StrBufPrintf(IO->ErrMsg,
+                            "Failed to connect: %s",
+                            strerror(errno));
                return IO->NextState;
        }
        return IO->NextState;
@@ -806,8 +824,8 @@ void SetNextTimeout(AsyncIO *IO, double timeout)
 }
 
 
-eNextState ReAttachIO(AsyncIO *IO, 
-                     void *pData, 
+eNextState ReAttachIO(AsyncIO *IO,
+                     void *pData,
                      int ReadFirst)
 {
        IO->Data = pData;
@@ -823,3 +841,38 @@ eNextState ReAttachIO(AsyncIO *IO,
 
        return IO->NextState;
 }
+
+void InitIOStruct(AsyncIO *IO,
+                 void *Data,
+                 eNextState NextState,
+                 IO_LineReaderCallback LineReader,
+                 IO_CallBack DNS_Fail,
+                 IO_CallBack SendDone,
+                 IO_CallBack ReadDone,
+                 IO_CallBack Terminate,
+                 IO_CallBack ConnFail,
+                 IO_CallBack Timeout,
+                 IO_CallBack ShutdownAbort)
+{
+       IO->Data          = Data;
+
+       IO->CitContext    = CloneContext(CC);
+       ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
+
+       IO->NextState     = NextState;
+
+       IO->SendDone      = SendDone;
+       IO->ReadDone      = ReadDone;
+       IO->Terminate     = Terminate;
+       IO->LineReader    = LineReader;
+       IO->ConnFail      = ConnFail;
+       IO->Timeout       = Timeout;
+       IO->ShutdownAbort = ShutdownAbort;
+
+       IO->DNS.Fail      = DNS_Fail;
+
+       IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
+       IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
+       IO->IOBuf         = NewStrBuf();
+
+}
index c8a4bafe3e41eeb02220b9dfbed8c5ed992de30e..c949a6ce937c3517fe826b9db1463ce984c7ad41 100644 (file)
@@ -166,7 +166,6 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB);
 eNextState QueueCurlContext(AsyncIO *IO);
 
 eNextState EvConnectSock(AsyncIO *IO, 
-                        void *pData, 
                         double conn_timeout, 
                         double first_rw_timeout,
                         int ReadFirst);
@@ -201,6 +200,18 @@ int evcurl_init(AsyncIO *IO,
                 IO_CallBack Terminate, 
                IO_CallBack ShutdownAbort);
 
+void InitIOStruct(AsyncIO *IO,
+                 void *Data,
+                 eNextState NextState,
+                 IO_LineReaderCallback LineReader,
+                 IO_CallBack DNS_Fail,
+                 IO_CallBack SendDone,
+                 IO_CallBack ReadDone,
+                 IO_CallBack Terminate,
+                 IO_CallBack ConnFail,
+                 IO_CallBack Timeout,
+                 IO_CallBack ShutdownAbort);
+
 eNextState ReAttachIO(AsyncIO *IO, 
                      void *pData, 
                      int ReadFirst);
index d0e9e6d197afd729d7ab38660907affe0e1b971c..63c6e65412e2165b29eb2852dd1f8ace014eb7de 100644 (file)
@@ -789,40 +789,31 @@ eNextState nwc_connect_ip(AsyncIO *IO)
               ChrPtr(NW->host),
               ChrPtr(NW->port));
        
-       return EvConnectSock(IO, NW, 
-                            NWC_ConnTimeout, 
+       return EvConnectSock(IO,
+                            NWC_ConnTimeout,
                             NWC_ReadTimeouts[0],
                             1);
 }
 
 void RunNetworker(AsyncNetworker *NW)
 {
-       CitContext *SubC;
-
        ParseURL(&NW->IO.ConnectMe, NW->Url, 504);
 
-       NW->IO.Data          = NW;
-       NW->IO.SendDone      = NWC_DispatchWriteDone;
-       NW->IO.ReadDone      = NWC_DispatchReadDone;
-       NW->IO.Terminate     = NWC_Terminate;
-       NW->IO.LineReader    = NWC_ReadServerStatus;
-       NW->IO.ConnFail      = NWC_ConnFail;
-       NW->IO.DNS.Fail      = NWC_DNSFail;
-       NW->IO.Timeout       = NWC_Timeout;
-       NW->IO.ShutdownAbort = NWC_Shutdown;
-       
-       NW->IO.SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       NW->IO.RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       NW->IO.IOBuf         = NewStrBuf();
-       
-       NW->IO.NextState     = eReadMessage;
-       SubC = CloneContext (&networker_client_CC);
-       SubC->session_specific_data = (char*) NW;
-       NW->IO.CitContext = SubC;
-
-       safestrncpy(SubC->cs_host, 
+       InitIOStruct(&NW->IO,
+                    NW,
+                    eReadMessage,
+                    NWC_ReadServerStatus,
+                    NWC_DNSFail,
+                    NWC_DispatchWriteDone,
+                    NWC_DispatchReadDone,
+                    NWC_Terminate,
+                    NWC_ConnFail,
+                    NWC_Timeout,
+                    NWC_Shutdown);
+
+       safestrncpy(((CitContext *)NW->IO.CitContext)->cs_host, 
                    ChrPtr(NW->host),
-                   sizeof(SubC->cs_host)); 
+                   sizeof(((CitContext *)NW->IO.CitContext)->cs_host)); 
 
        if (NW->IO.ConnectMe->IsIP) {
                QueueEventContext(&NW->IO,
@@ -854,6 +845,8 @@ void network_poll_other_citadel_nodes(int full_poll, char *working_ignetcfg)
                syslog(LOG_DEBUG, "network: no neighbor nodes are configured - not polling.\n");
                return;
        }
+       become_session(&networker_client_CC);
+
        CfgData = NewStrBufPlain(working_ignetcfg, -1);
        Line = NewStrBufPlain(NULL, StrLength(CfgData));
        Done = 0;
index 8a0d23c38f6c64537d9d2f44573082169509a43a..32426e0c3c3d148e3bf542b2bf8bd4584b9adffb 100644 (file)
@@ -757,17 +757,10 @@ eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO)
 
 eNextState pop3_connect_ip(AsyncIO *IO)
 {
-       pop3aggr *cpptr = IO->Data;
-
        syslog(LOG_DEBUG, "POP3: %s\n", __FUNCTION__);
-       
-////   IO->ConnectMe = &cpptr->Pop3Host;
-       /*  Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
 
-       /////// SetConnectStatus(IO);
-
-       return EvConnectSock(IO, cpptr, 
-                            POP3_C_ConnTimeout, 
+       return EvConnectSock(IO,
+                            POP3_C_ConnTimeout,
                             POP3_C_ReadTimeouts[0],
                             1);
 }
@@ -843,41 +836,27 @@ eNextState pop3_get_one_host_ip(AsyncIO *IO)
 
 int pop3_do_fetching(pop3aggr *cpptr)
 {
-       CitContext *SubC;
-
-       cpptr->IO.Data          = cpptr;
-
-       cpptr->IO.SendDone      = POP3_C_DispatchWriteDone;
-       cpptr->IO.ReadDone      = POP3_C_DispatchReadDone;
-       cpptr->IO.Terminate     = POP3_C_Terminate;
-       cpptr->IO.LineReader    = POP3_C_ReadServerStatus;
-       cpptr->IO.ConnFail      = POP3_C_ConnFail;
-       cpptr->IO.DNS.Fail      = POP3_C_DNSFail;
-       cpptr->IO.Timeout       = POP3_C_Timeout;
-       cpptr->IO.ShutdownAbort = POP3_C_Shutdown;
-       
-       cpptr->IO.SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       cpptr->IO.RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       cpptr->IO.IOBuf         = NewStrBuf();
-       
-       cpptr->IO.NextState     = eReadMessage;
-/* TODO
-   syslog(LOG_DEBUG, "POP3: %s %s %s <password>\n", roomname, pop3host, pop3user);
-   syslog(LOG_DEBUG, "Connecting to <%s>\n", pop3host);
-*/
-       
-       SubC = CloneContext (&pop3_client_CC);
-       SubC->session_specific_data = (char*) cpptr;
-       cpptr->IO.CitContext = SubC;
-       safestrncpy(SubC->cs_host, 
+       InitIOStruct(&cpptr->IO,
+                    cpptr,
+                    eReadMessage,
+                    POP3_C_ReadServerStatus,
+                    POP3_C_DNSFail,
+                    POP3_C_DispatchWriteDone,
+                    POP3_C_DispatchReadDone,
+                    POP3_C_Terminate,
+                    POP3_C_ConnFail,
+                    POP3_C_Timeout,
+                    POP3_C_Shutdown);
+
+       safestrncpy(((CitContext *)cpptr->IO.CitContext)->cs_host,
                    ChrPtr(cpptr->Url),
-                   sizeof(SubC->cs_host)); 
+                   sizeof(((CitContext *)cpptr->IO.CitContext)->cs_host));
 
        if (cpptr->IO.ConnectMe->IsIP) {
                QueueEventContext(&cpptr->IO,
                                  pop3_connect_ip);
        }
-       else { /* uneducated admin has chosen to add DNS to the equation... */
+       else {
                QueueEventContext(&cpptr->IO,
                                  pop3_get_one_host_ip);
        }
@@ -1053,7 +1032,6 @@ static int doing_pop3client = 0;
 
 void pop3client_scan(void) {
        static time_t last_run = 0L;
-///    struct pop3aggr *pptr;
        time_t fastest_scan;
        HashPos *it;
        long len;
@@ -1061,6 +1039,8 @@ void pop3client_scan(void) {
        void *vrptr;
        pop3aggr *cptr;
 
+       become_session(&pop3_client_CC);
+
        if (config.c_pop3_fastest < config.c_pop3_fetch)
                fastest_scan = config.c_pop3_fastest;
        else
index af5d12d9467328f4ac9f2bb51fb32b07efcbd2a0..f3a1566bdefa8f83caac66159bbaa80c2eacf706 100644 (file)
@@ -210,7 +210,6 @@ eNextState FailOneAttempt(AsyncIO *IO)
 
 void SetConnectStatus(AsyncIO *IO)
 {
-       
        SmtpOutMsg *SendMsg = IO->Data;
        char buf[256];
        void *src;
@@ -258,14 +257,13 @@ eNextState mx_connect_ip(AsyncIO *IO)
        SmtpOutMsg *SendMsg = IO->Data;
 
        EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__);
-       
+
        IO->ConnectMe = SendMsg->pCurrRelay;
-       /*  Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
 
        SetConnectStatus(IO);
 
-       return EvConnectSock(IO, SendMsg, 
-                            SMTP_C_ConnTimeout, 
+       return EvConnectSock(IO,
+                            SMTP_C_ConnTimeout,
                             SMTP_C_ReadTimeouts[0],
                             1);
 }
@@ -437,7 +435,7 @@ eNextState resolve_mx_records(AsyncIO *IO)
  ******************************************************************************/
 
 SmtpOutMsg *new_smtp_outmsg(OneQueItem *MyQItem, 
-                           MailQEntry *MyQEntry, 
+                           MailQEntry *MyQEntry,
                            int MsgCount)
 {
        SmtpOutMsg * SendMsg;
@@ -450,51 +448,44 @@ SmtpOutMsg *new_smtp_outmsg(OneQueItem *MyQItem,
        SendMsg->MyQItem          = MyQItem;
        SendMsg->pCurrRelay       = MyQItem->URL;
 
-       SendMsg->IO.Data          = SendMsg;
-
-       SendMsg->IO.SendDone      = SMTP_C_DispatchWriteDone;
-       SendMsg->IO.ReadDone      = SMTP_C_DispatchReadDone;
-       SendMsg->IO.Terminate     = SMTP_C_Terminate;
-       SendMsg->IO.LineReader    = SMTP_C_ReadServerStatus;
-       SendMsg->IO.ConnFail      = SMTP_C_ConnFail;
-       SendMsg->IO.DNS.Fail      = SMTP_C_DNSFail;
-       SendMsg->IO.Timeout       = SMTP_C_Timeout;
-       SendMsg->IO.ShutdownAbort = SMTP_C_Shutdown;
-
-       SendMsg->IO.SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       SendMsg->IO.RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
-       SendMsg->IO.IOBuf         = NewStrBuf();
-
-       SendMsg->IO.NextState     = eReadMessage;
+       InitIOStruct(&SendMsg->IO,
+                    SendMsg,
+                    eReadMessage,
+                    SMTP_C_ReadServerStatus,
+                    SMTP_C_DNSFail,
+                    SMTP_C_DispatchWriteDone,
+                    SMTP_C_DispatchReadDone,
+                    SMTP_C_Terminate,
+                    SMTP_C_ConnFail,
+                    SMTP_C_Timeout,
+                    SMTP_C_Shutdown);
 
        return SendMsg;
 }
 
 void smtp_try_one_queue_entry(OneQueItem *MyQItem, 
-                             MailQEntry *MyQEntry, 
-                             StrBuf *MsgText, 
+                             MailQEntry *MyQEntry,
+                             StrBuf *MsgText,
                              int KeepMsgText,  /* KeepMsgText allows us to use MsgText as ours. */
                              int MsgCount)
 {
-       AsyncIO *IO;
        SmtpOutMsg *SendMsg;
 
        syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__);
 
        SendMsg = new_smtp_outmsg(MyQItem, MyQEntry, MsgCount);
-       IO = &SendMsg->IO;
        if (KeepMsgText) SendMsg->msgtext = MsgText;
-       else             SendMsg->msgtext = NewStrBufDup(MsgText);
-       
+       else             SendMsg->msgtext = NewStrBufDup(MsgText);
+
        if (smtp_resolve_recipients(SendMsg)) {
-               CitContext *SubC;
-               SubC = CloneContext (CC);
-               SubC->session_specific_data = (char*) SendMsg;
-               SendMsg->IO.CitContext = SubC;
-               
-               safestrncpy(SubC->cs_host, SendMsg->node, sizeof(SubC->cs_host));
+
+               safestrncpy(
+                       ((CitContext *)SendMsg->IO.CitContext)->cs_host,
+                       SendMsg->node,
+                       sizeof(((CitContext *)SendMsg->IO.CitContext)->cs_host));
+
                syslog(LOG_DEBUG, "SMTP Starting: [%ld] <%s> CC <%d> \n",
-                      SendMsg->MyQItem->MessageID, 
+                      SendMsg->MyQItem->MessageID,
                       ChrPtr(SendMsg->MyQEntry->Recipient),
                       ((CitContext*)SendMsg->IO.CitContext)->cs_pid);
                if (SendMsg->pCurrRelay == NULL)
@@ -513,10 +504,10 @@ void smtp_try_one_queue_entry(OneQueItem *MyQItem,
        }
        else {
                /* No recipients? well fail then. */
-               if ((SendMsg==NULL) || 
+               if ((SendMsg==NULL) ||
                    (SendMsg->MyQEntry == NULL)) {
                        SendMsg->MyQEntry->Status = 5;
-                       StrBufPlain(SendMsg->MyQEntry->StatusMessage, 
+                       StrBufPlain(SendMsg->MyQEntry->StatusMessage,
                                    HKEY("Invalid Recipient!"));
                }
                FinalizeMessageSend(SendMsg);
@@ -627,8 +618,6 @@ eNextState SMTP_C_ConnFail(AsyncIO *IO)
 }
 eNextState SMTP_C_DNSFail(AsyncIO *IO)
 {
-       SmtpOutMsg *pMsg = IO->Data;
-
        EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__);
        return FailOneAttempt(IO);
 }