X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=d0e0fd9aefe9f20295a8827fda8fb5dabcf3f26b;hb=b1493ab199a2cf39b919cb8d0536263ae1d280aa;hp=dbaafd4e89a3dacc7f7b8370d10aa7ad6651ed31;hpb=8990e921a64ea9d19ce355aad74d820f95c7ee9b;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index dbaafd4e8..d0e0fd9ae 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -69,17 +69,12 @@ #include "citadel_dirs.h" #include "event_client.h" +#include "ctdl_module.h" +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, - int revents) -{ - AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); -} + int revents); /*------------------------------------------------------------------------------ @@ -124,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); - assert(IO->Terminate); - IO->Terminate(IO); + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -182,6 +178,17 @@ extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { @@ -289,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO) IO->Terminate(IO); } +void PostInbound(AsyncIO *IO) +{ + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + ev_io_start(event_base, &IO->send_event); + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + ShutDownCLient(IO); + break; + case eAbort: + ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: + break; + } +} eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; @@ -341,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO) } } - switch (IO->NextState) { - case eSendFile: - ev_io_start(event_base, &IO->send_event); - break; - case eSendReply: - case eSendMore: - assert(IO->SendDone); - IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); - break; - case eReadPayload: - case eReadMore: - case eReadFile: - ev_io_start(event_base, &IO->recv_event); - break; - case eTerminateConnection: - ShutDownCLient(IO); - break; - case eAbort: - ShutDownCLient(IO); - break; - case eSendDNSQuery: - case eReadDNSReply: - case eDBQuery: - case eConnect: - case eReadMessage: - break; - } + PostInbound(IO); + return Finished; } @@ -380,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -440,6 +453,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendFile: if (IO->IOB.ChunkSendRemain > 0) { ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); + } else { assert(IO->ReadDone); IO->NextState = IO->ReadDone(IO); @@ -499,8 +514,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - assert(IO->Timeout); - IO->Timeout(IO); + IO_Timeout_callback(loop, &IO->rw_timeout, revents); } /* else : must write more. */ } @@ -536,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -563,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -594,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -619,6 +636,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_io_stop(loop, &IO->conn_event); ev_timer_stop (event_base, &IO->conn_fail); set_start_callback(loop, IO, revents); @@ -630,6 +648,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -640,6 +659,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; @@ -681,15 +704,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + IO_Timeout_callback(loop, &IO->rw_timeout, revents); return; } else if (nbytes == -1) { // FD is gone. kick it. @@ -706,6 +721,7 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNS.Query->PostDNS); @@ -731,6 +747,7 @@ eNextState EvConnectSock(AsyncIO *IO, double first_rw_timeout, int ReadFirst) { + struct sockaddr_in egress_sin; int fdflags; int rc = -1; @@ -799,19 +816,38 @@ eNextState EvConnectSock(AsyncIO *IO, IO->rw_timeout.data = IO; + + /* for debugging you may bypass it like this: * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = * inet_addr("127.0.0.1"); */ - if (IO->ConnectMe->IPv6) + if (IO->ConnectMe->IPv6) { rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); - else + } + else { + /* If citserver is bound to a specific IP address on the host, make + * sure we use that address for outbound connections. + */ + + memset(&egress_sin, 0, sizeof(egress_sin)); + egress_sin.sin_family = AF_INET; + if (!IsEmptyStr(config.c_ip_addr)) { + egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { + egress_sin.sin_addr.s_addr = INADDR_ANY; + } + + /* If this bind fails, no problem; we can still use INADDR_ANY */ + bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin)); + } rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + } if (rc >= 0){ EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); @@ -881,6 +917,7 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack SendDone, IO_CallBack ReadDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ConnFail, IO_CallBack Timeout, IO_CallBack ShutdownAbort) @@ -895,6 +932,7 @@ void InitIOStruct(AsyncIO *IO, IO->SendDone = SendDone; IO->ReadDone = ReadDone; IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; IO->Timeout = Timeout; @@ -918,6 +956,7 @@ int InitcURLIOStruct(AsyncIO *IO, const char* Desc, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ShutdownAbort) { IO->Data = Data; @@ -925,8 +964,9 @@ int InitcURLIOStruct(AsyncIO *IO, IO->CitContext = CloneContext(CC); ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; - IO->SendDone = SendDone; - IO->Terminate = Terminate; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->ShutdownAbort = ShutdownAbort; strcpy(IO->HttpReq.errdesc, Desc);