X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=7b409fcf4a49404e85607402f5947faa046d642f;hb=2bebeb2bb6b26656a989f57fd757d718431f17a8;hp=57a6440102f12ca86d58b83aba96c0f1d1ad76f7;hpb=192056a6112602350c1f8a73eae2e134a31c7ba2;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index 57a644010..7b409fcf4 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -1,8 +1,8 @@ /* * - * Copyright (c) 1998-2009 by the citadel.org team + * Copyright (c) 1998-2012 by the citadel.org team * - * This program is free software; you can redistribute it and/or modify + * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. @@ -47,6 +47,9 @@ #include #include #include +#if HAVE_BACKTRACE +#include +#endif #include #include "citadel.h" @@ -66,25 +69,23 @@ #include "citadel_dirs.h" #include "event_client.h" +#include "ctdl_module.h" -static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) -{ - syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - - AsyncIO *IO = watcher->data; - assert(IO->ShutdownAbort); - IO->ShutdownAbort(IO); -} +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents); -/*-------------------------------------------------------------------------------- - * Server DB IO - */ +/*------------------------------------------------------------------------------ + * Server DB IO + *----------------------------------------------------------------------------*/ extern int evdb_count; extern pthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventExitQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; -extern ev_async DBAddJob; +extern ev_async DBAddJob; extern ev_async DBExitEventLoop; eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) @@ -95,44 +96,63 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; - ev_cleanup_init(&IO->db_abort_by_shutdown, + ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; - ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); pthread_mutex_lock(&DBEventQueueMutex); - syslog(LOG_DEBUG, "DBEVENT Q\n"); + if (DBInboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n"); + pthread_mutex_unlock(&DBEventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); i = ++evdb_count ; Put(DBInboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_lock(&DBEventExitQueueMutex); + if (event_db == NULL) + { + pthread_mutex_unlock(&DBEventExitQueueMutex); + return eAbort; + } ev_async_send (event_db, &DBAddJob); - syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + pthread_mutex_unlock(&DBEventExitQueueMutex); + + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); return eDBQuery; } +void StopDBWatchers(AsyncIO *IO) +{ + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + ev_idle_stop(event_db, &IO->db_unwind_stack); +} + void ShutDownDBCLient(AsyncIO *IO) { CitContext *Ctx =IO->CitContext; become_session(Ctx); - syslog(LOG_DEBUG, "DBEVENT\n"); - ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); - - assert(IO->Terminate); - IO->Terminate(IO); + EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); + StopDBWatchers(IO); - Ctx->state = CON_IDLE; - Ctx->kill_me = 1; + assert(IO->DBTerminate); + IO->DBTerminate(IO); } void DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); become_session(IO->CitContext); - + ev_idle_stop(event_db, &IO->db_unwind_stack); assert(IO->NextDBOperation); @@ -143,10 +163,10 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: + case eSendReply: case eSendMore: case eSendFile: - case eReadMessage: + case eReadMessage: case eReadMore: case eReadPayload: case eReadFile: @@ -154,7 +174,9 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) break; case eTerminateConnection: case eAbort: - ShutDownDBCLient(IO); + ev_idle_stop(event_db, &IO->db_unwind_stack); + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + ShutDownDBCLient(IO); } } @@ -168,16 +190,28 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) return eDBQuery; } -/*-------------------------------------------------------------------------------- - * Client IO - */ +/*------------------------------------------------------------------------------ + * Client IO + *----------------------------------------------------------------------------*/ extern int evbase_count; extern pthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventExitQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; -extern ev_async AddJob; +extern ev_async AddJob; extern ev_async ExitEventLoop; +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { @@ -187,19 +221,32 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; - ev_cleanup_init(&IO->abort_by_shutdown, + ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; - ev_cleanup_start(event_base, &IO->abort_by_shutdown); pthread_mutex_lock(&EventQueueMutex); - syslog(LOG_DEBUG, "EVENT Q\n"); + if (InboundEventQueue == NULL) + { + free(h); + /* shutting down... */ + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); - syslog(LOG_DEBUG, "EVENT Q Done.\n"); + pthread_mutex_unlock(&EventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); return eSendReply; } @@ -215,46 +262,85 @@ eNextState QueueCurlContext(AsyncIO *IO) h->EvAttch = evcurl_handle_start; pthread_mutex_lock(&EventQueueMutex); - syslog(LOG_DEBUG, "EVENT Q\n"); + if (InboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; + } + + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); pthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } ev_async_send (event_base, &AddJob); - syslog(LOG_DEBUG, "EVENT Q Done.\n"); - return eSendReply; -} - -int ShutDownEventQueue(void) -{ - pthread_mutex_lock(&DBEventQueueMutex); - ev_async_send (event_db, &DBExitEventLoop); - pthread_mutex_unlock(&DBEventQueueMutex); + pthread_mutex_unlock(&EventExitQueueMutex); - pthread_mutex_lock(&EventQueueMutex); - ev_async_send (EV_DEFAULT_ &ExitEventLoop); - pthread_mutex_unlock(&EventQueueMutex); - return 0; + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; } +void DestructCAres(AsyncIO *IO); void FreeAsyncIOContents(AsyncIO *IO) { + CitContext *Ctx = IO->CitContext; + FreeStrBuf(&IO->IOBuf); FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); + + DestructCAres(IO); + + FreeURL(&IO->ConnectMe); + FreeStrBuf(&IO->HttpReq.ReplyData); + + if (Ctx) { + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; + } } void StopClientWatchers(AsyncIO *IO) { + ev_timer_stop (event_base, &IO->rw_timeout); ev_timer_stop(event_base, &IO->conn_fail); - ev_io_stop(event_base, &IO->conn_event); ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + ev_io_stop(event_base, &IO->conn_event); ev_io_stop(event_base, &IO->send_event); ev_io_stop(event_base, &IO->recv_event); + + if (IO->SendBuf.fd != 0) { + close(IO->SendBuf.fd); + } + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; +} + +void StopCurlWatchers(AsyncIO *IO) +{ ev_timer_stop (event_base, &IO->rw_timeout); - close(IO->SendBuf.fd); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + + if (IO->SendBuf.fd != 0) { + close(IO->SendBuf.fd); + } IO->SendBuf.fd = 0; IO->RecvBuf.fd = 0; } @@ -264,77 +350,24 @@ void ShutDownCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - syslog(LOG_DEBUG, "EVENT x %d\n", IO->SendBuf.fd); + EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); - ev_cleanup_stop(event_base, &IO->abort_by_shutdown); StopClientWatchers(IO); - if (IO->DNSChannel != NULL) { - ares_destroy(IO->DNSChannel); - ev_io_stop(event_base, &IO->dns_recv_event); - ev_io_stop(event_base, &IO->dns_send_event); - IO->DNSChannel = NULL; + if (IO->DNS.Channel != NULL) { + ares_destroy(IO->DNS.Channel); + EV_DNS_LOG_STOP(DNS.recv_event); + EV_DNS_LOG_STOP(DNS.send_event); + ev_io_stop(event_base, &IO->DNS.recv_event); + ev_io_stop(event_base, &IO->DNS.send_event); + IO->DNS.Channel = NULL; } assert(IO->Terminate); IO->Terminate(IO); - Ctx->state = CON_IDLE; - Ctx->kill_me = 1; } - -eReadState HandleInbound(AsyncIO *IO) +void PostInbound(AsyncIO *IO) { - const char *Err = NULL; - eReadState Finished = eBufferNotEmpty; - - become_session(IO->CitContext); - - while ((Finished == eBufferNotEmpty) && - ((IO->NextState == eReadMessage)|| - (IO->NextState == eReadMore)|| - (IO->NextState == eReadFile)|| - (IO->NextState == eReadPayload))) - { - if (IO->RecvBuf.nBlobBytesWanted != 0) { - - } - else { /* Reading lines... */ -//// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if ((IO->NextState == eReadFile) && - (Finished == eBufferNotEmpty)) - { - Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); - if (Finished == eReadSuccess) - { - IO->NextState = eSendReply; - } - } - else if (IO->LineReader) - Finished = IO->LineReader(IO); - else - Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); - - switch (Finished) { - case eMustReadMore: /// read new from socket... - break; - case eBufferNotEmpty: /* shouldn't happen... */ - case eReadSuccess: /// done for now... - break; - case eReadFail: /// WHUT? - ///todo: shut down! - break; - } - - } - - if (Finished != eMustReadMore) { - assert(IO->ReadDone); - ev_io_stop(event_base, &IO->recv_event); - IO->NextState = IO->ReadDone(IO); - Finished = StrBufCheckBuffer(&IO->RecvBuf); - } - } - switch (IO->NextState) { case eSendFile: ev_io_start(event_base, &IO->send_event); @@ -351,7 +384,7 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: -//////TODOxxxx + ShutDownCLient(IO); break; case eAbort: ShutDownCLient(IO); @@ -363,6 +396,61 @@ eReadState HandleInbound(AsyncIO *IO) case eReadMessage: break; } +} +eReadState HandleInbound(AsyncIO *IO) +{ + const char *Err = NULL; + eReadState Finished = eBufferNotEmpty; + + become_session(IO->CitContext); + + while ((Finished == eBufferNotEmpty) && + ((IO->NextState == eReadMessage)|| + (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| + (IO->NextState == eReadPayload))) + { + /* Reading lines... + * lex line reply in callback, + * or do it ourselves. + * i.e. as nnn-blabla means continue reading in SMTP + */ + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) + Finished = IO->LineReader(IO); + else + Finished = StrBufChunkSipLine(IO->IOBuf, + &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + + if (Finished != eMustReadMore) { + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + IO->NextState = IO->ReadDone(IO); + Finished = StrBufCheckBuffer(&IO->RecvBuf); + } + } + + PostInbound(IO); + return Finished; } @@ -374,6 +462,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) AsyncIO *IO = watcher->data; const char *errmsg = NULL; + IO->Now = ev_now(event_base); become_session(IO->CitContext); #ifdef BIGBAD_IODBG { @@ -383,15 +472,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) const char *pch = ChrPtr(IO->SendBuf.Buf); const char *pchh = IO->SendBuf.ReadWritePointer; long nbytes; - + if (pchh == NULL) pchh = pch; - + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", ((CitContext*)(IO->CitContext))->ServiceName, IO->SendBuf.fd); - + fd = fopen(fn, "a+"); fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); @@ -406,7 +495,9 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) StrBufPlain(IO->ErrMsg, errmsg, -1); break; default: - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + rc = StrBuf_write_one_chunk_callback(watcher->fd, + 0/*TODO*/, + &IO->SendBuf); } #ifdef BIGBAD_IODBG @@ -432,6 +523,8 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eSendFile: if (IO->IOB.ChunkSendRemain > 0) { ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); + } else { assert(IO->ReadDone); IO->NextState = IO->ReadDone(IO); @@ -441,12 +534,13 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eDBQuery: case eConnect: break; - case eSendReply: + case eSendReply: case eSendMore: case eSendFile: - ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, + &IO->send_event); break; - case eReadMessage: + case eReadMessage: case eReadMore: case eReadPayload: case eReadFile: @@ -458,14 +552,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } break; case eSendReply: - if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) + if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; IO->NextState = eReadMore; case eReadMore: case eReadMessage: case eReadPayload: case eReadFile: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) + { HandleInbound(IO); } else { @@ -474,7 +569,10 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) break; case eDBQuery: - /* we now live in another queue, so we have to unregister. */ + /* + * we now live in another queue, + * so we have to unregister. + */ ev_cleanup_stop(loop, &IO->abort_by_shutdown); break; case eSendDNSQuery: @@ -486,19 +584,18 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } } else if (rc < 0) { - assert(IO->Timeout); - IO->Timeout(IO); + IO_Timeout_callback(loop, &IO->rw_timeout, revents); } /* else : must write more. */ } static void set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) { - switch(IO->NextState) { case eReadMore: case eReadMessage: case eReadFile: + StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0); ev_io_start(event_base, &IO->recv_event); break; case eSendReply: @@ -524,6 +621,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->rw_timeout); become_session(IO->CitContext); @@ -537,7 +635,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) } assert(IO->Timeout); - switch (IO->Timeout(IO)) + switch (IO->Timeout(IO)) { case eAbort: ShutDownCLient(IO); @@ -551,6 +649,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_timer_stop (event_base, &IO->conn_fail); if (IO->SendBuf.fd != 0) @@ -565,7 +664,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) become_session(IO->CitContext); assert(IO->ConnFail); - switch (IO->ConnFail(IO)) + switch (IO->ConnFail(IO)) { case eAbort: ShutDownCLient(IO); @@ -576,10 +675,13 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) } static void -IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revents) +IO_connfailimmediate_callback(struct ev_loop *loop, + ev_idle *watcher, + int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); ev_idle_stop (event_base, &IO->conn_fail_immediate); if (IO->SendBuf.fd != 0) @@ -590,7 +692,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revent become_session(IO->CitContext); assert(IO->ConnFail); - switch (IO->ConnFail(IO)) + switch (IO->ConnFail(IO)) { case eAbort: ShutDownCLient(IO); @@ -605,6 +707,9 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) { AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); + EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); + ev_io_stop(loop, &IO->conn_event); ev_timer_stop (event_base, &IO->conn_fail); set_start_callback(loop, IO, revents); @@ -616,72 +721,70 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) ssize_t nbytes; AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); switch (IO->NextState) { case eReadFile: nbytes = FileRecvChunked(&IO->IOB, &errmsg); if (nbytes < 0) StrBufPlain(IO->ErrMsg, errmsg, -1); - else + else { if (IO->IOB.ChunkSendRemain == 0) { IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; } else return; } break; default: - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, + 0 /*TODO */, + &IO->RecvBuf); break; } #ifdef BIGBAD_IODBG { + long nbytes; int rv = 0; char fn [SIZ]; FILE *fd; const char *pch = ChrPtr(IO->RecvBuf.Buf); const char *pchh = IO->RecvBuf.ReadWritePointer; - long nbytes; - + if (pchh == NULL) pchh = pch; - + nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch); snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", - ((CitContext*)(IO->CitContext))->ServiceName, + ((CitContext*)(IO->CitContext))->ServiceName, IO->SendBuf.fd); - + fd = fopen(fn, "a+"); fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); - - fclose(fd); } #endif if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - assert(IO->Timeout); - - switch (IO->Timeout(IO)) - { - case eAbort: - ShutDownCLient(IO); - default: - break; - } + IO_Timeout_callback(loop, &IO->rw_timeout, revents); return; } else if (nbytes == -1) { -/// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); - syslog(LOG_DEBUG, - "EVENT: Socket Invalid! %s \n", - strerror(errno)); + // FD is gone. kick it. + StopClientWatchers(IO); + EV_syslog(LOG_DEBUG, + "EVENT: Socket Invalid! %s \n", + strerror(errno)); ShutDownCLient(IO); return; } @@ -691,54 +794,88 @@ void IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { AsyncIO *IO = watcher->data; - syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - assert(IO->DNSFail); - assert(IO->DNSQuery->PostDNS); - switch (IO->DNSQuery->PostDNS(IO)) + assert(IO->DNS.Query->PostDNS); + switch (IO->DNS.Query->PostDNS(IO)) { case eAbort: - IO->DNSFail(IO); + assert(IO->DNS.Fail); + switch (IO->DNS.Fail(IO)) { + case eAbort: +//// StopClientWatchers(IO); + ShutDownCLient(IO); + default: + break; + } default: - break; + break; } } -eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_rw_timeout) + +eNextState EvConnectSock(AsyncIO *IO, + double conn_timeout, + double first_rw_timeout, + int ReadFirst) { - int fdflags; + struct sockaddr_in egress_sin; + int fdflags; int rc = -1; - IO->SendBuf.fd = IO->RecvBuf.fd = + become_session(IO->CitContext); + + if (ReadFirst) { + IO->NextState = eReadMessage; + } + else { + IO->NextState = eSendReply; + } + + IO->SendBuf.fd = IO->RecvBuf.fd = socket( - (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, - SOCK_STREAM, + (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, + SOCK_STREAM, IPPROTO_TCP); if (IO->SendBuf.fd < 0) { - syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); + EV_syslog(LOG_ERR, + "EVENT: socket() failed: %s\n", + strerror(errno)); + + StrBufPrintf(IO->ErrMsg, + "Failed to create socket: %s", + strerror(errno)); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; return eAbort; } fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - syslog(LOG_DEBUG, - "EVENT: unable to get socket flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + EV_syslog(LOG_ERR, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to get socket flags: %s", + strerror(errno)); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; return eAbort; } fdflags = fdflags | O_NONBLOCK; if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { - syslog(LOG_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + EV_syslog( + LOG_ERR, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to set socket flags: %s", + strerror(errno)); close(IO->SendBuf.fd); - IO->SendBuf.fd = IO->RecvBuf.fd = -1; + IO->SendBuf.fd = IO->RecvBuf.fd = 0; return eAbort; } -/* TODO: maye we could use offsetof() to calc the position of data... +/* TODO: maye we could use offsetof() to calc the position of data... * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher */ ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ); @@ -748,27 +885,57 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); IO->conn_fail.data = IO; - ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0); + ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0); IO->rw_timeout.data = IO; - /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ -/// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1"); - if (IO->ConnectMe->IPv6) - rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); - else - rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + + + /* for debugging you may bypass it like this: + * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = + * inet_addr("127.0.0.1"); + */ + if (IO->ConnectMe->IPv6) { + rc = connect(IO->SendBuf.fd, + &IO->ConnectMe->Addr, + sizeof(struct sockaddr_in6)); + } + else { + /* If citserver is bound to a specific IP address on the host, make + * sure we use that address for outbound connections. + */ + + memset(&egress_sin, 0, sizeof(egress_sin)); + egress_sin.sin_family = AF_INET; + if (!IsEmptyStr(config.c_ip_addr)) { + egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { + egress_sin.sin_addr.s_addr = INADDR_ANY; + } + + /* If this bind fails, no problem; we can still use INADDR_ANY */ + bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin)); + } + rc = connect(IO->SendBuf.fd, + (struct sockaddr_in *)&IO->ConnectMe->Addr, + sizeof(struct sockaddr_in)); + } if (rc >= 0){ - syslog(LOG_DEBUG, "connect() immediate success.\n"); + EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); set_start_callback(event_base, IO, 0); ev_timer_start(event_base, &IO->rw_timeout); return IO->NextState; } else if (errno == EINPROGRESS) { - syslog(LOG_DEBUG, "connect() have to wait now.\n"); + EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); + + ev_io_init(&IO->conn_event, + IO_connestd_callback, + IO->SendBuf.fd, + EV_READ|EV_WRITE); - ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); IO->conn_event.data = IO; ev_io_start(event_base, &IO->conn_event); @@ -780,9 +947,11 @@ eNextState event_connect_socket(AsyncIO *IO, double conn_timeout, double first_r IO_connfailimmediate_callback); IO->conn_fail_immediate.data = IO; ev_idle_start(event_base, &IO->conn_fail_immediate); - - syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + + EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to connect: %s", + strerror(errno)); return IO->NextState; } return IO->NextState; @@ -794,26 +963,9 @@ void SetNextTimeout(AsyncIO *IO, double timeout) ev_timer_again (event_base, &IO->rw_timeout); } -eNextState InitEventIO(AsyncIO *IO, - void *pData, - double conn_timeout, - double first_rw_timeout, - int ReadFirst) -{ - IO->Data = pData; - become_session(IO->CitContext); - - if (ReadFirst) { - IO->NextState = eReadMessage; - } - else { - IO->NextState = eSendReply; - } - return event_connect_socket(IO, conn_timeout, first_rw_timeout); -} -eNextState ReAttachIO(AsyncIO *IO, - void *pData, +eNextState ReAttachIO(AsyncIO *IO, + void *pData, int ReadFirst) { IO->Data = pData; @@ -829,3 +981,100 @@ eNextState ReAttachIO(AsyncIO *IO, return IO->NextState; } + +void InitIOStruct(AsyncIO *IO, + void *Data, + eNextState NextState, + IO_LineReaderCallback LineReader, + IO_CallBack DNS_Fail, + IO_CallBack SendDone, + IO_CallBack ReadDone, + IO_CallBack Terminate, + IO_CallBack DBTerminate, + IO_CallBack ConnFail, + IO_CallBack Timeout, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->NextState = NextState; + + IO->SendDone = SendDone; + IO->ReadDone = ReadDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; + IO->LineReader = LineReader; + IO->ConnFail = ConnFail; + IO->Timeout = Timeout; + IO->ShutdownAbort = ShutdownAbort; + + IO->DNS.Fail = DNS_Fail; + + IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->IOBuf = NewStrBuf(); + EV_syslog(LOG_DEBUG, + "EVENT: Session lives at %p IO at %p \n", + Data, IO); + +} + +extern int evcurl_init(AsyncIO *IO); + +int InitcURLIOStruct(AsyncIO *IO, + void *Data, + const char* Desc, + IO_CallBack SendDone, + IO_CallBack Terminate, + IO_CallBack DBTerminate, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data; + + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; + IO->ShutdownAbort = ShutdownAbort; + + strcpy(IO->HttpReq.errdesc, Desc); + + + return evcurl_init(IO); + +} + +extern int DebugEventLoopBacktrace; +void EV_backtrace(AsyncIO *IO) +{ +#ifdef HAVE_BACKTRACE + void *stack_frames[50]; + size_t size, i; + char **strings; + + if ((IO == NULL) || (DebugEventLoopBacktrace == 0)) + return; + size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); + strings = backtrace_symbols(stack_frames, size); + for (i = 0; i < size; i++) { + if (strings != NULL) { + EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); + } + else { + EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } + } + free(strings); +#endif +} + + +ev_tstamp ctdl_ev_now (void) +{ + return ev_now(event_base); +}