From 7cced4381b0497cf3d99a489bbb1a4f5375ded32 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 25 Dec 2011 17:45:57 +0100 Subject: [PATCH] Refactoring: create central place to init AsyncIO - set the CC in the Queue Runners, so InitIOStruct() can use that - call InitIOStruct() to reduce duplicate code - InitIOStruct(): a central place knowing whats needed to be inside of AsyncIO. --- citadel/event_client.c | 105 ++++++++++++++----- citadel/event_client.h | 13 ++- citadel/modules/network/serv_networkclient.c | 43 ++++---- citadel/modules/pop3client/serv_pop3client.c | 58 ++++------ citadel/modules/smtp/serv_smtpeventclient.c | 67 +++++------- 5 files changed, 156 insertions(+), 130 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 4a2f481a9..5f418fa6a 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -701,18 +701,16 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) } -eNextState EvConnectSock(AsyncIO *IO, - void *pData, - double conn_timeout, +eNextState EvConnectSock(AsyncIO *IO, + double conn_timeout, double first_rw_timeout, int ReadFirst) { - int fdflags; + int fdflags; int rc = -1; - IO->Data = pData; become_session(IO->CitContext); - + if (ReadFirst) { IO->NextState = eReadMessage; } @@ -720,36 +718,46 @@ eNextState EvConnectSock(AsyncIO *IO, IO->NextState = eSendReply; } - IO->SendBuf.fd = IO->RecvBuf.fd = + IO->SendBuf.fd = IO->RecvBuf.fd = socket( - (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, - SOCK_STREAM, + (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, + SOCK_STREAM, IPPROTO_TCP); if (IO->SendBuf.fd < 0) { - EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); + EV_syslog(LOG_ERR, + "EVENT: socket() failed: %s\n", + strerror(errno)); + + StrBufPrintf(IO->ErrMsg, + "Failed to create socket: %s", + strerror(errno)); return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - EV_syslog(LOG_DEBUG, + EV_syslog(LOG_DEBUG, "EVENT: unable to get socket flags! %s \n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to get socket flags: %s", + strerror(errno)); return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { - EV_syslog(LOG_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + EV_syslog( + LOG_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to set socket flags: %s", + strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = -1; return eAbort; } -/* TODO: maye we could use offsetof() to calc the position of data... +/* TODO: maye we could use offsetof() to calc the position of data... * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher */ ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ); @@ -759,16 +767,20 @@ eNextState EvConnectSock(AsyncIO *IO, ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); IO->conn_fail.data = IO; - ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0); + ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0); IO->rw_timeout.data = IO; /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ /// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1"); if (IO->ConnectMe->IPv6) - rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); + rc = connect(IO->SendBuf.fd, + &IO->ConnectMe->Addr, + sizeof(struct sockaddr_in6)); else - rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + rc = connect(IO->SendBuf.fd, + (struct sockaddr_in *)&IO->ConnectMe->Addr, + sizeof(struct sockaddr_in)); if (rc >= 0){ EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); @@ -779,7 +791,11 @@ eNextState EvConnectSock(AsyncIO *IO, else if (errno == EINPROGRESS) { EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); - ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); + ev_io_init(&IO->conn_event, + IO_connestd_callback, + IO->SendBuf.fd, + EV_READ|EV_WRITE); + IO->conn_event.data = IO; ev_io_start(event_base, &IO->conn_event); @@ -791,9 +807,11 @@ eNextState EvConnectSock(AsyncIO *IO, IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - + EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to connect: %s", + strerror(errno)); return IO->NextState; } return IO->NextState; @@ -806,8 +824,8 @@ void SetNextTimeout(AsyncIO *IO, double timeout) } -eNextState ReAttachIO(AsyncIO *IO, - void *pData, +eNextState ReAttachIO(AsyncIO *IO, + void *pData, int ReadFirst) { IO->Data = pData; @@ -823,3 +841,38 @@ eNextState ReAttachIO(AsyncIO *IO, return IO->NextState; } + +void InitIOStruct(AsyncIO *IO, + void *Data, + eNextState NextState, + IO_LineReaderCallback LineReader, + IO_CallBack DNS_Fail, + IO_CallBack SendDone, + IO_CallBack ReadDone, + IO_CallBack Terminate, + IO_CallBack ConnFail, + IO_CallBack Timeout, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->NextState = NextState; + + IO->SendDone = SendDone; + IO->ReadDone = ReadDone; + IO->Terminate = Terminate; + IO->LineReader = LineReader; + IO->ConnFail = ConnFail; + IO->Timeout = Timeout; + IO->ShutdownAbort = ShutdownAbort; + + IO->DNS.Fail = DNS_Fail; + + IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->IOBuf = NewStrBuf(); + +} diff --git a/citadel/event_client.h b/citadel/event_client.h index c8a4bafe3..c949a6ce9 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -166,7 +166,6 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB); eNextState QueueCurlContext(AsyncIO *IO); eNextState EvConnectSock(AsyncIO *IO, - void *pData, double conn_timeout, double first_rw_timeout, int ReadFirst); @@ -201,6 +200,18 @@ int evcurl_init(AsyncIO *IO, IO_CallBack Terminate, IO_CallBack ShutdownAbort); +void InitIOStruct(AsyncIO *IO, + void *Data, + eNextState NextState, + IO_LineReaderCallback LineReader, + IO_CallBack DNS_Fail, + IO_CallBack SendDone, + IO_CallBack ReadDone, + IO_CallBack Terminate, + IO_CallBack ConnFail, + IO_CallBack Timeout, + IO_CallBack ShutdownAbort); + eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst); diff --git a/citadel/modules/network/serv_networkclient.c b/citadel/modules/network/serv_networkclient.c index d0e9e6d19..63c6e6541 100644 --- a/citadel/modules/network/serv_networkclient.c +++ b/citadel/modules/network/serv_networkclient.c @@ -789,40 +789,31 @@ eNextState nwc_connect_ip(AsyncIO *IO) ChrPtr(NW->host), ChrPtr(NW->port)); - return EvConnectSock(IO, NW, - NWC_ConnTimeout, + return EvConnectSock(IO, + NWC_ConnTimeout, NWC_ReadTimeouts[0], 1); } void RunNetworker(AsyncNetworker *NW) { - CitContext *SubC; - ParseURL(&NW->IO.ConnectMe, NW->Url, 504); - NW->IO.Data = NW; - NW->IO.SendDone = NWC_DispatchWriteDone; - NW->IO.ReadDone = NWC_DispatchReadDone; - NW->IO.Terminate = NWC_Terminate; - NW->IO.LineReader = NWC_ReadServerStatus; - NW->IO.ConnFail = NWC_ConnFail; - NW->IO.DNS.Fail = NWC_DNSFail; - NW->IO.Timeout = NWC_Timeout; - NW->IO.ShutdownAbort = NWC_Shutdown; - - NW->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); - NW->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); - NW->IO.IOBuf = NewStrBuf(); - - NW->IO.NextState = eReadMessage; - SubC = CloneContext (&networker_client_CC); - SubC->session_specific_data = (char*) NW; - NW->IO.CitContext = SubC; - - safestrncpy(SubC->cs_host, + InitIOStruct(&NW->IO, + NW, + eReadMessage, + NWC_ReadServerStatus, + NWC_DNSFail, + NWC_DispatchWriteDone, + NWC_DispatchReadDone, + NWC_Terminate, + NWC_ConnFail, + NWC_Timeout, + NWC_Shutdown); + + safestrncpy(((CitContext *)NW->IO.CitContext)->cs_host, ChrPtr(NW->host), - sizeof(SubC->cs_host)); + sizeof(((CitContext *)NW->IO.CitContext)->cs_host)); if (NW->IO.ConnectMe->IsIP) { QueueEventContext(&NW->IO, @@ -854,6 +845,8 @@ void network_poll_other_citadel_nodes(int full_poll, char *working_ignetcfg) syslog(LOG_DEBUG, "network: no neighbor nodes are configured - not polling.\n"); return; } + become_session(&networker_client_CC); + CfgData = NewStrBufPlain(working_ignetcfg, -1); Line = NewStrBufPlain(NULL, StrLength(CfgData)); Done = 0; diff --git a/citadel/modules/pop3client/serv_pop3client.c b/citadel/modules/pop3client/serv_pop3client.c index 8a0d23c38..32426e0c3 100644 --- a/citadel/modules/pop3client/serv_pop3client.c +++ b/citadel/modules/pop3client/serv_pop3client.c @@ -757,17 +757,10 @@ eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO) eNextState pop3_connect_ip(AsyncIO *IO) { - pop3aggr *cpptr = IO->Data; - syslog(LOG_DEBUG, "POP3: %s\n", __FUNCTION__); - -//// IO->ConnectMe = &cpptr->Pop3Host; - /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ - /////// SetConnectStatus(IO); - - return EvConnectSock(IO, cpptr, - POP3_C_ConnTimeout, + return EvConnectSock(IO, + POP3_C_ConnTimeout, POP3_C_ReadTimeouts[0], 1); } @@ -843,41 +836,27 @@ eNextState pop3_get_one_host_ip(AsyncIO *IO) int pop3_do_fetching(pop3aggr *cpptr) { - CitContext *SubC; - - cpptr->IO.Data = cpptr; - - cpptr->IO.SendDone = POP3_C_DispatchWriteDone; - cpptr->IO.ReadDone = POP3_C_DispatchReadDone; - cpptr->IO.Terminate = POP3_C_Terminate; - cpptr->IO.LineReader = POP3_C_ReadServerStatus; - cpptr->IO.ConnFail = POP3_C_ConnFail; - cpptr->IO.DNS.Fail = POP3_C_DNSFail; - cpptr->IO.Timeout = POP3_C_Timeout; - cpptr->IO.ShutdownAbort = POP3_C_Shutdown; - - cpptr->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); - cpptr->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); - cpptr->IO.IOBuf = NewStrBuf(); - - cpptr->IO.NextState = eReadMessage; -/* TODO - syslog(LOG_DEBUG, "POP3: %s %s %s \n", roomname, pop3host, pop3user); - syslog(LOG_DEBUG, "Connecting to <%s>\n", pop3host); -*/ - - SubC = CloneContext (&pop3_client_CC); - SubC->session_specific_data = (char*) cpptr; - cpptr->IO.CitContext = SubC; - safestrncpy(SubC->cs_host, + InitIOStruct(&cpptr->IO, + cpptr, + eReadMessage, + POP3_C_ReadServerStatus, + POP3_C_DNSFail, + POP3_C_DispatchWriteDone, + POP3_C_DispatchReadDone, + POP3_C_Terminate, + POP3_C_ConnFail, + POP3_C_Timeout, + POP3_C_Shutdown); + + safestrncpy(((CitContext *)cpptr->IO.CitContext)->cs_host, ChrPtr(cpptr->Url), - sizeof(SubC->cs_host)); + sizeof(((CitContext *)cpptr->IO.CitContext)->cs_host)); if (cpptr->IO.ConnectMe->IsIP) { QueueEventContext(&cpptr->IO, pop3_connect_ip); } - else { /* uneducated admin has chosen to add DNS to the equation... */ + else { QueueEventContext(&cpptr->IO, pop3_get_one_host_ip); } @@ -1053,7 +1032,6 @@ static int doing_pop3client = 0; void pop3client_scan(void) { static time_t last_run = 0L; -/// struct pop3aggr *pptr; time_t fastest_scan; HashPos *it; long len; @@ -1061,6 +1039,8 @@ void pop3client_scan(void) { void *vrptr; pop3aggr *cptr; + become_session(&pop3_client_CC); + if (config.c_pop3_fastest < config.c_pop3_fetch) fastest_scan = config.c_pop3_fastest; else diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index af5d12d94..f3a1566bd 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -210,7 +210,6 @@ eNextState FailOneAttempt(AsyncIO *IO) void SetConnectStatus(AsyncIO *IO) { - SmtpOutMsg *SendMsg = IO->Data; char buf[256]; void *src; @@ -258,14 +257,13 @@ eNextState mx_connect_ip(AsyncIO *IO) SmtpOutMsg *SendMsg = IO->Data; EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__); - + IO->ConnectMe = SendMsg->pCurrRelay; - /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ SetConnectStatus(IO); - return EvConnectSock(IO, SendMsg, - SMTP_C_ConnTimeout, + return EvConnectSock(IO, + SMTP_C_ConnTimeout, SMTP_C_ReadTimeouts[0], 1); } @@ -437,7 +435,7 @@ eNextState resolve_mx_records(AsyncIO *IO) ******************************************************************************/ SmtpOutMsg *new_smtp_outmsg(OneQueItem *MyQItem, - MailQEntry *MyQEntry, + MailQEntry *MyQEntry, int MsgCount) { SmtpOutMsg * SendMsg; @@ -450,51 +448,44 @@ SmtpOutMsg *new_smtp_outmsg(OneQueItem *MyQItem, SendMsg->MyQItem = MyQItem; SendMsg->pCurrRelay = MyQItem->URL; - SendMsg->IO.Data = SendMsg; - - SendMsg->IO.SendDone = SMTP_C_DispatchWriteDone; - SendMsg->IO.ReadDone = SMTP_C_DispatchReadDone; - SendMsg->IO.Terminate = SMTP_C_Terminate; - SendMsg->IO.LineReader = SMTP_C_ReadServerStatus; - SendMsg->IO.ConnFail = SMTP_C_ConnFail; - SendMsg->IO.DNS.Fail = SMTP_C_DNSFail; - SendMsg->IO.Timeout = SMTP_C_Timeout; - SendMsg->IO.ShutdownAbort = SMTP_C_Shutdown; - - SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.IOBuf = NewStrBuf(); - - SendMsg->IO.NextState = eReadMessage; + InitIOStruct(&SendMsg->IO, + SendMsg, + eReadMessage, + SMTP_C_ReadServerStatus, + SMTP_C_DNSFail, + SMTP_C_DispatchWriteDone, + SMTP_C_DispatchReadDone, + SMTP_C_Terminate, + SMTP_C_ConnFail, + SMTP_C_Timeout, + SMTP_C_Shutdown); return SendMsg; } void smtp_try_one_queue_entry(OneQueItem *MyQItem, - MailQEntry *MyQEntry, - StrBuf *MsgText, + MailQEntry *MyQEntry, + StrBuf *MsgText, int KeepMsgText, /* KeepMsgText allows us to use MsgText as ours. */ int MsgCount) { - AsyncIO *IO; SmtpOutMsg *SendMsg; syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__); SendMsg = new_smtp_outmsg(MyQItem, MyQEntry, MsgCount); - IO = &SendMsg->IO; if (KeepMsgText) SendMsg->msgtext = MsgText; - else SendMsg->msgtext = NewStrBufDup(MsgText); - + else SendMsg->msgtext = NewStrBufDup(MsgText); + if (smtp_resolve_recipients(SendMsg)) { - CitContext *SubC; - SubC = CloneContext (CC); - SubC->session_specific_data = (char*) SendMsg; - SendMsg->IO.CitContext = SubC; - - safestrncpy(SubC->cs_host, SendMsg->node, sizeof(SubC->cs_host)); + + safestrncpy( + ((CitContext *)SendMsg->IO.CitContext)->cs_host, + SendMsg->node, + sizeof(((CitContext *)SendMsg->IO.CitContext)->cs_host)); + syslog(LOG_DEBUG, "SMTP Starting: [%ld] <%s> CC <%d> \n", - SendMsg->MyQItem->MessageID, + SendMsg->MyQItem->MessageID, ChrPtr(SendMsg->MyQEntry->Recipient), ((CitContext*)SendMsg->IO.CitContext)->cs_pid); if (SendMsg->pCurrRelay == NULL) @@ -513,10 +504,10 @@ void smtp_try_one_queue_entry(OneQueItem *MyQItem, } else { /* No recipients? well fail then. */ - if ((SendMsg==NULL) || + if ((SendMsg==NULL) || (SendMsg->MyQEntry == NULL)) { SendMsg->MyQEntry->Status = 5; - StrBufPlain(SendMsg->MyQEntry->StatusMessage, + StrBufPlain(SendMsg->MyQEntry->StatusMessage, HKEY("Invalid Recipient!")); } FinalizeMessageSend(SendMsg); @@ -627,8 +618,6 @@ eNextState SMTP_C_ConnFail(AsyncIO *IO) } eNextState SMTP_C_DNSFail(AsyncIO *IO) { - SmtpOutMsg *pMsg = IO->Data; - EVS_syslog(LOG_DEBUG, "SMTP: %s\n", __FUNCTION__); return FailOneAttempt(IO); } -- 2.30.2