X-Git-Url: https://code.citadel.org/?p=citadel.git;a=blobdiff_plain;f=citadel%2Fevent_client.c;h=90c5e5dd986619f4f9f5426bea3393133a9cb7c1;hp=0b40edcf4c9ea29ac619c0e4741792fab01f8a8b;hb=70c486dc2216fb4e3342803080ce6a4204dd8672;hpb=57b110370156d70f589924e130fd9c5a56296000 diff --git a/citadel/event_client.c b/citadel/event_client.c index 0b40edcf4..90c5e5dd9 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -1,20 +1,13 @@ /* - * * Copyright (c) 1998-2012 by the citadel.org team * - * This program is open source software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is open source software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 3. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. */ #include "sysdep.h" @@ -82,6 +75,7 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, *----------------------------------------------------------------------------*/ extern int evdb_count; extern pthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventExitQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; extern ev_async DBAddJob; @@ -98,26 +92,47 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; - ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); pthread_mutex_lock(&DBEventQueueMutex); + if (DBInboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n"); + pthread_mutex_unlock(&DBEventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventExitQueueMutex); + if (event_db == NULL) + { + pthread_mutex_unlock(&DBEventExitQueueMutex); + return eAbort; + } ev_async_send (event_db, &DBAddJob); + pthread_mutex_unlock(&DBEventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } +void StopDBWatchers(AsyncIO *IO) +{ + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + ev_idle_stop(event_db, &IO->db_unwind_stack); +} + void ShutDownDBCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; become_session(Ctx); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); - ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + StopDBWatchers(IO); assert(IO->DBTerminate); IO->DBTerminate(IO); @@ -173,6 +188,7 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) *----------------------------------------------------------------------------*/ extern int evbase_count; extern pthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventExitQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; extern ev_async AddJob; @@ -201,15 +217,28 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; - ev_cleanup_start(event_base, &IO->abort_by_shutdown); pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + free(h); + /* shutting down... */ + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -226,12 +255,28 @@ eNextState QueueCurlContext(AsyncIO *IO) h->EvAttch = evcurl_handle_start; pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -253,15 +298,35 @@ void FreeAsyncIOContents(AsyncIO *IO) if (Ctx) { Ctx->state = CON_IDLE; Ctx->kill_me = 1; + IO->CitContext = NULL; } } -void StopClientWatchers(AsyncIO *IO) +void StopClientWatchers(AsyncIO *IO, int CloseFD) { ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + + if (CloseFD && (IO->SendBuf.fd > 0)) { + close(IO->SendBuf.fd); + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } +} + +void StopCurlWatchers(AsyncIO *IO) +{ + ev_timer_stop (event_base, &IO->rw_timeout); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); ev_io_stop(event_base, &IO->conn_event); ev_io_stop(event_base, &IO->send_event); @@ -281,8 +346,7 @@ void ShutDownCLient(AsyncIO *IO) EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); - StopClientWatchers(IO); + StopClientWatchers(IO, 1); if (IO->DNS.Channel != NULL) { ares_destroy(IO->DNS.Channel); @@ -306,7 +370,22 @@ void PostInbound(AsyncIO *IO) case eSendMore: assert(IO->SendDone); IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); + switch (IO->NextState) + { + case eSendFile: + case eSendReply: + case eSendMore: + case eReadMessage: + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->send_event); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + default: + break; + } break; case eReadPayload: case eReadMore: @@ -425,8 +504,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) StrBufPlain(IO->ErrMsg, errmsg, -1); break; default: - rc = StrBuf_write_one_chunk_callback(watcher->fd, - 0/*TODO*/, + rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd, + 0, &IO->SendBuf); } @@ -514,17 +593,30 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - IO_Timeout_callback(loop, &IO->rw_timeout, revents); + if (errno != EAGAIN) { + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } } /* else : must write more. */ } static void set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) { + ev_timer_stop(event_base, &IO->conn_fail); + ev_timer_start(event_base, &IO->rw_timeout); + switch(IO->NextState) { case eReadMore: case eReadMessage: case eReadFile: + StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0); ev_io_start(event_base, &IO->recv_event); break; case eSendReply: @@ -634,15 +726,38 @@ IO_connfailimmediate_callback(struct ev_loop *loop, static void IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { - AsyncIO *IO = watcher->data; - - IO->Now = ev_now(event_base); - EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); - - ev_io_stop(loop, &IO->conn_event); - ev_timer_stop (event_base, &IO->conn_fail); - set_start_callback(loop, IO, revents); + AsyncIO *IO = watcher->data; + int so_err = 0; + socklen_t lon = sizeof(so_err); + int err; + + IO->Now = ev_now(event_base); + EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); + + ev_io_stop(loop, &IO->conn_event); + ev_timer_stop(event_base, &IO->conn_fail); + + err = getsockopt(IO->SendBuf.fd, + SOL_SOCKET, + SO_ERROR, + (void*)&so_err, + &lon); + + if ((err == 0) && (so_err != 0)) + { + EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n", + so_err, + strerror(so_err)); + IO_connfail_callback(loop, &IO->conn_fail, revents); + + } + else + { + EVM_syslog(LOG_DEBUG, "connect() succeeded\n"); + set_start_callback(loop, IO, revents); + } } + static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { @@ -671,8 +786,8 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) } break; default: - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, - 0 /*TODO */, + nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd, + 0, &IO->RecvBuf); break; } @@ -706,15 +821,21 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - IO_Timeout_callback(loop, &IO->rw_timeout, revents); + StopClientWatchers(IO, 1); + SetNextTimeout(IO, 0.01); return; } else if (nbytes == -1) { - // FD is gone. kick it. - StopClientWatchers(IO); - EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! %s \n", - strerror(errno)); - ShutDownCLient(IO); + if (errno != EAGAIN) { + // FD is gone. kick it. + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } return; } } @@ -782,10 +903,12 @@ eNextState EvConnectSock(AsyncIO *IO, fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { EV_syslog(LOG_ERR, - "EVENT: unable to get socket flags! %s \n", + "EVENT: unable to get socket %d flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, - "Failed to get socket flags: %s", + "Failed to get socket %d flags: %s", + IO->SendBuf.fd, strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = 0; @@ -795,7 +918,8 @@ eNextState EvConnectSock(AsyncIO *IO, if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { EV_syslog( LOG_ERR, - "EVENT: unable to set socket nonblocking flags! %s \n", + "EVENT: unable to set socket %d nonblocking flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", @@ -852,13 +976,12 @@ eNextState EvConnectSock(AsyncIO *IO, } if (rc >= 0){ - EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); + EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); set_start_callback(event_base, IO, 0); - ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; } else if (errno == EINPROGRESS) { - EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); + EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); ev_io_init(&IO->conn_event, IO_connestd_callback, @@ -877,7 +1000,11 @@ eNextState EvConnectSock(AsyncIO *IO, IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + EV_syslog(LOG_ERR, + "connect() = %d failed: %s\n", + IO->SendBuf.fd, + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); @@ -927,7 +1054,8 @@ void InitIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; IO->NextState = NextState; @@ -964,7 +1092,8 @@ int InitcURLIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; IO->SendDone = SendDone; IO->Terminate = Terminate; @@ -978,6 +1107,88 @@ int InitcURLIOStruct(AsyncIO *IO, } + +typedef struct KillOtherSessionContext { + AsyncIO IO; + AsyncIO *OtherOne; +}KillOtherSessionContext; + +eNextState KillTerminate(AsyncIO *IO) +{ + long id; + KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data; + EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__); + id = IO->ID; + FreeAsyncIOContents(IO); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + IO->ID = id; /* just for the case we want to analyze it in a coredump */ + free(Ctx); + return eAbort; + +} + +eNextState KillShutdown(AsyncIO *IO) +{ + return eTerminateConnection; +} + +eNextState KillOtherContextNow(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx = IO->Data; + + if (Ctx->OtherOne->ShutdownAbort != NULL) + Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + return eTerminateConnection; +} + +void KillAsyncIOContext(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx; + + Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext)); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + + InitIOStruct(&Ctx->IO, + Ctx, + eReadMessage, + NULL, + NULL, + NULL, + NULL, + KillTerminate, + NULL, + NULL, + NULL, + KillShutdown); + + Ctx->OtherOne = IO; + + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + QueueEventContext(&Ctx->IO, KillOtherContextNow); + break; + case eDBQuery: + QueueDBOperation(&Ctx->IO, KillOtherContextNow); + break; + case eTerminateConnection: + case eAbort: + /*hm, its already dying, dunno which Queue its in... */ + free(Ctx); + } + +} + extern int DebugEventLoopBacktrace; void EV_backtrace(AsyncIO *IO) {