Work on event'ing the pop3 aggregator
authorWilfried Goesgens <dothebart@citadel.org>
Wed, 17 Aug 2011 17:43:15 +0000 (17:43 +0000)
committerWilfried Goesgens <dothebart@citadel.org>
Wed, 17 Aug 2011 17:43:15 +0000 (17:43 +0000)
  - POP3_C_ReAttachToFetchMessages(): we manage the transition DB-IO -> Network I/O
  - POP3C_GetListCommandState(): after receiving the list-OK we have to continue reading the list items
  - POP3C_GetListOneLine(): the server sends us the size of the message; save it for later use.
  - POP3C_GetOneMessagID(): after reading all list-items, start DB lookup whether we knew them already.
  - POP3C_GetOneMessageIDState(): if more items are in the list, call us again with the next message ID.
  - POP3_FetchNetworkUsetableEntry(): fetch one UT-entry per call; forward to POP3_C_ReAttachToFetchMessages() once we saw all items in the list.
  - POP3C_SendGetOneMsg(): cycle over the list till we find the next Mail we didn't know already & fetch it; if no more items in list -> goodbuye.
  - POP3C_ReadMessageBody() after the I/O layer read the message from the socket, parse it; move our context over to the DB-Queue.
  - POP3C_ReadMessageBodyFollowing(): On success initialize the async message reader; let the I/O layer read the message, then continue parsing & saving it.
  - POP3C_StoreMsgRead(): parse & save the message to the DB. Continue with remembering it in the UT table.
  - POP3C_SendDelete(): check whether we want to delete the mails on the server, if do; else continue with POP3C_SendGetOneMsg() to fetch next in list.
  - POP3_C_ReadServerStatus(): switch between line based read operation and reading whole messages.

we now can successfully aggregate one pop3 account.

citadel/modules/pop3client/serv_pop3client.c

index 827976a3c93f0b11a250d709f601d93cd0b76347..f654668f439a9f316d90c81974b9e363fe88449b 100644 (file)
@@ -68,9 +68,11 @@ typedef enum ePOP3_C_States {
        GetUserState,
        GetPassState,
        GetListCommandState,
+       GetListOneLine,
        GetOneMessageIDState,
        ReadMessageBodyFollowing,
        ReadMessageBody,
+       GetDeleteState,
        ReadQuitState,
        POP3C_MaxRead
 }ePOP3_C_States;
@@ -78,9 +80,11 @@ typedef enum ePOP3_C_States {
 
 typedef struct _FetchItem {
        long MSGID;
+       long MSGSize;
        StrBuf *MsgUIDL;
        StrBuf *MsgUID;
        int NeedFetch;
+       struct CtdlMessage *Msg;
 } FetchItem;
 
 void HfreeFetchItem(void *vItem)
@@ -137,6 +141,7 @@ eNextState POP3_C_DispatchReadDone(AsyncIO *IO);
 eNextState POP3_C_DispatchWriteDone(AsyncIO *IO);
 eNextState POP3_C_Terminate(AsyncIO *IO);
 eReadState POP3_C_ReadServerStatus(AsyncIO *IO);
+eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO);
 
 eNextState FinalizePOP3AggrRun(AsyncIO *IO)
 {
@@ -212,12 +217,14 @@ eNextState POP3C_GetListCommandState(pop3aggr *RecvMsg)
        POP3C_DBG_READ();
        if (!POP3C_OK) return eTerminateConnection;
        RecvMsg->MsgNumbers = NewHash(1, NULL);
+       RecvMsg->State++;       
        return eReadMore;
 }
 
 
 eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg)
 {
+       const char *pch;
        FetchItem *OneMsg = NULL;
        POP3C_DBG_READ();
 
@@ -237,72 +244,89 @@ eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg)
        }
        OneMsg = (FetchItem*) malloc(sizeof(FetchItem));
        memset(OneMsg, 0, sizeof(FetchItem));
-       OneMsg->MSGID = atoi(ChrPtr(RecvMsg->IO.IOBuf));
+       OneMsg->MSGID = atol(ChrPtr(RecvMsg->IO.IOBuf));
+
+       pch = strchr(ChrPtr(RecvMsg->IO.IOBuf), ' ');
+       if (pch != NULL)
+       {
+               OneMsg->MSGSize = atol(pch + 1);
+       }
        Put(RecvMsg->MsgNumbers, LKEY(OneMsg->MSGID), OneMsg, HfreeFetchItem);
 
        //RecvMsg->State --; /* read next Line */
        return eReadMore;
 }
 
