X-Git-Url: https://code.citadel.org/?p=citadel.git;a=blobdiff_plain;f=citadel%2Fevent_client.c;h=90c5e5dd986619f4f9f5426bea3393133a9cb7c1;hp=b9f90d00c848ec6ba0f63d7cdd808d582f45b49d;hb=70c486dc2216fb4e3342803080ce6a4204dd8672;hpb=444d3154beb9bac200c1f8ae1947ed2ac96cc76c diff --git a/citadel/event_client.c b/citadel/event_client.c index b9f90d00c..90c5e5dd9 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -1,20 +1,13 @@ /* + * Copyright (c) 1998-2012 by the citadel.org team * - * Copyright (c) 1998-2009 by the citadel.org team + * This program is open source software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 3. * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. */ #include "sysdep.h" @@ -46,6 +39,11 @@ #include #include #include +#include +#if HAVE_BACKTRACE +#include +#endif + #include #include "citadel.h" #include "server.h" @@ -63,157 +61,405 @@ #include "locate_host.h" #include "citadel_dirs.h" -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT - #include "event_client.h" +#include "ctdl_module.h" + +static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents); +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents); + + +/*------------------------------------------------------------------------------ + * Server DB IO + *----------------------------------------------------------------------------*/ +extern int evdb_count; +extern pthread_mutex_t DBEventQueueMutex; +extern pthread_mutex_t DBEventExitQueueMutex; +extern HashList *DBInboundEventQueue; +extern struct ev_loop *event_db; +extern ev_async DBAddJob; +extern ev_async DBExitEventLoop; + +eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = CB; + ev_cleanup_init(&IO->db_abort_by_shutdown, + IO_abort_shutdown_callback); + IO->db_abort_by_shutdown.data = IO; + + pthread_mutex_lock(&DBEventQueueMutex); + if (DBInboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n"); + pthread_mutex_unlock(&DBEventQueueMutex); + return eAbort; + } + EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); + i = ++evdb_count ; + Put(DBInboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&DBEventQueueMutex); + + pthread_mutex_lock(&DBEventExitQueueMutex); + if (event_db == NULL) + { + pthread_mutex_unlock(&DBEventExitQueueMutex); + return eAbort; + } + ev_async_send (event_db, &DBAddJob); + pthread_mutex_unlock(&DBEventExitQueueMutex); + + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + return eDBQuery; +} + +void StopDBWatchers(AsyncIO *IO) +{ + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + ev_idle_stop(event_db, &IO->db_unwind_stack); +} + +void ShutDownDBCLient(AsyncIO *IO) +{ + CitContext *Ctx =IO->CitContext; + become_session(Ctx); + + EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); + StopDBWatchers(IO); + + assert(IO->DBTerminate); + IO->DBTerminate(IO); +} + +void +DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_db); + EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__); + become_session(IO->CitContext); + + ev_idle_stop(event_db, &IO->db_unwind_stack); + + assert(IO->NextDBOperation); + switch (IO->NextDBOperation(IO)) + { + case eDBQuery: + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + break; + case eTerminateConnection: + case eAbort: + ev_idle_stop(event_db, &IO->db_unwind_stack); + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + ShutDownDBCLient(IO); + } +} + +eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + IO->NextDBOperation = CB; + ev_idle_init(&IO->db_unwind_stack, + DB_PerformNext); + IO->db_unwind_stack.data = IO; + ev_idle_start(event_db, &IO->db_unwind_stack); + return eDBQuery; +} -extern int event_add_pipe[2]; -extern citthread_mutex_t EventQueueMutex; +/*------------------------------------------------------------------------------ + * Client IO + *----------------------------------------------------------------------------*/ +extern int evbase_count; +extern pthread_mutex_t EventQueueMutex; +extern pthread_mutex_t EventExitQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; +extern ev_async AddJob; +extern ev_async ExitEventLoop; -#define SEND_EVENT 1 -#define RECV_EVENT 2 - -int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); + IO->Now = ev_now(event_base); + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} + + +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) { IOAddHandler *h; int i; h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); - h->Ctx = Ctx; + h->IO = IO; h->EvAttch = CB; + ev_cleanup_init(&IO->abort_by_shutdown, + IO_abort_shutdown_callback); + IO->abort_by_shutdown.data = IO; - citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); - free (h); - return -1; + pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + free(h); + /* shutting down... */ + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; } - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q\n"); - i = GetCount(InboundEventQueue); + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; Put(InboundEventQueue, IKEY(i), h, NULL); - citthread_mutex_unlock(&EventQueueMutex); + pthread_mutex_unlock(&EventQueueMutex); - write(event_add_pipe[1], "+_", 1); - CtdlLogPrintf(CTDL_DEBUG, "EVENT Q Done.\n"); - return 0; + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } + ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; } +extern eNextState evcurl_handle_start(AsyncIO *IO); -int ShutDownEventQueue(void) +eNextState QueueCurlContext(AsyncIO *IO) { - citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = evcurl_handle_start; - return -1; + pthread_mutex_lock(&EventQueueMutex); + if (InboundEventQueue == NULL) + { + /* shutting down... */ + free(h); + EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n"); + pthread_mutex_unlock(&EventQueueMutex); + return eAbort; } - write(event_add_pipe[1], "x_", 1); - close(event_add_pipe[1]); - event_add_pipe[1] = -1; - citthread_mutex_unlock(&EventQueueMutex); - return 0; + + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; + Put(InboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&EventQueueMutex); + + pthread_mutex_lock(&EventExitQueueMutex); + if (event_base == NULL) { + pthread_mutex_unlock(&EventExitQueueMutex); + return eAbort; + } + ev_async_send (event_base, &AddJob); + pthread_mutex_unlock(&EventExitQueueMutex); + + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; } +void DestructCAres(AsyncIO *IO); void FreeAsyncIOContents(AsyncIO *IO) { + CitContext *Ctx = IO->CitContext; + FreeStrBuf(&IO->IOBuf); FreeStrBuf(&IO->SendBuf.Buf); FreeStrBuf(&IO->RecvBuf.Buf); - ares_destroy(IO->DNSChannel); + + DestructCAres(IO); + + FreeURL(&IO->ConnectMe); + FreeStrBuf(&IO->HttpReq.ReplyData); + + if (Ctx) { + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; + IO->CitContext = NULL; + } } -/* - static void - setup_signal_handlers(struct instance *instance) - { - signal(SIGPIPE, SIG_IGN); - event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigterm_event, NULL); +void StopClientWatchers(AsyncIO *IO, int CloseFD) +{ + ev_timer_stop (event_base, &IO->rw_timeout); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + + if (CloseFD && (IO->SendBuf.fd > 0)) { + close(IO->SendBuf.fd); + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; + } +} + +void StopCurlWatchers(AsyncIO *IO) +{ + ev_timer_stop (event_base, &IO->rw_timeout); + ev_timer_stop(event_base, &IO->conn_fail); + ev_idle_stop(event_base, &IO->unwind_stack); + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); - event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigint_event, NULL); + ev_io_stop(event_base, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); - event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigquit_event, NULL); - } -*/ + if (IO->SendBuf.fd != 0) { + close(IO->SendBuf.fd); + } + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; +} void ShutDownCLient(AsyncIO *IO) { - CtdlLogPrintf(CTDL_DEBUG, "EVENT x %d\n", IO->sock); - switch (IO->active_event) { - case SEND_EVENT: - ev_io_stop(event_base, &IO->send_event); + CitContext *Ctx =IO->CitContext; + become_session(Ctx); + + EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); + + StopClientWatchers(IO, 1); + + if (IO->DNS.Channel != NULL) { + ares_destroy(IO->DNS.Channel); + EV_DNS_LOG_STOP(DNS.recv_event); + EV_DNS_LOG_STOP(DNS.send_event); + ev_io_stop(event_base, &IO->DNS.recv_event); + ev_io_stop(event_base, &IO->DNS.send_event); + IO->DNS.Channel = NULL; + } + assert(IO->Terminate); + IO->Terminate(IO); +} + +void PostInbound(AsyncIO *IO) +{ + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); break; - case RECV_EVENT: - ev_io_stop(event_base, &IO->recv_event); + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + switch (IO->NextState) + { + case eSendFile: + case eSendReply: + case eSendMore: + case eReadMessage: + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->send_event); + break; + case eDBQuery: + StopClientWatchers(IO, 0); + default: + break; + } break; - case 0: - // no event active here; just bail out. + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: + ShutDownCLient(IO); + break; + case eAbort: + ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: break; } - - IO->active_event = 0; - IO->Terminate(IO->Data); - } - eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; - - while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){ - if (IO->RecvBuf.nBlobBytesWanted != 0) { - - } - else { /* Reading lines... */ -//// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) - Finished = IO->LineReader(IO); - else - Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); - - switch (Finished) { - case eMustReadMore: /// read new from socket... - return Finished; - break; - case eBufferNotEmpty: /* shouldn't happen... */ - case eReadSuccess: /// done for now... - break; - case eReadFail: /// WHUT? - ///todo: shut down! - break; + + become_session(IO->CitContext); + + while ((Finished == eBufferNotEmpty) && + ((IO->NextState == eReadMessage)|| + (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| + (IO->NextState == eReadPayload))) + { + /* Reading lines... + * lex line reply in callback, + * or do it ourselves. + * i.e. as nnn-blabla means continue reading in SMTP + */ + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; } - } - + else if (IO->LineReader) + Finished = IO->LineReader(IO); + else + Finished = StrBufChunkSipLine(IO->IOBuf, + &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + if (Finished != eMustReadMore) { + assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); - IO->active_event = 0; - IO->NextState = IO->ReadDone(IO->Data); + IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); } } + PostInbound(IO); - if ((IO->NextState == eSendReply) || - (IO->NextState == eSendMore)) - { - IO->NextState = IO->SendDone(IO->Data); - ev_io_start(event_base, &IO->send_event); - - IO->active_event = SEND_EVENT; - - } - else if ((IO->NextState == eTerminateConnection) || - (IO->NextState == eAbort) ) - ShutDownCLient(IO); return Finished; } @@ -223,264 +469,752 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; -/* - CtdlLogPrintf(CTDL_DEBUG, "EVENT -> %d : [%s%s%s%s]\n", - watcher->fd, - (event&EV_TIMEOUT) ? " timeout" : "", - (event&EV_READ) ? " read" : "", - (event&EV_WRITE) ? " write" : "", - (event&EV_SIGNAL) ? " signal" : ""); -*/ -/// assert(fd == IO->sock); - - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + const char *errmsg = NULL; - if (rc == 0) - { + IO->Now = ev_now(event_base); + become_session(IO->CitContext); #ifdef BIGBAD_IODBG - { - int rv = 0; - char fn [SIZ]; - FILE *fd; - const char *pch = ChrPtr(IO->SendBuf.Buf); - const char *pchh = IO->SendBuf.ReadWritePointer; - long nbytes; - - if (pchh == NULL) - pchh = pch; - - nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); - snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", "smtpev", IO->sock); - - fd = fopen(fn, "a+"); - fprintf(fd, "Read: BufSize: %ld BufContent: [", - nbytes); - rv = fwrite(pchh, nbytes, 1, fd); - fprintf(fd, "]\n"); - - - fclose(fd); + { + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->SendBuf.Buf); + const char *pchh = IO->SendBuf.ReadWritePointer; + long nbytes; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", + ((CitContext*)(IO->CitContext))->ServiceName, + IO->SendBuf.fd); + + fd = fopen(fn, "a+"); + fprintf(fd, "Send: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + if (!rv) printf("failed to write debug to %s!\n", fn); + fprintf(fd, "]\n"); +#endif + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd, + 0, + &IO->SendBuf); } + +#ifdef BIGBAD_IODBG + fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); + fclose(fd); + } #endif + if (rc == 0) + { ev_io_stop(event_base, &IO->send_event); - IO->active_event = 0; switch (IO->NextState) { - case eSendReply: - break; case eSendMore: - IO->NextState = IO->SendDone(IO->Data); + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); if ((IO->NextState == eTerminateConnection) || (IO->NextState == eAbort) ) ShutDownCLient(IO); else { ev_io_start(event_base, &IO->send_event); + } + break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + SetNextTimeout(IO, 100.0); - IO->active_event = SEND_EVENT; + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, + &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } } break; + case eSendReply: + if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) + break; + IO->NextState = eReadMore; + case eReadMore: case eReadMessage: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + case eReadPayload: + case eReadFile: + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) + { HandleInbound(IO); } else { ev_io_start(event_base, &IO->recv_event); - - IO->active_event = RECV_EVENT; } break; + case eDBQuery: + /* + * we now live in another queue, + * so we have to unregister. + */ + ev_cleanup_stop(loop, &IO->abort_by_shutdown); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: case eTerminateConnection: case eAbort: break; } } - else if (rc < 0) - IO->Timeout(IO); + else if (rc < 0) { + if (errno != EAGAIN) { + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_send_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } + } /* else : must write more. */ } +static void +set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) +{ + ev_timer_stop(event_base, &IO->conn_fail); + ev_timer_start(event_base, &IO->rw_timeout); + + switch(IO->NextState) { + case eReadMore: + case eReadMessage: + case eReadFile: + StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0); + ev_io_start(event_base, &IO->recv_event); + break; + case eSendReply: + case eSendMore: + case eReadPayload: + case eSendFile: + become_session(IO->CitContext); + IO_send_callback(loop, &IO->send_event, revents); + break; + case eDBQuery: + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eTerminateConnection: + case eAbort: + /// TODO: WHUT? + break; + } +} static void -IO_Timout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) +IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->Timeout(IO); + IO->Now = ev_now(event_base); + ev_timer_stop (event_base, &IO->rw_timeout); + become_session(IO->CitContext); + + if (IO->SendBuf.fd != 0) + { + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->rw_timeout); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + + assert(IO->Timeout); + switch (IO->Timeout(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + } } + static void IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) { AsyncIO *IO = watcher->data; - IO->ConnFail(IO); + IO->Now = ev_now(event_base); + ev_timer_stop (event_base, &IO->conn_fail); + + if (IO->SendBuf.fd != 0) + { + ev_io_stop(loop, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->rw_timeout); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + become_session(IO->CitContext); + + assert(IO->ConnFail); + switch (IO->ConnFail(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + + } } + +static void +IO_connfailimmediate_callback(struct ev_loop *loop, + ev_idle *watcher, + int revents) +{ + AsyncIO *IO = watcher->data; + + IO->Now = ev_now(event_base); + ev_idle_stop (event_base, &IO->conn_fail_immediate); + + if (IO->SendBuf.fd != 0) + { + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + become_session(IO->CitContext); + + assert(IO->ConnFail); + switch (IO->ConnFail(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + + } +} + +static void +IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + int so_err = 0; + socklen_t lon = sizeof(so_err); + int err; + + IO->Now = ev_now(event_base); + EVM_syslog(LOG_DEBUG, "connect() succeeded.\n"); + + ev_io_stop(loop, &IO->conn_event); + ev_timer_stop(event_base, &IO->conn_fail); + + err = getsockopt(IO->SendBuf.fd, + SOL_SOCKET, + SO_ERROR, + (void*)&so_err, + &lon); + + if ((err == 0) && (so_err != 0)) + { + EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n", + so_err, + strerror(so_err)); + IO_connfail_callback(loop, &IO->conn_fail, revents); + + } + else + { + EVM_syslog(LOG_DEBUG, "connect() succeeded\n"); + set_start_callback(loop, IO, revents); + } +} + static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { + const char *errmsg; ssize_t nbytes; AsyncIO *IO = watcher->data; -// assert(fd == IO->sock); - -// assert(fd == sb->fd); -/* - CtdlLogPrintf(CTDL_DEBUG, "EVENT <- %d : [%s%s%s%s]\n", - (int) fd, - (event&EV_TIMEOUT) ? " timeout" : "", - (event&EV_READ) ? " read" : "", - (event&EV_WRITE) ? " write" : "", - (event&EV_SIGNAL) ? " signal" : ""); -*/ - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + IO->Now = ev_now(event_base); + switch (IO->NextState) { + case eReadFile: + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + if (nbytes < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + PostInbound(IO); + return; + } + else + return; + } + break; + default: + nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd, + 0, + &IO->RecvBuf); + break; + } + +#ifdef BIGBAD_IODBG + { + long nbytes; + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->RecvBuf.Buf); + const char *pchh = IO->RecvBuf.ReadWritePointer; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", + ((CitContext*)(IO->CitContext))->ServiceName, + IO->SendBuf.fd); + + fd = fopen(fn, "a+"); + fprintf(fd, "Read: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + if (!rv) printf("failed to write debug to %s!\n", fn); + fprintf(fd, "]\n"); + fclose(fd); + } +#endif if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - IO->Timeout(IO); -/// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); seems as if socket is gone then? + StopClientWatchers(IO, 1); + SetNextTimeout(IO, 0.01); return; } else if (nbytes == -1) { -/// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); + if (errno != EAGAIN) { + // FD is gone. kick it. + StopClientWatchers(IO, 1); + EV_syslog(LOG_DEBUG, + "IO_recv_callback(): Socket Invalid! [%d] [%s] [%d]\n", + errno, strerror(errno), IO->SendBuf.fd); + StrBufPrintf(IO->ErrMsg, + "Socket Invalid! [%s]", + strerror(errno)); + SetNextTimeout(IO, 0.01); + } return; } } - -static void -set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents, int first_rw_timeout) +void +IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { - ev_timer_init(&IO->conn_timeout, - IO_Timout_callback, - first_rw_timeout, 0); - IO->conn_timeout.data = IO; - - switch(IO->NextState) { - case eReadMessage: - ev_io_start(event_base, &IO->recv_event); - IO->active_event = RECV_EVENT; - break; - case eSendReply: - case eSendMore: - IO_send_callback(loop, &IO->send_event, revents); - break; - case eTerminateConnection: + AsyncIO *IO = watcher->data; + IO->Now = ev_now(event_base); + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + become_session(IO->CitContext); + assert(IO->DNS.Query->PostDNS); + switch (IO->DNS.Query->PostDNS(IO)) + { case eAbort: - /// TODO: WHUT? + assert(IO->DNS.Fail); + switch (IO->DNS.Fail(IO)) { + case eAbort: +//// StopClientWatchers(IO); + ShutDownCLient(IO); + default: + break; + } + default: break; } } -int event_connect_socket(AsyncIO *IO, int conn_timeout, int first_rw_timeout) + +eNextState EvConnectSock(AsyncIO *IO, + double conn_timeout, + double first_rw_timeout, + int ReadFirst) { - struct sockaddr_in saddr; - int fdflags; + struct sockaddr_in egress_sin; + int fdflags; int rc = -1; - IO->SendBuf.fd = IO->RecvBuf.fd = - IO->sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); -/* -IO->curr_ai->ai_family, - IO->curr_ai->ai_socktype, - IO->curr_ai->ai_protocol); -*/ - if (IO->sock < 0) { - CtdlLogPrintf(CTDL_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); -// freeaddrinfo(res); - return -1; - } - fdflags = fcntl(IO->sock, F_GETFL); + become_session(IO->CitContext); + + if (ReadFirst) { + IO->NextState = eReadMessage; + } + else { + IO->NextState = eSendReply; + } + + IO->SendBuf.fd = IO->RecvBuf.fd = + socket( + (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, + SOCK_STREAM, + IPPROTO_TCP); + + if (IO->SendBuf.fd < 0) { + EV_syslog(LOG_ERR, + "EVENT: socket() failed: %s\n", + strerror(errno)); + + StrBufPrintf(IO->ErrMsg, + "Failed to create socket: %s", + strerror(errno)); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + return eAbort; + } + fdflags = fcntl(IO->SendBuf.fd, F_GETFL); if (fdflags < 0) { - CtdlLogPrintf(CTDL_DEBUG, - "EVENT: unable to get socket flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); - return -1; + EV_syslog(LOG_ERR, + "EVENT: unable to get socket %d flags! %s \n", + IO->SendBuf.fd, + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to get socket %d flags: %s", + IO->SendBuf.fd, + strerror(errno)); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + return eAbort; } fdflags = fdflags | O_NONBLOCK; - if (fcntl(IO->sock, F_SETFL, fdflags) < 0) { - CtdlLogPrintf(CTDL_DEBUG, - "EVENT: unable to set socket nonblocking flags! %s \n", - strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); - close(IO->sock); - return -1; - } -/* TODO: maye we could use offsetof() to calc the position of data... + if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { + EV_syslog( + LOG_ERR, + "EVENT: unable to set socket %d nonblocking flags! %s \n", + IO->SendBuf.fd, + strerror(errno)); + StrBufPrintf(IO->ErrMsg, + "Failed to set socket flags: %s", + strerror(errno)); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + return eAbort; + } +/* TODO: maye we could use offsetof() to calc the position of data... * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher */ - ev_io_init(&IO->recv_event, IO_recv_callback, IO->sock, EV_READ); + ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ); IO->recv_event.data = IO; - ev_io_init(&IO->send_event, IO_send_callback, IO->sock, EV_WRITE); + ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE); IO->send_event.data = IO; + + ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); + IO->conn_fail.data = IO; + ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0); + IO->rw_timeout.data = IO; + + + + + /* for debugging you may bypass it like this: + * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = + * inet_addr("127.0.0.1"); + */ + if (IO->ConnectMe->IPv6) { + rc = connect(IO->SendBuf.fd, + &IO->ConnectMe->Addr, + sizeof(struct sockaddr_in6)); + } + else { + /* If citserver is bound to a specific IP address on the host, make + * sure we use that address for outbound connections. + */ - memset( (struct sockaddr_in *)&saddr, '\0', sizeof( saddr ) ); - - memcpy(&saddr.sin_addr, - IO->HEnt->h_addr_list[0], - sizeof(struct in_addr)); - -// saddr.sin_addr.s_addr = inet_addr("85.88.5.80"); - saddr.sin_family = AF_INET; - saddr.sin_port = htons(IO->dport); - rc = connect(IO->sock, - (struct sockaddr *) &saddr, -/// TODO: ipv6?? (IO->HEnt->h_addrtype == AF_INET6)? - /* sizeof(in6_addr):*/ - sizeof(struct sockaddr_in)); + memset(&egress_sin, 0, sizeof(egress_sin)); + egress_sin.sin_family = AF_INET; + if (!IsEmptyStr(config.c_ip_addr)) { + egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr); + if (egress_sin.sin_addr.s_addr == !INADDR_ANY) { + egress_sin.sin_addr.s_addr = INADDR_ANY; + } + + /* If this bind fails, no problem; we can still use INADDR_ANY */ + bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin)); + } + rc = connect(IO->SendBuf.fd, + (struct sockaddr_in *)&IO->ConnectMe->Addr, + sizeof(struct sockaddr_in)); + } + if (rc >= 0){ -//// freeaddrinfo(res); - set_start_callback(event_base, IO, 0, first_rw_timeout); - return 0; + EV_syslog(LOG_DEBUG, "connect() = %d immediate success.\n", IO->SendBuf.fd); + set_start_callback(event_base, IO, 0); + return IO->NextState; } else if (errno == EINPROGRESS) { - set_start_callback(event_base, IO, 0, first_rw_timeout); - ev_timer_init(&IO->conn_fail, - IO_connfail_callback, - conn_timeout, 0); - IO->conn_fail.data = IO; - - return 0; + EV_syslog(LOG_DEBUG, "connect() = %d have to wait now.\n", IO->SendBuf.fd); + + ev_io_init(&IO->conn_event, + IO_connestd_callback, + IO->SendBuf.fd, + EV_READ|EV_WRITE); + + IO->conn_event.data = IO; + + ev_io_start(event_base, &IO->conn_event); + ev_timer_start(event_base, &IO->conn_fail); + return IO->NextState; } else { - CtdlLogPrintf(CTDL_ERR, "connect() failed: %s\n", strerror(errno)); - StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); - close(IO->sock); -/* IO->curr_ai = IO->curr_ai->ai_next; - if (IO->curr_ai != NULL) - return event_connect_socket(IO); - else*/ - return -1; - } -} - -void InitEventIO(AsyncIO *IO, - void *pData, - IO_CallBack ReadDone, - IO_CallBack SendDone, - IO_CallBack Terminate, - IO_CallBack Timeout, - IO_CallBack ConnFail, - IO_LineReaderCallback LineReader, - int conn_timeout, - int first_rw_timeout, - int ReadFirst) + ev_idle_init(&IO->conn_fail_immediate, + IO_connfailimmediate_callback); + IO->conn_fail_immediate.data = IO; + ev_idle_start(event_base, &IO->conn_fail_immediate); + + EV_syslog(LOG_ERR, + "connect() = %d failed: %s\n", + IO->SendBuf.fd, + strerror(errno)); + + StrBufPrintf(IO->ErrMsg, + "Failed to connect: %s", + strerror(errno)); + return IO->NextState; + } + return IO->NextState; +} + +void SetNextTimeout(AsyncIO *IO, double timeout) +{ + IO->rw_timeout.repeat = timeout; + ev_timer_again (event_base, &IO->rw_timeout); +} + + +eNextState ReAttachIO(AsyncIO *IO, + void *pData, + int ReadFirst) { IO->Data = pData; - IO->SendDone = SendDone; - IO->ReadDone = ReadDone; - IO->Terminate = Terminate; - IO->LineReader = LineReader; - + become_session(IO->CitContext); + ev_cleanup_start(event_base, &IO->abort_by_shutdown); if (ReadFirst) { IO->NextState = eReadMessage; } else { IO->NextState = eSendReply; } - IO->IP6 = IO->HEnt->h_addrtype == AF_INET6; -// IO->res = HEnt->h_addr_list[0]; - event_connect_socket(IO, conn_timeout, first_rw_timeout); + set_start_callback(event_base, IO, 0); + + return IO->NextState; +} + +void InitIOStruct(AsyncIO *IO, + void *Data, + eNextState NextState, + IO_LineReaderCallback LineReader, + IO_CallBack DNS_Fail, + IO_CallBack SendDone, + IO_CallBack ReadDone, + IO_CallBack Terminate, + IO_CallBack DBTerminate, + IO_CallBack ConnFail, + IO_CallBack Timeout, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; + + IO->NextState = NextState; + + IO->SendDone = SendDone; + IO->ReadDone = ReadDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; + IO->LineReader = LineReader; + IO->ConnFail = ConnFail; + IO->Timeout = Timeout; + IO->ShutdownAbort = ShutdownAbort; + + IO->DNS.Fail = DNS_Fail; + + IO->SendBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + IO->IOBuf = NewStrBuf(); + EV_syslog(LOG_DEBUG, + "EVENT: Session lives at %p IO at %p \n", + Data, IO); + } +extern int evcurl_init(AsyncIO *IO); + +int InitcURLIOStruct(AsyncIO *IO, + void *Data, + const char* Desc, + IO_CallBack SendDone, + IO_CallBack Terminate, + IO_CallBack DBTerminate, + IO_CallBack ShutdownAbort) +{ + IO->Data = Data; + + IO->CitContext = CloneContext(CC); + IO->CitContext->session_specific_data = Data; + IO->CitContext->IO = IO; + IO->SendDone = SendDone; + IO->Terminate = Terminate; + IO->DBTerminate = DBTerminate; + IO->ShutdownAbort = ShutdownAbort; + + strcpy(IO->HttpReq.errdesc, Desc); + + + return evcurl_init(IO); + +} + + +typedef struct KillOtherSessionContext { + AsyncIO IO; + AsyncIO *OtherOne; +}KillOtherSessionContext; + +eNextState KillTerminate(AsyncIO *IO) +{ + long id; + KillOtherSessionContext *Ctx = (KillOtherSessionContext*)IO->Data; + EV_syslog(LOG_DEBUG, "%s Exit\n", __FUNCTION__); + id = IO->ID; + FreeAsyncIOContents(IO); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + IO->ID = id; /* just for the case we want to analyze it in a coredump */ + free(Ctx); + return eAbort; + +} + +eNextState KillShutdown(AsyncIO *IO) +{ + return eTerminateConnection; +} + +eNextState KillOtherContextNow(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx = IO->Data; + + if (Ctx->OtherOne->ShutdownAbort != NULL) + Ctx->OtherOne->ShutdownAbort(Ctx->OtherOne); + return eTerminateConnection; +} + +void KillAsyncIOContext(AsyncIO *IO) +{ + KillOtherSessionContext *Ctx; + + Ctx = (KillOtherSessionContext*) malloc(sizeof(KillOtherSessionContext)); + memset(Ctx, 0, sizeof(KillOtherSessionContext)); + + InitIOStruct(&Ctx->IO, + Ctx, + eReadMessage, + NULL, + NULL, + NULL, + NULL, + KillTerminate, + NULL, + NULL, + NULL, + KillShutdown); + + Ctx->OtherOne = IO; + + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + QueueEventContext(&Ctx->IO, KillOtherContextNow); + break; + case eDBQuery: + QueueDBOperation(&Ctx->IO, KillOtherContextNow); + break; + case eTerminateConnection: + case eAbort: + /*hm, its already dying, dunno which Queue its in... */ + free(Ctx); + } + +} + +extern int DebugEventLoopBacktrace; +void EV_backtrace(AsyncIO *IO) +{ +#ifdef HAVE_BACKTRACE + void *stack_frames[50]; + size_t size, i; + char **strings; + + if ((IO == NULL) || (DebugEventLoopBacktrace == 0)) + return; + size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*)); + strings = backtrace_symbols(stack_frames, size); + for (i = 0; i < size; i++) { + if (strings != NULL) { + EV_syslog(LOG_ALERT, " BT %s\n", strings[i]); + } + else { + EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]); + } + } + free(strings); #endif +} + + +ev_tstamp ctdl_ev_now (void) +{ + return ev_now(event_base); +}