From 7cf6e5035af5106d7c9466e9e64e6b9439b841d3 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sun, 16 Jan 2011 18:46:56 +0100 Subject: [PATCH] Libev migration: - timeouts implemented & working - async connects working --- citadel/event_client.c | 99 +++++++++++---------- citadel/event_client.h | 9 +- citadel/modules/smtp/serv_smtpeventclient.c | 70 ++++++++------- 3 files changed, 95 insertions(+), 83 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index f3730ca67..0dd97d4b9 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -63,8 +63,6 @@ #include "locate_host.h" #include "citadel_dirs.h" -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT - #include "event_client.h" extern int event_add_pipe[2]; @@ -131,8 +129,7 @@ void ShutDownCLient(AsyncIO *IO) { ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); - ev_timer_stop (event_base, &IO->conn_fail); - ev_timer_stop (event_base, &IO->conn_timeout); + ev_timer_stop (event_base, &IO->rw_timeout); close(IO->sock); IO->sock = 0; IO->SendBuf.fd = 0; @@ -143,6 +140,7 @@ void ShutDownCLient(AsyncIO *IO) } + eReadState HandleInbound(AsyncIO *IO) { eReadState Finished = eBufferNotEmpty; @@ -260,12 +258,31 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->Timeout(IO); /* else : must write more. */ } +static void +set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) +{ + + switch(IO->NextState) { + case eReadMessage: + ev_io_start(event_base, &IO->recv_event); + break; + case eSendReply: + case eSendMore: + IO_send_callback(loop, &IO->send_event, revents); + break; + case eTerminateConnection: + case eAbort: + /// TODO: WHUT? + break; + } +} static void IO_Timout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + ev_timer_stop (event_base, &IO->rw_timeout); IO->Timeout(IO); } static void @@ -273,9 +290,20 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + ev_timer_stop (event_base, &IO->conn_fail); + ev_io_stop(loop, &IO->conn_event); IO->ConnFail(IO); } static void +IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_io_stop(loop, &IO->conn_event); + ev_timer_stop (event_base, &IO->conn_fail); + set_start_callback(loop, IO, revents); +} +static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { ssize_t nbytes; @@ -294,26 +322,8 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) } -static void -set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) -{ - - switch(IO->NextState) { - case eReadMessage: - ev_io_start(event_base, &IO->recv_event); - break; - case eSendReply: - case eSendMore: - IO_send_callback(loop, &IO->send_event, revents); - break; - case eTerminateConnection: - case eAbort: - /// TODO: WHUT? - break; - } -} -int event_connect_socket(AsyncIO *IO, int conn_timeout, int first_rw_timeout) +int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) { struct sockaddr_in saddr; int fdflags; @@ -356,14 +366,18 @@ IO->curr_ai->ai_family, IO->recv_event.data = IO; ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); IO->send_event.data = IO; + ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); IO->conn_fail.data = IO; + ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0); + IO->rw_timeout.data = IO; memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) ); memcpy(&saddr.sin_addr, IO->HEnt->h_addr_list[0], sizeof(struct in_addr)); + saddr.sin_addr.s_addr = inet_addr("127.0.0.1"); saddr.sin_family = AF_INET; saddr.sin_port = htons(IO->dport); @@ -374,40 +388,31 @@ IO->curr_ai->ai_family, sizeof(struct sockaddr_in)); if (rc >= 0){ //// freeaddrinfo(res); - ev_timer_init(&IO->conn_timeout, IO_Timout_callback, first_rw_timeout, 0); - IO->conn_timeout.data = IO; set_start_callback(event_base, IO, 0); - ev_timer_start(event_base, &IO->conn_timeout); + ev_timer_start(event_base, &IO->rw_timeout); return 0; } else if (errno == EINPROGRESS) { - set_start_callback(event_base, IO, 0); - ev_timer_init(&IO->conn_timeout, - IO_Timout_callback, - conn_timeout + first_rw_timeout, 0); + + ev_io_init(&IO->conn_event, IO_connestd_callback, IO->sock, EV_READ|EV_WRITE); + IO->conn_event.data = IO; + + ev_io_start(event_base, &IO->conn_event); ev_timer_start(event_base, &IO->conn_fail); - ev_timer_start(event_base, &IO->conn_timeout); - return 0; } else { CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); -/// close(IO->sock); -/* IO->curr_ai = IO->curr_ai->ai_next; - if (IO->curr_ai != NULL) - return event_connect_socket(IO); - else*/ - return -1; + IO->ConnFail(IO); + return -1; } } -void SetNextTimeout(AsyncIO *IO, int timeout) +void SetNextTimeout(AsyncIO *IO, double timeout) { - ev_timer_init(&IO->conn_timeout, - IO_Timout_callback, - timeout, 0); - ev_timer_again (event_base, &IO->conn_timeout); + IO->rw_timeout.repeat = timeout; + ev_timer_again (event_base, &IO->rw_timeout); } void InitEventIO(AsyncIO *IO, @@ -418,8 +423,8 @@ void InitEventIO(AsyncIO *IO, IO_CallBack Timeout, IO_CallBack ConnFail, IO_LineReaderCallback LineReader, - int conn_timeout, - int first_rw_timeout, + double conn_timeout, + double first_rw_timeout, int ReadFirst) { IO->Data = pData; @@ -428,6 +433,7 @@ void InitEventIO(AsyncIO *IO, IO->Terminate = Terminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; + IO->Timeout = Timeout; if (ReadFirst) { IO->NextState = eReadMessage; @@ -439,6 +445,3 @@ void InitEventIO(AsyncIO *IO, // IO->res = HEnt->h_addr_list[0]; event_connect_socket(IO, conn_timeout, first_rw_timeout); } - - -#endif diff --git a/citadel/event_client.h b/citadel/event_client.h index c5761ffae..208a71661 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -37,9 +37,10 @@ struct AsyncIO { eNextState NextState; ev_timer conn_fail, - conn_timeout; + rw_timeout; ev_io recv_event, - send_event; + send_event, + conn_event; StrBuf *ErrMsg; /* if we fail to connect, or lookup, error goes here. */ /* read/send related... */ @@ -93,11 +94,11 @@ void InitEventIO(AsyncIO *IO, IO_CallBack Timeout, IO_CallBack ConnFail, IO_LineReaderCallback LineReader, - int conn_timeout, int first_rw_timeout, + double conn_timeout, double first_rw_timeout, int ReadFirst); int QueueQuery(ns_type Type, char *name, AsyncIO *IO, IO_CallBack PostDNS); void StopClient(AsyncIO *IO); -void SetNextTimeout(AsyncIO *IO, int timeout); +void SetNextTimeout(AsyncIO *IO, double timeout); diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index f1f337128..ff99668bd 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -108,30 +108,30 @@ typedef enum _eSMTP_C_States { eMaxSMTPC } eSMTP_C_States; -const long SMTP_C_ConnTimeout = 60; /* wail 1 minute for connections... */ -const long SMTP_C_ReadTimeouts[eMaxSMTPC] = { - 300, /* Greeting... */ - 30, /* EHLO */ - 30, /* HELO */ - 30, /* Auth */ - 30, /* From */ - 90, /* RCPT */ - 30, /* DATA */ - 90, /* DATABody */ - 0, /* end of body... */ - 30 /* QUIT */ +const double SMTP_C_ConnTimeout = 60.; /* wail 1 minute for connections... */ +const double SMTP_C_ReadTimeouts[eMaxSMTPC] = { + 300., /* Greeting... */ + 30., /* EHLO */ + 30., /* HELO */ + 30., /* Auth */ + 30., /* From */ + 90., /* RCPT */ + 30., /* DATA */ + 90., /* DATABody */ + 90., /* end of body... */ + 30. /* QUIT */ }; -const long SMTP_C_SendTimeouts[eMaxSMTPC] = { - 90, /* Greeting... */ - 30, /* EHLO */ - 30, /* HELO */ - 30, /* Auth */ - 30, /* From */ - 30, /* RCPT */ - 30, /* DATA */ - 90, /* DATABody */ - 900, /* end of body... */ - 30 /* QUIT */ +const double SMTP_C_SendTimeouts[eMaxSMTPC] = { + 90., /* Greeting... */ + 30., /* EHLO */ + 30., /* HELO */ + 30., /* Auth */ + 30., /* From */ + 30., /* RCPT */ + 30., /* DATA */ + 90., /* DATABody */ + 900., /* end of body... */ + 30. /* QUIT */ }; /* const long SMTP_C_SendTimeouts[eMaxSMTPC] = { @@ -183,7 +183,7 @@ void DeleteSmtpOutMsg(void *v) FreeStrBuf(&Msg->msgtext); FreeAsyncIOContents(&Msg->IO); -/// free(Msg); + free(Msg); } eNextState SMTP_C_Timeout(AsyncIO *IO); @@ -244,15 +244,13 @@ void FinalizeMessageSend(SmtpOutMsg *Msg) msg->cm_anon_type = MES_NORMAL; msg->cm_format_type = FMT_RFC822; msg->cm_fields['M'] = SmashStrBuf(&MsgData); - /* Generate 'bounce' messages */ - smtp_do_bounce(msg->cm_fields['M'], - Msg->msgtext); - CtdlSubmitMsg(msg, NULL, SMTP_SPOOLOUT_ROOM, QP_EADDR); CtdlFreeMessage(msg); } - else + else { CtdlDeleteMessages(SMTP_SPOOLOUT_ROOM, &Msg->MyQItem->MessageID, 1, ""); + FreeStrBuf(&MsgData); + } RemoveQItem(Msg->MyQItem); } @@ -705,7 +703,6 @@ eNextState SMTPC_send_data_body(SmtpOutMsg *SendMsg) Buf = SendMsg->IO.SendBuf.Buf; SendMsg->IO.SendBuf.Buf = SendMsg->msgtext; SendMsg->msgtext = Buf; - //// TODO timeout like that: (SendMsg->msg_size / 128) + 50); SendMsg->State ++; return eSendMore; @@ -805,14 +802,25 @@ SMTPSendHandler SendHandlers[eMaxSMTPC] = { void SMTPSetTimeout(eNextState NextTCPState, SmtpOutMsg *pMsg) { CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); - long Timeout; + double Timeout; switch (NextTCPState) { case eSendReply: case eSendMore: Timeout = SMTP_C_SendTimeouts[pMsg->State]; + if (pMsg->State == eDATABody) { + /* if we're sending a huge message, we need more time. */ + Timeout += StrLength(pMsg->msgtext) / 1024; + } break; case eReadMessage: Timeout = SMTP_C_ReadTimeouts[pMsg->State]; + if (pMsg->State == eDATATerminateBody) { + /* + * some mailservers take a nap before accepting the message + * content inspection and such. + */ + Timeout += StrLength(pMsg->msgtext) / 1024; + } break; case eTerminateConnection: case eAbort: -- 2.30.2