X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=68995ddf7153401a26f8aa7222285959b33f16ca;hb=63dc1de06b047b4be691541935e98845457c4c04;hp=79a2f0f9c213feb6d4b8517f0de0d7cbef88aae3;hpb=aa7365c86de8e26e796d3aa3fd605c85d8c26220;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 79a2f0f9c..68995ddf7 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -64,6 +64,49 @@ #include "event_client.h" #include "ctdl_module.h" + +ConstStr IOStates[] = { + {HKEY("DB Queue")}, + {HKEY("DB Q Next")}, + {HKEY("DB Attach")}, + {HKEY("DB Next")}, + {HKEY("DB Stop")}, + {HKEY("DB Exit")}, + {HKEY("DB Terminate")}, + {HKEY("IO Queue")}, + {HKEY("IO Attach")}, + {HKEY("IO Connect Socket")}, + {HKEY("IO Abort")}, + {HKEY("IO Timeout")}, + {HKEY("IO ConnFail")}, + {HKEY("IO ConnFail Now")}, + {HKEY("IO Conn Now")}, + {HKEY("IO Conn Wait")}, + {HKEY("Curl Q")}, + {HKEY("Curl Start")}, + {HKEY("Curl Shotdown")}, + {HKEY("Curl More IO")}, + {HKEY("Curl Got IO")}, + {HKEY("Curl Got Data")}, + {HKEY("Curl Got Status")}, + {HKEY("C-Ares Start")}, + {HKEY("C-Ares IO Done")}, + {HKEY("C-Ares Finished")}, + {HKEY("C-Ares exit")}, + {HKEY("Killing")}, + {HKEY("Exit")} +}; + +void SetEVState(AsyncIO *IO, eIOState State) +{ + + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1); + +} + + static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, @@ -86,6 +129,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) IOAddHandler *h; int i; + SetEVState(IO, eDBQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; @@ -122,6 +166,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) void StopDBWatchers(AsyncIO *IO) { + SetEVState(IO, eDBStop); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); ev_idle_stop(event_db, &IO->db_unwind_stack); } @@ -131,6 +176,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); + SetEVState(IO, eDBTerm); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); StopDBWatchers(IO); @@ -142,6 +188,8 @@ void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eDBNext); IO->Now = ev_now(event_db); EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); @@ -175,6 +223,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) { + SetEVState(IO, eQDBNext); IO->NextDBOperation = CB; ev_idle_init(&IO->db_unwind_stack, DB_PerformNext); @@ -199,6 +248,8 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eIOAbort); EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); IO->Now = ev_now(event_base); assert(IO->ShutdownAbort); @@ -211,6 +262,7 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) IOAddHandler *h; int i; + SetEVState(IO, eIOQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; @@ -250,6 +302,7 @@ eNextState QueueCurlContext(AsyncIO *IO) IOAddHandler *h; int i; + SetEVState(IO, eCurlQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = evcurl_handle_start; @@ -298,6 +351,7 @@ void FreeAsyncIOContents(AsyncIO *IO) if (Ctx) { Ctx->state = CON_IDLE; Ctx->kill_me = 1; + IO->CitContext = NULL; } } @@ -315,8 +369,8 @@ void StopClientWatchers(AsyncIO *IO, int CloseFD) if (CloseFD && (IO->SendBuf.fd > 0)) { close(IO->SendBuf.fd); - IO->SendBuf.fd = -1; - IO->RecvBuf.fd = -1; + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; } } @@ -341,6 +395,8 @@ void StopCurlWatchers(AsyncIO *IO) void ShutDownCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; + + SetEVState(IO, eExit); become_session(Ctx); EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); @@ -369,7 +425,22 @@ void PostInbound(AsyncIO *IO) case eSendMore: assert(IO->SendDone); IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); + switch (IO->NextState) + { + case eSendFile: + case eSendReply: + case eSendMore: + case eReadMessage: + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->send_event); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + default: + break; + } break; case eReadPayload: case eReadMore: @@ -580,12 +651,12 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (errno != EAGAIN) { StopClientWatchers(IO, 1); EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! [%d] [%s] [%d]\n", + "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n", errno, strerror(errno), IO->SendBuf.fd); StrBufPrintf(IO->ErrMsg, "Socket Invalid! [%s]", strerror(errno)); - SetNextTimeout(IO, 0.0); + SetNextTimeout(IO, 0.01); } } /* else : must write more. */ @@ -626,6 +697,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOTimeout); IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -654,6 +726,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfail); IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); @@ -686,6 +759,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfailNow); IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); @@ -715,6 +789,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) socklen_t lon = sizeof(so_err); int err; + SetEVState(IO, eIOConnNow); IO->Now = ev_now(event_base); EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); @@ -805,19 +880,20 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - SetNextTimeout(IO, 0.0); + StopClientWatchers(IO, 1); + SetNextTimeout(IO, 0.01); return; } else if (nbytes == -1) { if (errno != EAGAIN) { // FD is gone. kick it. StopClientWatchers(IO, 1); EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! [%d] [%s] [%d]\n", + "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n", errno, strerror(errno), IO->SendBuf.fd); StrBufPrintf(IO->ErrMsg, "Socket Invalid! [%s]", strerror(errno)); - SetNextTimeout(IO, 0.0); + SetNextTimeout(IO, 0.01); } return; } @@ -827,6 +903,8 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eCaresFinished); IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); @@ -857,6 +935,7 @@ eNextState EvConnectSock(AsyncIO *IO, int fdflags; int rc = -1; + SetEVState(IO, eIOConnectSock); become_session(IO->CitContext); if (ReadFirst) { @@ -886,10 +965,12 @@ eNextState EvConnectSock(AsyncIO *IO, fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { EV_syslog(LOG_ERR, - "EVENT: unable to get socket flags! %s \n", + "EVENT: unable to get socket %d flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, - "Failed to get socket flags: %s", + "Failed to get socket %d flags: %s", + IO->SendBuf.fd, strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = 0; @@ -899,7 +980,8 @@ eNextState EvConnectSock(AsyncIO *IO, if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { EV_syslog( LOG_ERR, - "EVENT: unable to set socket nonblocking flags! %s \n", + "EVENT: unable to set socket %d nonblocking flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", @@ -956,12 +1038,14 @@ eNextState EvConnectSock(AsyncIO *IO, } if (rc >= 0){ - EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); + SetEVState(IO, eIOConnNow); + EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); set_start_callback(event_base, IO, 0); return IO->NextState; } else if (errno == EINPROGRESS) { - EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); + SetEVState(IO, eIOConnWait); + EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); ev_io_init(&IO->conn_event, IO_connestd_callback, @@ -975,12 +1059,17 @@ eNextState EvConnectSock(AsyncIO *IO, return IO->NextState; } else { + SetEVState(IO, eIOConnfail); ev_idle_init(&IO->conn_fail_immediate, IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + EV_syslog(LOG_ERR, + "connect() = %d failed: %s\n", + IO->SendBuf.fd, + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); @@ -1000,6 +1089,7 @@ eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst) { + SetEVState(IO, eIOAttach); IO->Data = pData; become_session(IO->CitContext); ev_cleanup_start(event_base, &IO->abort_by_shutdown); @@ -1030,7 +1120,8 @@ void InitIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; IO->NextState = NextState; @@ -1067,7 +1158,8 @@ int InitcURLIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; IO->SendDone = SendDone; IO->Terminate = Terminate; @@ -1081,6 +1173,90 @@ int InitcURLIOStruct(AsyncIO *IO, } + +typedef struct KillOtherSessionContext { + AsyncIO IO; + AsyncIO *OtherOne; +}KillOtherSessionContext; + +eNextState KillTerminate(AsyncIO *IO) +{ + long id; + KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data; + EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__); + id = IO->ID; + FreeAsyncIOContents(IO); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + IO->ID = id; /* just for the case we want to analyze it in a coredump */ + free(Ctx); + return eAbort; + +} + +eNextState KillShutdown(AsyncIO *IO) +{ + return eTerminateConnection; +} + +eNextState KillOtherContextNow(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx = IO->Data; + + SetEVState(IO, eKill); + + if (Ctx->OtherOne->ShutdownAbort != NULL) + Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + return eTerminateConnection; +} + +void KillAsyncIOContext(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx; + + Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext)); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + + InitIOStruct(&Ctx->IO, + Ctx, + eReadMessage, + NULL, + NULL, + NULL, + NULL, + KillTerminate, + NULL, + NULL, + NULL, + KillShutdown); + + Ctx->OtherOne = IO; + + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + QueueEventContext(&Ctx->IO, KillOtherContextNow); + break; + case eDBQuery: + QueueDBOperation(&Ctx->IO, KillOtherContextNow); + break; + case eTerminateConnection: + case eAbort: + /*hm, its already dying, dunno which Queue its in... */ + free(Ctx); + } + +} + extern int DebugEventLoopBacktrace; void EV_backtrace(AsyncIO *IO) {