Finalize Networker; native modern linux.
authorWilfried Goesgens <dothebart@citadel.org>
Sun, 16 Oct 2011 21:30:50 +0000 (23:30 +0200)
committerWilfried Goesgens <dothebart@citadel.org>
Sun, 16 Oct 2011 21:30:50 +0000 (23:30 +0200)
  - use sendfile to upload spool files from disk to the remote party
  - use splice() to download spoolfiles from the remote site.

citadel/event_client.c
citadel/modules/network/serv_networkclient.c
citadel/modules/pop3client/serv_pop3client.c
libcitadel/lib/libcitadel.h
libcitadel/lib/stringbuf.c

index 9a7566f..91dcabf 100644 (file)
@@ -270,6 +270,7 @@ eReadState HandleInbound(AsyncIO *IO)
        while ((Finished == eBufferNotEmpty) && 
               ((IO->NextState == eReadMessage)||
                (IO->NextState == eReadMore)||
+               (IO->NextState == eReadFile)||
                (IO->NextState == eReadPayload)))
        {
                if (IO->RecvBuf.nBlobBytesWanted != 0) { 
@@ -277,7 +278,16 @@ eReadState HandleInbound(AsyncIO *IO)
                }
                else { /* Reading lines... */
 //// lex line reply in callback, or do it ourselves. as nnn-blabla means continue reading in SMTP
-                       if (IO->LineReader)
+                       if ((IO->NextState == eReadFile) && 
+                           (Finished == eBufferNotEmpty))
+                       {
+                               Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
+                               if (Finished == eReadSuccess)
+                               {
+                                       IO->NextState = eSendReply;                             
+                               }
+                       }
+                       else if (IO->LineReader)
                                Finished = IO->LineReader(IO);
                        else 
                                Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
@@ -300,15 +310,6 @@ eReadState HandleInbound(AsyncIO *IO)
                        ev_io_stop(event_base, &IO->recv_event);
                        IO->NextState = IO->ReadDone(IO);
                        Finished = StrBufCheckBuffer(&IO->RecvBuf);
-                       if ((IO->NextState == eReadFile) && 
-                           (Finished == eBufferNotEmpty))
-                       {
-                               Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
-                               if (Finished == eReadSuccess)
-                               {
-                                       IO->NextState = eSendReply;                             
-                               }
-                       }
                }
        }
 
@@ -590,20 +591,32 @@ static void
 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
 {
        const char *errmsg;
-       long rc;
        ssize_t nbytes;
        AsyncIO *IO = watcher->data;
 
        switch (IO->NextState) {
        case eReadFile:
-               rc = FileRecvChunked(&IO->IOB, &errmsg);
-               if (rc < 0)
+               nbytes = FileRecvChunked(&IO->IOB, &errmsg);
+               syslog(LOG_DEBUG, "****************nbytes: %ld ChunkRemain: %ldx.\n", 
+                      nbytes, IO->IOB.ChunkSendRemain);
+               if (nbytes < 0)
                        StrBufPlain(IO->ErrMsg, errmsg, -1);
+               else 
+               {
+                       if (IO->IOB.ChunkSendRemain == 0)
+                       {
+                               IO->NextState = eSendReply;
+                               syslog(LOG_DEBUG, "***********************************xxxx.\n");
+                       }
+                       else
+                               return;
+               }
                break;
        default:
                nbytes = StrBuf_read_one_chunk_callback(watcher->fd, 0 /*TODO */, &IO->RecvBuf);
                break;
        }
+
 #ifdef BIGBAD_IODBG
        {
                int rv = 0;
@@ -650,6 +663,7 @@ IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
                syslog(LOG_DEBUG, 
                       "EVENT: Socket Invalid! %s \n",
                       strerror(errno));
+               ShutDownCLient(IO);
                return;
        }
 }
@@ -661,6 +675,7 @@ IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
        syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
        become_session(IO->CitContext);
        assert(IO->DNSFail);
