X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=15b5f44a7270b4d903a90f0bc88e944e4b356864;hb=72a4e9f304cff9f487b334f0d70f09142fee4183;hp=9f01e9a4aeddb193026e3411858c0b7e48a9718b;hpb=5e2e7b131ad76aeebc88ec3a0839219214f7386b;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 9f01e9a4a..15b5f44a7 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -72,17 +72,9 @@ #include "ctdl_module.h" static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); - static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, - int revents) -{ - AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); -} + int revents); /*------------------------------------------------------------------------------ @@ -127,15 +119,16 @@ void ShutDownDBCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); - assert(IO->Terminate); - IO->Terminate(IO); + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -185,6 +178,17 @@ extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { @@ -292,6 +296,37 @@ void ShutDownCLient(AsyncIO *IO) IO->Terminate(IO); } +void PostInbound(AsyncIO *IO) +{ + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + ev_io_start(event_base, &IO->send_event); + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + ShutDownCLient(IO); + break; + case eAbort: + ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: + break; + } +} eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; @@ -344,34 +379,8 @@ eReadState HandleInbound(AsyncIO *IO) } } - switch (IO->NextState) { - case eSendFile: - ev_io_start(event_base, &IO->send_event); - break; - case eSendReply: - case eSendMore: - assert(IO->SendDone); - IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); - break; - case eReadPayload: - case eReadMore: - case eReadFile: - ev_io_start(event_base, &IO->recv_event); - break; - case eTerminateConnection: - ShutDownCLient(IO); - break; - case eAbort: - ShutDownCLient(IO); - break; - case eSendDNSQuery: - case eReadDNSReply: - case eDBQuery: - case eConnect: - case eReadMessage: - break; - } + PostInbound(IO); + return Finished; } @@ -383,6 +392,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -540,6 +550,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -567,6 +578,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -598,6 +610,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -623,6 +636,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_io_stop(loop, &IO->conn_event); ev_timer_stop (event_base, &IO->conn_fail); set_start_callback(loop, IO, revents); @@ -634,6 +648,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -644,6 +659,10 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; @@ -685,15 +704,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + IO_Timeout_callback(loop, &IO->rw_timeout, revents); return; } else if (nbytes == -1) { // FD is gone. kick it. @@ -710,6 +721,7 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNS.Query->PostDNS); @@ -905,6 +917,7 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack SendDone, IO_CallBack ReadDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ConnFail, IO_CallBack Timeout, IO_CallBack ShutdownAbort) @@ -919,6 +932,7 @@ void InitIOStruct(AsyncIO *IO, IO->SendDone = SendDone; IO->ReadDone = ReadDone; IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; IO->Timeout = Timeout; @@ -942,6 +956,7 @@ int InitcURLIOStruct(AsyncIO *IO, const char* Desc, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ShutdownAbort) { IO->Data = Data; @@ -949,8 +964,9 @@ int InitcURLIOStruct(AsyncIO *IO, IO->CitContext = CloneContext(CC); ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; - IO->SendDone = SendDone; - IO->Terminate = Terminate; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->ShutdownAbort = ShutdownAbort; strcpy(IO->HttpReq.errdesc, Desc); @@ -971,10 +987,12 @@ void EV_backtrace(AsyncIO *IO) size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); strings = backtrace_symbols(stack_frames, size); for (i = 0; i < size; i++) { - if (strings != NULL) + if (strings != NULL) { EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); - else + } + else { EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } } free(strings); #endif