X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=0b40edcf4c9ea29ac619c0e4741792fab01f8a8b;hb=57b110370156d70f589924e130fd9c5a56296000;hp=0b6ffabb315f131cb133f7ee0ce2de936d845c58;hpb=35dd960d37f8d5dfcc9572a5606fa281d0036e2d;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 0b6ffabb3..0b40edcf4 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -69,17 +69,12 @@ #include "citadel_dirs.h" #include "event_client.h" +#include "ctdl_module.h" +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, - int revents) -{ - AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); -} + int revents); /*------------------------------------------------------------------------------ @@ -124,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); - assert(IO->Terminate); - IO->Terminate(IO); + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -182,6 +178,17 @@ extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { @@ -289,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO) IO->Terminate(IO); } +void PostInbound(AsyncIO *IO) +{ + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + ev_io_start(event_base, &IO->send_event); + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + ShutDownCLient(IO); + break; + case eAbort: + ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: + break; + } +} eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; @@ -341,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO) } } - switch (IO->NextState) { - case eSendFile: - ev_io_start(event_base, &IO->send_event); - break; - case eSendReply: - case eSendMore: - assert(IO->SendDone); - IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); - break; - case eReadPayload: - case eReadMore: - case eReadFile: - ev_io_start(event_base, &IO->recv_event); - break; - case eTerminateConnection: - ShutDownCLient(IO); - break; - case eAbort: - ShutDownCLient(IO); - break; - case eSendDNSQuery: - case eReadDNSReply: - case eDBQuery: - case eConnect: - case eReadMessage: - break; - } + PostInbound(IO); + return Finished; } @@ -380,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -440,6 +453,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendFile: if (IO->IOB.ChunkSendRemain > 0) { ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); + } else { assert(IO->ReadDone); IO->NextState = IO->ReadDone(IO); @@ -499,8 +514,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - assert(IO->Timeout); - IO->Timeout(IO); + IO_Timeout_callback(loop, &IO->rw_timeout, revents); } /* else : must write more. */ } @@ -536,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -563,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -594,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -619,6 +636,9 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); + EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); + ev_io_stop(loop, &IO->conn_event); ev_timer_stop (event_base, &IO->conn_fail); set_start_callback(loop, IO, revents); @@ -630,6 +650,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -640,6 +661,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; @@ -681,15 +706,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + IO_Timeout_callback(loop, &IO->rw_timeout, revents); return; } else if (nbytes == -1) { // FD is gone. kick it. @@ -706,13 +723,14 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - assert(IO->DNS.Fail); assert(IO->DNS.Query->PostDNS); switch (IO->DNS.Query->PostDNS(IO)) { case eAbort: + assert(IO->DNS.Fail); switch (IO->DNS.Fail(IO)) { case eAbort: //// StopClientWatchers(IO); @@ -731,6 +749,7 @@ eNextState EvConnectSock(AsyncIO *IO, double first_rw_timeout, int ReadFirst) { + struct sockaddr_in egress_sin; int fdflags; int rc = -1; @@ -762,7 +781,7 @@ eNextState EvConnectSock(AsyncIO *IO, } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - EV_syslog(LOG_DEBUG, + EV_syslog(LOG_ERR, "EVENT: unable to get socket flags! %s \n", strerror(errno)); StrBufPrintf(IO->ErrMsg, @@ -775,7 +794,7 @@ eNextState EvConnectSock(AsyncIO *IO, fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { EV_syslog( - LOG_DEBUG, + LOG_ERR, "EVENT: unable to set socket nonblocking flags! %s \n", strerror(errno)); StrBufPrintf(IO->ErrMsg, @@ -799,19 +818,38 @@ eNextState EvConnectSock(AsyncIO *IO, IO->rw_timeout.data = IO; + + /* for debugging you may bypass it like this: * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = * inet_addr("127.0.0.1"); */ - if (IO->ConnectMe->IPv6) + if (IO->ConnectMe->IPv6) { rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); - else + } + else { + /* If citserver is bound to a specific IP address on the host, make + * sure we use that address for outbound connections. + */ + + memset(&egress_sin, 0, sizeof(egress_sin)); + egress_sin.sin_family = AF_INET; + if (!IsEmptyStr(config.c_ip_addr)) { + egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { + egress_sin.sin_addr.s_addr = INADDR_ANY; + } + + /* If this bind fails, no problem; we can still use INADDR_ANY */ + bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin)); + } rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + } if (rc >= 0){ EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); @@ -881,6 +919,7 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack SendDone, IO_CallBack ReadDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ConnFail, IO_CallBack Timeout, IO_CallBack ShutdownAbort) @@ -895,6 +934,7 @@ void InitIOStruct(AsyncIO *IO, IO->SendDone = SendDone; IO->ReadDone = ReadDone; IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; IO->Timeout = Timeout; @@ -918,6 +958,7 @@ int InitcURLIOStruct(AsyncIO *IO, const char* Desc, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ShutdownAbort) { IO->Data = Data; @@ -925,8 +966,9 @@ int InitcURLIOStruct(AsyncIO *IO, IO->CitContext = CloneContext(CC); ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; - IO->SendDone = SendDone; - IO->Terminate = Terminate; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->ShutdownAbort = ShutdownAbort; strcpy(IO->HttpReq.errdesc, Desc); @@ -936,6 +978,7 @@ int InitcURLIOStruct(AsyncIO *IO, } +extern int DebugEventLoopBacktrace; void EV_backtrace(AsyncIO *IO) { #ifdef HAVE_BACKTRACE @@ -943,15 +986,24 @@ void EV_backtrace(AsyncIO *IO) size_t size, i; char **strings; - + if ((IO == NULL) || (DebugEventLoopBacktrace == 0)) + return; size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); strings = backtrace_symbols(stack_frames, size); for (i = 0; i < size; i++) { - if (strings != NULL) + if (strings != NULL) { EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); - else + } + else { EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } } free(strings); #endif } + + +ev_tstamp ctdl_ev_now (void) +{ + return ev_now(event_base); +}