From a706083cbf719d326b15edd89ca01fc134492169 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sat, 1 Jan 2011 19:14:51 +0100 Subject: [PATCH] libev migration - make connect() async; add wrappers in event_client to abstract it to our clients - switch libevent -> libev - start abstraction for dns asynchronisation via c-ares --- citadel/event_client.c | 185 ++++++++++++++---- citadel/event_client.h | 46 ++++- .../modules/eventclient/serv_eventclient.c | 30 +-- citadel/modules/smtp/serv_smtpeventclient.c | 53 +++-- 4 files changed, 236 insertions(+), 78 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index 8484cad4c..59be13321 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -65,15 +65,16 @@ #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT -#include #include "event_client.h" extern int event_add_pipe[2]; extern citthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; +extern struct ev_loop *event_base; #define SEND_EVENT 1 #define RECV_EVENT 2 +#define CONN_EVENT 3 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) { @@ -149,23 +150,22 @@ void ShutDownCLient(AsyncIO *IO) CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock); switch (IO->active_event) { case SEND_EVENT: - event_del(&IO->send_event); + ev_io_stop(event_base, &IO->send_event); break; case RECV_EVENT: - event_del(&IO->recv_event); + ev_io_stop(event_base, &IO->recv_event); + break; + case CONN_EVENT: + ev_io_stop(event_base, &IO->conn_event); break; case 0: // no event active here; just bail out. break; } + IO->active_event = 0; IO->Terminate(IO->Data); - -// citthread_mutex_lock(&EventQueueMutex); - -///QueueEvents /// todo remove from hash. - -// citthread_mutex_unlock(&EventQueueMutex); + } eReadState HandleInbound(AsyncIO *IO) @@ -198,7 +198,7 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - event_del(&IO->recv_event); + ev_io_stop(event_base, &IO->recv_event); IO->active_event = 0; IO->NextState = IO->ReadDone(IO->Data); Finished = StrBufCheckBuffer(&IO->RecvBuf); @@ -210,7 +210,8 @@ eReadState HandleInbound(AsyncIO *IO) (IO->NextState == eSendMore)) { IO->NextState = IO->SendDone(IO->Data); - event_add(&IO->send_event, NULL); + ev_io_start(event_base, &IO->send_event); + IO->active_event = SEND_EVENT; } @@ -222,23 +223,21 @@ eReadState HandleInbound(AsyncIO *IO) static void -IO_send_callback(int fd, short event, void *ctx) +IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; - AsyncIO *IO = ctx; - - (void)fd; - (void)event; + AsyncIO *IO = watcher->data; +/* CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", - (int) fd, + watcher->fd, (event&EV_TIMEOUT) ? " timeout" : "", (event&EV_READ) ? " read" : "", (event&EV_WRITE) ? " write" : "", (event&EV_SIGNAL) ? " signal" : ""); - +*/ /// assert(fd == IO->sock); - rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf); + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); if (rc == 0) { @@ -268,7 +267,7 @@ IO_send_callback(int fd, short event, void *ctx) fclose(fd); } #endif - event_del(&IO->send_event); + ev_io_stop(event_base, &IO->send_event); IO->active_event = 0; switch (IO->NextState) { case eSendReply: @@ -280,7 +279,8 @@ IO_send_callback(int fd, short event, void *ctx) (IO->NextState == eAbort) ) ShutDownCLient(IO); else { - event_add(&IO->send_event, NULL); + ev_io_start(event_base, &IO->send_event); + IO->active_event = SEND_EVENT; } break; @@ -289,11 +289,13 @@ IO_send_callback(int fd, short event, void *ctx) HandleInbound(IO); } else { - event_add(&IO->recv_event, NULL); + ev_io_start(event_base, &IO->recv_event); + IO->active_event = RECV_EVENT; } break; + case eTerminateConnection: case eAbort: break; } @@ -306,22 +308,23 @@ IO_send_callback(int fd, short event, void *ctx) static void -IO_recv_callback(int fd, short event, void *ctx) +IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { ssize_t nbytes; - AsyncIO *IO = ctx; + AsyncIO *IO = watcher->data; // assert(fd == IO->sock); // assert(fd == sb->fd); - +/* CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n", (int) fd, (event&EV_TIMEOUT) ? " timeout" : "", (event&EV_READ) ? " read" : "", (event&EV_WRITE) ? " write" : "", (event&EV_SIGNAL) ? " signal" : ""); - nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf); +*/ + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { @@ -334,11 +337,125 @@ IO_recv_callback(int fd, short event, void *ctx) } } + +static void +set_start_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + switch(IO->NextState) { + case eReadMessage: + ev_io_start(event_base, &IO->recv_event); + IO->active_event = RECV_EVENT; + break; + case eSendReply: + case eSendMore: + IO_send_callback(loop, watcher, revents); + break; + case eTerminateConnection: + case eAbort: + /// TODO: WHUT? + break; + } +} + +static void +IO_connect_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_io_stop(event_base, &IO->conn_event); + IO->active_event = 0; + set_start_callback(loop, watcher, revents); +} + +int event_connect_socket(AsyncIO *IO) +{ + int fdflags; + int rc = -1; + + IO->SendBuf.fd = + IO->RecvBuf.fd = + IO->sock = socket(IO->curr_ai->ai_family, + IO->curr_ai->ai_socktype, + IO->curr_ai->ai_protocol); + + if (IO->sock < 0) { + CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); +// freeaddrinfo(res); + return -1; + } + fdflags = fcntl(IO->sock, F_GETFL); + if (fdflags < 0) { + CtdlLogPrintf(CTDL_DEBUG, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + return -1; + } + fdflags = fdflags | O_NONBLOCK; + if (fcntl(IO->sock, F_SETFL, fdflags) < 0) { + CtdlLogPrintf(CTDL_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + close(IO->sock); + return -1; + } +/* TODO: maye we could use offsetof() to calc the position of data... + * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher + */ + ev_io_init(&IO->recv_event, IO_recv_callback, IO->sock, EV_READ); + IO->recv_event.data = IO; + ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); + IO->send_event.data = IO; + + rc = connect(IO->sock, IO->curr_ai->ai_addr, IO->curr_ai->ai_addrlen); + if (rc >= 0) { +//// freeaddrinfo(res); + ev_io_init(&IO->conn_event, IO_send_callback, IO->sock, EV_WRITE); + IO->conn_event.data = IO; + set_start_callback(event_base, &IO->conn_event, 0); + + return 0; + } + else if (errno == EINPROGRESS) { + ev_io_init(&IO->conn_event, IO_connect_callback, IO->sock, EV_READ|EV_WRITE); + IO->conn_event.data = IO; +/* TODO + + event_set(&IO->conn_event, + IO->sock, + EV_READ|EV_WRITE|EV_PERSIST, + IO_connect_callback, + IO); +*/ + ev_io_start(event_base, &IO->conn_event); + + IO->active_event = CONN_EVENT; + return 1; + } + else { + CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + close(IO->sock); + IO->curr_ai = IO->curr_ai->ai_next; + if (IO->curr_ai != NULL) + return event_connect_socket(IO); + else + return -1; + } +} + void InitEventIO(AsyncIO *IO, void *pData, IO_CallBack ReadDone, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack Timeout, + IO_CallBack ConnFail, + IO_CallBack CustomDNS, IO_LineReaderCallback LineReader, int ReadFirst) { @@ -348,26 +465,14 @@ void InitEventIO(AsyncIO *IO, IO->Terminate = Terminate; IO->LineReader = LineReader; - event_set(&IO->recv_event, - IO->sock, - EV_READ|EV_PERSIST, - IO_recv_callback, - IO); - - event_set(&IO->send_event, - IO->sock, - EV_WRITE|EV_PERSIST, - IO_send_callback, - IO); - if (ReadFirst) { IO->NextState = eReadMessage; - event_add(&IO->recv_event, NULL); - IO->active_event = RECV_EVENT; } else { IO->NextState = eSendReply; } + + event_connect_socket(IO); } diff --git a/citadel/event_client.h b/citadel/event_client.h index 34023a3c0..0e6cae953 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -1,4 +1,7 @@ -#include +#include +#include +#include +#include typedef struct AsyncIO AsyncIO; @@ -15,16 +18,40 @@ typedef eNextState (*IO_CallBack)(void *Data); typedef eReadState (*IO_LineReaderCallback)(AsyncIO *IO); struct AsyncIO { + StrBuf *Host; + char service[32]; + + /* To cycle through several possible services... */ + struct addrinfo *res; + struct addrinfo *curr_ai; + + /* connection related */ int sock; int active_event; - struct event recv_event, send_event; - IOBuffer SendBuf, RecvBuf; - IO_LineReaderCallback LineReader; - IO_CallBack ReadDone, SendDone, Terminate; - StrBuf *IOBuf; - void *Data; - DeleteHashDataFunc DeleteData; /* data is expected to contain AsyncIO... */ eNextState NextState; + ev_io recv_event, + send_event, + conn_event; + StrBuf *ErrMsg; /* if we fail to connect, or lookup, error goes here. */ + + /* read/send related... */ + StrBuf *IOBuf; + IOBuffer SendBuf, + RecvBuf; + + /* Citadel application callbacks... */ + IO_CallBack ReadDone, /* Theres new data to read... */ + SendDone, /* we may send more data */ + Terminate, /* shutting down... */ + Timeout, /* Timeout handler; may also be connection timeout */ + ConnFail, /* What to do when one connection failed? */ + CustomDNS; /* If the application wants to do custom dns functionality like cycle through different MX-Records */ + + IO_LineReaderCallback LineReader; /* if we have linereaders, maybe we want to read more lines before the real application logic is called? */ + + /* Custom data; its expected to contain AsyncIO so we can save malloc()s... */ + DeleteHashDataFunc DeleteData; /* so if we have to destroy you, what to do... */ + void *Data; /* application specific data */ }; typedef struct _IOAddHandler { @@ -42,5 +69,8 @@ void InitEventIO(AsyncIO *IO, IO_CallBack ReadDone, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack Timeout, + IO_CallBack ConnFail, + IO_CallBack CustomDNS, IO_LineReaderCallback LineReader, int ReadFirst); diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 68a45c725..29fb5bb51 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -65,10 +65,10 @@ HashList *QueueEvents = NULL; HashList *InboundEventQueue = NULL; HashList *InboundEventQueues[2] = { NULL, NULL }; -static struct event_base *event_base; -struct event queue_add_event; +struct ev_loop *event_base; +struct ev_io queue_add_event; -static void QueueEventAddCallback(int fd, short event, void *ctx) +static void QueueEventAddCallback(struct ev_loop *loop, ev_io *watcher, int revents) { char buf[10]; HashList *q; @@ -78,7 +78,7 @@ static void QueueEventAddCallback(int fd, short event, void *ctx) const char *Key; /* get the control command... */ - read(fd, buf, 1); + read(watcher->fd, buf, 1); switch (buf[0]) { case '+': citthread_mutex_lock(&EventQueueMutex); @@ -104,10 +104,10 @@ static void QueueEventAddCallback(int fd, short event, void *ctx) /// TODO: add it to QueueEvents break; case 'x': - event_del(&queue_add_event); + /////event_del(&queue_add_event); close(event_add_pipe[0]); /// TODO; flush QueueEvents fd's and delete it. - event_base_loopexit(event_base, NULL); + ev_io_stop(event_base, NULL); } /* Unblock the other side */ // read(fd, buf, 1); @@ -119,7 +119,7 @@ void InitEventQueue(void) { struct rlimit LimitSet; - event_base = event_init(); + event_base = ev_default_loop(0); /* base = event_base_new(); if (!base) @@ -128,7 +128,7 @@ void InitEventQueue(void) citthread_mutex_init(&EventQueueMutex, NULL); if (pipe(event_add_pipe) != 0) { - CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libevent queueing: %s\n", strerror(errno)); + CtdlLogPrintf(CTDL_EMERG, "Unable to create pipe for libev queueing: %s\n", strerror(errno)); abort(); } LimitSet.rlim_cur = 1; @@ -141,17 +141,18 @@ void InitEventQueue(void) InboundEventQueue = InboundEventQueues[0]; } /* - * this thread operates the select() etc. via libevent. + * this thread operates the select() etc. via libev. * * */ void *client_event_thread(void *arg) { struct CitContext libevent_client_CC; + CtdlFillSystemContext(&libevent_client_CC, "LibEvent Thread"); // citthread_setspecific(MyConKey, (void *)&smtp_queue_CC); CtdlLogPrintf(CTDL_DEBUG, "client_event_thread() initializing\n"); - +/* event_set(&queue_add_event, event_add_pipe[0], EV_READ|EV_PERSIST, @@ -159,11 +160,16 @@ void *client_event_thread(void *arg) NULL); event_add(&queue_add_event, NULL); +*/ + ev_io_init(&queue_add_event, QueueEventAddCallback, event_add_pipe[0], EV_READ); + ev_io_start(event_base, &queue_add_event); + + event_base = ev_default_loop (EVFLAG_AUTO); +/// ev_loop(event_base, 0); - event_dispatch(); CtdlClearSystemContext(); - event_base_free(event_base); + ev_default_destroy (); citthread_mutex_destroy(&EventQueueMutex); return(NULL); } diff --git a/citadel/modules/smtp/serv_smtpeventclient.c b/citadel/modules/smtp/serv_smtpeventclient.c index 2904e297f..5a4a51439 100644 --- a/citadel/modules/smtp/serv_smtpeventclient.c +++ b/citadel/modules/smtp/serv_smtpeventclient.c @@ -220,9 +220,12 @@ void DeleteSmtpOutMsg(void *v) free(Msg); } +eNextState SMTP_C_Timeout(void *Data); +eNextState SMTP_C_ConnFail(void *Data); eNextState SMTP_C_DispatchReadDone(void *Data); eNextState SMTP_C_DispatchWriteDone(void *Data); eNextState SMTP_C_Terminate(void *Data); +eNextState SMTP_C_MXLookup(void *Data); typedef eNextState (*SMTPReadHandler)(SmtpOutMsg *Msg); typedef eNextState (*SMTPSendHandler)(SmtpOutMsg *Msg); @@ -627,27 +630,20 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld]: connecting [%s:%s]!\n", SendMsg->n, SendMsg->mx_host, SendMsg->mx_port); + SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); + SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + SendMsg->IO.IOBuf = NewStrBuf(); + SendMsg->IO.ErrMsg = SendMsg->MyQEntry->StatusMessage; + + SendMsg->IO.SendBuf.fd = SendMsg->IO.RecvBuf.fd = SendMsg->IO.sock = sock_connect(SendMsg->mx_host, SendMsg->mx_port); StrBufPrintf(SendMsg->MyQEntry->StatusMessage, "Could not connect: %s", strerror(errno)); - if (SendMsg->IO.sock >= 0) - { - CtdlLogPrintf(CTDL_DEBUG, "SMTP client[%ld:%ld]: connected!\n", SendMsg->n, SendMsg->IO.sock); - int fdflags; - fdflags = fcntl(SendMsg->IO.sock, F_GETFL); - if (fdflags < 0) - CtdlLogPrintf(CTDL_DEBUG, - "SMTP client[%ld]: unable to get socket flags! %s \n", - SendMsg->n, strerror(errno)); - fdflags = fdflags | O_NONBLOCK; - if (fcntl(SendMsg->IO.sock, F_SETFL, fdflags) < 0) - CtdlLogPrintf(CTDL_DEBUG, - "SMTP client[%ld]: unable to set socket nonblocking flags! %s \n", - SendMsg->n, strerror(errno)); - } + + if (SendMsg->IO.sock < 0) { if (errno > 0) { StrBufPlain(SendMsg->MyQEntry->StatusMessage, @@ -667,18 +663,19 @@ int connect_one_smtpsrv_xamine_result(void *Ctx) } - SendMsg->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); - SendMsg->IO.IOBuf = NewStrBuf(); InitEventIO(&SendMsg->IO, SendMsg, SMTP_C_DispatchReadDone, SMTP_C_DispatchWriteDone, SMTP_C_Terminate, + SMTP_C_Timeout, + SMTP_C_ConnFail, + SMTP_C_MXLookup, SMTP_C_ReadServerStatus, 1); return 0; } + eNextState SMTPC_read_greeting(SmtpOutMsg *SendMsg) { /* Process the SMTP greeting from the server */ @@ -1289,6 +1286,21 @@ eNextState SMTP_C_Terminate(void *Data) FinalizeMessageSend(pMsg); } + +eNextState SMTP_C_Timeout(void *Data) +{ + SmtpOutMsg *pMsg = Data; + FinalizeMessageSend(pMsg); + +} + +eNextState SMTP_C_ConnFail(void *Data) +{ + SmtpOutMsg *pMsg = Data; + FinalizeMessageSend(pMsg); + +} + eNextState SMTP_C_DispatchReadDone(void *Data) { SmtpOutMsg *pMsg = Data; @@ -1304,6 +1316,11 @@ eNextState SMTP_C_DispatchWriteDone(void *Data) } +eNextState SMTP_C_MXLookup(void *Data) +{ + +} + #endif CTDL_MODULE_INIT(smtp_eventclient) -- 2.30.2