X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=822c42ba532a9d35212291f41c1e683585b6f8f1;hb=aa8b1021b70043469d32645ea2961794fbccaa32;hp=19e0458367d43be834ec212d07162c6132ea2140;hpb=7da250e1c674a7f08080afe2a3b95d2c51617500;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 19e045836..822c42ba5 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -71,16 +71,10 @@ #include "event_client.h" #include "ctdl_module.h" +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, - int revents) -{ - AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); -} + int revents); /*------------------------------------------------------------------------------ @@ -125,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); - assert(IO->Terminate); - IO->Terminate(IO); + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -183,6 +178,17 @@ extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { @@ -290,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO) IO->Terminate(IO); } +void PostInbound(AsyncIO *IO) +{ + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + ev_io_start(event_base, &IO->send_event); + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + ShutDownCLient(IO); + break; + case eAbort: + ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: + break; + } +} eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; @@ -342,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO) } } - switch (IO->NextState) { - case eSendFile: - ev_io_start(event_base, &IO->send_event); - break; - case eSendReply: - case eSendMore: - assert(IO->SendDone); - IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); - break; - case eReadPayload: - case eReadMore: - case eReadFile: - ev_io_start(event_base, &IO->recv_event); - break; - case eTerminateConnection: - ShutDownCLient(IO); - break; - case eAbort: - ShutDownCLient(IO); - break; - case eSendDNSQuery: - case eReadDNSReply: - case eDBQuery: - case eConnect: - case eReadMessage: - break; - } + PostInbound(IO); + return Finished; } @@ -381,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -441,6 +453,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendFile: if (IO->IOB.ChunkSendRemain > 0) { ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); + } else { assert(IO->ReadDone); IO->NextState = IO->ReadDone(IO); @@ -500,8 +514,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - assert(IO->Timeout); - IO->Timeout(IO); + IO_Timeout_callback(loop, &IO->rw_timeout, revents); } /* else : must write more. */ } @@ -537,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -564,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -595,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -620,6 +636,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_io_stop(loop, &IO->conn_event); ev_timer_stop (event_base, &IO->conn_fail); set_start_callback(loop, IO, revents); @@ -631,6 +648,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -641,6 +659,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; @@ -682,15 +704,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + IO_Timeout_callback(loop, &IO->rw_timeout, revents); return; } else if (nbytes == -1) { // FD is gone. kick it. @@ -707,6 +721,7 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNS.Query->PostDNS); @@ -764,7 +779,7 @@ eNextState EvConnectSock(AsyncIO *IO, } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - EV_syslog(LOG_DEBUG, + EV_syslog(LOG_ERR, "EVENT: unable to get socket flags! %s \n", strerror(errno)); StrBufPrintf(IO->ErrMsg, @@ -777,7 +792,7 @@ eNextState EvConnectSock(AsyncIO *IO, fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { EV_syslog( - LOG_DEBUG, + LOG_ERR, "EVENT: unable to set socket nonblocking flags! %s \n", strerror(errno)); StrBufPrintf(IO->ErrMsg, @@ -902,6 +917,7 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack SendDone, IO_CallBack ReadDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ConnFail, IO_CallBack Timeout, IO_CallBack ShutdownAbort) @@ -916,6 +932,7 @@ void InitIOStruct(AsyncIO *IO, IO->SendDone = SendDone; IO->ReadDone = ReadDone; IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; IO->Timeout = Timeout; @@ -939,6 +956,7 @@ int InitcURLIOStruct(AsyncIO *IO, const char* Desc, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ShutdownAbort) { IO->Data = Data; @@ -946,8 +964,9 @@ int InitcURLIOStruct(AsyncIO *IO, IO->CitContext = CloneContext(CC); ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; - IO->SendDone = SendDone; - IO->Terminate = Terminate; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->ShutdownAbort = ShutdownAbort; strcpy(IO->HttpReq.errdesc, Desc); @@ -957,6 +976,7 @@ int InitcURLIOStruct(AsyncIO *IO, } +extern int DebugEventLoopBacktrace; void EV_backtrace(AsyncIO *IO) { #ifdef HAVE_BACKTRACE @@ -964,14 +984,17 @@ void EV_backtrace(AsyncIO *IO) size_t size, i; char **strings; - + if ((IO == NULL) || (DebugEventLoopBacktrace == 0)) + return; size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); strings = backtrace_symbols(stack_frames, size); for (i = 0; i < size; i++) { - if (strings != NULL) + if (strings != NULL) { EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); - else + } + else { EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } } free(strings); #endif