X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fevent_client.c;h=9a7566f7d4d84f777eac6c1a2e8085b631467695;hb=c06372f95e9313f29a331c7d76ef812e480f3c84;hp=f1abcbef7ff9396550809c03be7b3af22edec87e;hpb=aa1bc013ff54eb118855365945aae4218d099003;p=citadel.git diff --git a/citadel/event_client.c b/citadel/event_client.c index f1abcbef7..9a7566f7d 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eConnect: case eSendReply: case eSendMore: + case eSendFile: case eReadMessage: case eReadMore: case eReadPayload: + case eReadFile: ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); break; case eTerminateConnection: @@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO) eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; become_session(IO->CitContext); @@ -297,10 +300,22 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } } } switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; case eSendReply: case eSendMore: assert(IO->SendDone); @@ -309,9 +324,12 @@ eReadState HandleInbound(AsyncIO *IO) break; case eReadPayload: case eReadMore: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: +//////TODOxxxx + break; case eAbort: ShutDownCLient(IO); break; @@ -331,6 +349,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; + const char *errmsg = NULL; become_session(IO->CitContext); #ifdef BIGBAD_IODBG @@ -357,7 +376,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); #endif - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + } #ifdef BIGBAD_IODBG fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); @@ -379,6 +406,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) ev_io_start(event_base, &IO->send_event); } break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } + } + break; case eSendReply: if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; @@ -386,6 +441,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eReadMore: case eReadMessage: case eReadPayload: + case eReadFile: if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { HandleInbound(IO); } @@ -419,11 +475,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) switch(IO->NextState) { case eReadMore: case eReadMessage: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eSendReply: case eSendMore: case eReadPayload: + case eSendFile: become_session(IO->CitContext); IO_send_callback(loop, &IO->send_event, revents); break; @@ -531,10 +589,21 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { + const char *errmsg; + long rc; ssize_t nbytes; AsyncIO *IO = watcher->data; - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + switch (IO->NextState) { + case eReadFile: + rc = FileRecvChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + break; + } #ifdef BIGBAD_IODBG { int rv = 0;