From: Wilfried Goesgens Date: Sat, 15 Jan 2011 19:57:34 +0000 (+0100) Subject: libev migration - timeouts seem to be working. X-Git-Tag: v8.11~1105 X-Git-Url: https://code.citadel.org/?p=citadel.git;a=commitdiff_plain;h=ddb2f5803b76a4a6120df8e845bcf165b3726f86 libev migration - timeouts seem to be working. - handle timeouts with libev timers --- diff --git a/citadel/event_client.c b/citadel/event_client.c index 9ad065422..f3730ca67 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -122,25 +122,6 @@ void FreeAsyncIOContents(AsyncIO *IO) ares_destroy(IO->DNSChannel); } -/* - static void - setup_signal_handlers(struct instance *instance) - { - signal(SIGPIPE, SIG_IGN); - - event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigterm_event, NULL); - - event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigint_event, NULL); - - event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigquit_event, NULL); - } -*/ void ShutDownCLient(AsyncIO *IO) { @@ -150,13 +131,15 @@ void ShutDownCLient(AsyncIO *IO) { ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->conn_fail); + ev_timer_stop (event_base, &IO->conn_timeout); close(IO->sock); IO->sock = 0; IO->SendBuf.fd = 0; IO->RecvBuf.fd = 0; } - IO->Terminate(IO->Data); + IO->Terminate(IO); } @@ -191,7 +174,7 @@ eReadState HandleInbound(AsyncIO *IO) if (Finished != eMustReadMore) { ev_io_stop(event_base, &IO->recv_event); - IO->NextState = IO->ReadDone(IO->Data); + IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); } } @@ -200,7 +183,7 @@ eReadState HandleInbound(AsyncIO *IO) if ((IO->NextState == eSendReply) || (IO->NextState == eSendMore)) { - IO->NextState = IO->SendDone(IO->Data); + IO->NextState = IO->SendDone(IO); ev_io_start(event_base, &IO->send_event); } else if ((IO->NextState == eTerminateConnection) || @@ -215,16 +198,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; -/* - CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", - watcher->fd, - (event&EV_TIMEOUT) ? " timeout" : "", - (event&EV_READ) ? " read" : "", - (event&EV_WRITE) ? " write" : "", - (event&EV_SIGNAL) ? " signal" : ""); -*/ -/// assert(fd == IO->sock); - + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); if (rc == 0) @@ -259,7 +233,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendReply: break; case eSendMore: - IO->NextState = IO->SendDone(IO->Data); + IO->NextState = IO->SendDone(IO); if ((IO->NextState == eTerminateConnection) || (IO->NextState == eAbort) ) @@ -321,12 +295,8 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void -set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents, int first_rw_timeout) +set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) { - ev_timer_init(&IO->conn_timeout, - IO_Timout_callback, - first_rw_timeout, 0); - IO->conn_timeout.data = IO; switch(IO->NextState) { case eReadMessage: @@ -386,6 +356,8 @@ IO->curr_ai->ai_family, IO->recv_event.data = IO; ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); IO->send_event.data = IO; + ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); + IO->conn_fail.data = IO; memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) ); @@ -402,23 +374,26 @@ IO->curr_ai->ai_family, sizeof(struct sockaddr_in)); if (rc >= 0){ //// freeaddrinfo(res); - set_start_callback(event_base, IO, 0, first_rw_timeout); + ev_timer_init(&IO->conn_timeout, IO_Timout_callback, first_rw_timeout, 0); + IO->conn_timeout.data = IO; + set_start_callback(event_base, IO, 0); + ev_timer_start(event_base, &IO->conn_timeout); return 0; } else if (errno == EINPROGRESS) { - set_start_callback(event_base, IO, 0, first_rw_timeout); - ev_timer_init(&IO->conn_fail, - IO_connfail_callback, - conn_timeout, 0); - IO->conn_fail.data = IO; + set_start_callback(event_base, IO, 0); + ev_timer_init(&IO->conn_timeout, + IO_Timout_callback, + conn_timeout + first_rw_timeout, 0); ev_timer_start(event_base, &IO->conn_fail); + ev_timer_start(event_base, &IO->conn_timeout); return 0; } else { CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); - close(IO->sock); +/// close(IO->sock); /* IO->curr_ai = IO->curr_ai->ai_next; if (IO->curr_ai != NULL) return event_connect_socket(IO); @@ -427,6 +402,14 @@ IO->curr_ai->ai_family, } } +void SetNextTimeout(AsyncIO *IO, int timeout) +{ + ev_timer_init(&IO->conn_timeout, + IO_Timout_callback, + timeout, 0); + ev_timer_again (event_base, &IO->conn_timeout); +} + void InitEventIO(AsyncIO *IO, void *pData, IO_CallBack ReadDone, diff --git a/citadel/event_client.h b/citadel/event_client.h index e6b41f3bd..c5761ffae 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -16,7 +16,7 @@ typedef enum _eNextState { }eNextState; typedef int (*EventContextAttach)(void *Data); -typedef eNextState (*IO_CallBack)(void *Data); +typedef eNextState (*IO_CallBack)(AsyncIO *IO); typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO); typedef void (*ParseDNSAnswerCb)(AsyncIO*, unsigned char*, int); typedef void (*FreeDNSReply)(void *DNSData); @@ -99,3 +99,5 @@ void InitEventIO(AsyncIO *IO, int QueueQuery(ns_type Type, char *name, AsyncIO *IO, IO_CallBack PostDNS); void StopClient(AsyncIO *IO); + +void SetNextTimeout(AsyncIO *IO, int timeout); diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index fc4dc20a6..f1f337128 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -110,6 +110,18 @@ typedef enum _eSMTP_C_States { const long SMTP_C_ConnTimeout = 60; /* wail 1 minute for connections... */ const long SMTP_C_ReadTimeouts[eMaxSMTPC] = { + 300, /* Greeting... */ + 30, /* EHLO */ + 30, /* HELO */ + 30, /* Auth */ + 30, /* From */ + 90, /* RCPT */ + 30, /* DATA */ + 90, /* DATABody */ + 0, /* end of body... */ + 30 /* QUIT */ +}; +const long SMTP_C_SendTimeouts[eMaxSMTPC] = { 90, /* Greeting... */ 30, /* EHLO */ 30, /* HELO */ @@ -125,16 +137,16 @@ const long SMTP_C_ReadTimeouts[eMaxSMTPC] = { const long SMTP_C_SendTimeouts[eMaxSMTPC] = { }; */ -const char *ReadErrors[eMaxSMTPC] = { - "Connection broken during SMTP conversation", - "Connection broken during SMTP EHLO", - "Connection broken during SMTP HELO", - "Connection broken during SMTP AUTH", - "Connection broken during SMTP MAIL FROM", - "Connection broken during SMTP RCPT", - "Connection broken during SMTP DATA", - "Connection broken during SMTP message transmit", - ""/* quit reply, don't care. */ +static const ConstStr ReadErrors[eMaxSMTPC] = { + {HKEY("Connection broken during SMTP conversation")}, + {HKEY("Connection broken during SMTP EHLO")}, + {HKEY("Connection broken during SMTP HELO")}, + {HKEY("Connection broken during SMTP AUTH")}, + {HKEY("Connection broken during SMTP MAIL FROM")}, + {HKEY("Connection broken during SMTP RCPT")}, + {HKEY("Connection broken during SMTP DATA")}, + {HKEY("Connection broken during SMTP message transmit")}, + {HKEY("")}/* quit reply, don't care. */ }; @@ -171,14 +183,14 @@ void DeleteSmtpOutMsg(void *v) FreeStrBuf(&Msg->msgtext); FreeAsyncIOContents(&Msg->IO); - free(Msg); +/// free(Msg); } -eNextState SMTP_C_Timeout(void *Data); -eNextState SMTP_C_ConnFail(void *Data); -eNextState SMTP_C_DispatchReadDone(void *Data); -eNextState SMTP_C_DispatchWriteDone(void *Data); -eNextState SMTP_C_Terminate(void *Data); +eNextState SMTP_C_Timeout(AsyncIO *IO); +eNextState SMTP_C_ConnFail(AsyncIO *IO); +eNextState SMTP_C_DispatchReadDone(AsyncIO *IO); +eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO); +eNextState SMTP_C_Terminate(AsyncIO *IO); eReadState SMTP_C_ReadServerStatus(AsyncIO *IO); typedef eNextState (*SMTPReadHandler)(SmtpOutMsg *Msg); @@ -207,6 +219,8 @@ typedef eNextState (*SMTPSendHandler)(SmtpOutMsg *Msg); void FinalizeMessageSend(SmtpOutMsg *Msg) { + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + if (DecreaseQReference(Msg->MyQItem)) { int nRemain; @@ -255,6 +269,7 @@ void get_one_mx_host_ip_done(void *Ctx, { AsyncIO *IO = Ctx; SmtpOutMsg *SendMsg = IO->Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); if ((status == ARES_SUCCESS) && (hostent != NULL) ) { unsigned long psaddr; // TODO: IPV6 @@ -262,7 +277,7 @@ void get_one_mx_host_ip_done(void *Ctx, psaddr = ntohl(psaddr); CtdlLogPrintf(CTDL_DEBUG, - "SMTP client[%ld]: connecting to %s [%d.%d.%d.%d:%d] ...\n", + "SMTP client[%ld]: connecting to %s [%ld.%ld.%ld.%ld:%d] ...\n", SendMsg->n, SendMsg->mx_host, (psaddr >> 24) & 0xFF, @@ -273,8 +288,13 @@ void get_one_mx_host_ip_done(void *Ctx, SendMsg->MyQEntry->Status = 5; StrBufPrintf(SendMsg->MyQEntry->StatusMessage, - "Timeout while connecting %s", - SendMsg->mx_host); + "Timeout while connecting %s [%ld.%ld.%ld.%ld:%d] ", + SendMsg->mx_host, + (psaddr >> 24) & 0xFF, + (psaddr >> 16) & 0xFF, + (psaddr >> 8) & 0xFF, + (psaddr >> 0) & 0xFF, + SendMsg->IO.dport); SendMsg->IO.HEnt = hostent; InitEventIO(IO, SendMsg, @@ -297,6 +317,7 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg) //char *endpart; //char buf[SIZ]; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); SendMsg->IO.dport = DefaultMXPort; @@ -328,7 +349,8 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg) CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: looking up %s : %d ...\n", SendMsg->n, - SendMsg->mx_host); + SendMsg->mx_host, + SendMsg->IO.dport); ares_gethostbyname(SendMsg->IO.DNSChannel, SendMsg->mx_host, @@ -338,11 +360,12 @@ void get_one_mx_host_ip(SmtpOutMsg *SendMsg) } -eNextState smtp_resolve_mx_done(void *data) +eNextState smtp_resolve_mx_done(AsyncIO *IO) { - AsyncIO *IO = data; SmtpOutMsg * SendMsg = IO->Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); SendMsg->IO.IOBuf = NewStrBuf(); @@ -359,6 +382,8 @@ int resolve_mx_records(void *Ctx) { SmtpOutMsg * SendMsg = Ctx; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + if (!QueueQuery(ns_t_mx, SendMsg->node, &SendMsg->IO, @@ -381,6 +406,8 @@ int smtp_resolve_recipients(SmtpOutMsg *SendMsg) int lp, rp; int i; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + if ((SendMsg==NULL) || (SendMsg->MyQEntry == NULL) || (StrLength(SendMsg->MyQEntry->Recipient) == 0)) { @@ -461,6 +488,8 @@ void smtp_try(OneQueItem *MyQItem, { SmtpOutMsg * SendMsg; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + SendMsg = (SmtpOutMsg *) malloc(sizeof(SmtpOutMsg)); memset(SendMsg, 0, sizeof(SmtpOutMsg)); SendMsg->IO.sock = (-1); @@ -772,40 +801,69 @@ SMTPSendHandler SendHandlers[eMaxSMTPC] = { SMTPC_send_terminate_data_body, SMTPC_send_QUIT }; -eNextState SMTP_C_DispatchReadDone(void *Data) + +void SMTPSetTimeout(eNextState NextTCPState, SmtpOutMsg *pMsg) +{ + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + long Timeout; + switch (NextTCPState) { + case eSendReply: + case eSendMore: + Timeout = SMTP_C_SendTimeouts[pMsg->State]; + break; + case eReadMessage: + Timeout = SMTP_C_ReadTimeouts[pMsg->State]; + break; + case eTerminateConnection: + case eAbort: + return; + } + SetNextTimeout(&pMsg->IO, Timeout); +} +eNextState SMTP_C_DispatchReadDone(AsyncIO *IO) { - SmtpOutMsg *pMsg = Data; - eNextState rc = ReadHandlers[pMsg->State](pMsg); + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + SmtpOutMsg *pMsg = IO->Data; + eNextState rc; + + rc = ReadHandlers[pMsg->State](pMsg); pMsg->State++; + SMTPSetTimeout(rc, pMsg); return rc; } -eNextState SMTP_C_DispatchWriteDone(void *Data) +eNextState SMTP_C_DispatchWriteDone(AsyncIO *IO) { - SmtpOutMsg *pMsg = Data; - return SendHandlers[pMsg->State](pMsg); + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); + SmtpOutMsg *pMsg = IO->Data; + eNextState rc; + + rc = SendHandlers[pMsg->State](pMsg); + SMTPSetTimeout(rc, pMsg); + return rc; } /*****************************************************************************/ /* SMTP CLIENT ERROR CATCHERS */ /*****************************************************************************/ -eNextState SMTP_C_Terminate(void *Data) +eNextState SMTP_C_Terminate(AsyncIO *IO) { - AsyncIO *IO = Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); SmtpOutMsg *pMsg = IO->Data; FinalizeMessageSend(pMsg); return 0; } -eNextState SMTP_C_Timeout(void *Data) +eNextState SMTP_C_Timeout(AsyncIO *IO) { - AsyncIO *IO = Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); SmtpOutMsg *pMsg = IO->Data; + StrBufPlain(IO->ErrMsg, CKEY(ReadErrors[pMsg->State])); FinalizeMessageSend(pMsg); return 0; } -eNextState SMTP_C_ConnFail(void *Data) +eNextState SMTP_C_ConnFail(AsyncIO *IO) { - AsyncIO *IO = Data; + CtdlLogPrintf(CTDL_DEBUG, "SMTP: %s\n", __FUNCTION__); SmtpOutMsg *pMsg = IO->Data; FinalizeMessageSend(pMsg); return 0;