-eNextState POP3C_GetOneMessagID(pop3aggr *RecvMsg)
+eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO)
 {
        long HKLen;
        const char *HKey;
        void *vData;
-
-       if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData))
-       {
-               RecvMsg->CurrMsg = (FetchItem*) vData;
-               /* Find out the UIDL of the message, to determine whether we've already downloaded it */
-               StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
-                            "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID);
-               POP3C_DBG_SEND();
-       }
-       else
-       {
-               DeleteHashPos(&RecvMsg->Pos);
-               /// done receiving uidls.. start looking them up now.
-               RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0);
-               
-       }
-       return eReadMore; /* TODO */
-}
-
-#if 0
-eNextState FetchNetworkUsetableEntry(AsyncIO *IO)
-{
        struct cdbdata *cdbut;
-       networker_save_message *Ctx = (networker_save_message *) IO->Data;
+       pop3aggr *RecvMsg = (pop3aggr *) IO->Data;
 
        if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData))
        {
+               struct UseTable ut;
 
+               RecvMsg->CurrMsg = (FetchItem*) vData;
                /* Find out if we've already seen this item */
-               strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
-               Ctx->ut.ut_timestamp = time(NULL);
+               safestrncpy(ut.ut_msgid, 
+                           ChrPtr(RecvMsg->CurrMsg->MsgUIDL),
+                           sizeof(ut.ut_msgid));
+               ut.ut_timestamp = time(NULL);/// TODO: libev timestamp!
                
-               cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
+               cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUIDL));
                if (cdbut != NULL) {
                        /* Item has already been seen */
-                       CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
+                       CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(RecvMsg->CurrMsg->MsgUIDL));
                        cdb_free(cdbut);
                
                        /* rewrite the record anyway, to update the timestamp */
                        cdb_store(CDB_USETABLE, 
-                                 SKEY(Ctx->MsgGUID), 
-                                 &Ctx->ut, sizeof(struct UseTable) );
-                       return eAbort;
+                                 SKEY(RecvMsg->CurrMsg->MsgUIDL), 
+                                 &ut, sizeof(struct UseTable) );
+                       RecvMsg->CurrMsg->NeedFetch = 0;
                }
                else
                {
-                       NextDBOperation(IO, RSSSaveMessage);
-                       return eSendMore;
+                       RecvMsg->CurrMsg->NeedFetch = 1;
                }
+               return NextDBOperation(&RecvMsg->IO, POP3_FetchNetworkUsetableEntry);
+       }
+       else
+       {
+               /* ok, now we know them all, continue with reading the actual messages. */
+               DeleteHashPos(&RecvMsg->Pos);
+
+               return QueueEventContext(IO, POP3_C_ReAttachToFetchMessages);
        }
-       return eReadMessage;
 }
-#endif
 
+eNextState POP3C_GetOneMessagID(pop3aggr *RecvMsg)
+{
+       long HKLen;
+       const char *HKey;
+       void *vData;
+
+       if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData))
+       {
+               RecvMsg->CurrMsg = (FetchItem*) vData;
+               /* Find out the UIDL of the message, to determine whether we've already downloaded it */
+               StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                            "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID);
+               POP3C_DBG_SEND();
+       }
+       else
+       {
+           RecvMsg->State++;
+               DeleteHashPos(&RecvMsg->Pos);
+               /// done receiving uidls.. start looking them up now.
+               RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0);
+               return QueueDBOperation(&RecvMsg->IO, POP3_FetchNetworkUsetableEntry);
+       }
+       return eReadMore; /* TODO */
+}
 
 eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg)
 {
@@ -317,7 +341,8 @@ eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg)
                     ChrPtr(RecvMsg->RoomName), 
                     ChrPtr(RecvMsg->CurrMsg->MsgUIDL),
                     RecvMsg->Pop3Host.Host);
-       return eReadMessage;
+       RecvMsg->State --;
+       return eSendReply;
 }
 
 eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg)
@@ -343,11 +368,27 @@ eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg)
 
 eNextState POP3C_SendGetOneMsg(pop3aggr *RecvMsg)
 {
-       /* Message has not been seen. Tell the server to fetch the message... */
-       StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
-                    "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID);
-       POP3C_DBG_SEND();
-       return eReadMessage;
+       long HKLen;
+       const char *HKey;
+       void *vData;
+
+       RecvMsg->CurrMsg = NULL;
+       while (GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData) && 
+              (RecvMsg->CurrMsg = (FetchItem*) vData, RecvMsg->CurrMsg->NeedFetch == 0))
+       {}
+
+       if ((RecvMsg->CurrMsg != NULL ) && (RecvMsg->CurrMsg->NeedFetch == 1))
+       {
+               /* Message has not been seen. Tell the server to fetch the message... */
+               StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                            "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID);
+               POP3C_DBG_SEND();
+               return eReadMessage;
+       }
+       else {
+               RecvMsg->State = ReadQuitState;
+               return POP3_C_DispatchWriteDone(&RecvMsg->IO);
+       }
 }
 
 
@@ -355,56 +396,78 @@ eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg)
 {
        POP3C_DBG_READ();
        if (!POP3C_OK) return eTerminateConnection;
-       else return eSendReply;
+       RecvMsg->IO.ReadMsg = NewAsyncMsg(HKEY("."), 
+                                         RecvMsg->CurrMsg->MSGSize,
+                                         config.c_maxmsglen, 
+                                         NULL, -1,
+                                         1);
+
+       return eReadPayload;
 }      
 
 
+eNextState POP3C_StoreMsgRead(AsyncIO *IO)
+{
+       pop3aggr *RecvMsg = (pop3aggr *) IO->Data;
+       struct UseTable ut;
 
-eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg)
+       safestrncpy(ut.ut_msgid, 
+                   ChrPtr(RecvMsg->CurrMsg->MsgUID),
+                   sizeof(ut.ut_msgid));
+       ut.ut_timestamp = time(NULL); /* TODO: use libev time */
+       cdb_store(CDB_USETABLE, 
+                 ChrPtr(RecvMsg->CurrMsg->MsgUID), 
+                 StrLength(RecvMsg->CurrMsg->MsgUID),
+                 &ut, 
+                 sizeof(struct UseTable) );
+
+       return QueueEventContext(&RecvMsg->IO, POP3_C_ReAttachToFetchMessages);
+}
+eNextState POP3C_SaveMsg(AsyncIO *IO)
 {
-#if 0
-//TODO
-       /* If we get to this point, the message is on its way.  Read it. */
-       body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock);
-       if (body == NULL) goto bail;
-       
-       CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n");
-       msg = convert_internet_message(body);
-       body = NULL;    /* yes, this should be dereferenced, NOT freed */
-       
-                       /* Do Something With It (tm) */
-       msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0);
+       long msgnum;
+       pop3aggr *RecvMsg = (pop3aggr *) IO->Data;
+
+       /* Do Something With It (tm) */
+       msgnum = CtdlSubmitMsg(RecvMsg->CurrMsg->Msg, 
+                              NULL, 
+                              ChrPtr(RecvMsg->RoomName), 
+                              0);
        if (msgnum > 0L) {
                /* Message has been committed to the store */
                /* write the uidl to the use table so we don't fetch this message again */
        }
-       CtdlFreeMessage(msg);
-#endif
+       CtdlFreeMessage(RecvMsg->CurrMsg->Msg);
+
+       return NextDBOperation(&RecvMsg->IO, POP3C_StoreMsgRead);
        return eReadMessage;
 }
 
-eNextState POP3C_StoreMsgRead(pop3aggr *RecvMsg)
+eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg)
 {
-#if 0
-       strcpy(ut.ut_msgid, utmsgid);
-       ut.ut_timestamp = time(NULL);
-       cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid),
-                 &ut, sizeof(struct UseTable) );
-#endif
-       return eReadMessage;/// TODO
+       CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n");
+       RecvMsg->CurrMsg->Msg = convert_internet_message_buf(&RecvMsg->IO.ReadMsg->MsgBuf);
+
+       return QueueDBOperation(&RecvMsg->IO, POP3C_SaveMsg);
 }
