X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=fad9eefeccaa5dea30651ba65cc482a1baa19d65;hb=08932aebd49f355220da46a3e1ea58d569e46a95;hp=d9e09b48b12448604c4d88a1ea97efaa0e06251e;hpb=1965fe005619a4a759f23253408cfc3ec9a3ebd2;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index d9e09b48b..fad9eefec 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -1,138 +1,164 @@ /* - * * Copyright (c) 1998-2012 by the citadel.org team * - * This program is open source software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * This program is open source software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 3. * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. */ - #include "sysdep.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#if TIME_WITH_SYS_TIME -# include -# include -#else -# if HAVE_SYS_TIME_H -# include -# else -# include -# endif -#endif -#include -#include +#include #include -#include +#include +#include #include #include #include -#include #if HAVE_BACKTRACE #include #endif #include -#include "citadel.h" -#include "server.h" -#include "citserver.h" -#include "support.h" -#include "config.h" -#include "control.h" -#include "user_ops.h" -#include "database.h" -#include "msgbase.h" -#include "internet_addressing.h" -#include "genstamp.h" -#include "domain.h" -#include "clientsocket.h" -#include "locate_host.h" -#include "citadel_dirs.h" +#include "ctdl_module.h" #include "event_client.h" +#include "citserver.h" +#include "config.h" -static void IO_abort_shutdown_callback(struct ev_loop *loop, - ev_cleanup *watcher, - int revents) +ConstStr IOStates[] = { + {HKEY("DB Queue")}, + {HKEY("DB Q Next")}, + {HKEY("DB Attach")}, + {HKEY("DB Next")}, + {HKEY("DB Stop")}, + {HKEY("DB Exit")}, + {HKEY("DB Terminate")}, + {HKEY("IO Queue")}, + {HKEY("IO Attach")}, + {HKEY("IO Connect Socket")}, + {HKEY("IO Abort")}, + {HKEY("IO Timeout")}, + {HKEY("IO ConnFail")}, + {HKEY("IO ConnFail Now")}, + {HKEY("IO Conn Now")}, + {HKEY("IO Conn Wait")}, + {HKEY("Curl Q")}, + {HKEY("Curl Start")}, + {HKEY("Curl Shotdown")}, + {HKEY("Curl More IO")}, + {HKEY("Curl Got IO")}, + {HKEY("Curl Got Data")}, + {HKEY("Curl Got Status")}, + {HKEY("C-Ares Start")}, + {HKEY("C-Ares IO Done")}, + {HKEY("C-Ares Finished")}, + {HKEY("C-Ares exit")}, + {HKEY("Killing")}, + {HKEY("Exit")} +}; + +void SetEVState(AsyncIO *IO, eIOState State) { - AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->lastcmdname, IOStates[State].Key, IOStates[State].len + 1); + } +eNextState QueueAnEventContext(AsyncIO *IO); +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents); + /*------------------------------------------------------------------------------ * Server DB IO *----------------------------------------------------------------------------*/ extern int evdb_count; extern pthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventExitQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; extern ev_async DBAddJob; extern ev_async DBExitEventLoop; -eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +eNextState QueueAnDBOperation(AsyncIO *IO) { IOAddHandler *h; int i; + SetEVState(IO, eDBQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; - ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); pthread_mutex_lock(&DBEventQueueMutex); + if (DBInboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n"); + pthread_mutex_unlock(&DBEventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventExitQueueMutex); + if (event_db == NULL) + { + pthread_mutex_unlock(&DBEventExitQueueMutex); + return eAbort; + } ev_async_send (event_db, &DBAddJob); - EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + pthread_mutex_unlock(&DBEventExitQueueMutex); + + EVQM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } +void StopDBWatchers(AsyncIO *IO) +{ + SetEVState(IO, eDBStop); + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + ev_idle_stop(event_db, &IO->db_unwind_stack); +} + void ShutDownDBCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; become_session(Ctx); + SetEVState(IO, eDBTerm); EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); - ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + StopDBWatchers(IO); - assert(IO->Terminate); - IO->Terminate(IO); + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + + SetEVState(IO, eDBNext); + IO->CitContext->lastcmd = IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); ev_idle_stop(event_db, &IO->db_unwind_stack); @@ -140,12 +166,15 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) assert(IO->NextDBOperation); switch (IO->NextDBOperation(IO)) { + case eSendReply: + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + QueueAnEventContext(IO); + break; case eDBQuery: break; case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: case eSendMore: case eSendFile: case eReadMessage: @@ -164,6 +193,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) { + SetEVState(IO, eQDBNext); IO->NextDBOperation = CB; ev_idle_init(&IO->db_unwind_stack, DB_PerformNext); @@ -177,36 +207,87 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) *----------------------------------------------------------------------------*/ extern int evbase_count; extern pthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventExitQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; -eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) + SetEVState(IO, eIOAbort); + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + + +eNextState QueueAnEventContext(AsyncIO *IO) { IOAddHandler *h; int i; + SetEVState(IO, eIOQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; - h->EvAttch = CB; + + assert(IO->ReAttachCB != NULL); + + h->EvAttch = IO->ReAttachCB; + ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; - ev_cleanup_start(event_base, &IO->abort_by_shutdown); pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + free(h); + /* shutting down... */ + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } +eNextState EventQueueDBOperation(AsyncIO *IO, IO_CallBack CB, int CloseFDs) +{ + StopClientWatchers(IO, CloseFDs); + IO->ReAttachCB = CB; + return eDBQuery; +} +eNextState DBQueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + StopDBWatchers(IO); + IO->ReAttachCB = CB; + return eSendReply; +} + +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + IO->ReAttachCB = CB; + return QueueAnEventContext(IO); +} + extern eNextState evcurl_handle_start(AsyncIO *IO); eNextState QueueCurlContext(AsyncIO *IO) @@ -214,22 +295,46 @@ eNextState QueueCurlContext(AsyncIO *IO) IOAddHandler *h; int i; + SetEVState(IO, eCurlQ); h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = evcurl_handle_start; pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } -void DestructCAres(AsyncIO *IO); +eNextState CurlQueueDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + StopCurlWatchers(IO); + IO->ReAttachCB = CB; + return eDBQuery; +} + + void FreeAsyncIOContents(AsyncIO *IO) { CitContext *Ctx = IO->CitContext; @@ -238,28 +343,56 @@ void FreeAsyncIOContents(AsyncIO *IO) FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); - DestructCAres(IO); - FreeURL(&IO->ConnectMe); FreeStrBuf(&IO->HttpReq.ReplyData); if (Ctx) { Ctx->state = CON_IDLE; Ctx->kill_me = 1; + IO->CitContext = NULL; } } -void StopClientWatchers(AsyncIO *IO) +void DestructCAres(AsyncIO *IO); +void StopClientWatchers(AsyncIO *IO, int CloseFD) +{ + EVM_syslog(LOG_DEBUG, "EVENT StopClientWatchers"); + + DestructCAres(IO); + + ev_timer_stop (event_base, &IO->rw_timeout); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + + if (CloseFD && (IO->SendBuf.fd > 0)) { + close(IO->SendBuf.fd); + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } +} + +void StopCurlWatchers(AsyncIO *IO) { + EVM_syslog(LOG_DEBUG, "EVENT StopCurlWatchers \n"); + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); ev_io_stop(event_base, &IO->conn_event); ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); + curl_easy_cleanup(IO->HttpReq.chnd); + IO->HttpReq.chnd = NULL; + if (IO->SendBuf.fd != 0) { close(IO->SendBuf.fd); } @@ -267,15 +400,16 @@ void StopClientWatchers(AsyncIO *IO) IO->RecvBuf.fd = 0; } -void ShutDownCLient(AsyncIO *IO) +eNextState ShutDownCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; + + SetEVState(IO, eExit); become_session(Ctx); EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); - StopClientWatchers(IO); + StopClientWatchers(IO, 1); if (IO->DNS.Channel != NULL) { ares_destroy(IO->DNS.Channel); @@ -286,9 +420,58 @@ void ShutDownCLient(AsyncIO *IO) IO->DNS.Channel = NULL; } assert(IO->Terminate); - IO->Terminate(IO); + return IO->Terminate(IO); } +void PostInbound(AsyncIO *IO) +{ + + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + switch (IO->NextState) + { + case eSendFile: + case eSendReply: + case eSendMore: + case eReadMessage: + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->send_event); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + default: + break; + } + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + case eAbort: + if (ShutDownCLient(IO) == eDBQuery) { + QueueAnDBOperation(IO); + } + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eReadMessage: + break; + case eDBQuery: + QueueAnDBOperation(IO); + } +} eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; @@ -334,41 +517,22 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); - Finished = StrBufCheckBuffer(&IO->RecvBuf); + if (IO->NextState == eDBQuery) { + if (QueueAnDBOperation(IO) == eAbort) + return eReadFail; + else + return eReadSuccess; + } + else { + Finished = StrBufCheckBuffer(&IO->RecvBuf); + } } } - switch (IO->NextState) { - case eSendFile: - ev_io_start(event_base, &IO->send_event); - break; - case eSendReply: - case eSendMore: - assert(IO->SendDone); - IO->NextState = IO->SendDone(IO); - ev_io_start(event_base, &IO->send_event); - break; - case eReadPayload: - case eReadMore: - case eReadFile: - ev_io_start(event_base, &IO->recv_event); - break; - case eTerminateConnection: -//////TODOxxxx - break; - case eAbort: - ShutDownCLient(IO); - break; - case eSendDNSQuery: - case eReadDNSReply: - case eDBQuery: - case eConnect: - case eReadMessage: - break; - } + PostInbound(IO); + return Finished; } @@ -380,6 +544,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -399,6 +564,11 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -412,8 +582,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) StrBufPlain(IO->ErrMsg, errmsg, -1); break; default: - rc = StrBuf_write_one_chunk_callback(watcher->fd, - 0/*TODO*/, + rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd, + 0, &IO->SendBuf); } @@ -440,6 +610,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendFile: if (IO->IOB.ChunkSendRemain > 0) { ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); + } else { assert(IO->ReadDone); IO->NextState = IO->ReadDone(IO); @@ -499,18 +671,30 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - assert(IO->Timeout); - IO->Timeout(IO); + if (errno != EAGAIN) { + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } } /* else : must write more. */ } static void set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) { + ev_timer_stop(event_base, &IO->conn_fail); + ev_timer_start(event_base, &IO->rw_timeout); + switch(IO->NextState) { case eReadMore: case eReadMessage: case eReadFile: + StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0); ev_io_start(event_base, &IO->recv_event); break; case eSendReply: @@ -536,6 +720,8 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOTimeout); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -563,6 +749,8 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfail); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -594,6 +782,8 @@ IO_connfailimmediate_callback(struct ev_loop *loop, { AsyncIO *IO = watcher->data; + SetEVState(IO, eIOConnfailNow); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -617,12 +807,39 @@ IO_connfailimmediate_callback(struct ev_loop *loop, static void IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { - AsyncIO *IO = watcher->data; - - ev_io_stop(loop, &IO->conn_event); - ev_timer_stop (event_base, &IO->conn_fail); - set_start_callback(loop, IO, revents); + AsyncIO *IO = watcher->data; + int so_err = 0; + socklen_t lon = sizeof(so_err); + int err; + + SetEVState(IO, eIOConnNow); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); + EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); + + ev_io_stop(loop, &IO->conn_event); + ev_timer_stop(event_base, &IO->conn_fail); + + err = getsockopt(IO->SendBuf.fd, + SOL_SOCKET, + SO_ERROR, + (void*)&so_err, + &lon); + + if ((err == 0) && (so_err != 0)) + { + EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n", + so_err, + strerror(so_err)); + IO_connfail_callback(loop, &IO->conn_fail, revents); + + } + else + { + EVM_syslog(LOG_DEBUG, "connect() succeeded\n"); + set_start_callback(loop, IO, revents); + } } + static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { @@ -630,6 +847,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); @@ -640,14 +858,18 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; } break; default: - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, - 0 /*TODO */, + nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd, + 0, &IO->RecvBuf); break; } @@ -670,6 +892,11 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) IO->SendBuf.fd); fd = fopen(fn, "a+"); + if (fd == NULL) { + syslog(LOG_EMERG, "failed to open file %s: %s", fn, strerror(errno)); + cit_backtrace(); + exit(1); + } fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); @@ -681,23 +908,21 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + StopClientWatchers(IO, 1); + SetNextTimeout(IO, 0.01); return; } else if (nbytes == -1) { - // FD is gone. kick it. - StopClientWatchers(IO); - EV_syslog(LOG_DEBUG, - "EVENT: Socket Invalid! %s \n", - strerror(errno)); - ShutDownCLient(IO); + if (errno != EAGAIN) { + // FD is gone. kick it. + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } return; } } @@ -706,20 +931,32 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; + + SetEVState(IO, eCaresFinished); + IO->CitContext->lastcmd = IO->Now = ev_now(event_base); EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - assert(IO->DNS.Fail); assert(IO->DNS.Query->PostDNS); switch (IO->DNS.Query->PostDNS(IO)) { case eAbort: + assert(IO->DNS.Fail); switch (IO->DNS.Fail(IO)) { case eAbort: //// StopClientWatchers(IO); ShutDownCLient(IO); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } + case eDBQuery: + StopClientWatchers(IO, 0); + QueueAnDBOperation(IO); + break; default: break; } @@ -731,9 +968,11 @@ eNextState EvConnectSock(AsyncIO *IO, double first_rw_timeout, int ReadFirst) { + struct sockaddr_in egress_sin; int fdflags; int rc = -1; + SetEVState(IO, eIOConnectSock); become_session(IO->CitContext); if (ReadFirst) { @@ -762,11 +1001,13 @@ eNextState EvConnectSock(AsyncIO *IO, } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - EV_syslog(LOG_DEBUG, - "EVENT: unable to get socket flags! %s \n", + EV_syslog(LOG_ERR, + "EVENT: unable to get socket %d flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, - "Failed to get socket flags: %s", + "Failed to get socket %d flags: %s", + IO->SendBuf.fd, strerror(errno)); close(IO->SendBuf.fd); IO->SendBuf.fd = IO->RecvBuf.fd = 0; @@ -775,8 +1016,9 @@ eNextState EvConnectSock(AsyncIO *IO, fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { EV_syslog( - LOG_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", + LOG_ERR, + "EVENT: unable to set socket %d nonblocking flags! %s \n", + IO->SendBuf.fd, strerror(errno)); StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", @@ -799,28 +1041,48 @@ eNextState EvConnectSock(AsyncIO *IO, IO->rw_timeout.data = IO; + + /* for debugging you may bypass it like this: * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = * inet_addr("127.0.0.1"); */ - if (IO->ConnectMe->IPv6) + if (IO->ConnectMe->IPv6) { rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); - else + } + else { + /* If citserver is bound to a specific IP address on the host, make + * sure we use that address for outbound connections. + */ + + memset(&egress_sin, 0, sizeof(egress_sin)); + egress_sin.sin_family = AF_INET; + if (!IsEmptyStr(CtdlGetConfigStr("c_ip_addr"))) { + egress_sin.sin_addr.s_addr = inet_addr(CtdlGetConfigStr("c_ip_addr")); + if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { + egress_sin.sin_addr.s_addr = INADDR_ANY; + } + + /* If this bind fails, no problem; we can still use INADDR_ANY */ + bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin)); + } rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + } if (rc >= 0){ - EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); + SetEVState(IO, eIOConnNow); + EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); set_start_callback(event_base, IO, 0); - ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; } else if (errno == EINPROGRESS) { - EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); + SetEVState(IO, eIOConnWait); + EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); ev_io_init(&IO->conn_event, IO_connestd_callback, @@ -834,12 +1096,17 @@ eNextState EvConnectSock(AsyncIO *IO, return IO->NextState; } else { + SetEVState(IO, eIOConnfail); ev_idle_init(&IO->conn_fail_immediate, IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + EV_syslog(LOG_ERR, + "connect() = %d failed: %s\n", + IO->SendBuf.fd, + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); @@ -859,6 +1126,7 @@ eNextState ReAttachIO(AsyncIO *IO, void *pData, int ReadFirst) { + SetEVState(IO, eIOAttach); IO->Data = pData; become_session(IO->CitContext); ev_cleanup_start(event_base, &IO->abort_by_shutdown); @@ -881,6 +1149,7 @@ void InitIOStruct(AsyncIO *IO, IO_CallBack SendDone, IO_CallBack ReadDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ConnFail, IO_CallBack Timeout, IO_CallBack ShutdownAbort) @@ -888,13 +1157,15 @@ void InitIOStruct(AsyncIO *IO, IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; IO->NextState = NextState; IO->SendDone = SendDone; IO->ReadDone = ReadDone; IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->LineReader = LineReader; IO->ConnFail = ConnFail; IO->Timeout = Timeout; @@ -918,15 +1189,18 @@ int InitcURLIOStruct(AsyncIO *IO, const char* Desc, IO_CallBack SendDone, IO_CallBack Terminate, + IO_CallBack DBTerminate, IO_CallBack ShutdownAbort) { IO->Data = Data; IO->CitContext = CloneContext(CC); - ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; - IO->SendDone = SendDone; - IO->Terminate = Terminate; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; IO->ShutdownAbort = ShutdownAbort; strcpy(IO->HttpReq.errdesc, Desc); @@ -936,6 +1210,98 @@ int InitcURLIOStruct(AsyncIO *IO, } + +typedef struct KillOtherSessionContext { + AsyncIO IO; + AsyncIO *OtherOne; +}KillOtherSessionContext; + +eNextState KillTerminate(AsyncIO *IO) +{ + long id; + KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data; + EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__); + id = IO->ID; + FreeAsyncIOContents(IO); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + IO->ID = id; /* just for the case we want to analyze it in a coredump */ + free(Ctx); + return eAbort; + +} + +eNextState KillShutdown(AsyncIO *IO) +{ + return eTerminateConnection; +} + +eNextState KillOtherContextNow(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx = IO->Data; + + SetEVState(IO, eKill); + + if (Ctx->OtherOne->ShutdownAbort != NULL) { + Ctx->OtherOne->NextState = eAbort; + if (Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne) == eDBQuery) { + StopClientWatchers(Ctx->OtherOne, 0); + QueueAnDBOperation(Ctx->OtherOne); + } + } + return eTerminateConnection; +} + +void KillAsyncIOContext(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx; + + Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext)); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + + InitIOStruct(&Ctx->IO, + Ctx, + eReadMessage, + NULL, + NULL, + NULL, + NULL, + KillTerminate, + NULL, + NULL, + NULL, + KillShutdown); + + Ctx->OtherOne = IO; + + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnEventContext(&Ctx->IO); + break; + case eDBQuery: + Ctx->IO.ReAttachCB = KillOtherContextNow; + QueueAnDBOperation(&Ctx->IO); + break; + case eTerminateConnection: + case eAbort: + /*hm, its already dying, dunno which Queue its in... */ + free(Ctx); + } + +} + +extern int DebugEventLoopBacktrace; void EV_backtrace(AsyncIO *IO) { #ifdef HAVE_BACKTRACE @@ -943,15 +1309,24 @@ void EV_backtrace(AsyncIO *IO) size_t size, i; char **strings; - + if ((IO == NULL) || (DebugEventLoopBacktrace == 0)) + return; size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); strings = backtrace_symbols(stack_frames, size); for (i = 0; i < size; i++) { - if (strings != NULL) + if (strings != NULL) { EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); - else + } + else { EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } } free(strings); #endif } + + +ev_tstamp ctdl_ev_now (void) +{ + return ev_now(event_base); +}