X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=4ab63a4a292bbd1edb08e490b195698d75decddf;hb=4c274aa94a53d8850ed363e64c5863970fe629a4;hp=6b39829f9285f736507e952c66ad463df21faef4;hpb=1fec17eb5ce7c785521d4a860a475e18400520f7;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 6b39829f9..4ab63a4a2 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -9,61 +9,68 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ - #include "sysdep.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#if TIME_WITH_SYS_TIME -# include -# include -#else -# if HAVE_SYS_TIME_H -# include -# else -# include -# endif -#endif -#include -#include +#include #include -#include +#include +#include #include #include #include -#include #if HAVE_BACKTRACE #include #endif #include -#include "citadel.h" -#include "server.h" + +#include "ctdl_module.h" +#include "event_client.h" #include "citserver.h" -#include "support.h" #include "config.h" -#include "control.h" -#include "user_ops.h" -#include "database.h" -#include "msgbase.h" -#include "internet_addressing.h" -#include "genstamp.h" -#include "domain.h" -#include "clientsocket.h" -#include "locate_host.h" -#include "citadel_dirs.h" -#include "event_client.h" -#include "ctdl_module.h" +ConstStr IOStates[] = { + {HKEY("DB Queue")}, + {HKEY("DB Q Next")}, + {HKEY("DB Attach")}, + {HKEY("DB Next")}, + {HKEY("DB Stop")}, + {HKEY("DB Exit")}, + {HKEY("DB Terminate")}, + {HKEY("IO Queue")}, + {HKEY("IO Attach")}, + {HKEY("IO Connect Socket")}, + {HKEY("IO Abort")}, + {HKEY("IO Timeout")}, + {HKEY("IO ConnFail")}, + {HKEY("IO ConnFail Now")}, + {HKEY("IO Conn Now")}, + {HKEY("IO Conn Wait")}, + {HKEY("Curl Q")}, + {HKEY("Curl Start")}, + {HKEY("Curl Shotdown")}, + {HKEY("Curl More IO")}, + {HKEY("Curl Got IO")}, + {HKEY("Curl Got Data")}, + {HKEY("Curl Got Status")}, + {HKEY("C-Ares Start")}, + {HKEY("C-Ares IO Done")}, + {HKEY("C-Ares Finished")}, + {HKEY("C-Ares exit")}, + {HKEY("Killing")}, + {HKEY("Exit")} +}; + +void SetEVState(AsyncIO *IO, eIOState State) +{ + + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1); +} + +eNextState QueueAnEventContext(AsyncIO *IO); static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, @@ -81,14 +88,18 @@ extern struct ev_loop *event_db; extern ev_async DBAddJob; extern ev_async DBExitEventLoop; -eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +eNextState QueueAnDBOperation(AsyncIO *IO) { IOAddHandler *h; int i; + SetEVState(IO, eDBQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; @@ -116,12 +127,13 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) ev_async_send (event_db, &DBAddJob); pthread_mutex_unlock(&DBEventExitQueueMutex); - EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } void StopDBWatchers(AsyncIO *IO) { + SetEVState(IO, eDBStop); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); ev_idle_stop(event_db, &IO->db_unwind_stack); } @@ -131,6 +143,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); + SetEVState(IO, eDBTerm); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); StopDBWatchers(IO); @@ -142,7 +155,9 @@ void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_db); + + SetEVState(IO, eDBNext); + SET_EV_TIME(IO, event_db); EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); @@ -151,12 +166,15 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) assert(IO->NextDBOperation); switch (IO->NextDBOperation(IO)) { + case eSendReply: + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + QueueAnEventContext(IO); + break; case eDBQuery: break; case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: case eSendMore: case eSendFile: case eReadMessage: @@ -175,6 +193,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) { + SetEVState(IO, eQDBNext); IO->NextDBOperation = CB; ev_idle_init(&IO->db_unwind_stack, DB_PerformNext); @@ -199,21 +218,28 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eIOAbort); EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); assert(IO->ShutdownAbort); IO->ShutdownAbort(IO); } -eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +eNextState QueueAnEventContext(AsyncIO *IO) { IOAddHandler *h; int i; + SetEVState(IO, eIOQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; + ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; @@ -243,6 +269,25 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) return eSendReply; } +eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs) +{ + StopClientWatchers(IO, CloseFDs); + IO->ReAttachCB = CB; + return eDBQuery; +} +eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + StopDBWatchers(IO); + IO->ReAttachCB = CB; + return eSendReply; +} + +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + IO->ReAttachCB = CB; + return QueueAnEventContext(IO); +} + extern eNextState evcurl_handle_start(AsyncIO *IO); eNextState QueueCurlContext(AsyncIO *IO) @@ -250,6 +295,7 @@ eNextState QueueCurlContext(AsyncIO *IO) IOAddHandler *h; int i; + SetEVState(IO, eCurlQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = evcurl_handle_start; @@ -281,7 +327,14 @@ eNextState QueueCurlContext(AsyncIO *IO) return eSendReply; } -void DestructCAres(AsyncIO *IO); +eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + StopCurlWatchers(IO); + IO->ReAttachCB = CB; + return eDBQuery; +} + + void FreeAsyncIOContents(AsyncIO *IO) { CitContext *Ctx = IO->CitContext; @@ -290,8 +343,6 @@ void FreeAsyncIOContents(AsyncIO *IO) FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); - DestructCAres(IO); - FreeURL(&IO->ConnectMe); FreeStrBuf(&IO->HttpReq.ReplyData); @@ -303,8 +354,13 @@ void FreeAsyncIOContents(AsyncIO *IO) } +void DestructCAres(AsyncIO *IO); void StopClientWatchers(AsyncIO *IO, int CloseFD) { + EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers"); + + DestructCAres(IO); + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); @@ -323,6 +379,8 @@ void StopClientWatchers(AsyncIO *IO, int CloseFD) void StopCurlWatchers(AsyncIO *IO) { + EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n"); + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); @@ -332,6 +390,9 @@ void StopCurlWatchers(AsyncIO *IO) ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); + curl_easy_cleanup(IO->HttpReq.chnd); + IO->HttpReq.chnd = NULL; + if (IO->SendBuf.fd != 0) { close(IO->SendBuf.fd); } @@ -339,9 +400,11 @@ void StopCurlWatchers(AsyncIO *IO) IO->RecvBuf.fd = 0; } -void ShutDownCLient(AsyncIO *IO) +eNextState ShutDownCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; + + SetEVState(IO, eExit); become_session(Ctx); EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); @@ -357,11 +420,12 @@ void ShutDownCLient(AsyncIO *IO) IO->DNS.Channel = NULL; } assert(IO->Terminate); - IO->Terminate(IO); + return IO->Terminate(IO); } void PostInbound(AsyncIO *IO) { + switch (IO->NextState) { case eSendFile: ev_io_start(event_base, &IO->send_event); @@ -370,7 +434,23 @@ void PostInbound(AsyncIO *IO) case eSendMore: assert(IO->SendDone); IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); + switch (IO->NextState) + { + case eSendFile: + case eSendReply: + case eSendMore: + case eReadMessage: + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->send_event); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + default: + break; + } break; case eReadPayload: case eReadMore: @@ -378,17 +458,18 @@ void PostInbound(AsyncIO *IO) ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: - ShutDownCLient(IO); - break; case eAbort: - ShutDownCLient(IO); + if (ShutDownCLient(IO) == eDBQuery) { + QueueAnDBOperation(IO); + } break; case eSendDNSQuery: case eReadDNSReply: - case eDBQuery: case eConnect: case eReadMessage: break; + case eDBQuery: + QueueAnDBOperation(IO); } } eReadState HandleInbound(AsyncIO *IO) @@ -436,10 +517,17 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); - Finished = StrBufCheckBuffer(&IO->RecvBuf); + if (IO->NextState == eDBQuery) { + if (QueueAnDBOperation(IO) == eAbort) + return eReadFail; + else + return eReadSuccess; + } + else { + Finished = StrBufCheckBuffer(&IO->RecvBuf); + } } } @@ -456,7 +544,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -476,6 +564,11 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -581,7 +674,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (errno != EAGAIN) { StopClientWatchers(IO, 1); EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! [%d] [%s] [%d]\n", + "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n", errno, strerror(errno), IO->SendBuf.fd); StrBufPrintf(IO->ErrMsg, "Socket Invalid! [%s]", @@ -627,7 +720,8 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + SetEVState(IO, eIOTimeout); + SET_EV_TIME(IO, event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -655,7 +749,8 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + SetEVState(IO, eIOConnfail); + SET_EV_TIME(IO, event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -687,7 +782,8 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + SetEVState(IO, eIOConnfailNow); + SET_EV_TIME(IO, event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -716,7 +812,8 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) socklen_t lon = sizeof(so_err); int err; - IO->Now = ev_now(event_base); + SetEVState(IO, eIOConnNow); + SET_EV_TIME(IO, event_base); EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); ev_io_stop(loop, &IO->conn_event); @@ -750,7 +847,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -795,6 +892,11 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -814,7 +916,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) // FD is gone. kick it. StopClientWatchers(IO, 1); EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! [%d] [%s] [%d]\n", + "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n", errno, strerror(errno), IO->SendBuf.fd); StrBufPrintf(IO->ErrMsg, "Socket Invalid! [%s]", @@ -829,7 +931,9 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + + SetEVState(IO, eCaresFinished); + SET_EV_TIME(IO, event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNS.Query->PostDNS); @@ -841,9 +945,18 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) case eAbort: //// StopClientWatchers(IO); ShutDownCLient(IO); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } @@ -859,6 +972,7 @@ eNextState EvConnectSock(AsyncIO *IO, int fdflags; int rc = -1; + SetEVState(IO, eIOConnectSock); become_session(IO->CitContext); if (ReadFirst) { @@ -946,8 +1060,8 @@ eNextState EvConnectSock(AsyncIO *IO, memset(&egress_sin, 0, sizeof(egress_sin)); egress_sin.sin_family = AF_INET; - if (!IsEmptyStr(config.c_ip_addr)) { - egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (!IsEmptyStr(CtdlGetConfigStr("c_ip_addr"))) { + egress_sin.sin_addr.s_addr = inet_addr(CtdlGetConfigStr("c_ip_addr")); if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { egress_sin.sin_addr.s_addr = INADDR_ANY; } @@ -961,11 +1075,13 @@ eNextState EvConnectSock(AsyncIO *IO, } if (rc >= 0){ + SetEVState(IO, eIOConnNow); EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); set_start_callback(event_base, IO, 0); return IO->NextState; } else if (errno == EINPROGRESS) { + SetEVState(IO, eIOConnWait); EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); ev_io_init(&IO->conn_event, @@ -980,6 +1096,7 @@ eNextState EvConnectSock(AsyncIO *IO, return IO->NextState; } else { + SetEVState(IO, eIOConnfail); ev_idle_init(&IO->conn_fail_immediate, IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; @@ -1009,6 +1126,7 @@ eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst) { + SetEVState(IO, eIOAttach); IO->Data = pData; become_session(IO->CitContext); ev_cleanup_start(event_base, &IO->abort_by_shutdown); @@ -1039,7 +1157,7 @@ void InitIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - IO->CitContext->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; IO->CitContext->IO = IO; IO->NextState = NextState; @@ -1077,7 +1195,7 @@ int InitcURLIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - IO->CitContext->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; IO->CitContext->IO = IO; IO->SendDone = SendDone; @@ -1100,10 +1218,13 @@ typedef struct KillOtherSessionContext { eNextState KillTerminate(AsyncIO *IO) { + long id; KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data; EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__); + id = IO->ID; FreeAsyncIOContents(IO); memset(Ctx, 0, sizeof(KillOtherSessionContext)); + IO->ID = id; /* just for the case we want to analyze it in a coredump */ free(Ctx); return eAbort; @@ -1118,8 +1239,15 @@ eNextState KillOtherContextNow(AsyncIO *IO) { KillOtherSessionContext *Ctx = IO->Data; - if (Ctx->OtherOne->ShutdownAbort != NULL) - Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + SetEVState(IO, eKill); + + if (Ctx->OtherOne->ShutdownAbort != NULL) { + Ctx->OtherOne->NextState = eAbort; + if (Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne) == eDBQuery) { + StopClientWatchers(Ctx->OtherOne, 0); + QueueAnDBOperation(Ctx->OtherOne); + } + } return eTerminateConnection; } @@ -1158,10 +1286,12 @@ void KillAsyncIOContext(AsyncIO *IO) case eReadMore: case eReadPayload: case eReadFile: - QueueEventContext(&Ctx->IO, KillOtherContextNow); + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnEventContext(&Ctx->IO); break; case eDBQuery: - QueueDBOperation(&Ctx->IO, KillOtherContextNow); + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnDBOperation(&Ctx->IO); break; case eTerminateConnection: case eAbort: