X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=4ab63a4a292bbd1edb08e490b195698d75decddf;hb=4c274aa94a53d8850ed363e64c5863970fe629a4;hp=959aa542d14207c66d294d54174bb3817a244a22;hpb=29da39d15271b3e276956552759e05b1dcce2c2b;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 959aa542d..4ab63a4a2 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -26,6 +26,8 @@ #include "ctdl_module.h" #include "event_client.h" +#include "citserver.h" +#include "config.h" ConstStr IOStates[] = { {HKEY("DB Queue")}, @@ -155,7 +157,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) AsyncIO *IO = watcher->data; SetEVState(IO, eDBNext); - IO->Now = ev_now(event_db); + SET_EV_TIME(IO, event_db); EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); @@ -219,7 +221,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, SetEVState(IO, eIOAbort); EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); assert(IO->ShutdownAbort); IO->ShutdownAbort(IO); } @@ -518,7 +520,10 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); if (IO->NextState == eDBQuery) { - return QueueAnDBOperation(IO); + if (QueueAnDBOperation(IO) == eAbort) + return eReadFail; + else + return eReadSuccess; } else { Finished = StrBufCheckBuffer(&IO->RecvBuf); @@ -539,7 +544,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -559,6 +564,11 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -711,7 +721,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) AsyncIO *IO = watcher->data; SetEVState(IO, eIOTimeout); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -740,7 +750,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) AsyncIO *IO = watcher->data; SetEVState(IO, eIOConnfail); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -773,7 +783,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, AsyncIO *IO = watcher->data; SetEVState(IO, eIOConnfailNow); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -803,7 +813,7 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) int err; SetEVState(IO, eIOConnNow); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); ev_io_stop(loop, &IO->conn_event); @@ -837,7 +847,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -882,6 +892,11 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -918,7 +933,7 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) AsyncIO *IO = watcher->data; SetEVState(IO, eCaresFinished); - IO->Now = ev_now(event_base); + SET_EV_TIME(IO, event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNS.Query->PostDNS); @@ -930,9 +945,18 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) case eAbort: //// StopClientWatchers(IO); ShutDownCLient(IO); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } @@ -1036,8 +1060,8 @@ eNextState EvConnectSock(AsyncIO *IO, memset(&egress_sin, 0, sizeof(egress_sin)); egress_sin.sin_family = AF_INET; - if (!IsEmptyStr(config.c_ip_addr)) { - egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (!IsEmptyStr(CtdlGetConfigStr("c_ip_addr"))) { + egress_sin.sin_addr.s_addr = inet_addr(CtdlGetConfigStr("c_ip_addr")); if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { egress_sin.sin_addr.s_addr = INADDR_ANY; } @@ -1217,8 +1241,13 @@ eNextState KillOtherContextNow(AsyncIO *IO) SetEVState(IO, eKill); - if (Ctx->OtherOne->ShutdownAbort != NULL) - Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + if (Ctx->OtherOne->ShutdownAbort != NULL) { + Ctx->OtherOne->NextState = eAbort; + if (Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne) == eDBQuery) { + StopClientWatchers(Ctx->OtherOne, 0); + QueueAnDBOperation(Ctx->OtherOne); + } + } return eTerminateConnection; } @@ -1257,11 +1286,11 @@ void KillAsyncIOContext(AsyncIO *IO) case eReadMore: case eReadPayload: case eReadFile: - IO->ReAttachCB = KillOtherContextNow; + Ctx->IO.ReAttachCB = KillOtherContextNow; QueueAnEventContext(&Ctx->IO); break; case eDBQuery: - IO->ReAttachCB = KillOtherContextNow; + Ctx->IO.ReAttachCB = KillOtherContextNow; QueueAnDBOperation(&Ctx->IO); break; case eTerminateConnection: