X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=2fc22de8c600eecf3bafbe315c8f40b5a549e6e8;hb=7fece55050c597542f70c6e84db5fef4b5213cf0;hp=f1abcbef7ff9396550809c03be7b3af22edec87e;hpb=36111d9ace0953d536650249f7339a52b0bed77d;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index f1abcbef7..2fc22de8c 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eConnect: case eSendReply: case eSendMore: + case eSendFile: case eReadMessage: case eReadMore: case eReadPayload: + case eReadFile: ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); break; case eTerminateConnection: @@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO) eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; become_session(IO->CitContext); @@ -267,6 +270,7 @@ eReadState HandleInbound(AsyncIO *IO) while ((Finished == eBufferNotEmpty) && ((IO->NextState == eReadMessage)|| (IO->NextState == eReadMore)|| + (IO->NextState == eReadFile)|| (IO->NextState == eReadPayload))) { if (IO->RecvBuf.nBlobBytesWanted != 0) { @@ -274,7 +278,16 @@ eReadState HandleInbound(AsyncIO *IO) } else { /* Reading lines... */ //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP - if (IO->LineReader) + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } + else if (IO->LineReader) Finished = IO->LineReader(IO); else Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); @@ -301,6 +314,9 @@ eReadState HandleInbound(AsyncIO *IO) } switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; case eSendReply: case eSendMore: assert(IO->SendDone); @@ -309,9 +325,12 @@ eReadState HandleInbound(AsyncIO *IO) break; case eReadPayload: case eReadMore: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: +//////TODOxxxx + break; case eAbort: ShutDownCLient(IO); break; @@ -331,6 +350,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; + const char *errmsg = NULL; become_session(IO->CitContext); #ifdef BIGBAD_IODBG @@ -357,7 +377,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); #endif - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + } #ifdef BIGBAD_IODBG fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); @@ -379,6 +407,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) ev_io_start(event_base, &IO->send_event); } break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } + } + break; case eSendReply: if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; @@ -386,6 +442,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eReadMore: case eReadMessage: case eReadPayload: + case eReadFile: if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { HandleInbound(IO); } @@ -419,11 +476,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) switch(IO->NextState) { case eReadMore: case eReadMessage: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eSendReply: case eSendMore: case eReadPayload: + case eSendFile: become_session(IO->CitContext); IO_send_callback(loop, &IO->send_event, revents); break; @@ -531,10 +590,30 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { + const char *errmsg; ssize_t nbytes; AsyncIO *IO = watcher->data; - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + switch (IO->NextState) { + case eReadFile: + nbytes = FileRecvChunked(&IO->IOB, &errmsg); + if (nbytes < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + else + { + if (IO->IOB.ChunkSendRemain == 0) + { + IO->NextState = eSendReply; + } + else + return; + } + break; + default: + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + break; + } + #ifdef BIGBAD_IODBG { int rv = 0; @@ -581,6 +660,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) syslog(LOG_DEBUG, "EVENT: Socket Invalid! %s \n", strerror(errno)); + ShutDownCLient(IO); return; } } @@ -592,6 +672,7 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents) syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__); become_session(IO->CitContext); assert(IO->DNSFail); + assert(IO->DNSQuery->PostDNS); switch (IO->DNSQuery->PostDNS(IO)) { case eAbort: