X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=4dca578f7232122e63c0ab301d7cd5b6e1cfc5c8;hb=2b1af802b6361c71b7d2376cc8b5812918beb47f;hp=6316a1c4ce809d9447116e25526a9dd90413996a;hpb=c12b418a64b44be9d08cae0e5dd25c988a522b90;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 6316a1c4c..4dca578f7 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -46,6 +46,8 @@ #include #include #include +#include + #include #include "citadel.h" #include "server.h" @@ -135,7 +137,7 @@ void ShutDownCLient(AsyncIO *IO) IO->SendBuf.fd = 0; IO->RecvBuf.fd = 0; } - + assert(IO->Terminate); IO->Terminate(IO); } @@ -144,7 +146,7 @@ void ShutDownCLient(AsyncIO *IO) eReadState HandleInbound(AsyncIO *IO) { eReadState Finished = eBufferNotEmpty; - + while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){ if (IO->RecvBuf.nBlobBytesWanted != 0) { @@ -171,6 +173,7 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { + assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); @@ -181,6 +184,7 @@ eReadState HandleInbound(AsyncIO *IO) if ((IO->NextState == eSendReply) || (IO->NextState == eSendMore)) { + assert(IO->SendDone); IO->NextState = IO->SendDone(IO); ev_io_start(event_base, &IO->send_event); } @@ -231,6 +235,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendReply: break; case eSendMore: + assert(IO->SendDone); IO->NextState = IO->SendDone(IO); if ((IO->NextState == eTerminateConnection) || @@ -254,8 +259,10 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) break; } } - else if (rc < 0) + else if (rc < 0) { + assert(IO->Timeout); IO->Timeout(IO); + } /* else : must write more. */ } static void @@ -283,6 +290,7 @@ IO_Timout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) AsyncIO *IO = watcher->data; ev_timer_stop (event_base, &IO->rw_timeout); + assert(IO->Timeout); IO->Timeout(IO); } static void @@ -292,6 +300,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) ev_timer_stop (event_base, &IO->conn_fail); ev_io_stop(loop, &IO->conn_event); + assert(IO->ConnFail); IO->ConnFail(IO); } static void @@ -313,6 +322,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { + assert(IO->Timeout); IO->Timeout(IO); /* this is a timeout... */ return; } else if (nbytes == -1) { @@ -323,7 +333,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) -int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) +eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) { int fdflags; int rc = -1; @@ -338,7 +348,7 @@ int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeo CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); // freeaddrinfo(res); - return -1; + return eAbort; } fdflags = fcntl(IO->sock, F_GETFL); if (fdflags < 0) { @@ -346,7 +356,7 @@ int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeo "EVENT: unable to get socket flags! %s \n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); - return -1; + return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->sock, F_SETFL, fdflags) < 0) { @@ -355,7 +365,7 @@ int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeo strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); close(IO->sock); - return -1; + return eAbort; } /* TODO: maye we could use offsetof() to calc the position of data... * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher @@ -369,17 +379,17 @@ int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeo IO->conn_fail.data = IO; ev_timer_init(&IO->rw_timeout, IO_Timout_callback, first_rw_timeout, 0); IO->rw_timeout.data = IO; - - rc = connect(IO->sock, - (struct sockaddr *) &IO->Addr, - (IO->IP6)? ///HEnt->h_addrtype == AF_INET6)? - sizeof(struct in6_addr): - sizeof(struct sockaddr_in)); + ///struct sockaddr_in *addr = &IO->Addr; + if (IO->IP6) + rc = connect(IO->sock, &IO->Addr, sizeof(struct in6_addr)); + else + rc = connect(IO->sock, (struct sockaddr_in *)&IO->Addr, sizeof(struct sockaddr_in)); + if (rc >= 0){ //// freeaddrinfo(res); set_start_callback(event_base, IO, 0); ev_timer_start(event_base, &IO->rw_timeout); - return 0; + return IO->NextState; } else if (errno == EINPROGRESS) { @@ -388,14 +398,16 @@ int event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeo ev_io_start(event_base, &IO->conn_event); ev_timer_start(event_base, &IO->conn_fail); - return 0; + return IO->NextState; } else { CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + assert(IO->ConnFail); IO->ConnFail(IO); - return -1; + return eAbort; } + return IO->NextState; } void SetNextTimeout(AsyncIO *IO, double timeout) @@ -404,11 +416,11 @@ void SetNextTimeout(AsyncIO *IO, double timeout) ev_timer_again (event_base, &IO->rw_timeout); } -void InitEventIO(AsyncIO *IO, - void *pData, - double conn_timeout, - double first_rw_timeout, - int ReadFirst) +eNextState InitEventIO(AsyncIO *IO, + void *pData, + double conn_timeout, + double first_rw_timeout, + int ReadFirst) { IO->Data = pData; @@ -420,5 +432,5 @@ void InitEventIO(AsyncIO *IO, } IO->IP6 = IO->HEnt->h_addrtype == AF_INET6; // IO->res = HEnt->h_addr_list[0]; - event_connect_socket(IO, conn_timeout, first_rw_timeout); + return event_connect_socket(IO, conn_timeout, first_rw_timeout); }