+       assert(IO->DNSQuery->PostDNS);
        switch (IO->DNSQuery->PostDNS(IO))
        {
        case eAbort:
index 455e47f..66d65cc 100644 (file)
@@ -125,15 +125,34 @@ eNextState nwc_connect_ip(AsyncIO *IO);
 eNextState NWC_SendQUIT(AsyncNetworker *NW);
 eNextState NWC_DispatchWriteDone(AsyncIO *IO);
 
-
-void DestroyNetworker(AsyncNetworker *NW)
+void DeleteNetworker(void *vptr)
 {
+       AsyncNetworker *NW = (AsyncNetworker *)vptr;
+        FreeStrBuf(&NW->SpoolFileName);
+        FreeStrBuf(&NW->tempFileName);
+       FreeStrBuf(&NW->node);
+       FreeStrBuf(&NW->host);
+       FreeStrBuf(&NW->port);
+       FreeStrBuf(&NW->secret);
+       FreeStrBuf(&NW->Url);
+       FreeAsyncIOContents(&NW->IO);
+       free(NW);
 }
 
 #define NWC_DBG_SEND() syslog(LOG_DEBUG, "NW client[%ld]: > %s", NW->n, ChrPtr(NW->IO.SendBuf.Buf))
 #define NWC_DBG_READ() syslog(LOG_DEBUG, "NW client[%ld]: < %s\n", NW->n, ChrPtr(NW->IO.IOBuf))
 #define NWC_OK (strncasecmp(ChrPtr(NW->IO.IOBuf), "+OK", 3) == 0)
 
+eNextState FinalizeNetworker(AsyncIO *IO)
+{
+       AsyncNetworker *NW = (AsyncNetworker *)IO->Data;
+
+       network_talking_to(ChrPtr(NW->node), NTT_REMOVE);
+
+       DeleteNetworker(IO->Data);
+       return eAbort;
+}
+
 eNextState NWC_ReadGreeting(AsyncNetworker *NW)
 {
        char connected_to[SIZ];
@@ -245,6 +264,8 @@ eNextState NWC_ReadNDOPReply(AsyncNetworker *NW)
 
 eNextState NWC_SendREAD(AsyncNetworker *NW)
 {
+       eNextState rc;
+
        if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
        {
                /*
@@ -254,22 +275,29 @@ eNextState NWC_SendREAD(AsyncNetworker *NW)
                if (server_shutting_down)
                {
                        close(NW->IO.IOB.OtherFD);
-//////                 unlink(ChrPtr(NW->tempFileName));
+                       unlink(ChrPtr(NW->tempFileName));
                        return eAbort;
                }
                StrBufPrintf(NW->IO.SendBuf.Buf, "READ %ld|%ld\n",
                             NW->IO.IOB.TotalSentAlready,
+                            NW->IO.IOB.TotalSendSize);
+/*
                             ((NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready > IGNET_PACKET_SIZE)
                              ? IGNET_PACKET_SIZE : 
                              (NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready))
                        );
+*/
+               NWC_DBG_SEND();
                return eSendReply;
+       }
+       else 
+       {
+               NW->State = eCLOS;
+               rc = NWC_DispatchWriteDone(&NW->IO);
+               NWC_DBG_SEND();
 
-
-
+               return rc;
        }
-       else {} // continue sending
-       return eSendReply;
 }
 
 eNextState NWC_ReadREADState(AsyncNetworker *NW)
@@ -279,8 +307,6 @@ eNextState NWC_ReadREADState(AsyncNetworker *NW)
        {
                NW->IO.IOB.ChunkSendRemain = 
                        NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
-///            NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
-/// TODO               StrBufReadjustIOBuffer(NW->IO.RecvBuf, NW->BlobReadSize);
                return eReadFile;
        }
        return eAbort;
@@ -288,21 +314,34 @@ eNextState NWC_ReadREADState(AsyncNetworker *NW)
 eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW);
 eNextState NWC_ReadREADBlob(AsyncNetworker *NW)
 {
-       /// FlushIOBuffer(NW->IO.RecvBuf); /// TODO
-
-       ///NW->bytes_received += NW->IO.IOB.ChunkSize;
-
-       if (NW->IO.IOB.TotalSentAlready < NW->IO.IOB.TotalSendSize)
+       NWC_DBG_READ();
+       if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
        {
-               NW->State = eREAD - 1;
-               return eSendReply;/* now fetch next chunk*/
+               NW->State ++;
+
+               close(NW->IO.IOB.OtherFD);
+               
+               if (link(ChrPtr(NW->SpoolFileName), ChrPtr(NW->tempFileName)) != 0) {
+                       syslog(LOG_ALERT, 
+                              "Could not link %s to %s: %s\n",
+                              ChrPtr(NW->tempFileName), 
+                              ChrPtr(NW->SpoolFileName), 
+                              strerror(errno));
+               }
+       
+               unlink(ChrPtr(NW->tempFileName));
+               return NWC_DispatchWriteDone(&NW->IO);
+       }
+       else {
+               NW->State --;
+               NW->IO.IOB.ChunkSendRemain = NW->IO.IOB.ChunkSize;
+               return NWC_DispatchWriteDone(&NW->IO);
        }
-       else
-               return NWC_ReadREADBlobDone(NW);
 }
 
 eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
 {
+       NWC_DBG_READ();
        if (NW->IO.IOB.TotalSendSize == NW->IO.IOB.TotalSentAlready)
        {
                NW->State ++;
@@ -317,24 +356,25 @@ eNextState NWC_ReadREADBlobDone(AsyncNetworker *NW)
                               strerror(errno));
                }
        
-/////          unlink(ChrPtr(NW->tempFileName));
+               unlink(ChrPtr(NW->tempFileName));
                return NWC_DispatchWriteDone(&NW->IO);
        }
        else {
                NW->State --;
+               NW->IO.IOB.ChunkSendRemain = NW->IO.IOB.ChunkSize;
                return NWC_DispatchWriteDone(&NW->IO);
        }
 }
 eNextState NWC_SendCLOS(AsyncNetworker *NW)
 {
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("CLOS\n"));
-////   unlink(ChrPtr(NW->tempFileName));
-       return eReadMessage;
+       NWC_DBG_SEND();
+       return eSendReply;
 }
 
 eNextState NWC_ReadCLOSReply(AsyncNetworker *NW)
 {
-/// todo
+       NWC_DBG_READ();
        if (ChrPtr(NW->IO.IOBuf)[0] != '2')
                return eTerminateConnection;
        return eSendReply;
@@ -343,6 +383,7 @@ eNextState NWC_ReadCLOSReply(AsyncNetworker *NW)
 
 eNextState NWC_SendNUOP(AsyncNetworker *NW)
 {
+       eNextState rc;
        long TotalSendSize;
        struct stat statbuf;
        int fd;
@@ -360,7 +401,8 @@ eNextState NWC_SendNUOP(AsyncNetworker *NW)
                               strerror(errno));
                }
                NW->State = eQUIT;
-               return NWC_SendQUIT(NW);
+               rc = NWC_SendQUIT(NW);
+               NWC_DBG_SEND();
        }
 
        if (fstat(fd, &statbuf) == -1) {
@@ -375,13 +417,14 @@ eNextState NWC_SendNUOP(AsyncNetworker *NW)
                syslog(LOG_DEBUG,
                       "Nothing to send.\n");
                NW->State = eQUIT;
-               return NWC_SendQUIT(NW);
+               rc = NWC_SendQUIT(NW);
+               NWC_DBG_SEND();
+               return rc;
        }
        FDIOBufferInit(&NW->IO.IOB, &NW->IO.SendBuf, fd, TotalSendSize);
 
-////   NW->bytes_written = 0;
-
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("NUOP\n"));
+       NWC_DBG_SEND();
        return eSendReply;
 
 }
@@ -397,7 +440,7 @@ eNextState NWC_SendWRIT(AsyncNetworker *NW)
 {
        StrBufPrintf(NW->IO.SendBuf.Buf, "WRIT %ld\n", 
                     NW->IO.IOB.TotalSendSize - NW->IO.IOB.TotalSentAlready);
-
+       NWC_DBG_SEND();
        return eSendReply;
 }
 eNextState NWC_ReadWRITReply(AsyncNetworker *NW)
@@ -410,7 +453,6 @@ eNextState NWC_ReadWRITReply(AsyncNetworker *NW)
 
        NW->IO.IOB.ChunkSendRemain = 
                NW->IO.IOB.ChunkSize = atol(ChrPtr(NW->IO.IOBuf)+4);
-///    NW->IO.IOB.TotalSentAlready += NW->IO.IOB.ChunkSize;
        return eSendFile;
 }
 
@@ -422,13 +464,13 @@ eNextState NWC_SendBlobDone(AsyncNetworker *NW)
                NW->State ++;
 
                close(NW->IO.IOB.OtherFD);
-//// TODO: unlink networker file?              
                rc =  NWC_DispatchWriteDone(&NW->IO);
                NW->State --;
                return rc;
        }
        else {
                NW->State --;
+               NW->IO.IOB.ChunkSendRemain = NW->IO.IOB.ChunkSize;
                return NWC_DispatchWriteDone(&NW->IO);
        }
 }
@@ -436,6 +478,7 @@ eNextState NWC_SendBlobDone(AsyncNetworker *NW)
 eNextState NWC_SendUCLS(AsyncNetworker *NW)
 {
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("UCLS 1\n"));
+       NWC_DBG_SEND();
        return eSendReply;
 
 }
@@ -444,10 +487,9 @@ eNextState NWC_ReadUCLS(AsyncNetworker *NW)
        NWC_DBG_READ();
 
        syslog(LOG_NOTICE, "Sent %ld octets to <%s>\n", NW->IO.IOB.ChunkSize, ChrPtr(NW->node));
-///    syslog(LOG_DEBUG, "<%s\n", buf);
        if (ChrPtr(NW->IO.IOBuf)[0] == '2') {
                syslog(LOG_DEBUG, "Removing <%s>\n", ChrPtr(NW->tempFileName));
-///            unlink(ChrPtr(NW->tempFileName));
+               unlink(ChrPtr(NW->tempFileName));
        }
        return eSendReply;
 }
@@ -456,7 +498,7 @@ eNextState NWC_SendQUIT(AsyncNetworker *NW)
 {
        StrBufPlain(NW->IO.SendBuf.Buf, HKEY("QUIT\n"));
 
-       network_talking_to(ChrPtr(NW->node), NTT_REMOVE);
+       NWC_DBG_SEND();
        return eSendReply;
 }
 
@@ -464,7 +506,7 @@ eNextState NWC_ReadQUIT(AsyncNetworker *NW)
 {
        NWC_DBG_READ();
 
-       return eTerminateConnection;
+       return eAbort;
 }
 
 
