X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=999c9e4546c1b989efc6fb93c6f6ab8df8394b8f;hb=8357d67fb22adec3b854d61bdbd898dcfcc91959;hp=c9c149f8e156fa002493565a05943d27fd2e7233;hpb=15fc2bf9cd4d2a34fd91aa16c4f632ee46e72dd8;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index c9c149f8e..999c9e454 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -9,61 +9,24 @@ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. */ - #include "sysdep.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#if TIME_WITH_SYS_TIME -# include -# include -#else -# if HAVE_SYS_TIME_H -# include -# else -# include -# endif -#endif -#include -#include +#include #include -#include +#include +#include #include #include #include -#include #if HAVE_BACKTRACE #include #endif #include -#include "citadel.h" -#include "server.h" -#include "citserver.h" -#include "support.h" -#include "config.h" -#include "control.h" -#include "user_ops.h" -#include "database.h" -#include "msgbase.h" -#include "internet_addressing.h" -#include "genstamp.h" -#include "domain.h" -#include "clientsocket.h" -#include "locate_host.h" -#include "citadel_dirs.h" -#include "event_client.h" #include "ctdl_module.h" - +#include "event_client.h" +#include "citserver.h" ConstStr IOStates[] = { {HKEY("DB Queue")}, @@ -106,7 +69,7 @@ void SetEVState(AsyncIO *IO, eIOState State) } - +eNextState QueueAnEventContext(AsyncIO *IO); static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, @@ -124,7 +87,7 @@ extern struct ev_loop *event_db; extern ev_async DBAddJob; extern ev_async DBExitEventLoop; -eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +eNextState QueueAnDBOperation(AsyncIO *IO) { IOAddHandler *h; int i; @@ -132,7 +95,10 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) SetEVState(IO, eDBQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; @@ -160,7 +126,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) ev_async_send (event_db, &DBAddJob); pthread_mutex_unlock(&DBEventExitQueueMutex); - EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } @@ -199,12 +165,15 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) assert(IO->NextDBOperation); switch (IO->NextDBOperation(IO)) { + case eSendReply: + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + QueueAnEventContext(IO); + break; case eDBQuery: break; case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: case eSendMore: case eSendFile: case eReadMessage: @@ -257,7 +226,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, } -eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +eNextState QueueAnEventContext(AsyncIO *IO) { IOAddHandler *h; int i; @@ -265,7 +234,11 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) SetEVState(IO, eIOQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; + ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; @@ -295,15 +268,23 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) return eSendReply; } -eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB) +eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs) { - StopClientWatchers(IO, 0); - return QueueDBOperation(IO, CB); + StopClientWatchers(IO, CloseFDs); + IO->ReAttachCB = CB; + return eDBQuery; } eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB) { StopDBWatchers(IO); - return QueueEventContext(IO, CB); + IO->ReAttachCB = CB; + return eSendReply; +} + +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + IO->ReAttachCB = CB; + return QueueAnEventContext(IO); } extern eNextState evcurl_handle_start(AsyncIO *IO); @@ -348,11 +329,11 @@ eNextState QueueCurlContext(AsyncIO *IO) eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB) { StopCurlWatchers(IO); - return QueueDBOperation(IO, CB); + IO->ReAttachCB = CB; + return eDBQuery; } -void DestructCAres(AsyncIO *IO); void FreeAsyncIOContents(AsyncIO *IO) { CitContext *Ctx = IO->CitContext; @@ -361,8 +342,6 @@ void FreeAsyncIOContents(AsyncIO *IO) FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); - DestructCAres(IO); - FreeURL(&IO->ConnectMe); FreeStrBuf(&IO->HttpReq.ReplyData); @@ -374,8 +353,13 @@ void FreeAsyncIOContents(AsyncIO *IO) } +void DestructCAres(AsyncIO *IO); void StopClientWatchers(AsyncIO *IO, int CloseFD) { + EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers"); + + DestructCAres(IO); + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); @@ -394,6 +378,8 @@ void StopClientWatchers(AsyncIO *IO, int CloseFD) void StopCurlWatchers(AsyncIO *IO) { + EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n"); + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); @@ -403,6 +389,9 @@ void StopCurlWatchers(AsyncIO *IO) ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); + curl_easy_cleanup(IO->HttpReq.chnd); + IO->HttpReq.chnd = NULL; + if (IO->SendBuf.fd != 0) { close(IO->SendBuf.fd); } @@ -410,7 +399,7 @@ void StopCurlWatchers(AsyncIO *IO) IO->RecvBuf.fd = 0; } -void ShutDownCLient(AsyncIO *IO) +eNextState ShutDownCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; @@ -430,11 +419,12 @@ void ShutDownCLient(AsyncIO *IO) IO->DNS.Channel = NULL; } assert(IO->Terminate); - IO->Terminate(IO); + return IO->Terminate(IO); } void PostInbound(AsyncIO *IO) { + switch (IO->NextState) { case eSendFile: ev_io_start(event_base, &IO->send_event); @@ -456,6 +446,7 @@ void PostInbound(AsyncIO *IO) break; case eDBQuery: StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); default: break; } @@ -466,17 +457,18 @@ void PostInbound(AsyncIO *IO) ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: - ShutDownCLient(IO); - break; case eAbort: - ShutDownCLient(IO); + if (ShutDownCLient(IO) == eDBQuery) { + QueueAnDBOperation(IO); + } break; case eSendDNSQuery: case eReadDNSReply: - case eDBQuery: case eConnect: case eReadMessage: break; + case eDBQuery: + QueueAnDBOperation(IO); } } eReadState HandleInbound(AsyncIO *IO) @@ -524,10 +516,17 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); - Finished = StrBufCheckBuffer(&IO->RecvBuf); + if (IO->NextState == eDBQuery) { + if (QueueAnDBOperation(IO) == eAbort) + return eReadFail; + else + return eReadSuccess; + } + else { + Finished = StrBufCheckBuffer(&IO->RecvBuf); + } } } @@ -564,6 +563,11 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -887,6 +891,11 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -935,9 +944,18 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) case eAbort: //// StopClientWatchers(IO); ShutDownCLient(IO); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } @@ -1222,8 +1240,13 @@ eNextState KillOtherContextNow(AsyncIO *IO) SetEVState(IO, eKill); - if (Ctx->OtherOne->ShutdownAbort != NULL) - Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + if (Ctx->OtherOne->ShutdownAbort != NULL) { + Ctx->OtherOne->NextState = eAbort; + if (Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne) == eDBQuery) { + StopClientWatchers(Ctx->OtherOne, 0); + QueueAnDBOperation(Ctx->OtherOne); + } + } return eTerminateConnection; } @@ -1262,10 +1285,12 @@ void KillAsyncIOContext(AsyncIO *IO) case eReadMore: case eReadPayload: case eReadFile: - QueueEventContext(&Ctx->IO, KillOtherContextNow); + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnEventContext(&Ctx->IO); break; case eDBQuery: - QueueDBOperation(&Ctx->IO, KillOtherContextNow); + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnDBOperation(&Ctx->IO); break; case eTerminateConnection: case eAbort: