Implement asynchroneous networking.
authorWilfried Goesgens <dothebart@citadel.org>
Sat, 8 Oct 2011 21:59:49 +0000 (23:59 +0200)
committerWilfried Goesgens <dothebart@citadel.org>
Sat, 8 Oct 2011 21:59:49 +0000 (23:59 +0200)
citadel/event_client.c
citadel/event_client.h
citadel/modules/network/serv_networkclient.c

index f1abcbef7ff9396550809c03be7b3af22edec87e..9a7566f7d4d84f777eac6c1a2e8085b631467695 100644 (file)
@@ -145,9 +145,11 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
        case eConnect:
        case eSendReply: 
        case eSendMore:
+       case eSendFile:
        case eReadMessage: 
        case eReadMore:
        case eReadPayload:
+       case eReadFile:
                ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
                break;
        case eTerminateConnection:
@@ -260,6 +262,7 @@ void ShutDownCLient(AsyncIO *IO)
 
 eReadState HandleInbound(AsyncIO *IO)
 {
+       const char *Err = NULL;
        eReadState Finished = eBufferNotEmpty;
        
        become_session(IO->CitContext);
@@ -297,10 +300,22 @@ eReadState HandleInbound(AsyncIO *IO)
                        ev_io_stop(event_base, &IO->recv_event);
                        IO->NextState = IO->ReadDone(IO);
                        Finished = StrBufCheckBuffer(&IO->RecvBuf);
+                       if ((IO->NextState == eReadFile) && 
+                           (Finished == eBufferNotEmpty))
+                       {
+                               Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+                               if (Finished == eReadSuccess)
+                               {
+                                       IO->NextState = eSendReply;                             
+                               }
+                       }
                }
        }
 
        switch (IO->NextState) {
+       case eSendFile:
+               ev_io_start(event_base, &IO->send_event);
+               break;
        case eSendReply:
        case eSendMore:
                assert(IO->SendDone);
@@ -309,9 +324,12 @@ eReadState HandleInbound(AsyncIO *IO)
                break;
        case eReadPayload:
        case eReadMore:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eTerminateConnection:
+//////TODOxxxx
+               break;
        case eAbort:
                ShutDownCLient(IO);
                break;
@@ -331,6 +349,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        int rc;
        AsyncIO *IO = watcher->data;
+       const char *errmsg = NULL;
 
        become_session(IO->CitContext);
 #ifdef BIGBAD_IODBG
@@ -357,7 +376,15 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                if (!rv) printf("failed to write debug to %s!\n", fn);
                fprintf(fd, "]\n");
 #endif
-               rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               switch (IO->NextState) {
+               case eSendFile:
+                       rc = FileSendChunked(&IO->IOB, &errmsg);
+                       if (rc < 0)
+                               StrBufPlain(IO->ErrMsg, errmsg, -1);
+                       break;
+               default:
+                       rc = StrBuf_write_one_chunk_callback(watcher->fd, 0/*TODO*/, &IO->SendBuf);
+               }
 
 #ifdef BIGBAD_IODBG
                fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
@@ -379,6 +406,34 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                                ev_io_start(event_base, &IO->send_event);
                        }
                        break;
+               case eSendFile:
+                       if (IO->IOB.ChunkSendRemain > 0) {
+                               ev_io_start(event_base, &IO->recv_event);
+                       } else {
+                               assert(IO->ReadDone);
+                               IO->NextState = IO->ReadDone(IO);
+                               switch(IO->NextState) {
+                               case eSendDNSQuery:
+                               case eReadDNSReply:
+                               case eDBQuery:
+                               case eConnect:
+                                       break;
+                               case eSendReply: 
+                               case eSendMore:
+                               case eSendFile:
+                                       ev_io_start(event_base, &IO->send_event);
+                                       break;
+                               case eReadMessage: 
+                               case eReadMore:
+                               case eReadPayload:
+                               case eReadFile:
+                                       break;
+                               case eTerminateConnection:
+                               case eAbort:
+                                       break;
+                               }
+                       }
+                       break;
                case eSendReply:
                    if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess) 
                        break;
@@ -386,6 +441,7 @@ IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                case eReadMore:
                case eReadMessage:
                case eReadPayload:
+               case eReadFile:
                        if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty) {
                                HandleInbound(IO);
                        }
@@ -419,11 +475,13 @@ set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
        switch(IO->NextState) {
        case eReadMore:
        case eReadMessage:
+       case eReadFile:
                ev_io_start(event_base, &IO->recv_event);
                break;
        case eSendReply:
        case eSendMore:
        case eReadPayload:
+       case eSendFile:
                become_session(IO->CitContext);
                IO_send_callback(loop, &IO->send_event, revents);
                break;
@@ -531,10 +589,21 @@ IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 static void
 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
+       const char *errmsg;
+       long rc;
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
-       nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+       switch (IO->NextState) {
+       case eReadFile:
+               rc = FileRecvChunked(&IO->IOB, &errmsg);
+               if (rc < 0)
+                       StrBufPlain(IO->ErrMsg, errmsg, -1);
+               break;
+       default:
+               nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
+               break;
+       }
 #ifdef BIGBAD_IODBG
        {
                int rv = 0;
index b2c368db2400c7d6bacd93b8df58a9271ff36c49..1e8ba4fb500db62b7766b183dc7cd47835c105bd 100644 (file)
@@ -14,13 +14,19 @@ typedef struct AsyncIO AsyncIO;
 typedef enum _eNextState {
        eSendDNSQuery,
        eReadDNSReply,
+
        eDBQuery,
+
        eConnect,
        eSendReply, 
        eSendMore,
+       eSendFile,
+
        eReadMessage, 
        eReadMore,
        eReadPayload,
+       eReadFile,
+
        eTerminateConnection,
        eAbort
 }eNextState;
@@ -82,6 +88,8 @@ struct AsyncIO {
        IOBuffer SendBuf, 
                RecvBuf;
 
+       FDIOBuffer IOB; /* when sending from / reading into files, this is used. */
+
        /* our events... */
        ev_cleanup abort_by_shutdown, /* server wants to go down... */
                db_abort_by_shutdown; /* server wants to go down... */
index 7515c7926d6c02e5b15c10f8cb0c41bf995b05d8..455e47f02e8e2bda9903b0f810143ddcfba55942 100644 (file)
@@ -107,10 +107,7 @@ typedef struct _async_networker {
         AsyncIO IO;
        DNSQueryParts HostLookup;
        eNWCState State;
-       int fd;
        long n;
-        FILE *TmpFile;
-        long bytes_received;
         StrBuf *SpoolFileName;
         StrBuf *tempFileName;
        StrBuf *node;
@@ -118,11 +115,6 @@ typedef struct _async_networker {
        StrBuf *port;
        StrBuf *secret;
        StrBuf          *Url;
-
-       long download_len;
-       int BlobReadSize;
-       long bytes_written;
-       long bytes_to_write;
 } AsyncNetworker;
 
 typedef eNextState(*NWClientHandler)(AsyncNetworker* NW);
@@ -131,7 +123,7 @@ eNextState nwc_get_one_host_ip(AsyncIO *IO);
 eNextState nwc_connect_ip(AsyncIO *IO);
 
 eNextState NWC_SendQUIT(AsyncNetworker *NW);
-
+eNextState NWC_DispatchWriteDone(AsyncIO *IO);
 
 
 void DestroyNetworker(AsyncNetworker *NW)
@@ -215,17 +207,23 @@ eNextState NWC_SendNDOP(AsyncNetworker *NW)
 
 eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
 {
+       int TotalSendSize;
        NWC_DBG_READ();
        if (ChrPtr(NW->IO.IOBuf)[0] == '2')
        {
-               NW->download_len = atol (ChrPtr(NW->IO.IOBuf) + 4);
-               syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->download_len);
-               if (NW->download_len <= 0) {
+
+               NW->IO.IOB.TotalSentAlready = 0;
+               TotalSendSize = atol (ChrPtr(NW->IO.IOBuf) + 4);
+               syslog(LOG_DEBUG, "Expecting to transfer %ld bytes\n", NW->IO.IOB.TotalSendSize);
+               if (TotalSendSize <= 0) {
                        NW->State = eNUOP - 1;
                }
                else {
-                       NW->TmpFile = fopen(ChrPtr(NW->SpoolFileName), "w");
-                       if (NW->TmpFile == NULL)
+                       int fd;
+                       fd = open(ChrPtr(NW->SpoolFileName), 
+                                 O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY, 
+                                 S_IRUSR|S_IWUSR);
+                       if (fd < 0)
                        {
                                syslog(LOG_CRIT,
                                       "cannot open %s: %s\n", 
@@ -233,7 +231,9 @@ eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
                                       strerror(errno));
 
                                NW->State = eQUIT - 1;
+                               return eAbort;
                        }
+                       FDIOBufferInit(&NW->IO.IOB, &NW->IO.RecvBuf, fd, TotalSendSize);
                }
                return eSendReply;
        }
@@ -245,7 +245,7 @@ eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
 
 eNextState NWC_SendREAD(AsyncNetworker *NW)
 {
-       if (NW->bytes_received < NW->download_len)
+       if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
        {
                /*
                 * If shutting down we can exit here and unlink the temp file.
@@ -253,15 +253,15 @@ eNextState NWC_SendREAD(AsyncNetworker *NW)
                 */
                if (server_shutting_down)
                {
-                       fclose(NW->TmpFile);
+                       close(NW->IO.IOB.OtherFD);
 //////                 unlink(ChrPtr(NW->tempFileName));
                        return eAbort;
                }
                StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n",
-                            NW->bytes_received,
-                            ((NW->download_len - NW->bytes_received > IGNET_PACKET_SIZE)
+                            NW->IO.IOB.TotalSentAlready,
+                            ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE)
                              ? IGNET_PACKET_SIZE : 
-                             (NW->download_len - NW->bytes_received))
+                             (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready))
                        );
                return eSendReply;
 
@@ -277,28 +277,37 @@ eNextState NWC_ReadREADState(AsyncNetworker *NW)
        NWC_DBG_READ();
        if (ChrPtr(NW->IO.IOBuf)[0] == '6')
        {
-               NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4);
-               NW->bytes_received += NW->BlobReadSize;
+               NW->IO.IOB.ChunkSendRemain = 
+                       NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+///            NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
 /// TODO               StrBufReadjustIOBuffer(NW->IO.RecvBuf, NW->BlobReadSize);
-               return eReadPayload;
+               return eReadFile;
        }
        return eAbort;
 }
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW);
 eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
 {
        /// FlushIOBuffer(NW->IO.RecvBuf); /// TODO
-       fwrite(NW->IO.RecvBuf.ReadWritePointer,
-              1, 
-              NW->BlobReadSize, 
-              NW->TmpFile);
-       if (NW->bytes_received < NW->download_len)
+
+       ///NW->bytes_received += NW->IO.IOB.ChunkSize;
+
+       if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
        {
                NW->State = eREAD - 1;
                return eSendReply;/* now fetch next chunk*/
        }
-       else 
+       else
+               return NWC_ReadREADBlobDone(NW);
+}
+
+eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
+{
+       if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
        {
-               fclose(NW->TmpFile);
+               NW->State ++;
+
+               close(NW->IO.IOB.OtherFD);
                
                if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
                        syslog(LOG_ALERT, 
@@ -309,11 +318,13 @@ eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
                }
        
 /////          unlink(ChrPtr(NW->tempFileName));
-               return eSendReply; //// TODO: step forward.
+               return NWC_DispatchWriteDone(&NW->IO);
+       }
+       else {
+               NW->State --;
+               return NWC_DispatchWriteDone(&NW->IO);
        }
 }
-
-
 eNextState NWC_SendCLOS(AsyncNetworker *NW)
 {
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n"));
@@ -332,14 +343,16 @@ eNextState NWC_ReadCLOSReply(AsyncNetworker *NW)
 
 eNextState NWC_SendNUOP(AsyncNetworker *NW)
 {
+       long TotalSendSize;
        struct stat statbuf;
+       int fd;
 
        StrBufPrintf(NW->tempFileName,
                     "%s/%s",
                     ctdl_netout_dir,
                     ChrPtr(NW->node));
-       NW->fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
-       if (NW->fd < 0) {
+       fd = open(ChrPtr(NW->tempFileName), O_RDONLY);
+       if (fd < 0) {
                if (errno != ENOENT) {
                        syslog(LOG_CRIT,
                               "cannot open %s: %s\n", 
@@ -350,23 +363,23 @@ eNextState NWC_SendNUOP(AsyncNetworker *NW)
                return NWC_SendQUIT(NW);
        }
 
-       if (fstat(NW->fd, &statbuf) == -1) {
+       if (fstat(fd, &statbuf) == -1) {
                syslog(9, "FSTAT FAILED %s [%s]--\n", 
                       ChrPtr(NW->tempFileName), 
                       strerror(errno));
-               if (NW->fd > 0) close(NW->fd);
+               if (fd > 0) close(fd);
                return eAbort;
        }
-       
-       NW->download_len = statbuf.st_size;
-       if (NW->download_len == 0) {
+       TotalSendSize = statbuf.st_size;
+       if (TotalSendSize == 0) {
                syslog(LOG_DEBUG,
                       "Nothing to send.\n");
                NW->State = eQUIT;
                return NWC_SendQUIT(NW);
        }
+       FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize);
 
-       NW->bytes_written = 0;
+////   NW->bytes_written = 0;
 
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n"));
        return eSendReply;
@@ -382,7 +395,8 @@ eNextState NWC_ReadNUOPReply(AsyncNetworker *NW)
 
 eNextState NWC_SendWRIT(AsyncNetworker *NW)
 {
-       StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", NW->bytes_to_write);
+       StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", 
+                    NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready);
 
        return eSendReply;
 }
@@ -394,17 +408,29 @@ eNextState NWC_ReadWRITReply(AsyncNetworker *NW)
                return eAbort;
        }
 
-       NW->BlobReadSize = atol(ChrPtr(NW->IO.IOBuf)+4);
-       return eSendMore;
+       NW->IO.IOB.ChunkSendRemain = 
+               NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
+///    NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
+       return eSendFile;
 }
 
-eNextState NWC_SendBlob(AsyncNetworker *NW)
+eNextState NWC_SendBlobDone(AsyncNetworker *NW)
 {
+       eNextState rc;
+       if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
+       {
+               NW->State ++;
 
-       ///                     bytes_to_write -= thisblock;
-       ///                     bytes_written += thisblock;
-
-       return eReadMessage;
+               close(NW->IO.IOB.OtherFD);
+//// TODO: unlink networker file?              
+               rc =  NWC_DispatchWriteDone(&NW->IO);
+               NW->State --;
+               return rc;
+       }
+       else {
+               NW->State --;
+               return NWC_DispatchWriteDone(&NW->IO);
+       }
 }
 
 eNextState NWC_SendUCLS(AsyncNetworker *NW)
@@ -417,7 +443,7 @@ eNextState NWC_ReadUCLS(AsyncNetworker *NW)
 {
        NWC_DBG_READ();
 
-       syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->bytes_written, ChrPtr(NW->node));
+       syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node));
 ///    syslog(LOG_DEBUG, "<%s\n", buf);
        if (ChrPtr(NW->IO.IOBuf)[0] == '2') {
                syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName));
@@ -451,7 +477,7 @@ NWClientHandler NWC_ReadHandlers[] = {
        NWC_ReadCLOSReply,
        NWC_ReadNUOPReply,
        NWC_ReadWRITReply,
-       NULL,
+       NWC_SendBlobDone,
        NWC_ReadUCLS,
        NWC_ReadQUIT
 };
@@ -482,11 +508,11 @@ NWClientHandler NWC_SendHandlers[] = {
        NWC_SendAuth,
        NWC_SendNDOP,
        NWC_SendREAD,
-       NULL,
+       NWC_ReadREADBlobDone,
        NWC_SendCLOS,
        NWC_SendNUOP,
        NWC_SendWRIT,
-       NWC_SendBlob,
+       NWC_SendBlobDone,
        NWC_SendUCLS,
        NWC_SendQUIT
 };
@@ -597,8 +623,10 @@ eReadState NWC_ReadServerStatus(AsyncIO *IO)
        case eReadMessage: 
                Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
                break;
+       case eReadFile:
+       case eSendFile:
        case eReadPayload:
-               Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
+////TODO               Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
                break;
        }
        return Finished;