@@ -479,8 +521,9 @@ NWClientHandler NWC_ReadHandlers[] = {
        NWC_ReadWRITReply,
        NWC_SendBlobDone,
        NWC_ReadUCLS,
-       NWC_ReadQUIT
-};
+       NWC_ReadQUIT};
+
+long NWC_ConnTimeout = 100;
 
 const long NWC_SendTimeouts[] = {
        100,
@@ -575,10 +618,7 @@ eNextState nwc_get_one_host_ip(AsyncIO *IO)
 {
        AsyncNetworker *NW = IO->Data;
        /* 
-        * here we start with the lookup of one host. it might be...
-        * - the relay host *sigh*
-        * - the direct hostname if there was no mx record
-        * - one of the mx'es
+        * here we start with the lookup of one host.
         */ 
 
        InitC_ares_dns(IO);
@@ -605,7 +645,7 @@ eNextState nwc_get_one_host_ip(AsyncIO *IO)
  */
 eReadState NWC_ReadServerStatus(AsyncIO *IO)
 {
-       AsyncNetworker *NW = IO->Data;
+//     AsyncNetworker *NW = IO->Data;
        eReadState Finished = eBufferNotEmpty; 
 
        switch (IO->NextState) {
@@ -626,7 +666,6 @@ eReadState NWC_ReadServerStatus(AsyncIO *IO)
        case eReadFile:
        case eSendFile:
        case eReadPayload:
-////TODO               Finished = IOBufferStrLength(&IO->RecvBuf) >= NW->BlobReadSize;
                break;
        }
        return Finished;
@@ -636,7 +675,39 @@ eReadState NWC_ReadServerStatus(AsyncIO *IO)
 
 eNextState NWC_FailNetworkConnection(AsyncIO *IO)
 {
-       return eTerminateConnection;
+       return eAbort;
+}
+
+void NWC_SetTimeout(eNextState NextTCPState, AsyncNetworker *NW)
+{
+       double Timeout = 0.0;
+
+       syslog(LOG_DEBUG, "NWC3: %s\n", __FUNCTION__);
+
+       switch (NextTCPState) {
+       case eSendReply:
+       case eSendMore:
+               break;
+       case eReadFile:
+       case eReadMessage:
+               Timeout = NWC_ReadTimeouts[NW->State];
+               break;
+       case eReadPayload:
+               Timeout = 100000;
+               /* TODO!!! */
+               break;
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eConnect:
+       case eSendFile:
+//TODO
+       case eTerminateConnection:
+       case eDBQuery:
+       case eAbort:
+       case eReadMore://// TODO
+               return;
+       }
+       SetNextTimeout(&NW->IO, Timeout);
 }
 
 
@@ -649,7 +720,7 @@ eNextState NWC_DispatchReadDone(AsyncIO *IO)
        rc = NWC_ReadHandlers[NW->State](NW);
        if (rc != eReadMore)
                NW->State++;
-       ////NWCSetTimeout(rc, NW);
+       NWC_SetTimeout(rc, NW);
        return rc;
 }
 eNextState NWC_DispatchWriteDone(AsyncIO *IO)
@@ -659,7 +730,7 @@ eNextState NWC_DispatchWriteDone(AsyncIO *IO)
        eNextState rc;
 
        rc = NWC_SendHandlers[NW->State](NW);
-       ////NWCSetTimeout(rc, NW);
+       NWC_SetTimeout(rc, NW);
        return rc;
 }
 
@@ -669,16 +740,14 @@ eNextState NWC_DispatchWriteDone(AsyncIO *IO)
 eNextState NWC_Terminate(AsyncIO *IO)
 {
        syslog(LOG_DEBUG, "Nw: %s\n", __FUNCTION__);
-///    FinalizeNetworker(IO); TODO
+       FinalizeNetworker(IO);
        return eAbort;
 }
 
 eNextState NWC_Timeout(AsyncIO *IO)
 {
-//     AsyncNetworker *NW = IO->Data;
-
        syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
-//     StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); todo
+
        return NWC_FailNetworkConnection(IO);
 }
 eNextState NWC_ConnFail(AsyncIO *IO)
@@ -702,9 +771,7 @@ eNextState NWC_Shutdown(AsyncIO *IO)
        syslog(LOG_DEBUG, "NW: %s\n", __FUNCTION__);
 ////   pop3aggr *pMsg = IO->Data;
 
-       ////pMsg->MyQEntry->Status = 3;
-       ///StrBufPlain(pMsg->MyQEntry->StatusMessage, HKEY("server shutdown during message retrieval."));
-///    FinalizePOP3AggrRun(IO); todo
+       FinalizeNetworker(IO);
        return eAbort;
 }
 
@@ -720,15 +787,10 @@ eNextState nwc_connect_ip(AsyncIO *IO)
               ChrPtr(NW->host),
               ChrPtr(NW->port));
        