+
 eNextState POP3C_SendDelete(pop3aggr *RecvMsg)
 {
        if (!RecvMsg->keep) {
                StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
                             "DELE %ld\r\n", RecvMsg->CurrMsg->MSGID);
                POP3C_DBG_SEND();
+               return eReadMessage;
+       }
+       else {
+               RecvMsg->State = ReadMessageBodyFollowing;
+               return POP3_C_DispatchWriteDone(&RecvMsg->IO);
        }
-       return eReadMessage;
 }
 eNextState POP3C_ReadDeleteState(pop3aggr *RecvMsg)
 {
        POP3C_DBG_READ();
+       RecvMsg->State = GetOneMessageIDState;
        return eReadMessage;
 }
 
@@ -432,9 +495,11 @@ Pop3ClientHandler POP3C_ReadHandlers[] = {
        POP3C_GetUserState,
        POP3C_GetPassState,
        POP3C_GetListCommandState,
+       POP3C_GetListOneLine,
        POP3C_GetOneMessageIDState,
        POP3C_ReadMessageBodyFollowing,
        POP3C_ReadMessageBody,
+       POP3C_ReadDeleteState,
        POP3C_ReadQuitState,
 };
 
@@ -464,9 +529,10 @@ Pop3ClientHandler POP3C_SendHandlers[] = {
        POP3C_SendUser,
        POP3C_SendPassword,
        POP3C_SendListCommand,
-       POP3C_GetListOneLine,
+       NULL,
        POP3C_GetOneMessagID,
        POP3C_SendGetOneMsg,
+       NULL,
        POP3C_SendDelete,
        POP3C_SendQuit
 };
@@ -516,10 +582,15 @@ void POP3SetTimeout(eNextState NextTCPState, pop3aggr *pMsg)
   }
 */
                break;
+       case eReadPayload:
+               Timeout = 100000;
+               /* TODO!!! */
+               break;
        case eSendDNSQuery:
        case eReadDNSReply:
        case eConnect:
        case eTerminateConnection:
+       case eDBQuery:
        case eAbort:
        case eReadMore://// TODO
                return;
@@ -533,7 +604,8 @@ eNextState POP3_C_DispatchReadDone(AsyncIO *IO)
        eNextState rc;
 
        rc = POP3C_ReadHandlers[pMsg->State](pMsg);
-       pMsg->State++;
+       if (rc != eReadMore)
+           pMsg->State++;
        POP3SetTimeout(rc, pMsg);
        return rc;
 }
@@ -595,26 +667,24 @@ eReadState POP3_C_ReadServerStatus(AsyncIO *IO)
 {
        eReadState Finished = eBufferNotEmpty; 
 
-       while (Finished == eBufferNotEmpty) {
+       switch (IO->NextState) {
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eDBQuery:
+       case eConnect:
+       case eTerminateConnection:
+       case eAbort:
+               Finished = eReadFail;
+               break;
+       case eSendReply: 
+       case eSendMore:
+       case eReadMore:
+       case eReadMessage: 
                Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
-               
-               switch (Finished) {
-               case eMustReadMore: /// read new from socket... 
-                       return Finished;
-                       break;
-               case eBufferNotEmpty: /* shouldn't happen... */
-               case eReadSuccess: /// done for now...
-                       if (StrLength(IO->IOBuf) < 4)
-                               continue;
-                       if (ChrPtr(IO->IOBuf)[3] == '-')
-                               Finished = eBufferNotEmpty;
-                       else 
-                               return Finished;
-                       break;
-               case eReadFail: /// WHUT?
-                       ///todo: shut down! 
-                       break;
-               }
+               break;
+       case eReadPayload:
+               Finished = CtdlReadMessageBodyAsync(IO);
+               break;
        }
        return Finished;
 }
@@ -622,6 +692,21 @@ eReadState POP3_C_ReadServerStatus(AsyncIO *IO)
 /*****************************************************************************
  * So we connect our Server IP here.                                         *
  *****************************************************************************/
+eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO)
+{
+       pop3aggr *cpptr = IO->Data;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+////???        cpptr->State ++;
+       if (cpptr->Pos == NULL)
+               cpptr->Pos = GetNewHashPos(cpptr->MsgNumbers, 0);
+
+       POP3_C_DispatchWriteDone(IO);
+       ReAttachIO(IO, cpptr, 0);
+       IO->NextState = eReadMessage;
+       return IO->NextState;
+}
+
 eNextState connect_ip(AsyncIO *IO)
 {
        pop3aggr *cpptr = IO->Data;