X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=4a2f481a9cafc2412cc7c8fd2ed3b6d8e2a7e2aa;hb=cf7cb2463d47a4a9ed36c8d1c13f188418389437;hp=ba18107d76c1307c46ab7754c4c39796eb28dc02;hpb=3de7cf767571d26d4f4852577b080f9264d34f91;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index ba18107d7..4a2f481a9 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -2,7 +2,7 @@ * * Copyright (c) 1998-2009 by the citadel.org team * - * This program is free software; you can redistribute it and/or modify + * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. @@ -46,6 +46,8 @@ #include #include #include +#include + #include #include "citadel.h" #include "server.h" @@ -63,110 +65,247 @@ #include "locate_host.h" #include "citadel_dirs.h" -#ifdef EXPERIMENTAL_SMTP_EVENT_CLIENT - -#include #include "event_client.h" -extern int event_add_pipe[2]; -extern citthread_mutex_t EventQueueMutex; -extern void *QueueEventAddPtr; -extern AsyncIO *QueueThisIO; -extern EventContextAttach EventContextAttachPtr; - -int QueueEventContext(void *Ctx, AsyncIO *IO, EventContextAttach CB) +static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) { - citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); - return -1; - } + assert(IO->ShutdownAbort); + IO->ShutdownAbort(IO); +} - QueueEventAddPtr = Ctx; - EventContextAttachPtr = CB; - QueueThisIO = IO; - write(event_add_pipe[1], "+_", 1); - citthread_mutex_unlock(&EventQueueMutex); - return 0; +/*-------------------------------------------------------------------------------- + * Server DB IO + */ +extern int evdb_count; +extern pthread_mutex_t DBEventQueueMutex; +extern HashList *DBInboundEventQueue; +extern struct ev_loop *event_db; +extern ev_async DBAddJob; +extern ev_async DBExitEventLoop; + +eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = CB; + ev_cleanup_init(&IO->db_abort_by_shutdown, + IO_abort_shutdown_callback); + IO->db_abort_by_shutdown.data = IO; + ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); + + pthread_mutex_lock(&DBEventQueueMutex); + EVM_syslog(LOG_DEBUG, "DBEVENT Q\n"); + i = ++evdb_count ; + Put(DBInboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&DBEventQueueMutex); + + ev_async_send (event_db, &DBAddJob); + EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n"); + return eDBQuery; } +void ShutDownDBCLient(AsyncIO *IO) +{ + CitContext *Ctx =IO->CitContext; + become_session(Ctx); + + EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); + ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); + + assert(IO->Terminate); + IO->Terminate(IO); -int ShutDownEventQueue(void) + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; +} + +void +DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) { - citthread_mutex_lock(&EventQueueMutex); - if (event_add_pipe[1] == -1) { - citthread_mutex_unlock(&EventQueueMutex); + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + become_session(IO->CitContext); + + ev_idle_stop(event_db, &IO->db_unwind_stack); - return -1; + assert(IO->NextDBOperation); + switch (IO->NextDBOperation(IO)) + { + case eDBQuery: + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + break; + case eTerminateConnection: + case eAbort: + ev_idle_stop(event_db, &IO->db_unwind_stack); + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + ShutDownDBCLient(IO); } - write(event_add_pipe[1], "x_", 1); - close(event_add_pipe[1]); - event_add_pipe[1] = -1; - citthread_mutex_unlock(&EventQueueMutex); - return 0; } -void FreeAsyncIOContents(AsyncIO *IO) +eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) { - FreeStrBuf(&IO->IOBuf); - FreeStrBuf(&IO->SendBuf.Buf); - FreeStrBuf(&IO->RecvBuf.Buf); + IO->NextDBOperation = CB; + ev_idle_init(&IO->db_unwind_stack, + DB_PerformNext); + IO->db_unwind_stack.data = IO; + ev_idle_start(event_db, &IO->db_unwind_stack); + return eDBQuery; +} + +/*-------------------------------------------------------------------------------- + * Client IO + */ +extern int evbase_count; +extern pthread_mutex_t EventQueueMutex; +extern HashList *InboundEventQueue; +extern struct ev_loop *event_base; +extern ev_async AddJob; +extern ev_async ExitEventLoop; + +eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) +{ + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = CB; + ev_cleanup_init(&IO->abort_by_shutdown, + IO_abort_shutdown_callback); + IO->abort_by_shutdown.data = IO; + ev_cleanup_start(event_base, &IO->abort_by_shutdown); + + pthread_mutex_lock(&EventQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; + Put(InboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&EventQueueMutex); + + ev_async_send (event_base, &AddJob); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; } -/* -static void -setup_signal_handlers(struct instance *instance) +extern eNextState evcurl_handle_start(AsyncIO *IO); + +eNextState QueueCurlContext(AsyncIO *IO) { - signal(SIGPIPE, SIG_IGN); + IOAddHandler *h; + int i; + + h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); + h->IO = IO; + h->EvAttch = evcurl_handle_start; + + pthread_mutex_lock(&EventQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Q\n"); + i = ++evbase_count; + Put(InboundEventQueue, IKEY(i), h, NULL); + pthread_mutex_unlock(&EventQueueMutex); + + ev_async_send (event_base, &AddJob); + EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n"); + return eSendReply; +} - event_set(&instance->sigterm_event, SIGTERM, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigterm_event, NULL); +void FreeAsyncIOContents(AsyncIO *IO) +{ + FreeStrBuf(&IO->IOBuf); + FreeStrBuf(&IO->SendBuf.Buf); + FreeStrBuf(&IO->RecvBuf.Buf); +} - event_set(&instance->sigint_event, SIGINT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigint_event, NULL); - event_set(&instance->sigquit_event, SIGQUIT, EV_SIGNAL|EV_PERSIST, - exit_event_callback, instance); - event_add(&instance->sigquit_event, NULL); +void StopClientWatchers(AsyncIO *IO) +{ + ev_timer_stop(event_base, &IO->conn_fail); + ev_io_stop(event_base, &IO->conn_event); + ev_idle_stop(event_base, &IO->unwind_stack); + + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->rw_timeout); + close(IO->SendBuf.fd); + IO->SendBuf.fd = 0; + IO->RecvBuf.fd = 0; } -*/ void ShutDownCLient(AsyncIO *IO) { - event_del(&IO->send_event); - event_del(&IO->recv_event); - IO->Terminate(IO->Data); + CitContext *Ctx =IO->CitContext; + become_session(Ctx); -// citthread_mutex_lock(&EventQueueMutex); + EVM_syslog(LOG_DEBUG, "EVENT Terminating \n"); -///QueueEvents /// todo remove from hash. + ev_cleanup_stop(event_base, &IO->abort_by_shutdown); + StopClientWatchers(IO); -// citthread_mutex_unlock(&EventQueueMutex); + if (IO->DNS.Channel != NULL) { + ares_destroy(IO->DNS.Channel); + ev_io_stop(event_base, &IO->DNS.recv_event); + ev_io_stop(event_base, &IO->DNS.send_event); + IO->DNS.Channel = NULL; + } + assert(IO->Terminate); + IO->Terminate(IO); + Ctx->state = CON_IDLE; + Ctx->kill_me = 1; } + eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; - - while ((Finished == eBufferNotEmpty) && (IO->NextState == eReadMessage)){ + + become_session(IO->CitContext); + + while ((Finished == eBufferNotEmpty) && + ((IO->NextState == eReadMessage)|| + (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| + (IO->NextState == eReadPayload))) + { if (IO->RecvBuf.nBlobBytesWanted != 0) { } else { /* Reading lines... */ //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) Finished = IO->LineReader(IO); else Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); switch (Finished) { case eMustReadMore: /// read new from socket... - return Finished; break; case eBufferNotEmpty: /* shouldn't happen... */ case eReadSuccess: /// done for now... @@ -179,140 +318,508 @@ eReadState HandleInbound(AsyncIO *IO) } if (Finished != eMustReadMore) { - event_del(&IO->recv_event); - IO->NextState = IO->ReadDone(IO->Data); + assert(IO->ReadDone); + ev_io_stop(event_base, &IO->recv_event); + IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); } } - - if ((IO->NextState == eSendReply) || - (IO->NextState == eSendMore)) - { - IO->NextState = IO->SendDone(IO->Data); - event_add(&IO->send_event, NULL); - - } - else if ((IO->NextState == eTerminateConnection) || - (IO->NextState == eAbort) ) + switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eSendReply: + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + ev_io_start(event_base, &IO->send_event); + break; + case eReadPayload: + case eReadMore: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eTerminateConnection: +//////TODOxxxx + break; + case eAbort: ShutDownCLient(IO); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eReadMessage: + break; + } return Finished; } static void -IO_send_callback(int fd, short event, void *ctx) +IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; - AsyncIO *IO = ctx; + AsyncIO *IO = watcher->data; + const char *errmsg = NULL; - (void)fd; - (void)event; - -/// assert(fd == IO->sock); - - rc = StrBuf_write_one_chunk_callback(fd, event, &IO->SendBuf); + become_session(IO->CitContext); +#ifdef BIGBAD_IODBG + { + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->SendBuf.Buf); + const char *pchh = IO->SendBuf.ReadWritePointer; + long nbytes; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", + ((CitContext*)(IO->CitContext))->ServiceName, + IO->SendBuf.fd); + + fd = fopen(fn, "a+"); + fprintf(fd, "Send: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + if (!rv) printf("failed to write debug to %s!\n", fn); + fprintf(fd, "]\n"); +#endif + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + } +#ifdef BIGBAD_IODBG + fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); + fclose(fd); + } +#endif if (rc == 0) { - event_del(&IO->send_event); - switch (IO->NextState) { - case eSendReply: - break; - case eSendMore: - IO->NextState = IO->SendDone(IO->Data); - - if ((IO->NextState == eTerminateConnection) || - (IO->NextState == eAbort) ) - ShutDownCLient(IO); - else - event_add(&IO->send_event, NULL); - break; - case eReadMessage: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { - HandleInbound(IO); - } - else { - event_add(&IO->recv_event, NULL); - } - - break; - case eAbort: - break; - } + ev_io_stop(event_base, &IO->send_event); + switch (IO->NextState) { + case eSendMore: + assert(IO->SendDone); + IO->NextState = IO->SendDone(IO); + + if ((IO->NextState == eTerminateConnection) || + (IO->NextState == eAbort) ) + ShutDownCLient(IO); + else { + ev_io_start(event_base, &IO->send_event); + } + break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } + } + break; + case eSendReply: + if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) + break; + IO->NextState = eReadMore; + case eReadMore: + case eReadMessage: + case eReadPayload: + case eReadFile: + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + HandleInbound(IO); + } + else { + ev_io_start(event_base, &IO->recv_event); + } + + break; + case eDBQuery: + /* we now live in another queue, so we have to unregister. */ + ev_cleanup_stop(loop, &IO->abort_by_shutdown); + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eTerminateConnection: + case eAbort: + break; + } + } + else if (rc < 0) { + assert(IO->Timeout); + IO->Timeout(IO); + } + /* else : must write more. */ +} +static void +set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) +{ + + switch(IO->NextState) { + case eReadMore: + case eReadMessage: + case eReadFile: + ev_io_start(event_base, &IO->recv_event); + break; + case eSendReply: + case eSendMore: + case eReadPayload: + case eSendFile: + become_session(IO->CitContext); + IO_send_callback(loop, &IO->send_event, revents); + break; + case eDBQuery: + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eTerminateConnection: + case eAbort: + /// TODO: WHUT? + break; + } +} + +static void +IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_timer_stop (event_base, &IO->rw_timeout); + become_session(IO->CitContext); + + if (IO->SendBuf.fd != 0) + { + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->rw_timeout); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + + assert(IO->Timeout); + switch (IO->Timeout(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; } - else if (rc > 0) - return; -// else - ///abort! } +static void +IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_timer_stop (event_base, &IO->conn_fail); + + if (IO->SendBuf.fd != 0) + { + ev_io_stop(loop, &IO->conn_event); + ev_io_stop(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); + ev_timer_stop (event_base, &IO->rw_timeout); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + become_session(IO->CitContext); + + assert(IO->ConnFail); + switch (IO->ConnFail(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + + } +} static void -IO_recv_callback(int fd, short event, void *ctx) +IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revents) { - ssize_t nbytes; - AsyncIO *IO = ctx; + AsyncIO *IO = watcher->data; -// assert(fd == IO->sock); - -// assert(fd == sb->fd); + ev_idle_stop (event_base, &IO->conn_fail_immediate); + + if (IO->SendBuf.fd != 0) + { + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = 0; + } + become_session(IO->CitContext); + + assert(IO->ConnFail); + switch (IO->ConnFail(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + + } +} + +static void +IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + + ev_io_stop(loop, &IO->conn_event); + ev_timer_stop (event_base, &IO->conn_fail); + set_start_callback(loop, IO, revents); +} +static void +IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) +{ + const char *errmsg; + ssize_t nbytes; + AsyncIO *IO = watcher->data; + + switch (IO->NextState) { + case eReadFile: + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + if (nbytes < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + } + else + return; + } + break; + default: + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + break; + } - nbytes = StrBuf_read_one_chunk_callback(fd, event, &IO->RecvBuf); +#ifdef BIGBAD_IODBG + { + int rv = 0; + char fn [SIZ]; + FILE *fd; + const char *pch = ChrPtr(IO->RecvBuf.Buf); + const char *pchh = IO->RecvBuf.ReadWritePointer; + long nbytes; + + if (pchh == NULL) + pchh = pch; + + nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch); + snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", + ((CitContext*)(IO->CitContext))->ServiceName, + IO->SendBuf.fd); + + fd = fopen(fn, "a+"); + fprintf(fd, "Read: BufSize: %ld BufContent: [", + nbytes); + rv = fwrite(pchh, nbytes, 1, fd); + if (!rv) printf("failed to write debug to %s!\n", fn); + fprintf(fd, "]\n"); + + + fclose(fd); + } +#endif if (nbytes > 0) { HandleInbound(IO); } else if (nbytes == 0) { - /// TODO: this is a timeout??? sock_buff_invoke_free(sb, 0); + assert(IO->Timeout); + + switch (IO->Timeout(IO)) + { + case eAbort: + ShutDownCLient(IO); + default: + break; + } return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); + EV_syslog(LOG_DEBUG, + "EVENT: Socket Invalid! %s \n", + strerror(errno)); + ShutDownCLient(IO); return; } } -void IOReadNextLine(AsyncIO *IO, int timeout) +void +IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) +{ + AsyncIO *IO = watcher->data; + EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); + become_session(IO->CitContext); + assert(IO->DNS.Fail); + assert(IO->DNS.Query->PostDNS); + switch (IO->DNS.Query->PostDNS(IO)) + { + case eAbort: + switch (IO->DNS.Fail(IO)) { + case eAbort: + ShutDownCLient(IO); + default: + break; + + } + default: + break; + } +} + + +eNextState EvConnectSock(AsyncIO *IO, + void *pData, + double conn_timeout, + double first_rw_timeout, + int ReadFirst) { + int fdflags; + int rc = -1; + + IO->Data = pData; + become_session(IO->CitContext); + + if (ReadFirst) { + IO->NextState = eReadMessage; + } + else { + IO->NextState = eSendReply; + } + + IO->SendBuf.fd = IO->RecvBuf.fd = + socket( + (IO->ConnectMe->IPv6)?PF_INET6:PF_INET, + SOCK_STREAM, + IPPROTO_TCP); + + if (IO->SendBuf.fd < 0) { + EV_syslog(LOG_ERR, "EVENT: socket() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to create socket: %s", strerror(errno)); + return eAbort; + } + fdflags = fcntl(IO->SendBuf.fd, F_GETFL); + if (fdflags < 0) { + EV_syslog(LOG_DEBUG, + "EVENT: unable to get socket flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to get socket flags: %s", strerror(errno)); + return eAbort; + } + fdflags = fdflags | O_NONBLOCK; + if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) { + EV_syslog(LOG_DEBUG, + "EVENT: unable to set socket nonblocking flags! %s \n", + strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to set socket flags: %s", strerror(errno)); + close(IO->SendBuf.fd); + IO->SendBuf.fd = IO->RecvBuf.fd = -1; + return eAbort; + } +/* TODO: maye we could use offsetof() to calc the position of data... + * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher + */ + ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ); + IO->recv_event.data = IO; + ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE); + IO->send_event.data = IO; + + ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0); + IO->conn_fail.data = IO; + ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout, 0); + IO->rw_timeout.data = IO; + + + /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ +/// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1"); + if (IO->ConnectMe->IPv6) + rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr, sizeof(struct sockaddr_in6)); + else + rc = connect(IO->SendBuf.fd, (struct sockaddr_in *)&IO->ConnectMe->Addr, sizeof(struct sockaddr_in)); + + if (rc >= 0){ + EVM_syslog(LOG_DEBUG, "connect() immediate success.\n"); + set_start_callback(event_base, IO, 0); + ev_timer_start(event_base, &IO->rw_timeout); + return IO->NextState; + } + else if (errno == EINPROGRESS) { + EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n"); + + ev_io_init(&IO->conn_event, IO_connestd_callback, IO->SendBuf.fd, EV_READ|EV_WRITE); + IO->conn_event.data = IO; + ev_io_start(event_base, &IO->conn_event); + ev_timer_start(event_base, &IO->conn_fail); + return IO->NextState; + } + else { + ev_idle_init(&IO->conn_fail_immediate, + IO_connfailimmediate_callback); + IO->conn_fail_immediate.data = IO; + ev_idle_start(event_base, &IO->conn_fail_immediate); + + EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno)); + StrBufPrintf(IO->ErrMsg, "Failed to connect: %s", strerror(errno)); + return IO->NextState; + } + return IO->NextState; } -void IOReadNextBLOB(AsyncIO *IO, int timeout, long size) +void SetNextTimeout(AsyncIO *IO, double timeout) { + IO->rw_timeout.repeat = timeout; + ev_timer_again (event_base, &IO->rw_timeout); } -void InitEventIO(AsyncIO *IO, - void *pData, - IO_CallBack ReadDone, - IO_CallBack SendDone, - IO_CallBack Terminate, - IO_LineReaderCallback LineReader, - int ReadFirst) + +eNextState ReAttachIO(AsyncIO *IO, + void *pData, + int ReadFirst) { IO->Data = pData; - IO->SendDone = SendDone; - IO->ReadDone = ReadDone; - IO->Terminate = Terminate; - IO->LineReader = LineReader; - - event_set(&IO->recv_event, - IO->sock, - EV_READ|EV_PERSIST, - IO_recv_callback, - IO); - - event_set(&IO->send_event, - IO->sock, - EV_WRITE|EV_PERSIST, - IO_send_callback, - IO); - + become_session(IO->CitContext); + ev_cleanup_start(event_base, &IO->abort_by_shutdown); if (ReadFirst) { IO->NextState = eReadMessage; - event_add(&IO->recv_event, NULL); } else { IO->NextState = eSendReply; } -} - + set_start_callback(event_base, IO, 0); -#endif + return IO->NextState; +}