From 969d199c6fb0d1f54220dc906202946a0b43f387 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 10 Aug 2011 14:02:00 +0000 Subject: [PATCH] work on the pop3 aggregator - rewrite the config parser similar to the rss-configparser - copy over handler logic from async smtp connector - chop worker into handy bits so the handler logic can control it. --- citadel/event_client.h | 5 + citadel/modules/pop3client/serv_pop3client.c | 1010 ++++++++++++++---- 2 files changed, 825 insertions(+), 190 deletions(-) diff --git a/citadel/event_client.h b/citadel/event_client.h index 5939ad914..9ed88c01e 100644 --- a/citadel/event_client.h +++ b/citadel/event_client.h @@ -1,3 +1,5 @@ +#ifndef __EVENT_CLIENT_H__ +#define __EVENT_CLIENT_H__ #define EV_COMPAT3 0 #include #include @@ -16,6 +18,7 @@ typedef enum _eNextState { eSendReply, eSendMore, eReadMessage, + eReadMore, eTerminateConnection, eAbort }eNextState; @@ -151,3 +154,5 @@ int evcurl_init(AsyncIO *IO, IO_CallBack Terminate); void evcurl_handle_start(AsyncIO *IO); + +#endif /* __EVENT_CLIENT_H__ */ diff --git a/citadel/modules/pop3client/serv_pop3client.c b/citadel/modules/pop3client/serv_pop3client.c index 40add1484..827976a3c 100644 --- a/citadel/modules/pop3client/serv_pop3client.c +++ b/citadel/modules/pop3client/serv_pop3client.c @@ -50,262 +50,879 @@ #include "internet_addressing.h" #include "database.h" #include "citadel_dirs.h" +#include "event_client.h" -struct pop3aggr { - struct pop3aggr *next; - char roomname[ROOMNAMELEN]; - char pop3host[128]; - char pop3user[128]; - char pop3pass[128]; + + +citthread_mutex_t POP3QueueMutex; /* locks the access to the following vars: */ +HashList *POP3QueueRooms = NULL; /* rss_room_counter */ +HashList *POP3FetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */ + +typedef struct __pop3_room_counter { + int count; + long QRnumber; +}pop3_room_counter; + +typedef enum ePOP3_C_States { + ReadGreeting, + GetUserState, + GetPassState, + GetListCommandState, + GetOneMessageIDState, + ReadMessageBodyFollowing, + ReadMessageBody, + ReadQuitState, + POP3C_MaxRead +}ePOP3_C_States; + + +typedef struct _FetchItem { + long MSGID; + StrBuf *MsgUIDL; + StrBuf *MsgUID; + int NeedFetch; +} FetchItem; + +void HfreeFetchItem(void *vItem) +{ + FetchItem *Item = (FetchItem*) vItem; + FreeStrBuf(&Item->MsgUIDL); + FreeStrBuf(&Item->MsgUID); + free(Item); +} + +typedef struct __pop3aggr { + AsyncIO IO; + + long n; + long RefCount; + ParsedURL Pop3Host; + DNSQueryParts HostLookup; + + StrBuf *rooms; + long QRnumber; + HashList *OtherQRnumbers; + + StrBuf *Url; +/// StrBuf *pop3host; -> URL + StrBuf *pop3user; + StrBuf *pop3pass; + StrBuf *RoomName; // TODO: fill me int keep; time_t interval; -}; + ePOP3_C_States State; + HashList *MsgNumbers; + HashPos *Pos; + FetchItem *CurrMsg; +} pop3aggr; + +void DeletePOP3Aggregator(void *vptr) +{ + pop3aggr *ptr = vptr; + DeleteHashPos(&ptr->Pos); + DeleteHash(&ptr->MsgNumbers); + FreeStrBuf(&ptr->rooms); + FreeStrBuf(&ptr->pop3user); + FreeStrBuf(&ptr->pop3pass); + FreeStrBuf(&ptr->RoomName); +} -struct pop3aggr *palist = NULL; +typedef eNextState(*Pop3ClientHandler)(pop3aggr* RecvMsg); -void pop3_do_fetching(char *roomname, char *pop3host, char *pop3user, char *pop3pass, int keep) +eNextState POP3_C_Shutdown(AsyncIO *IO); +eNextState POP3_C_Timeout(AsyncIO *IO); +eNextState POP3_C_ConnFail(AsyncIO *IO); +eNextState POP3_C_DispatchReadDone(AsyncIO *IO); +eNextState POP3_C_DispatchWriteDone(AsyncIO *IO); +eNextState POP3_C_Terminate(AsyncIO *IO); +eReadState POP3_C_ReadServerStatus(AsyncIO *IO); + +eNextState FinalizePOP3AggrRun(AsyncIO *IO) { - int sock; - char buf[SIZ]; - int msg_to_fetch = 0; - int *msglist = NULL; - int num_msgs = 0; - int alloc_msgs = 0; - int i; - char *body = NULL; - struct CtdlMessage *msg = NULL; - long msgnum = 0; - char this_uidl[64]; - char utmsgid[SIZ]; - struct cdbdata *cdbut; - struct UseTable ut; - CitContext *CCC=CC; - CtdlLogPrintf(CTDL_DEBUG, "POP3: %s %s %s \n", roomname, pop3host, pop3user); - CtdlLogPrintf(CTDL_NOTICE, "Connecting to <%s>\n", pop3host); - - if (CtdlThreadCheckStop()) - return; - - sock = sock_connect(pop3host, "110"); - if (sock < 0) { - CtdlLogPrintf(CTDL_ERR, "Could not connect: %s\n", strerror(errno)); - return; - } - - if (CtdlThreadCheckStop()) - goto bail; + return eAbort; +} - CtdlLogPrintf(CTDL_DEBUG, "Connected!\n"); - CCC->SBuf.Buf = NewStrBuf(); - CCC->sMigrateBuf = NewStrBuf(); - CCC->SBuf.ReadWritePointer = NULL; +eNextState FailAggregationRun(AsyncIO *IO) +{ + return eAbort; +} + +#define POP3C_DBG_SEND() CtdlLogPrintf(CTDL_DEBUG, "POP3 client[%ld]: > %s\n", RecvMsg->n, ChrPtr(RecvMsg->IO.SendBuf.Buf)) +#define POP3C_DBG_READ() CtdlLogPrintf(CTDL_DEBUG, "POP3 client[%ld]: < %s\n", RecvMsg->n, ChrPtr(RecvMsg->IO.IOBuf)) +#define POP3C_OK (strncasecmp(ChrPtr(RecvMsg->IO.IOBuf), "+OK", 3) == 0) +eNextState POP3C_ReadGreeting(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); /* Read the server greeting */ - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; + if (!POP3C_OK) return eTerminateConnection; + else return eSendReply; +} - if (CtdlThreadCheckStop()) - goto bail; +eNextState POP3C_SendUser(pop3aggr *RecvMsg) +{ /* Identify ourselves. NOTE: we have to append a CR to each command. The LF will * automatically be appended by sock_puts(). Believe it or not, leaving out the CR * will cause problems if the server happens to be Exchange, which is so b0rken it * actually barfs on LF-terminated newlines. */ - snprintf(buf, sizeof buf, "USER %s\r", pop3user); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "USER %s\r\n", ChrPtr(RecvMsg->pop3user)); + POP3C_DBG_SEND(); + return eReadMessage; +} - if (CtdlThreadCheckStop()) - goto bail; +eNextState POP3C_GetUserState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + if (!POP3C_OK) return eTerminateConnection; + else return eSendReply; +} +eNextState POP3C_SendPassword(pop3aggr *RecvMsg) +{ /* Password */ - snprintf(buf, sizeof buf, "PASS %s\r", pop3pass); + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "PASS %s\r\n", ChrPtr(RecvMsg->pop3pass)); CtdlLogPrintf(CTDL_DEBUG, "\n"); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; +// POP3C_DBG_SEND(); + return eReadMessage; +} - if (CtdlThreadCheckStop()) - goto bail; +eNextState POP3C_GetPassState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + if (!POP3C_OK) return eTerminateConnection; + else return eSendReply; +} +eNextState POP3C_SendListCommand(pop3aggr *RecvMsg) +{ /* Get the list of messages */ - snprintf(buf, sizeof buf, "LIST\r"); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; + StrBufPlain(RecvMsg->IO.SendBuf.Buf, HKEY("LIST\r\n")); + POP3C_DBG_SEND(); + return eReadMessage; +} - if (CtdlThreadCheckStop()) - goto bail; - - do { - if (CtdlThreadCheckStop()) - goto bail; - - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - msg_to_fetch = atoi(buf); - if (msg_to_fetch > 0) { - if (alloc_msgs == 0) { - alloc_msgs = 100; - msglist = malloc((alloc_msgs * (sizeof(int)))); - } - else if (num_msgs >= alloc_msgs) { - alloc_msgs = alloc_msgs * 2; - msglist = realloc(msglist, (alloc_msgs * sizeof(int))); - } - if (msglist == NULL) goto bail; - msglist[num_msgs++] = msg_to_fetch; +eNextState POP3C_GetListCommandState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + if (!POP3C_OK) return eTerminateConnection; + RecvMsg->MsgNumbers = NewHash(1, NULL); + return eReadMore; +} + + +eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg) +{ + FetchItem *OneMsg = NULL; + POP3C_DBG_READ(); + + if ((StrLength(RecvMsg->IO.IOBuf) == 1) && + (ChrPtr(RecvMsg->IO.IOBuf)[0] == '.')) + { + if (GetCount(RecvMsg->MsgNumbers) == 0) + { + //// RecvMsg->Sate = ReadQuitState; + } + else + { + RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0); } - } while (buf[0] != '.'); + return eSendReply; + + } + OneMsg = (FetchItem*) malloc(sizeof(FetchItem)); + memset(OneMsg, 0, sizeof(FetchItem)); + OneMsg->MSGID = atoi(ChrPtr(RecvMsg->IO.IOBuf)); + Put(RecvMsg->MsgNumbers, LKEY(OneMsg->MSGID), OneMsg, HfreeFetchItem); + + //RecvMsg->State --; /* read next Line */ + return eReadMore; +} - if (num_msgs) for (i=0; iMsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData)) + { + RecvMsg->CurrMsg = (FetchItem*) vData; /* Find out the UIDL of the message, to determine whether we've already downloaded it */ - snprintf(buf, sizeof buf, "UIDL %d\r", msglist[i]); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; - extract_token(this_uidl, buf, 2, ' ', sizeof this_uidl); + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID); + POP3C_DBG_SEND(); + } + else + { + DeleteHashPos(&RecvMsg->Pos); + /// done receiving uidls.. start looking them up now. + RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0); + + } + return eReadMore; /* TODO */ +} - snprintf(utmsgid, sizeof utmsgid, "pop3/%s/%s@%s", roomname, this_uidl, pop3host); +#if 0 +eNextState FetchNetworkUsetableEntry(AsyncIO *IO) +{ + struct cdbdata *cdbut; + networker_save_message *Ctx = (networker_save_message *) IO->Data; - if (CtdlThreadCheckStop()) - goto bail; + if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData)) + { - cdbut = cdb_fetch(CDB_USETABLE, utmsgid, strlen(utmsgid)); + /* Find out if we've already seen this item */ + strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO + Ctx->ut.ut_timestamp = time(NULL); + + cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID)); if (cdbut != NULL) { - /* message has already been seen */ - CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", utmsgid); + /* Item has already been seen */ + CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); cdb_free(cdbut); - + /* rewrite the record anyway, to update the timestamp */ - strcpy(ut.ut_msgid, utmsgid); - ut.ut_timestamp = time(NULL); - cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), &ut, sizeof(struct UseTable) ); + cdb_store(CDB_USETABLE, + SKEY(Ctx->MsgGUID), + &Ctx->ut, sizeof(struct UseTable) ); + return eAbort; } - else { - /* Message has not been seen. Tell the server to fetch the message... */ - snprintf(buf, sizeof buf, "RETR %d\r", msglist[i]); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); - if (strncasecmp(buf, "+OK", 3)) goto bail; - - if (CtdlThreadCheckStop()) - goto bail; + else + { + NextDBOperation(IO, RSSSaveMessage); + return eSendMore; + } + } + return eReadMessage; +} +#endif - /* If we get to this point, the message is on its way. Read it. */ - body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock); - if (body == NULL) goto bail; + +eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + if (!POP3C_OK) return eTerminateConnection; + RecvMsg->CurrMsg->MsgUIDL = NewStrBufPlain(NULL, StrLength(RecvMsg->IO.IOBuf)); + RecvMsg->CurrMsg->MsgUID = NewStrBufPlain(NULL, StrLength(RecvMsg->IO.IOBuf) * 2); + + StrBufExtract_token(RecvMsg->CurrMsg->MsgUIDL, RecvMsg->IO.IOBuf, 2, ' '); + StrBufPrintf(RecvMsg->CurrMsg->MsgUID, + "pop3/%s/%s@%s", + ChrPtr(RecvMsg->RoomName), + ChrPtr(RecvMsg->CurrMsg->MsgUIDL), + RecvMsg->Pop3Host.Host); + return eReadMessage; +} + +eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg) +{ + + struct cdbdata *cdbut; + struct UseTable ut; + + cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID)); + if (cdbut != NULL) { + /* message has already been seen */ + CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(RecvMsg->CurrMsg->MsgUID)); + cdb_free(cdbut); + + /* rewrite the record anyway, to update the timestamp */ + strcpy(ut.ut_msgid, ChrPtr(RecvMsg->CurrMsg->MsgUID)); + ut.ut_timestamp = time(NULL); + cdb_store(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID), &ut, sizeof(struct UseTable) ); + } + + return eReadMessage; +} + +eNextState POP3C_SendGetOneMsg(pop3aggr *RecvMsg) +{ + /* Message has not been seen. Tell the server to fetch the message... */ + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID); + POP3C_DBG_SEND(); + return eReadMessage; +} + + +eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + if (!POP3C_OK) return eTerminateConnection; + else return eSendReply; +} + + + +eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg) +{ +#if 0 +//TODO + /* If we get to this point, the message is on its way. Read it. */ + body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock); + if (body == NULL) goto bail; - CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n"); - msg = convert_internet_message(body); - body = NULL; /* yes, this should be dereferenced, NOT freed */ + CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n"); + msg = convert_internet_message(body); + body = NULL; /* yes, this should be dereferenced, NOT freed */ /* Do Something With It (tm) */ - msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0); - if (msgnum > 0L) { - /* Message has been committed to the store */ - - if (!keep) { - snprintf(buf, sizeof buf, "DELE %d\r", msglist[i]); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); /* errors here are non-fatal */ - } + msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0); + if (msgnum > 0L) { + /* Message has been committed to the store */ + /* write the uidl to the use table so we don't fetch this message again */ + } + CtdlFreeMessage(msg); +#endif + return eReadMessage; +} - /* write the uidl to the use table so we don't fetch this message again */ - strcpy(ut.ut_msgid, utmsgid); - ut.ut_timestamp = time(NULL); - cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), - &ut, sizeof(struct UseTable) ); - } - CtdlFreeMessage(msg); - } +eNextState POP3C_StoreMsgRead(pop3aggr *RecvMsg) +{ +#if 0 + strcpy(ut.ut_msgid, utmsgid); + ut.ut_timestamp = time(NULL); + cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), + &ut, sizeof(struct UseTable) ); +#endif + return eReadMessage;/// TODO +} +eNextState POP3C_SendDelete(pop3aggr *RecvMsg) +{ + if (!RecvMsg->keep) { + StrBufPrintf(RecvMsg->IO.SendBuf.Buf, + "DELE %ld\r\n", RecvMsg->CurrMsg->MSGID); + POP3C_DBG_SEND(); } + return eReadMessage; +} +eNextState POP3C_ReadDeleteState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + return eReadMessage; +} +eNextState POP3C_SendQuit(pop3aggr *RecvMsg) +{ /* Log out */ - snprintf(buf, sizeof buf, "QUIT\r"); - CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf); - if (sock_puts(&sock, buf) <0) goto bail; - if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail; - CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); -bail: - FreeStrBuf(&CCC->SBuf.Buf); - FreeStrBuf(&CCC->sMigrateBuf); + StrBufPlain(RecvMsg->IO.SendBuf.Buf, + HKEY("QUIT\r\n3)")); + POP3C_DBG_SEND(); + return eReadMessage; +} + - if (sock != -1) - sock_close(sock); - if (msglist) free(msglist); +eNextState POP3C_ReadQuitState(pop3aggr *RecvMsg) +{ + POP3C_DBG_READ(); + return eAbort; } +const long POP3_C_ConnTimeout = 1000; +const long DefaultPOP3Port = 110; + +Pop3ClientHandler POP3C_ReadHandlers[] = { + POP3C_ReadGreeting, + POP3C_GetUserState, + POP3C_GetPassState, + POP3C_GetListCommandState, + POP3C_GetOneMessageIDState, + POP3C_ReadMessageBodyFollowing, + POP3C_ReadMessageBody, + POP3C_ReadQuitState, +}; + +const long POP3_C_SendTimeouts[POP3C_MaxRead] = { + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100 +}; +const ConstStr POP3C_ReadErrors[POP3C_MaxRead] = { + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")}, + {HKEY("Connection broken during ")} +}; + +Pop3ClientHandler POP3C_SendHandlers[] = { + NULL, /* we don't send a greeting */ + POP3C_SendUser, + POP3C_SendPassword, + POP3C_SendListCommand, + POP3C_GetListOneLine, + POP3C_GetOneMessagID, + POP3C_SendGetOneMsg, + POP3C_SendDelete, + POP3C_SendQuit +}; + +const long POP3_C_ReadTimeouts[] = { + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100, + 100 +}; +/*****************************************************************************/ +/* POP3 CLIENT DISPATCHER */ +/*****************************************************************************/ + +void POP3SetTimeout(eNextState NextTCPState, pop3aggr *pMsg) +{ + double Timeout = 0.0; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + + switch (NextTCPState) { + case eSendReply: + case eSendMore: + Timeout = POP3_C_SendTimeouts[pMsg->State]; +/* + if (pMsg->State == eDATABody) { + / * if we're sending a huge message, we need more time. * / + Timeout += StrLength(pMsg->msgtext) / 1024; + } +*/ + break; + case eReadMessage: + Timeout = POP3_C_ReadTimeouts[pMsg->State]; +/* + if (pMsg->State == eDATATerminateBody) { + / * + * some mailservers take a nap before accepting the message + * content inspection and such. + * / + Timeout += StrLength(pMsg->msgtext) / 1024; + } +*/ + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eTerminateConnection: + case eAbort: + case eReadMore://// TODO + return; + } + SetNextTimeout(&pMsg->IO, Timeout); +} +eNextState POP3_C_DispatchReadDone(AsyncIO *IO) +{ + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + pop3aggr *pMsg = IO->Data; + eNextState rc; + + rc = POP3C_ReadHandlers[pMsg->State](pMsg); + pMsg->State++; + POP3SetTimeout(rc, pMsg); + return rc; +} +eNextState POP3_C_DispatchWriteDone(AsyncIO *IO) +{ + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + pop3aggr *pMsg = IO->Data; + eNextState rc; + + rc = POP3C_SendHandlers[pMsg->State](pMsg); + POP3SetTimeout(rc, pMsg); + return rc; +} + + +/*****************************************************************************/ +/* POP3 CLIENT ERROR CATCHERS */ +/*****************************************************************************/ +eNextState POP3_C_Terminate(AsyncIO *IO) +{ +/// pop3aggr *pMsg = (pop3aggr *)IO->Data; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + FinalizePOP3AggrRun(IO); + return eAbort; +} +eNextState POP3_C_Timeout(AsyncIO *IO) +{ + pop3aggr *pMsg = IO->Data; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); + return FailAggregationRun(IO); +} +eNextState POP3_C_ConnFail(AsyncIO *IO) +{ + pop3aggr *pMsg = (pop3aggr *)IO->Data; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State])); + return FailAggregationRun(IO); +} +eNextState POP3_C_Shutdown(AsyncIO *IO) +{ + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); +//// pop3aggr *pMsg = IO->Data; + + ////pMsg->MyQEntry->Status = 3; + ///StrBufPlain(pMsg->MyQEntry->StatusMessage, HKEY("server shutdown during message retrieval.")); + FinalizePOP3AggrRun(IO); + return eAbort; +} + + +/** + * @brief lineread Handler; understands when to read more POP3 lines, and when this is a one-lined reply. + */ +eReadState POP3_C_ReadServerStatus(AsyncIO *IO) +{ + eReadState Finished = eBufferNotEmpty; + + while (Finished == eBufferNotEmpty) { + Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf); + + switch (Finished) { + case eMustReadMore: /// read new from socket... + return Finished; + break; + case eBufferNotEmpty: /* shouldn't happen... */ + case eReadSuccess: /// done for now... + if (StrLength(IO->IOBuf) < 4) + continue; + if (ChrPtr(IO->IOBuf)[3] == '-') + Finished = eBufferNotEmpty; + else + return Finished; + break; + case eReadFail: /// WHUT? + ///todo: shut down! + break; + } + } + return Finished; +} + +/***************************************************************************** + * So we connect our Server IP here. * + *****************************************************************************/ +eNextState connect_ip(AsyncIO *IO) +{ + pop3aggr *cpptr = IO->Data; + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + +//// IO->ConnectMe = &cpptr->Pop3Host; + /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ + + /////// SetConnectStatus(IO); + + return InitEventIO(IO, cpptr, + POP3_C_ConnTimeout, + POP3_C_ReadTimeouts[0], + 1); +} + +eNextState get_one_host_ip_done(AsyncIO *IO) +{ + pop3aggr *cpptr = IO->Data; + struct hostent *hostent; + + QueryCbDone(IO); + + hostent = cpptr->HostLookup.VParsedDNSReply; + if ((cpptr->HostLookup.DNSStatus == ARES_SUCCESS) && + (hostent != NULL) ) { + memset(&cpptr->Pop3Host.Addr, 0, sizeof(struct in6_addr)); + if (cpptr->Pop3Host.IPv6) { + memcpy(&cpptr->Pop3Host.Addr.sin6_addr.s6_addr, + &hostent->h_addr_list[0], + sizeof(struct in6_addr)); + + cpptr->Pop3Host.Addr.sin6_family = hostent->h_addrtype; + cpptr->Pop3Host.Addr.sin6_port = htons(DefaultPOP3Port); + } + else { + struct sockaddr_in *addr = (struct sockaddr_in*) &cpptr->Pop3Host.Addr; + /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */ +// addr->sin_addr.s_addr = htonl((uint32_t)&hostent->h_addr_list[0]); + memcpy(&addr->sin_addr.s_addr, + hostent->h_addr_list[0], + sizeof(uint32_t)); + + addr->sin_family = hostent->h_addrtype; + addr->sin_port = htons(DefaultPOP3Port); + + } + return connect_ip(IO); + } + else + return eAbort; +} + +eNextState get_one_host_ip(AsyncIO *IO) +{ + pop3aggr *cpptr = IO->Data; + /* + * here we start with the lookup of one host. it might be... + * - the relay host *sigh* + * - the direct hostname if there was no mx record + * - one of the mx'es + */ + + InitC_ares_dns(IO); + + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__); + + CtdlLogPrintf(CTDL_DEBUG, + "POP3 client[%ld]: looking up %s-Record %s : %d ...\n", + cpptr->n, + (cpptr->Pop3Host.IPv6)? "aaaa": "a", + cpptr->Pop3Host.Host, + cpptr->Pop3Host.Port); + + if (!QueueQuery((cpptr->Pop3Host.IPv6)? ns_t_aaaa : ns_t_a, + cpptr->Pop3Host.Host, + &cpptr->IO, + &cpptr->HostLookup, + get_one_host_ip_done)) + { +// cpptr->MyQEntry->Status = 5; +// StrBufPrintf(SendMsg->MyQEntry->StatusMessage, +// "No MX hosts found for <%s>", SendMsg->node); + cpptr->IO.NextState = eTerminateConnection; + return IO->NextState; + } + IO->NextState = eReadDNSReply; + return IO->NextState; +} + + + +int pop3_do_fetching(pop3aggr *cpptr) +{ + CitContext *SubC; + + cpptr->IO.Data = cpptr; + + cpptr->IO.SendDone = POP3_C_DispatchWriteDone; + cpptr->IO.ReadDone = POP3_C_DispatchReadDone; + cpptr->IO.Terminate = POP3_C_Terminate; + cpptr->IO.LineReader = POP3_C_ReadServerStatus; + cpptr->IO.ConnFail = POP3_C_ConnFail; + cpptr->IO.Timeout = POP3_C_Timeout; + cpptr->IO.ShutdownAbort = POP3_C_Shutdown; + + cpptr->IO.SendBuf.Buf = NewStrBufPlain(NULL, 1024); + cpptr->IO.RecvBuf.Buf = NewStrBufPlain(NULL, 1024); + cpptr->IO.IOBuf = NewStrBuf(); + + cpptr->IO.NextState = eReadMessage; +/* TODO + CtdlLogPrintf(CTDL_DEBUG, "POP3: %s %s %s \n", roomname, pop3host, pop3user); + CtdlLogPrintf(CTDL_NOTICE, "Connecting to <%s>\n", pop3host); +*/ + + SubC = CloneContext (CC); + SubC->session_specific_data = (char*) cpptr; + cpptr->IO.CitContext = SubC; + + if (cpptr->IO.ConnectMe->IsIP) { + QueueEventContext(&cpptr->IO, + connect_ip); + } + else { /* uneducated admin has chosen to add DNS to the equation... */ + QueueEventContext(&cpptr->IO, + get_one_host_ip); + } + return 1; +} /* * Scan a room's netconfig to determine whether it requires POP3 aggregation */ void pop3client_scan_room(struct ctdlroom *qrbuf, void *data) { + StrBuf *CfgData; + StrBuf *CfgType; + StrBuf *Line; + + struct stat statbuf; char filename[PATH_MAX]; - char buf[1024]; - char instr[32]; - FILE *fp; - struct pop3aggr *pptr; + int fd; + int Done; + void *vptr; + const char *CfgPtr, *lPtr; + const char *Err; - if (CtdlThreadCheckStop()) + pop3_room_counter *Count = NULL; +// pop3aggr *cpptr; + + citthread_mutex_lock(&POP3QueueMutex); + if (GetHash(POP3QueueRooms, LKEY(qrbuf->QRnumber), &vptr)) + { + CtdlLogPrintf(CTDL_DEBUG, + "pop3client: [%ld] %s already in progress.\n", + qrbuf->QRnumber, + qrbuf->QRname); + citthread_mutex_unlock(&POP3QueueMutex); return; + } + citthread_mutex_unlock(&POP3QueueMutex); assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); + if (CtdlThreadCheckStop()) + return; + /* Only do net processing for rooms that have netconfigs */ - fp = fopen(filename, "r"); - if (fp == NULL) { + fd = open(filename, 0); + if (fd <= 0) { + //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname); + return; + } + if (CtdlThreadCheckStop()) + return; + if (fstat(fd, &statbuf) == -1) { + CtdlLogPrintf(CTDL_DEBUG, "ERROR: could not stat configfile '%s' - %s\n", + filename, strerror(errno)); return; } + if (CtdlThreadCheckStop()) + return; + CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { + close(fd); + FreeStrBuf(&CfgData); + CtdlLogPrintf(CTDL_DEBUG, "ERROR: reading config '%s' - %s
\n", + filename, strerror(errno)); + return; + } + close(fd); + if (CtdlThreadCheckStop()) + return; + + CfgPtr = NULL; + CfgType = NewStrBuf(); + Line = NewStrBufPlain(NULL, StrLength(CfgData)); + Done = 0; + + while (!Done) + { + Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0; + if (StrLength(Line) > 0) + { + lPtr = NULL; + StrBufExtract_NextToken(CfgType, Line, &lPtr, '|'); + if (!strcasecmp("pop3client", ChrPtr(CfgType))) + { + pop3aggr *cptr; + + if (Count == NULL) + { + Count = malloc(sizeof(pop3_room_counter)); + Count->count = 0; + } + Count->count ++; + cptr = (pop3aggr *) malloc(sizeof(pop3aggr)); + memset(cptr, 0, sizeof(pop3aggr)); + /// TODO do we need this? cptr->roomlist_parts = 1; + cptr->rooms = NewStrBufPlain(qrbuf->QRname, -1); + cptr->pop3user = NewStrBufPlain(NULL, StrLength(Line)); + cptr->pop3pass = NewStrBufPlain(NULL, StrLength(Line)); + cptr->Url = NewStrBuf(); + + StrBufExtract_NextToken(cptr->Url, Line, &lPtr, '|'); + StrBufExtract_NextToken(cptr->pop3user, Line, &lPtr, '|'); + StrBufExtract_NextToken(cptr->pop3pass, Line, &lPtr, '|'); + cptr->keep = StrBufExtractNext_long(Line, &lPtr, '|'); + cptr->interval = StrBufExtractNext_long(Line, &lPtr, '|'); + + ParseURL(&cptr->IO.ConnectMe, cptr->Url, 110); + + cptr->IO.ConnectMe->CurlCreds = cptr->pop3user; + cptr->IO.ConnectMe->User = ChrPtr(cptr->IO.ConnectMe->CurlCreds); + cptr->IO.ConnectMe->UrlWithoutCred = cptr->pop3pass; + cptr->IO.ConnectMe->Pass = ChrPtr(cptr->IO.ConnectMe->UrlWithoutCred); + + + +#if 0 +/* todo: we need to reunite the url to be shure. */ + + citthread_mutex_lock(&POP3ueueMutex); + GetHash(POP3FetchUrls, SKEY(ptr->Url), &vptr); + use_this_cptr = (pop3aggr *)vptr; + + if (use_this_rncptr != NULL) + { + /* mustn't attach to an active session */ + if (use_this_cptr->RefCount > 0) + { + DeletePOP3Cfg(cptr); + Count->count--; + } + else + { + long *QRnumber; + StrBufAppendBufPlain(use_this_cptr->rooms, + qrbuf->QRname, + -1, 0); + if (use_this_cptr->roomlist_parts == 1) + { + use_this_cptr->OtherQRnumbers = NewHash(1, lFlathash); + } + QRnumber = (long*)malloc(sizeof(long)); + *QRnumber = qrbuf->QRnumber; + Put(use_this_cptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); + use_this_cptr->roomlist_parts++; + } + citthread_mutex_unlock(&POP3QueueMutex); + continue; + } + citthread_mutex_unlock(&RSSQueueMutex); +#endif + + citthread_mutex_lock(&POP3QueueMutex); + Put(POP3FetchUrls, SKEY(cptr->Url), cptr, DeletePOP3Aggregator); + citthread_mutex_unlock(&POP3QueueMutex); - while (fgets(buf, sizeof buf, fp) != NULL) { - buf[strlen(buf)-1] = 0; - - extract_token(instr, buf, 0, '|', sizeof instr); - if (!strcasecmp(instr, "pop3client")) { - pptr = (struct pop3aggr *) malloc(sizeof(struct pop3aggr)); - if (pptr != NULL) { - safestrncpy(pptr->roomname, qrbuf->QRname, sizeof pptr->roomname); - extract_token(pptr->pop3host, buf, 1, '|', sizeof pptr->pop3host); - extract_token(pptr->pop3user, buf, 2, '|', sizeof pptr->pop3user); - extract_token(pptr->pop3pass, buf, 3, '|', sizeof pptr->pop3pass); - pptr->keep = extract_int(buf, 4); - pptr->interval = extract_long(buf, 5); - pptr->next = palist; - palist = pptr; } - } - } + } - fclose(fp); + ///fclose(fp); + } } void pop3client_scan(void) { static time_t last_run = 0L; static int doing_pop3client = 0; - struct pop3aggr *pptr; +/// struct pop3aggr *pptr; time_t fastest_scan; + HashPos *it; + long len; + const char *Key; + void *vrptr; + pop3aggr *cptr; if (config.c_pop3_fastest < config.c_pop3_fetch) fastest_scan = config.c_pop3_fastest; @@ -331,15 +948,18 @@ void pop3client_scan(void) { CtdlLogPrintf(CTDL_DEBUG, "pop3client started\n"); CtdlForEachRoom(pop3client_scan_room, NULL); - while (palist != NULL && !CtdlThreadCheckStop()) { - if ((palist->interval && time(NULL) > (last_run + palist->interval)) - || (time(NULL) > last_run + config.c_pop3_fetch)) - pop3_do_fetching(palist->roomname, palist->pop3host, - palist->pop3user, palist->pop3pass, palist->keep); - pptr = palist; - palist = palist->next; - free(pptr); + + citthread_mutex_lock(&POP3QueueMutex); + it = GetNewHashPos(POP3FetchUrls, 0); + while (GetNextHashPos(POP3FetchUrls, it, &len, &Key, &vrptr) && + (vrptr != NULL)) { + cptr = (pop3aggr *)vrptr; + if (cptr->RefCount == 0) + if (!pop3_do_fetching(cptr)) + DeletePOP3Aggregator(cptr);////TODO } + DeleteHashPos(&it); + citthread_mutex_unlock(&POP3QueueMutex); CtdlLogPrintf(CTDL_DEBUG, "pop3client ended\n"); last_run = time(NULL); @@ -347,13 +967,23 @@ void pop3client_scan(void) { } +void pop3_cleanup(void) +{ + citthread_mutex_destroy(&POP3QueueMutex); + DeleteHash(&POP3FetchUrls); + DeleteHash(&POP3QueueRooms); +} + CTDL_MODULE_INIT(pop3client) { if (!threading) { + citthread_mutex_init(&POP3QueueMutex, NULL); + POP3QueueRooms = NewHash(1, lFlathash); + POP3FetchUrls = NewHash(1, NULL); CtdlRegisterSessionHook(pop3client_scan, EVT_TIMER); + CtdlRegisterCleanupHook(pop3_cleanup); } - /* return our Subversion id for the Log */ return "pop3client"; } -- 2.30.2