-////   IO->ConnectMe = &NW->Pop3Host;
-       /*  Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
-
-       /////// SetConnectStatus(IO);
-
-       return InitEventIO(IO, NW, 100, 100, 1); /*
-                                                NWC_ConnTimeout, 
-                                                NWC_ReadTimeouts[0],
-                                                1);*/
+       return InitEventIO(IO, NW, 
+                          NWC_ConnTimeout, 
+                          NWC_ReadTimeouts[0],
+                          1);
 }
 
 void RunNetworker(AsyncNetworker *NW)
@@ -842,9 +904,11 @@ void network_poll_other_citadel_nodes(int full_poll, char *working_ignetcfg)
                                        continue;
                                }
                        }
-                       DestroyNetworker(NW);
+                       DeleteNetworker(NW);
                }
        }
+       FreeStrBuf(&CfgData);
+       FreeStrBuf(&Line);
 
 }
 
index 18e4bdf..331a4d7 100644 (file)
@@ -136,6 +136,7 @@ void DeletePOP3Aggregator(void *vptr)
        FreeStrBuf(&ptr->IO.SendBuf.Buf);
        FreeStrBuf(&ptr->IO.RecvBuf.Buf);
        DeleteAsyncMsg(&ptr->IO.ReadMsg);
+       FreeAsyncIOContents(&ptr->IO);
        free(ptr);
 }
 
index 0ee29b2..7021ed4 100644 (file)
@@ -240,6 +240,7 @@ typedef struct _file_buffer {
 typedef struct __fd_iobuffer {
        IOBuffer *IOB;
        int OtherFD;
+       int SplicePipe[2];
        long TotalSendSize;
        long TotalSentAlready;
        long ChunkSize;
index 08e904b..7dc07af 100644 (file)
@@ -16,6 +16,7 @@
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  */
 
+#define _GNU_SOURCE
 #include "sysdep.h"
 #include <ctype.h>
 #include <errno.h>
@@ -29,6 +30,7 @@
 #define SHOW_ME_VAPPEND_PRINTF
 #include <stdarg.h>
 #ifndef LINUX_SENDFILE
+#include <bits/fcntl.h>
 #include <sys/sendfile.h>
 #endif
 #include "libcitadel.h"
@@ -3799,6 +3801,8 @@ void FDIOBufferInit(FDIOBuffer *FDB, IOBuffer *IO, int FD, long TotalSendSize)
        FDB->IOB = IO;
 #ifndef LINUX_SENDFILE
        FDB->ChunkBuffer = NewStrBuf();
+#else
+       pipe(FDB->SplicePipe);
 #endif
        FDB->OtherFD = FD;
 }
@@ -3825,15 +3829,29 @@ int FileRecvChunked(FDIOBuffer *FDB, const char **Err)
 {
 
 #ifdef LINUX_SENDFILE
-       ssize_t sent;
-       sent = sendfile(FDB->OtherFD, FDB->IOB->fd, &FDB->TotalSentAlready, FDB->ChunkSendRemain);
+       ssize_t sent, pipesize;
+       long foo = 0;
+
+       pipesize = splice(FDB->IOB->fd, NULL, 
+                         FDB->SplicePipe[1], NULL, 
+                         FDB->ChunkSendRemain, 
+                         SPLICE_F_MORE | SPLICE_F_MOVE|SPLICE_F_NONBLOCK);
+       if (pipesize == -1)
+       {
+               *Err = strerror(errno);
+               return pipesize;
+       }
+       
+       sent = splice(FDB->SplicePipe[0], NULL, 
+                     FDB->OtherFD, &FDB->TotalSentAlready, 
+                     pipesize, SPLICE_F_MORE | SPLICE_F_MOVE);
        if (sent == -1)
        {
                *Err = strerror(errno);
                return sent;
        }
        FDB->ChunkSendRemain -= sent;
-       return FDB->ChunkSendRemain;
+       return sent;
 #else
 #endif
        return 0;