From c06372f95e9313f29a331c7d76ef812e480f3c84 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Sat, 8 Oct 2011 23:59:49 +0200 Subject: [PATCH] Implement asynchroneous networking. --- citadel/event_client.c | 73 +++++++++- citadel/event_client.h | 8 ++ citadel/modules/network/serv_networkclient.c | 134 +++++++++++-------- 3 files changed, 160 insertions(+), 55 deletions(-) diff --git a/citadel/event_client.c b/citadel/event_client.c index f1abcbef7..9a7566f7d 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) case eConnect: case eSendReply: case eSendMore: + case eSendFile: case eReadMessage: case eReadMore: case eReadPayload: + case eReadFile: ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); break; case eTerminateConnection: @@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO) eReadState HandleInbound(AsyncIO *IO) { + const char *Err = NULL; eReadState Finished = eBufferNotEmpty; become_session(IO->CitContext); @@ -297,10 +300,22 @@ eReadState HandleInbound(AsyncIO *IO) ev_io_stop(event_base, &IO->recv_event); IO->NextState = IO->ReadDone(IO); Finished = StrBufCheckBuffer(&IO->RecvBuf); + if ((IO->NextState == eReadFile) && + (Finished == eBufferNotEmpty)) + { + Finished = WriteIOBAlreadyRead(&IO->IOB, &Err); + if (Finished == eReadSuccess) + { + IO->NextState = eSendReply; + } + } } } switch (IO->NextState) { + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; case eSendReply: case eSendMore: assert(IO->SendDone); @@ -309,9 +324,12 @@ eReadState HandleInbound(AsyncIO *IO) break; case eReadPayload: case eReadMore: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eTerminateConnection: +//////TODOxxxx + break; case eAbort: ShutDownCLient(IO); break; @@ -331,6 +349,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) { int rc; AsyncIO *IO = watcher->data; + const char *errmsg = NULL; become_session(IO->CitContext); #ifdef BIGBAD_IODBG @@ -357,7 +376,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) if (!rv) printf("failed to write debug to %s!\n", fn); fprintf(fd, "]\n"); #endif - rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + switch (IO->NextState) { + case eSendFile: + rc = FileSendChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf); + } #ifdef BIGBAD_IODBG fprintf(fd, "Sent: BufSize: %d bytes.\n", rc); @@ -379,6 +406,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) ev_io_start(event_base, &IO->send_event); } break; + case eSendFile: + if (IO->IOB.ChunkSendRemain > 0) { + ev_io_start(event_base, &IO->recv_event); + } else { + assert(IO->ReadDone); + IO->NextState = IO->ReadDone(IO); + switch(IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + break; + case eSendReply: + case eSendMore: + case eSendFile: + ev_io_start(event_base, &IO->send_event); + break; + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + break; + } + } + break; case eSendReply: if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) break; @@ -386,6 +441,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents) case eReadMore: case eReadMessage: case eReadPayload: + case eReadFile: if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) { HandleInbound(IO); } @@ -419,11 +475,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents) switch(IO->NextState) { case eReadMore: case eReadMessage: + case eReadFile: ev_io_start(event_base, &IO->recv_event); break; case eSendReply: case eSendMore: case eReadPayload: + case eSendFile: become_session(IO->CitContext); IO_send_callback(loop, &IO->send_event, revents); break; @@ -531,10 +589,21 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents) static void IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents) { + const char *errmsg; + long rc; ssize_t nbytes; AsyncIO *IO = watcher->data; - nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + switch (IO->NextState) { + case eReadFile: + rc = FileRecvChunked(&IO->IOB, &errmsg); + if (rc < 0) + StrBufPlain(IO->ErrMsg, errmsg, -1); + break; + default: + nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf); + break; + } #ifdef BIGBAD_IODBG { int rv = 0; diff --git a/citadel/event_client.h b/citadel/event_client.h index b2c368db2..1e8ba4fb5 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -14,13 +14,19 @@ typedef struct AsyncIO AsyncIO; typedef enum _eNextState { eSendDNSQuery, eReadDNSReply, + eDBQuery, + eConnect, eSendReply, eSendMore, + eSendFile, + eReadMessage, eReadMore, eReadPayload, + eReadFile, + eTerminateConnection, eAbort }eNextState; @@ -82,6 +88,8 @@ struct AsyncIO { IOBuffer SendBuf, RecvBuf; + FDIOBuffer IOB; /* when sending from / reading into files, this is used. */ + /* our events... */ ev_cleanup abort_by_shutdown, /* server wants to go down... */ db_abort_by_shutdown; /* server wants to go down... */ diff --git a/citadel/modules/network/serv_networkclient.c b/citadel/modules/network/serv_networkclient.c index 7515c7926..455e47f02 100644 --- a/citadel/modules/network/serv_networkclient.c +++ b/citadel/modules/network/serv_networkclient.c @@ -107,10 +107,7 @@ typedef struct _async_networker { AsyncIO IO; DNSQueryParts HostLookup; eNWCState State; - int fd; long n; - FILE *TmpFile; - long bytes_received; StrBuf *SpoolFileName; StrBuf *tempFileName; StrBuf *node; @@ -118,11 +115,6 @@ typedef struct _async_networker { StrBuf *port; StrBuf *secret; StrBuf *Url; - - long download_len; - int BlobReadSize; - long bytes_written; - long bytes_to_write; } AsyncNetworker; typedef eNextState(*NWClientHandler)(AsyncNetworker* NW); @@ -131,7 +123,7 @@ eNextState nwc_get_one_host_ip(AsyncIO *IO); eNextState nwc_connect_ip(AsyncIO *IO); eNextState NWC_SendQUIT(AsyncNetworker *NW); - +eNextState NWC_DispatchWriteDone(AsyncIO *IO); void DestroyNetworker(AsyncNetworker *NW) @@ -215,17 +207,23 @@ eNextState NWC_SendNDOP(AsyncNetworker *NW) eNextState NWC_ReadNDOPReply(AsyncNetworker *NW) { + int TotalSendSize; NWC_DBG_READ(); if (ChrPtr(NW->IO.IOBuf)[0] == '2') { - NW->download_len = atol (ChrPtr(NW->IO.IOBuf) + 4); - syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->download_len); - if (NW->download_len <= 0) { + + NW->IO.IOB.TotalSentAlready = 0; + TotalSendSize = atol (ChrPtr(NW->IO.IOBuf) + 4); + syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->IO.IOB.TotalSendSize); + if (TotalSendSize <= 0) { NW->State = eNUOP - 1; } else { - NW->TmpFile = fopen(ChrPtr(NW->SpoolFileName), "w"); - if (NW->TmpFile == NULL) + int fd; + fd = open(ChrPtr(NW->SpoolFileName), + O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY, + S_IRUSR|S_IWUSR); + if (fd < 0) { syslog(LOG_CRIT, "cannot open %s: %s\n", @@ -233,7 +231,9 @@ eNextState NWC_ReadNDOPReply(AsyncNetworker *NW) strerror(errno)); NW->State = eQUIT - 1; + return eAbort; } + FDIOBufferInit(&NW->IO.IOB, &NW->IO.RecvBuf, fd, TotalSendSize); } return eSendReply; } @@ -245,7 +245,7 @@ eNextState NWC_ReadNDOPReply(AsyncNetworker *NW) eNextState NWC_SendREAD(AsyncNetworker *NW) { - if (NW->bytes_received < NW->download_len) + if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize) { /* * If shutting down we can exit here and unlink the temp file. @@ -253,15 +253,15 @@ eNextState NWC_SendREAD(AsyncNetworker *NW) */ if (server_shutting_down) { - fclose(NW->TmpFile); + close(NW->IO.IOB.OtherFD); ////// unlink(ChrPtr(NW->tempFileName)); return eAbort; } StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n", - NW->bytes_received, - ((NW->download_len - NW->bytes_received > IGNET_PACKET_SIZE) + NW->IO.IOB.TotalSentAlready, + ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE) ? IGNET_PACKET_SIZE : - (NW->download_len - NW->bytes_received)) + (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready)) ); return eSendReply; @@ -277,28 +277,37 @@ eNextState NWC_ReadREADState(AsyncNetworker *NW) NWC_DBG_READ(); if (ChrPtr(NW->IO.IOBuf)[0] == '6') { - NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4); - NW->bytes_received += NW->BlobReadSize; + NW->IO.IOB.ChunkSendRemain = + NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4); +/// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize; /// TODO StrBufReadjustIOBuffer(NW->IO.RecvBuf, NW->BlobReadSize); - return eReadPayload; + return eReadFile; } return eAbort; } +eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW); eNextState NWC_ReadREADBlob(AsyncNetworker *NW) { /// FlushIOBuffer(NW->IO.RecvBuf); /// TODO - fwrite(NW->IO.RecvBuf.ReadWritePointer, - 1, - NW->BlobReadSize, - NW->TmpFile); - if (NW->bytes_received < NW->download_len) + + ///NW->bytes_received += NW->IO.IOB.ChunkSize; + + if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize) { NW->State = eREAD - 1; return eSendReply;/* now fetch next chunk*/ } - else + else + return NWC_ReadREADBlobDone(NW); +} + +eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW) +{ + if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready) { - fclose(NW->TmpFile); + NW->State ++; + + close(NW->IO.IOB.OtherFD); if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) { syslog(LOG_ALERT, @@ -309,11 +318,13 @@ eNextState NWC_ReadREADBlob(AsyncNetworker *NW) } ///// unlink(ChrPtr(NW->tempFileName)); - return eSendReply; //// TODO: step forward. + return NWC_DispatchWriteDone(&NW->IO); + } + else { + NW->State --; + return NWC_DispatchWriteDone(&NW->IO); } } - - eNextState NWC_SendCLOS(AsyncNetworker *NW) { StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n")); @@ -332,14 +343,16 @@ eNextState NWC_ReadCLOSReply(AsyncNetworker *NW) eNextState NWC_SendNUOP(AsyncNetworker *NW) { + long TotalSendSize; struct stat statbuf; + int fd; StrBufPrintf(NW->tempFileName, "%s/%s", ctdl_netout_dir, ChrPtr(NW->node)); - NW->fd = open(ChrPtr(NW->tempFileName), O_RDONLY); - if (NW->fd < 0) { + fd = open(ChrPtr(NW->tempFileName), O_RDONLY); + if (fd < 0) { if (errno != ENOENT) { syslog(LOG_CRIT, "cannot open %s: %s\n", @@ -350,23 +363,23 @@ eNextState NWC_SendNUOP(AsyncNetworker *NW) return NWC_SendQUIT(NW); } - if (fstat(NW->fd, &statbuf) == -1) { + if (fstat(fd, &statbuf) == -1) { syslog(9, "FSTAT FAILED %s [%s]--\n", ChrPtr(NW->tempFileName), strerror(errno)); - if (NW->fd > 0) close(NW->fd); + if (fd > 0) close(fd); return eAbort; } - - NW->download_len = statbuf.st_size; - if (NW->download_len == 0) { + TotalSendSize = statbuf.st_size; + if (TotalSendSize == 0) { syslog(LOG_DEBUG, "Nothing to send.\n"); NW->State = eQUIT; return NWC_SendQUIT(NW); } + FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize); - NW->bytes_written = 0; +//// NW->bytes_written = 0; StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n")); return eSendReply; @@ -382,7 +395,8 @@ eNextState NWC_ReadNUOPReply(AsyncNetworker *NW) eNextState NWC_SendWRIT(AsyncNetworker *NW) { - StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", NW->bytes_to_write); + StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", + NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready); return eSendReply; } @@ -394,17 +408,29 @@ eNextState NWC_ReadWRITReply(AsyncNetworker *NW) return eAbort; } - NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4); - return eSendMore; + NW->IO.IOB.ChunkSendRemain = + NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4); +/// NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize; + return eSendFile; } -eNextState NWC_SendBlob(AsyncNetworker *NW) +eNextState NWC_SendBlobDone(AsyncNetworker *NW) { + eNextState rc; + if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready) + { + NW->State ++; - /// bytes_to_write -= thisblock; - /// bytes_written += thisblock; - - return eReadMessage; + close(NW->IO.IOB.OtherFD); +//// TODO: unlink networker file? + rc = NWC_DispatchWriteDone(&NW->IO); + NW->State --; + return rc; + } + else { + NW->State --; + return NWC_DispatchWriteDone(&NW->IO); + } } eNextState NWC_SendUCLS(AsyncNetworker *NW) @@ -417,7 +443,7 @@ eNextState NWC_ReadUCLS(AsyncNetworker *NW) { NWC_DBG_READ(); - syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->bytes_written, ChrPtr(NW->node)); + syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node)); /// syslog(LOG_DEBUG, "<%s\n", buf); if (ChrPtr(NW->IO.IOBuf)[0] == '2') { syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName)); @@ -451,7 +477,7 @@ NWClientHandler NWC_ReadHandlers[] = { NWC_ReadCLOSReply, NWC_ReadNUOPReply, NWC_ReadWRITReply, - NULL, + NWC_SendBlobDone, NWC_ReadUCLS, NWC_ReadQUIT }; @@ -482,11 +508,11 @@ NWClientHandler NWC_SendHandlers[] = { NWC_SendAuth, NWC_SendNDOP, NWC_SendREAD, - NULL, + NWC_ReadREADBlobDone, NWC_SendCLOS, NWC_SendNUOP, NWC_SendWRIT, - NWC_SendBlob, + NWC_SendBlobDone, NWC_SendUCLS, NWC_SendQUIT }; @@ -597,8 +623,10 @@ eReadState NWC_ReadServerStatus(AsyncIO *IO) case eReadMessage: Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); break; + case eReadFile: + case eSendFile: case eReadPayload: - Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize; +////TODO Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize; break; } return Finished; -- 2.30.2