X-Git-Url: https://code.citadel.org/?p=citadel.git;a=blobdiff_plain;f=citadel%2Fevent_client.c;h=833a270a75f316b970133c962dd0ce1116aa5591;hp=cea9a24f748e47515b3ff61ff63e6b93c48c3e03;hb=66f72c07b70ed9500c49e8ff3c3f895e5269d339;hpb=25743b6534e474b532dfe17167ce925bd94e163c diff --git a/citadel/event_client.c b/citadel/event_client.c index cea9a24f7..833a270a7 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -1,6 +1,6 @@ /* * - * Copyright (c) 1998-2009 by the citadel.org team + * Copyright (c) 1998-2012 by the citadel.org team * * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -67,7 +67,9 @@ #include "event_client.h" -static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher, int revents) +static void IO_abort_shutdown_callback(struct ev_loop *loop, + ev_cleanup *watcher, + int revents) { AsyncIO *IO = watcher->data; EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__); @@ -77,14 +79,14 @@ static void IO_abort_shutdown_callback(struct ev_loop *loop, ev_cleanup *watcher } -/*-------------------------------------------------------------------------------- - * Server DB IO - */ +/*------------------------------------------------------------------------------ + * Server DB IO + *----------------------------------------------------------------------------*/ extern int evdb_count; extern pthread_mutex_t DBEventQueueMutex; extern HashList *DBInboundEventQueue; extern struct ev_loop *event_db; -extern ev_async DBAddJob; +extern ev_async DBAddJob; extern ev_async DBExitEventLoop; eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) @@ -95,7 +97,7 @@ eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB) h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; - ev_cleanup_init(&IO->db_abort_by_shutdown, + ev_cleanup_init(&IO->db_abort_by_shutdown, IO_abort_shutdown_callback); IO->db_abort_by_shutdown.data = IO; ev_cleanup_start(event_db, &IO->db_abort_by_shutdown); @@ -120,7 +122,7 @@ void ShutDownDBCLient(AsyncIO *IO) ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); assert(IO->Terminate); - IO->Terminate(IO); + IO->Terminate(IO); } void @@ -129,7 +131,7 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) AsyncIO *IO = watcher->data; EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); - + ev_idle_stop(event_db, &IO->db_unwind_stack); assert(IO->NextDBOperation); @@ -140,10 +142,10 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eSendDNSQuery: case eReadDNSReply: case eConnect: - case eSendReply: + case eSendReply: case eSendMore: case eSendFile: - case eReadMessage: + case eReadMessage: case eReadMore: case eReadPayload: case eReadFile: @@ -167,14 +169,14 @@ eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB) return eDBQuery; } -/*-------------------------------------------------------------------------------- - * Client IO - */ +/*------------------------------------------------------------------------------ + * Client IO + *----------------------------------------------------------------------------*/ extern int evbase_count; extern pthread_mutex_t EventQueueMutex; extern HashList *InboundEventQueue; extern struct ev_loop *event_base; -extern ev_async AddJob; +extern ev_async AddJob; extern ev_async ExitEventLoop; @@ -186,7 +188,7 @@ eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB) h = (IOAddHandler*)malloc(sizeof(IOAddHandler)); h->IO = IO; h->EvAttch = CB; - ev_cleanup_init(&IO->abort_by_shutdown, + ev_cleanup_init(&IO->abort_by_shutdown, IO_abort_shutdown_callback); IO->abort_by_shutdown.data = IO; ev_cleanup_start(event_base, &IO->abort_by_shutdown); @@ -282,47 +284,46 @@ eReadState HandleInbound(AsyncIO *IO) { const char *Err = NULL; eReadState Finished = eBufferNotEmpty; - + become_session(IO->CitContext); - while ((Finished == eBufferNotEmpty) && + while ((Finished == eBufferNotEmpty) && ((IO->NextState == eReadMessage)|| (IO->NextState == eReadMore)|| (IO->NextState == eReadFile)|| (IO->NextState == eReadPayload))) { - if (IO->RecvBuf.nBlobBytesWanted != 0) { - - } - else { /* Reading lines... */ -//// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if ((IO->NextState == eReadFile) && - (Finished == eBufferNotEmpty)) + /* Reading lines... + * lex line reply in callback, + * or do it ourselves. + * i.e. as nnn-blabla means continue reading in SMTP + */ + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) { - Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); - if (Finished == eReadSuccess) - { - IO->NextState = eSendReply; - } - } - else if (IO->LineReader) - Finished = IO->LineReader(IO); - else - Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); - - switch (Finished) { - case eMustReadMore: /// read new from socket... - break; - case eBufferNotEmpty: /* shouldn't happen... */ - case eReadSuccess: /// done for now... - break; - case eReadFail: /// WHUT? - ///todo: shut down! - break; + IO->NextState = eSendReply; } - } - + else if (IO->LineReader) + Finished = IO->LineReader(IO); + else + Finished = StrBufChunkSipLine(IO->IOBuf, + &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + if (Finished != eMustReadMore) { assert(IO->ReadDone); ev_io_stop(event_base, &IO->recv_event); @@ -379,15 +380,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) const char *pch = ChrPtr(IO->SendBuf.Buf); const char *pchh = IO->SendBuf.ReadWritePointer; long nbytes; - + if (pchh == NULL) pchh = pch; - + nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch); snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", ((CitContext*)(IO->CitContext))->ServiceName, IO->SendBuf.fd); - + fd = fopen(fn, "a+"); fprintf(fd, "Send: BufSize: %ld BufContent: [", nbytes); @@ -402,7 +403,9 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) StrBufPlain(IO->ErrMsg, errmsg, -1); break; default: - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + rc = StrBuf_write_one_chunk_callback(watcher->fd, + 0/*TODO*/, + &IO->SendBuf); } #ifdef BIGBAD_IODBG @@ -437,12 +440,13 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eDBQuery: case eConnect: break; - case eSendReply: + case eSendReply: case eSendMore: case eSendFile: - ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, + &IO->send_event); break; - case eReadMessage: + case eReadMessage: case eReadMore: case eReadPayload: case eReadFile: @@ -454,14 +458,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) } break; case eSendReply: - if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) + if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; IO->NextState = eReadMore; case eReadMore: case eReadMessage: case eReadPayload: case eReadFile: - if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { + if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) + { HandleInbound(IO); } else { @@ -470,7 +475,10 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) break; case eDBQuery: - /* we now live in another queue, so we have to unregister. */ + /* + * we now live in another queue, + * so we have to unregister. + */ ev_cleanup_stop(loop, &IO->abort_by_shutdown); break; case eSendDNSQuery: @@ -490,7 +498,6 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) { - switch(IO->NextState) { case eReadMore: case eReadMessage: @@ -533,7 +540,7 @@ IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents) } assert(IO->Timeout); - switch (IO->Timeout(IO)) + switch (IO->Timeout(IO)) { case eAbort: ShutDownCLient(IO); @@ -561,7 +568,7 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) become_session(IO->CitContext); assert(IO->ConnFail); - switch (IO->ConnFail(IO)) + switch (IO->ConnFail(IO)) { case eAbort: ShutDownCLient(IO); @@ -572,7 +579,9 @@ IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents) } static void -IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revents) +IO_connfailimmediate_callback(struct ev_loop *loop, + ev_idle *watcher, + int revents) { AsyncIO *IO = watcher->data; @@ -586,7 +595,7 @@ IO_connfailimmediate_callback(struct ev_loop *loop, ev_idle *watcher, int revent become_session(IO->CitContext); assert(IO->ConnFail); - switch (IO->ConnFail(IO)) + switch (IO->ConnFail(IO)) { case eAbort: ShutDownCLient(IO); @@ -617,7 +626,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) nbytes = FileRecvChunked(&IO->IOB, &errmsg); if (nbytes < 0) StrBufPlain(IO->ErrMsg, errmsg, -1); - else + else { if (IO->IOB.ChunkSendRemain == 0) { @@ -628,35 +637,35 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) } break; default: - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, + 0 /*TODO */, + &IO->RecvBuf); break; } #ifdef BIGBAD_IODBG { + long nbytes; int rv = 0; char fn [SIZ]; FILE *fd; const char *pch = ChrPtr(IO->RecvBuf.Buf); const char *pchh = IO->RecvBuf.ReadWritePointer; - long nbytes; - + if (pchh == NULL) pchh = pch; - + nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch); snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d", - ((CitContext*)(IO->CitContext))->ServiceName, + ((CitContext*)(IO->CitContext))->ServiceName, IO->SendBuf.fd); - + fd = fopen(fn, "a+"); fprintf(fd, "Read: BufSize: %ld BufContent: [", nbytes); rv = fwrite(pchh, nbytes, 1, fd); if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); - - fclose(fd); } #endif @@ -675,7 +684,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) return; } else if (nbytes == -1) { /// TODO: FD is gone. kick it. sock_buff_invoke_free(sb, errno); - EV_syslog(LOG_DEBUG, + EV_syslog(LOG_DEBUG, "EVENT: Socket Invalid! %s \n", strerror(errno)); ShutDownCLient(IO); @@ -776,8 +785,11 @@ eNextState EvConnectSock(AsyncIO *IO, IO->rw_timeout.data = IO; - /* Bypass it like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ -/// ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = inet_addr("127.0.0.1"); + /* for debugging you may bypass it like this: + * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr = + * inet_addr("127.0.0.1"); + */ if (IO->ConnectMe->IPv6) rc = connect(IO->SendBuf.fd, &IO->ConnectMe->Addr,