From: Wilfried Goesgens Date: Wed, 17 Aug 2011 17:43:15 +0000 (+0000) Subject: Work on event'ing the pop3 aggregator X-Git-Tag: v8.11~1018 X-Git-Url: https://code.citadel.org/?p=citadel.git;a=commitdiff_plain;h=5c73f399000adbee40452735a93684267d54c8bd Work on event'ing the pop3 aggregator - POP3_C_ReAttachToFetchMessages(): we manage the transition DB-IO -> Network I/O - POP3C_GetListCommandState(): after receiving the list-OK we have to continue reading the list items - POP3C_GetListOneLine(): the server sends us the size of the message; save it for later use. - POP3C_GetOneMessagID(): after reading all list-items, start DB lookup whether we knew them already. - POP3C_GetOneMessageIDState(): if more items are in the list, call us again with the next message ID. - POP3_FetchNetworkUsetableEntry(): fetch one UT-entry per call; forward to POP3_C_ReAttachToFetchMessages() once we saw all items in the list. - POP3C_SendGetOneMsg(): cycle over the list till we find the next Mail we didn't know already & fetch it; if no more items in list -> goodbuye. - POP3C_ReadMessageBody() after the I/O layer read the message from the socket, parse it; move our context over to the DB-Queue. - POP3C_ReadMessageBodyFollowing(): On success initialize the async message reader; let the I/O layer read the message, then continue parsing & saving it. - POP3C_StoreMsgRead(): parse & save the message to the DB. Continue with remembering it in the UT table. - POP3C_SendDelete(): check whether we want to delete the mails on the server, if do; else continue with POP3C_SendGetOneMsg() to fetch next in list. - POP3_C_ReadServerStatus(): switch between line based read operation and reading whole messages. we now can successfully aggregate one pop3 account. --- diff --git a/citadel/modules/pop3client/serv_pop3client.c b/citadel/modules/pop3client/serv_pop3client.c index 827976a3c..f654668f4 100644 --- a/citadel/modules/pop3client/serv_pop3client.c +++ b/citadel/modules/pop3client/serv_pop3client.c @@ -68,9 +68,11 @@ typedef enum ePOP3_C_States { GetUserState, GetPassState, GetListCommandState, + GetListOneLine, GetOneMessageIDState, ReadMessageBodyFollowing, ReadMessageBody, + GetDeleteState, ReadQuitState, POP3C_MaxRead }ePOP3_C_States; @@ -78,9 +80,11 @@ typedef enum ePOP3_C_States { typedef struct _FetchItem { long MSGID; + long MSGSize; StrBuf *MsgUIDL; StrBuf *MsgUID; int NeedFetch; + struct CtdlMessage *Msg; } FetchItem; void HfreeFetchItem(void *vItem) @@ -137,6 +141,7 @@ eNextState POP3_C_DispatchReadDone(AsyncIO *IO); eNextState POP3_C_DispatchWriteDone(AsyncIO *IO); eNextState POP3_C_Terminate(AsyncIO *IO); eReadState POP3_C_ReadServerStatus(AsyncIO *IO); +eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO); eNextState FinalizePOP3AggrRun(AsyncIO *IO) { @@ -212,12 +217,14 @@ eNextState POP3C_GetListCommandState(pop3aggr *RecvMsg) POP3C_DBG_READ(); if (!POP3C_OK) return eTerminateConnection; RecvMsg->MsgNumbers = NewHash(1, NULL); + RecvMsg->State++; return eReadMore; } eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg) { + const char *pch; FetchItem *OneMsg = NULL; POP3C_DBG_READ(); @@ -237,72 +244,89 @@ eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg) } OneMsg = (FetchItem*) malloc(sizeof(FetchItem)); memset(OneMsg, 0, sizeof(FetchItem)); - OneMsg->MSGID = atoi(ChrPtr(RecvMsg->IO.IOBuf)); + OneMsg->MSGID = atol(ChrPtr(RecvMsg->IO.IOBuf)); + + pch = strchr(ChrPtr(RecvMsg->IO.IOBuf), ' '); + if (pch != NULL) + { + OneMsg->MSGSize = atol(pch + 1); + } Put(RecvMsg->MsgNumbers, LKEY(OneMsg->MSGID), OneMsg, HfreeFetchItem); //RecvMsg->State --; /* read next Line */ return eReadMore; } -eNextState POP3C_GetOneMessagID(pop3aggr *RecvMsg) +eNextState POP3_FetchNetworkUsetableEntry(AsyncIO *IO) { long HKLen; const char *HKey; void *vData; - - if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData)) - { - RecvMsg->CurrMsg = (FetchItem*) vData; - /* Find out the UIDL of the message, to determine whether we've already downloaded it */ - StrBufPrintf(RecvMsg->IO.SendBuf.Buf, - "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID); - POP3C_DBG_SEND(); - } - else - { - DeleteHashPos(&RecvMsg->Pos); - /// done receiving uidls.. start looking them up now. - RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0); - - } - return eReadMore; /* TODO */ -} - -#if 0 -eNextState FetchNetworkUsetableEntry(AsyncIO *IO) -{ struct cdbdata *cdbut; - networker_save_message *Ctx = (networker_save_message *) IO->Data; + pop3aggr *RecvMsg = (pop3aggr *) IO->Data; if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData)) { + struct UseTable ut; + RecvMsg->CurrMsg = (FetchItem*) vData; /* Find out if we've already seen this item */ - strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO - Ctx->ut.ut_timestamp = time(NULL); + safestrncpy(ut.ut_msgid, + ChrPtr(RecvMsg->CurrMsg->MsgUIDL), + sizeof(ut.ut_msgid)); + ut.ut_timestamp = time(NULL);/// TODO: libev timestamp! - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID)); + cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUIDL)); if (cdbut != NULL) { /* Item has already been seen */ - CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); + CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(RecvMsg->CurrMsg->MsgUIDL)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ cdb_store(CDB_USETABLE, - SKEY(Ctx->MsgGUID), - &Ctx->ut, sizeof(struct UseTable) ); - return eAbort; + SKEY(RecvMsg->CurrMsg->MsgUIDL), + &ut, sizeof(struct UseTable) ); + RecvMsg->CurrMsg->NeedFetch = 0; } else { - NextDBOperation(IO, RSSSaveMessage); - return eSendMore; + RecvMsg->CurrMsg->NeedFetch = 1; } + return NextDBOperation(&RecvMsg->IO, POP3_FetchNetworkUsetableEntry); + } + else + { + /* ok, now we know them all, continue with reading the actual messages. */ + DeleteHashPos(&RecvMsg->Pos); + + return QueueEventContext(IO, POP3_C_ReAttachToFetchMessages); } - return eReadMessage; } -#endif +eNextState POP3C_GetOneMessagID(pop3aggr *RecvMsg) +{ + long HKLen; + const char *HKey; + void *vData; + + if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData)) + { + RecvMsg->CurrMsg = (FetchItem*) vData; + /* Find out the UIDL of the message, to determine whether we've already downloaded it */ + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID); + POP3C_DBG_SEND(); + } + else + { + RecvMsg->State++; + DeleteHashPos(&RecvMsg->Pos); + /// done receiving uidls.. start looking them up now. + RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0); + return QueueDBOperation(&RecvMsg->IO, POP3_FetchNetworkUsetableEntry); + } + return eReadMore; /* TODO */ +} eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg) { @@ -317,7 +341,8 @@ eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg) ChrPtr(RecvMsg->RoomName), ChrPtr(RecvMsg->CurrMsg->MsgUIDL), RecvMsg->Pop3Host.Host); - return eReadMessage; + RecvMsg->State --; + return eSendReply; } eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg) @@ -343,11 +368,27 @@ eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg) eNextState POP3C_SendGetOneMsg(pop3aggr *RecvMsg) { - /* Message has not been seen. Tell the server to fetch the message... */ - StrBufPrintf(RecvMsg->IO.SendBuf.Buf, - "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID); - POP3C_DBG_SEND(); - return eReadMessage; + long HKLen; + const char *HKey; + void *vData; + + RecvMsg->CurrMsg = NULL; + while (GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData) && + (RecvMsg->CurrMsg = (FetchItem*) vData, RecvMsg->CurrMsg->NeedFetch == 0)) + {} + + if ((RecvMsg->CurrMsg != NULL ) && (RecvMsg->CurrMsg->NeedFetch == 1)) + { + /* Message has not been seen. Tell the server to fetch the message... */ + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID); + POP3C_DBG_SEND(); + return eReadMessage; + } + else { + RecvMsg->State = ReadQuitState; + return POP3_C_DispatchWriteDone(&RecvMsg->IO); + } } @@ -355,56 +396,78 @@ eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg) { POP3C_DBG_READ(); if (!POP3C_OK) return eTerminateConnection; - else return eSendReply; + RecvMsg->IO.ReadMsg = NewAsyncMsg(HKEY("."), + RecvMsg->CurrMsg->MSGSize, + config.c_maxmsglen, + NULL, -1, + 1); + + return eReadPayload; } +eNextState POP3C_StoreMsgRead(AsyncIO *IO) +{ + pop3aggr *RecvMsg = (pop3aggr *) IO->Data; + struct UseTable ut; -eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg) + safestrncpy(ut.ut_msgid, + ChrPtr(RecvMsg->CurrMsg->MsgUID), + sizeof(ut.ut_msgid)); + ut.ut_timestamp = time(NULL); /* TODO: use libev time */ + cdb_store(CDB_USETABLE, + ChrPtr(RecvMsg->CurrMsg->MsgUID), + StrLength(RecvMsg->CurrMsg->MsgUID), + &ut, + sizeof(struct UseTable) ); + + return QueueEventContext(&RecvMsg->IO, POP3_C_ReAttachToFetchMessages); +} +eNextState POP3C_SaveMsg(AsyncIO *IO) { -#if 0 -//TODO - /* If we get to this point, the message is on its way. Read it. */ - body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock); - if (body == NULL) goto bail; - - CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n"); - msg = convert_internet_message(body); - body = NULL; /* yes, this should be dereferenced, NOT freed */ - - /* Do Something With It (tm) */ - msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0); + long msgnum; + pop3aggr *RecvMsg = (pop3aggr *) IO->Data; + + /* Do Something With It (tm) */ + msgnum = CtdlSubmitMsg(RecvMsg->CurrMsg->Msg, + NULL, + ChrPtr(RecvMsg->RoomName), + 0); if (msgnum > 0L) { /* Message has been committed to the store */ /* write the uidl to the use table so we don't fetch this message again */ } - CtdlFreeMessage(msg); -#endif + CtdlFreeMessage(RecvMsg->CurrMsg->Msg); + + return NextDBOperation(&RecvMsg->IO, POP3C_StoreMsgRead); return eReadMessage; } -eNextState POP3C_StoreMsgRead(pop3aggr *RecvMsg) +eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg) { -#if 0 - strcpy(ut.ut_msgid, utmsgid); - ut.ut_timestamp = time(NULL); - cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), - &ut, sizeof(struct UseTable) ); -#endif - return eReadMessage;/// TODO + CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n"); + RecvMsg->CurrMsg->Msg = convert_internet_message_buf(&RecvMsg->IO.ReadMsg->MsgBuf); + + return QueueDBOperation(&RecvMsg->IO, POP3C_SaveMsg); } + eNextState POP3C_SendDelete(pop3aggr *RecvMsg) { if (!RecvMsg->keep) { StrBufPrintf(RecvMsg->IO.SendBuf.Buf, "DELE %ld\r\n", RecvMsg->CurrMsg->MSGID); POP3C_DBG_SEND(); + return eReadMessage; + } + else { + RecvMsg->State = ReadMessageBodyFollowing; + return POP3_C_DispatchWriteDone(&RecvMsg->IO); } - return eReadMessage; } eNextState POP3C_ReadDeleteState(pop3aggr *RecvMsg) { POP3C_DBG_READ(); + RecvMsg->State = GetOneMessageIDState; return eReadMessage; } @@ -432,9 +495,11 @@ Pop3ClientHandler POP3C_ReadHandlers[] = { POP3C_GetUserState, POP3C_GetPassState, POP3C_GetListCommandState, + POP3C_GetListOneLine, POP3C_GetOneMessageIDState, POP3C_ReadMessageBodyFollowing, POP3C_ReadMessageBody, + POP3C_ReadDeleteState, POP3C_ReadQuitState, }; @@ -464,9 +529,10 @@ Pop3ClientHandler POP3C_SendHandlers[] = { POP3C_SendUser, POP3C_SendPassword, POP3C_SendListCommand, - POP3C_GetListOneLine, + NULL, POP3C_GetOneMessagID, POP3C_SendGetOneMsg, + NULL, POP3C_SendDelete, POP3C_SendQuit }; @@ -516,10 +582,15 @@ void POP3SetTimeout(eNextState NextTCPState, pop3aggr *pMsg) } */ break; + case eReadPayload: + Timeout = 100000; + /* TODO!!! */ + break; case eSendDNSQuery: case eReadDNSReply: case eConnect: case eTerminateConnection: + case eDBQuery: case eAbort: case eReadMore://// TODO return; @@ -533,7 +604,8 @@ eNextState POP3_C_DispatchReadDone(AsyncIO *IO) eNextState rc; rc = POP3C_ReadHandlers[pMsg->State](pMsg); - pMsg->State++; + if (rc != eReadMore) + pMsg->State++; POP3SetTimeout(rc, pMsg); return rc; } @@ -595,26 +667,24 @@ eReadState POP3_C_ReadServerStatus(AsyncIO *IO) { eReadState Finished = eBufferNotEmpty; - while (Finished == eBufferNotEmpty) { + switch (IO->NextState) { + case eSendDNSQuery: + case eReadDNSReply: + case eDBQuery: + case eConnect: + case eTerminateConnection: + case eAbort: + Finished = eReadFail; + break; + case eSendReply: + case eSendMore: + case eReadMore: + case eReadMessage: Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); - - switch (Finished) { - case eMustReadMore: /// read new from socket... - return Finished; - break; - case eBufferNotEmpty: /* shouldn't happen... */ - case eReadSuccess: /// done for now... - if (StrLength(IO->IOBuf) < 4) - continue; - if (ChrPtr(IO->IOBuf)[3] == '-') - Finished = eBufferNotEmpty; - else - return Finished; - break; - case eReadFail: /// WHUT? - ///todo: shut down! - break; - } + break; + case eReadPayload: + Finished = CtdlReadMessageBodyAsync(IO); + break; } return Finished; } @@ -622,6 +692,21 @@ eReadState POP3_C_ReadServerStatus(AsyncIO *IO) /***************************************************************************** * So we connect our Server IP here. * *****************************************************************************/ +eNextState POP3_C_ReAttachToFetchMessages(AsyncIO *IO) +{ + pop3aggr *cpptr = IO->Data; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); +////??? cpptr->State ++; + if (cpptr->Pos == NULL) + cpptr->Pos = GetNewHashPos(cpptr->MsgNumbers, 0); + + POP3_C_DispatchWriteDone(IO); + ReAttachIO(IO, cpptr, 0); + IO->NextState = eReadMessage; + return IO->NextState; +} + eNextState connect_ip(AsyncIO *IO) { pop3aggr *cpptr = IO->Data;