X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=59be13321d5baabc1badaef54f5a2d88285e3bf0;hb=a706083cbf719d326b15edd89ca01fc134492169;hp=8484cad4cfb9437e52682fcebce43655755e4bfe;hpb=03ca46c6f61467f62cb600d345383d70ca4fc953;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 8484cad4c..59be13321 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -65,15 +65,16 @@ #ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT -#include #include "event_client.h" extern int event_add_pipe[2]; extern citthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; +extern struct ev_loop *event_base; #define SEND_EVENT 1 #define RECV_EVENT 2 +#define CONN_EVENT 3 int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) { @@ -149,23 +150,22 @@ void ShutDownCLient(AsyncIO *IO) CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock); switch (IO->active_event) { case SEND_EVENT: - event_del(&IO->send_event); + ev_io_stop(event_base, &IO->send_event); break; case RECV_EVENT: - event_del(&IO->recv_event); + ev_io_stop(event_base, &IO->recv_event); + break; + case CONN_EVENT: + ev_io_stop(event_base, &IO->conn_event); break; case 0: // no event active here; just bail out. break; } + IO->active_event = 0; IO->Terminate(IO->Data); - -// citthread_mutex_lock(&EventQueueMutex); - -///QueueEvents /// todo remove from hash. - -// citthread_mutex_unlock(&EventQueueMutex); + } eReadState HandleInbound(AsyncIO *IO) @@ -198,7 +198,7 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - event_del(&IO->recv_event); + ev_io_stop(event_base, &IO->recv_event); IO->active_event = 0; IO->NextState = IO->ReadDone(IO->Data); Finished = StrBufCheckBuffer(&IO->RecvBuf); @@ -210,7 +210,8 @@ eReadState HandleInbound(AsyncIO *IO) (IO->NextState == eSendMore)) { IO->NextState = IO->SendDone(IO->Data); - event_add(&IO->send_event, NULL); + ev_io_start(event_base, &IO->send_event); + IO->active_event = SEND_EVENT; } @@ -222,23 +223,21 @@ eReadState HandleInbound(AsyncIO *IO) static void -IO_send_callback(int fd, short event, void *ctx) +IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; - AsyncIO *IO = ctx; - - (void)fd; - (void)event; + AsyncIO *IO = watcher->data; +/* CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", - (int) fd, + watcher->fd, (event&EV_TIMEOUT) ? " timeout" : "", (event&EV_READ) ? " read" : "", (event&EV_WRITE) ? " write" : "", (event&EV_SIGNAL) ? " signal" : ""); - +*/ /// assert(fd == IO->sock); - rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf); + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); if (rc == 0) { @@ -268,7 +267,7 @@ IO_send_callback(int fd, short event, void *ctx) fclose(fd); } #endif - event_del(&IO->send_event); + ev_io_stop(event_base, &IO->send_event); IO->active_event = 0; switch (IO->NextState) { case eSendReply: @@ -280,7 +279,8 @@ IO_send_callback(int fd, short event, void *ctx) (IO->NextState == eAbort) ) ShutDownCLient(IO); else { - event_add(&IO->send_event, NULL); + ev_io_start(event_base, &IO->send_event); + IO->active_event = SEND_EVENT; } break; @@ -289,11 +289,13 @@ IO_send_callback(int fd, short event, void *ctx) HandleInbound(IO); } else { - event_add(&IO->recv_event, NULL); + ev_io_start(event_base, &IO->recv_event); + IO->active_event = RECV_EVENT; } break; + case eTerminateConnection: case eAbort: break; } @@ -306,22 +308,23 @@ IO_send_callback(int fd, short event, void *ctx) static void -IO_recv_callback(int fd, short event, void *ctx) +IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { ssize_t nbytes; - AsyncIO *IO = ctx; + AsyncIO *IO = watcher->data; // assert(fd == IO->sock); // assert(fd == sb->fd); - +/* CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n", (int) fd, (event&EV_TIMEOUT) ? " timeout" : "", (event&EV_READ) ? " read" : "", (event&EV_WRITE) ? " write" : "", (event&EV_SIGNAL) ? " signal" : ""); - nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf); +*/ + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { @@ -334,11 +337,125 @@ IO_recv_callback(int fd, short event, void *ctx) } } + +static void +set_start_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + switch(IO->NextState) { + case eReadMessage: + ev_io_start(event_base, &IO->recv_event); + IO->active_event = RECV_EVENT; + break; + case eSendReply: + case eSendMore: + IO_send_callback(loop, watcher, revents); + break; + case eTerminateConnection: + case eAbort: + /// TODO: WHUT? + break; + } +} + +static void +IO_connect_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_io_stop(event_base, &IO->conn_event); + IO->active_event = 0; + set_start_callback(loop, watcher, revents); +} + +int event_connect_socket(AsyncIO *IO) +{ + int fdflags; + int rc = -1; + + IO->SendBuf.fd = + IO->RecvBuf.fd = + IO->sock = socket(IO->curr_ai->ai_family, + IO->curr_ai->ai_socktype, + IO->curr_ai->ai_protocol); + + if (IO->sock < 0) { + CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); +// freeaddrinfo(res); + return -1; + } + fdflags = fcntl(IO->sock, F_GETFL); + if (fdflags < 0) { + CtdlLogPrintf(CTDL_DEBUG, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + return -1; + } + fdflags = fdflags | O_NONBLOCK; + if (fcntl(IO->sock, F_SETFL, fdflags) < 0) { + CtdlLogPrintf(CTDL_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + close(IO->sock); + return -1; + } +/* TODO: maye we could use offsetof() to calc the position of data... + * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher + */ + ev_io_init(&IO->recv_event, IO_recv_callback, IO->sock, EV_READ); + IO->recv_event.data = IO; + ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); + IO->send_event.data = IO; + + rc = connect(IO->sock, IO->curr_ai->ai_addr, IO->curr_ai->ai_addrlen); + if (rc >= 0) { +//// freeaddrinfo(res); + ev_io_init(&IO->conn_event, IO_send_callback, IO->sock, EV_WRITE); + IO->conn_event.data = IO; + set_start_callback(event_base, &IO->conn_event, 0); + + return 0; + } + else if (errno == EINPROGRESS) { + ev_io_init(&IO->conn_event, IO_connect_callback, IO->sock, EV_READ|EV_WRITE); + IO->conn_event.data = IO; +/* TODO + + event_set(&IO->conn_event, + IO->sock, + EV_READ|EV_WRITE|EV_PERSIST, + IO_connect_callback, + IO); +*/ + ev_io_start(event_base, &IO->conn_event); + + IO->active_event = CONN_EVENT; + return 1; + } + else { + CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + close(IO->sock); + IO->curr_ai = IO->curr_ai->ai_next; + if (IO->curr_ai != NULL) + return event_connect_socket(IO); + else + return -1; + } +} + void InitEventIO(AsyncIO *IO, void *pData, IO_CallBack ReadDone, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack Timeout, + IO_CallBack ConnFail, + IO_CallBack CustomDNS, IO_LineReaderCallback LineReader, int ReadFirst) { @@ -348,26 +465,14 @@ void InitEventIO(AsyncIO *IO, IO->Terminate = Terminate; IO->LineReader = LineReader; - event_set(&IO->recv_event, - IO->sock, - EV_READ|EV_PERSIST, - IO_recv_callback, - IO); - - event_set(&IO->send_event, - IO->sock, - EV_WRITE|EV_PERSIST, - IO_send_callback, - IO); - if (ReadFirst) { IO->NextState = eReadMessage; - event_add(&IO->recv_event, NULL); - IO->active_event = RECV_EVENT; } else { IO->NextState = eSendReply; } + + event_connect_socket(IO); }