work on the pop3 aggregator
authorWilfried Goesgens <dothebart@citadel.org>
Wed, 10 Aug 2011 14:02:00 +0000 (14:02 +0000)
committerWilfried Goesgens <dothebart@citadel.org>
Wed, 10 Aug 2011 14:02:00 +0000 (14:02 +0000)
  - rewrite the config parser similar to the rss-configparser
  - copy over handler logic from async smtp connector
  - chop worker into handy bits so the handler logic can control it.

citadel/event_client.h
citadel/modules/pop3client/serv_pop3client.c

index 5939ad914ce96db005e862437ab3f83220d02ec7..9ed88c01e15401a16d00ed26242d34c08eb3653f 100644 (file)
@@ -1,3 +1,5 @@
+#ifndef __EVENT_CLIENT_H__
+#define __EVENT_CLIENT_H__
 #define EV_COMPAT3 0
 #include <ev.h>
 #include <sys/types.h>
@@ -16,6 +18,7 @@ typedef enum _eNextState {
        eSendReply, 
        eSendMore,
        eReadMessage, 
+       eReadMore,
        eTerminateConnection,
        eAbort
 }eNextState;
@@ -151,3 +154,5 @@ int evcurl_init(AsyncIO *IO,
                IO_CallBack Terminate);
 
 void evcurl_handle_start(AsyncIO *IO);
+
+#endif /* __EVENT_CLIENT_H__ */
index 40add14844bc815ec0486c27debf7b634bf9e8ac..827976a3c93f0b11a250d709f601d93cd0b76347 100644 (file)
 #include "internet_addressing.h"
 #include "database.h"
 #include "citadel_dirs.h"
+#include "event_client.h"
 
-struct pop3aggr {
-       struct pop3aggr *next;
-       char roomname[ROOMNAMELEN];
-       char pop3host[128];
-       char pop3user[128];
-       char pop3pass[128];
+
+
+citthread_mutex_t POP3QueueMutex; /* locks the access to the following vars: */
+HashList *POP3QueueRooms = NULL; /* rss_room_counter */
+HashList *POP3FetchUrls = NULL; /* -> rss_aggregator; ->RefCount access to be locked too. */
+
+typedef struct __pop3_room_counter {
+       int count;
+       long QRnumber;
+}pop3_room_counter;
+
+typedef enum ePOP3_C_States {
+       ReadGreeting,
+       GetUserState,
+       GetPassState,
+       GetListCommandState,
+       GetOneMessageIDState,
+       ReadMessageBodyFollowing,
+       ReadMessageBody,
+       ReadQuitState,
+       POP3C_MaxRead
+}ePOP3_C_States;
+
+
+typedef struct _FetchItem {
+       long MSGID;
+       StrBuf *MsgUIDL;
+       StrBuf *MsgUID;
+       int NeedFetch;
+} FetchItem;
+
+void HfreeFetchItem(void *vItem)
+{
+       FetchItem *Item = (FetchItem*) vItem;
+       FreeStrBuf(&Item->MsgUIDL);
+       FreeStrBuf(&Item->MsgUID);
+       free(Item);
+}
+
+typedef struct __pop3aggr {
+       AsyncIO          IO;
+
+       long n;
+       long RefCount;
+       ParsedURL Pop3Host;
+       DNSQueryParts HostLookup;
+
+       StrBuf          *rooms;
+       long             QRnumber;
+       HashList        *OtherQRnumbers;
+
+       StrBuf          *Url;
+///    StrBuf *pop3host; -> URL
+       StrBuf *pop3user;
+       StrBuf *pop3pass;
+       StrBuf *RoomName; // TODO: fill me
        int keep;
        time_t interval;
-};
+       ePOP3_C_States State;
+       HashList *MsgNumbers;
+       HashPos *Pos;
+       FetchItem *CurrMsg;
+} pop3aggr;
+
+void DeletePOP3Aggregator(void *vptr)
+{
+       pop3aggr *ptr = vptr;
+       DeleteHashPos(&ptr->Pos);
+       DeleteHash(&ptr->MsgNumbers);
+       FreeStrBuf(&ptr->rooms);
+       FreeStrBuf(&ptr->pop3user);
+       FreeStrBuf(&ptr->pop3pass);
+       FreeStrBuf(&ptr->RoomName);
+}
 
-struct pop3aggr *palist = NULL;
 
+typedef eNextState(*Pop3ClientHandler)(pop3aggr* RecvMsg);
 
-void pop3_do_fetching(char *roomname, char *pop3host, char *pop3user, char *pop3pass, int keep)
+eNextState POP3_C_Shutdown(AsyncIO *IO);
+eNextState POP3_C_Timeout(AsyncIO *IO);
+eNextState POP3_C_ConnFail(AsyncIO *IO);
+eNextState POP3_C_DispatchReadDone(AsyncIO *IO);
+eNextState POP3_C_DispatchWriteDone(AsyncIO *IO);
+eNextState POP3_C_Terminate(AsyncIO *IO);
+eReadState POP3_C_ReadServerStatus(AsyncIO *IO);
+
+eNextState FinalizePOP3AggrRun(AsyncIO *IO)
 {
-       int sock;
-       char buf[SIZ];
-       int msg_to_fetch = 0;
-       int *msglist = NULL;
-       int num_msgs = 0;
-       int alloc_msgs = 0;
-       int i;
-       char *body = NULL;
-       struct CtdlMessage *msg = NULL;
-       long msgnum = 0;
-       char this_uidl[64];
-       char utmsgid[SIZ];
-       struct cdbdata *cdbut;
-       struct UseTable ut;
-       CitContext *CCC=CC;
 
-       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s %s %s <password>\n", roomname, pop3host, pop3user);
-       CtdlLogPrintf(CTDL_NOTICE, "Connecting to <%s>\n", pop3host);
-       
-       if (CtdlThreadCheckStop())
-               return;
-               
-       sock = sock_connect(pop3host, "110");
-       if (sock < 0) {
-               CtdlLogPrintf(CTDL_ERR, "Could not connect: %s\n", strerror(errno));
-               return;
-       }
-       
-       if (CtdlThreadCheckStop())
-               goto bail;
+       return eAbort;
+}
 
-       CtdlLogPrintf(CTDL_DEBUG, "Connected!\n");
-       CCC->SBuf.Buf = NewStrBuf();
-       CCC->sMigrateBuf = NewStrBuf();
-       CCC->SBuf.ReadWritePointer = NULL;
+eNextState FailAggregationRun(AsyncIO *IO)
+{
+       return eAbort;
+}
+
+#define POP3C_DBG_SEND() CtdlLogPrintf(CTDL_DEBUG, "POP3 client[%ld]: > %s\n", RecvMsg->n, ChrPtr(RecvMsg->IO.SendBuf.Buf))
+#define POP3C_DBG_READ() CtdlLogPrintf(CTDL_DEBUG, "POP3 client[%ld]: < %s\n", RecvMsg->n, ChrPtr(RecvMsg->IO.IOBuf))
+#define POP3C_OK (strncasecmp(ChrPtr(RecvMsg->IO.IOBuf), "+OK", 3) == 0)
 
+eNextState POP3C_ReadGreeting(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
        /* Read the server greeting */
-       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-       if (strncasecmp(buf, "+OK", 3)) goto bail;
+       if (!POP3C_OK) return eTerminateConnection;
+       else return eSendReply;
+}
 
-       if (CtdlThreadCheckStop())
-               goto bail;
 
+eNextState POP3C_SendUser(pop3aggr *RecvMsg)
+{
        /* Identify ourselves.  NOTE: we have to append a CR to each command.  The LF will
         * automatically be appended by sock_puts().  Believe it or not, leaving out the CR
         * will cause problems if the server happens to be Exchange, which is so b0rken it
         * actually barfs on LF-terminated newlines.
         */
-       snprintf(buf, sizeof buf, "USER %s\r", pop3user);
-       CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-       if (sock_puts(&sock, buf) <0) goto bail;
-       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-       if (strncasecmp(buf, "+OK", 3)) goto bail;
+       StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                    "USER %s\r\n", ChrPtr(RecvMsg->pop3user));
+       POP3C_DBG_SEND();
+       return eReadMessage;
+}
 
-       if (CtdlThreadCheckStop())
-               goto bail;
+eNextState POP3C_GetUserState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       if (!POP3C_OK) return eTerminateConnection;
+       else return eSendReply;
+}
 
+eNextState POP3C_SendPassword(pop3aggr *RecvMsg)
+{
        /* Password */
-       snprintf(buf, sizeof buf, "PASS %s\r", pop3pass);
+       StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                    "PASS %s\r\n", ChrPtr(RecvMsg->pop3pass));
        CtdlLogPrintf(CTDL_DEBUG, "<PASS <password>\n");
-       if (sock_puts(&sock, buf) <0) goto bail;
-       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-       if (strncasecmp(buf, "+OK", 3)) goto bail;
+//     POP3C_DBG_SEND();
+       return eReadMessage;
+}
 
-       if (CtdlThreadCheckStop())
-               goto bail;
+eNextState POP3C_GetPassState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       if (!POP3C_OK) return eTerminateConnection;
+       else return eSendReply;
+}
 
+eNextState POP3C_SendListCommand(pop3aggr *RecvMsg)
+{
        /* Get the list of messages */
-       snprintf(buf, sizeof buf, "LIST\r");
-       CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-       if (sock_puts(&sock, buf) <0) goto bail;
-       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-       if (strncasecmp(buf, "+OK", 3)) goto bail;
+       StrBufPlain(RecvMsg->IO.SendBuf.Buf, HKEY("LIST\r\n"));
+       POP3C_DBG_SEND();
+       return eReadMessage;
+}
 
-       if (CtdlThreadCheckStop())
-               goto bail;
-
-       do {
-               if (CtdlThreadCheckStop())
-                       goto bail;
-
-               if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-               CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-               msg_to_fetch = atoi(buf);
-               if (msg_to_fetch > 0) {
-                       if (alloc_msgs == 0) {
-                               alloc_msgs = 100;
-                               msglist = malloc((alloc_msgs * (sizeof(int))));
-                       }
-                       else if (num_msgs >= alloc_msgs) {
-                               alloc_msgs = alloc_msgs * 2;
-                               msglist = realloc(msglist, (alloc_msgs * sizeof(int)));
-                       }
-                       if (msglist == NULL) goto bail;
-                       msglist[num_msgs++] = msg_to_fetch;
+eNextState POP3C_GetListCommandState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       if (!POP3C_OK) return eTerminateConnection;
+       RecvMsg->MsgNumbers = NewHash(1, NULL);
+       return eReadMore;
+}
+
+
+eNextState POP3C_GetListOneLine(pop3aggr *RecvMsg)
+{
+       FetchItem *OneMsg = NULL;
+       POP3C_DBG_READ();
+
+       if ((StrLength(RecvMsg->IO.IOBuf) == 1) && 
+           (ChrPtr(RecvMsg->IO.IOBuf)[0] == '.'))
+       {
+               if (GetCount(RecvMsg->MsgNumbers) == 0)
+               {
+                       ////    RecvMsg->Sate = ReadQuitState;
+               }
+               else
+               {
+                       RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0);
                }
-       } while (buf[0] != '.');
+               return eSendReply;
+
+       }
+       OneMsg = (FetchItem*) malloc(sizeof(FetchItem));
+       memset(OneMsg, 0, sizeof(FetchItem));
+       OneMsg->MSGID = atoi(ChrPtr(RecvMsg->IO.IOBuf));
+       Put(RecvMsg->MsgNumbers, LKEY(OneMsg->MSGID), OneMsg, HfreeFetchItem);
+
+       //RecvMsg->State --; /* read next Line */
+       return eReadMore;
+}
 
-       if (num_msgs) for (i=0; i<num_msgs; ++i) {
+eNextState POP3C_GetOneMessagID(pop3aggr *RecvMsg)
+{
+       long HKLen;
+       const char *HKey;
+       void *vData;
 
+       if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData))
+       {
+               RecvMsg->CurrMsg = (FetchItem*) vData;
                /* Find out the UIDL of the message, to determine whether we've already downloaded it */
-               snprintf(buf, sizeof buf, "UIDL %d\r", msglist[i]);
-               CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-               if (sock_puts(&sock, buf) <0) goto bail;
-               if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-               CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-               if (strncasecmp(buf, "+OK", 3)) goto bail;
-               extract_token(this_uidl, buf, 2, ' ', sizeof this_uidl);
+               StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                            "UIDL %ld\r\n", RecvMsg->CurrMsg->MSGID);
+               POP3C_DBG_SEND();
+       }
+       else
+       {
+               DeleteHashPos(&RecvMsg->Pos);
+               /// done receiving uidls.. start looking them up now.
+               RecvMsg->Pos = GetNewHashPos(RecvMsg->MsgNumbers, 0);
+               
+       }
+       return eReadMore; /* TODO */
+}
 
-               snprintf(utmsgid, sizeof utmsgid, "pop3/%s/%s@%s", roomname, this_uidl, pop3host);
+#if 0
+eNextState FetchNetworkUsetableEntry(AsyncIO *IO)
+{
+       struct cdbdata *cdbut;
+       networker_save_message *Ctx = (networker_save_message *) IO->Data;
 
-               if (CtdlThreadCheckStop())
-                       goto bail;
+       if(GetNextHashPos(RecvMsg->MsgNumbers, RecvMsg->Pos, &HKLen, &HKey, &vData))
+       {
 
-               cdbut = cdb_fetch(CDB_USETABLE, utmsgid, strlen(utmsgid));
+               /* Find out if we've already seen this item */
+               strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
+               Ctx->ut.ut_timestamp = time(NULL);
+               
+               cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
                if (cdbut != NULL) {
-                       /* message has already been seen */
-                       CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", utmsgid);
+                       /* Item has already been seen */
+                       CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
                        cdb_free(cdbut);
-
+               
                        /* rewrite the record anyway, to update the timestamp */
-                       strcpy(ut.ut_msgid, utmsgid);
-                       ut.ut_timestamp = time(NULL);
-                       cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid), &ut, sizeof(struct UseTable) );
+                       cdb_store(CDB_USETABLE, 
+                                 SKEY(Ctx->MsgGUID), 
+                                 &Ctx->ut, sizeof(struct UseTable) );
+                       return eAbort;
                }
-               else {
-                       /* Message has not been seen. Tell the server to fetch the message... */
-                       snprintf(buf, sizeof buf, "RETR %d\r", msglist[i]);
-                       CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-                       if (sock_puts(&sock, buf) <0) goto bail;
-                       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-                       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-                       if (strncasecmp(buf, "+OK", 3)) goto bail;
-       
-                       if (CtdlThreadCheckStop())
-                               goto bail;
+               else
+               {
+                       NextDBOperation(IO, RSSSaveMessage);
+                       return eSendMore;
+               }
+       }
+       return eReadMessage;
+}
+#endif
 
-                       /* If we get to this point, the message is on its way.  Read it. */
-                       body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock);
-                       if (body == NULL) goto bail;
+
+eNextState POP3C_GetOneMessageIDState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       if (!POP3C_OK) return eTerminateConnection;
+       RecvMsg->CurrMsg->MsgUIDL = NewStrBufPlain(NULL, StrLength(RecvMsg->IO.IOBuf));
+       RecvMsg->CurrMsg->MsgUID = NewStrBufPlain(NULL, StrLength(RecvMsg->IO.IOBuf) * 2);
+
+       StrBufExtract_token(RecvMsg->CurrMsg->MsgUIDL, RecvMsg->IO.IOBuf, 2, ' ');
+       StrBufPrintf(RecvMsg->CurrMsg->MsgUID, 
+                    "pop3/%s/%s@%s", 
+                    ChrPtr(RecvMsg->RoomName), 
+                    ChrPtr(RecvMsg->CurrMsg->MsgUIDL),
+                    RecvMsg->Pop3Host.Host);
+       return eReadMessage;
+}
+
+eNextState POP3C_GetOneMessageIDFromUseTable(pop3aggr *RecvMsg)
+{
+
+       struct cdbdata *cdbut;
+       struct UseTable ut;
+
+       cdbut = cdb_fetch(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID));
+       if (cdbut != NULL) {
+               /* message has already been seen */
+               CtdlLogPrintf(CTDL_DEBUG, "%s has already been seen\n", ChrPtr(RecvMsg->CurrMsg->MsgUID));
+               cdb_free(cdbut);
+
+               /* rewrite the record anyway, to update the timestamp */
+               strcpy(ut.ut_msgid, ChrPtr(RecvMsg->CurrMsg->MsgUID));
+               ut.ut_timestamp = time(NULL);
+               cdb_store(CDB_USETABLE, SKEY(RecvMsg->CurrMsg->MsgUID), &ut, sizeof(struct UseTable) );
+       }
+
+       return eReadMessage;
+}
+
+eNextState POP3C_SendGetOneMsg(pop3aggr *RecvMsg)
+{
+       /* Message has not been seen. Tell the server to fetch the message... */
+       StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                    "RETR %ld\r\n", RecvMsg->CurrMsg->MSGID);
+       POP3C_DBG_SEND();
+       return eReadMessage;
+}
+
+
+eNextState POP3C_ReadMessageBodyFollowing(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       if (!POP3C_OK) return eTerminateConnection;
+       else return eSendReply;
+}      
+
+
+
+eNextState POP3C_ReadMessageBody(pop3aggr *RecvMsg)
+{
+#if 0
+//TODO
+       /* If we get to this point, the message is on its way.  Read it. */
+       body = CtdlReadMessageBody(HKEY("."), config.c_maxmsglen, NULL, 1, &sock);
+       if (body == NULL) goto bail;
        
-                       CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n");
-                       msg = convert_internet_message(body);
-                       body = NULL;    /* yes, this should be dereferenced, NOT freed */
+       CtdlLogPrintf(CTDL_DEBUG, "Converting message...\n");
+       msg = convert_internet_message(body);
+       body = NULL;    /* yes, this should be dereferenced, NOT freed */
        
                        /* Do Something With It (tm) */
-                       msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0);
-                       if (msgnum > 0L) {
-                               /* Message has been committed to the store */
-       
-                               if (!keep) {
-                                       snprintf(buf, sizeof buf, "DELE %d\r", msglist[i]);
-                                       CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-                                       if (sock_puts(&sock, buf) <0) goto bail;
-                                       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-                                       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf); /* errors here are non-fatal */
-                               }
+       msgnum = CtdlSubmitMsg(msg, NULL, roomname, 0);
+       if (msgnum > 0L) {
+               /* Message has been committed to the store */
+               /* write the uidl to the use table so we don't fetch this message again */
+       }
+       CtdlFreeMessage(msg);
+#endif
+       return eReadMessage;
+}
 
-                               /* write the uidl to the use table so we don't fetch this message again */
-                               strcpy(ut.ut_msgid, utmsgid);
-                               ut.ut_timestamp = time(NULL);
-                               cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid),
-                                               &ut, sizeof(struct UseTable) );
-                       }
-                       CtdlFreeMessage(msg);
-               }
+eNextState POP3C_StoreMsgRead(pop3aggr *RecvMsg)
+{
+#if 0
+       strcpy(ut.ut_msgid, utmsgid);
+       ut.ut_timestamp = time(NULL);
+       cdb_store(CDB_USETABLE, utmsgid, strlen(utmsgid),
+                 &ut, sizeof(struct UseTable) );
+#endif
+       return eReadMessage;/// TODO
+}
+eNextState POP3C_SendDelete(pop3aggr *RecvMsg)
+{
+       if (!RecvMsg->keep) {
+               StrBufPrintf(RecvMsg->IO.SendBuf.Buf,
+                            "DELE %ld\r\n", RecvMsg->CurrMsg->MSGID);
+               POP3C_DBG_SEND();
        }
+       return eReadMessage;
+}
+eNextState POP3C_ReadDeleteState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       return eReadMessage;
+}
 
+eNextState POP3C_SendQuit(pop3aggr *RecvMsg)
+{
        /* Log out */
-       snprintf(buf, sizeof buf, "QUIT\r");
-       CtdlLogPrintf(CTDL_DEBUG, "<%s\n", buf);
-       if (sock_puts(&sock, buf) <0) goto bail;
-       if (sock_getln(&sock, buf, sizeof buf) < 0) goto bail;
-       CtdlLogPrintf(CTDL_DEBUG, ">%s\n", buf);
-bail:  
-       FreeStrBuf(&CCC->SBuf.Buf);
-       FreeStrBuf(&CCC->sMigrateBuf);
+       StrBufPlain(RecvMsg->IO.SendBuf.Buf,
+                   HKEY("QUIT\r\n3)"));
+       POP3C_DBG_SEND();
+       return eReadMessage;
+}
+
 
-       if (sock != -1)
-               sock_close(sock);
-       if (msglist) free(msglist);
+eNextState POP3C_ReadQuitState(pop3aggr *RecvMsg)
+{
+       POP3C_DBG_READ();
+       return eAbort;
 }
 
+const long POP3_C_ConnTimeout = 1000;
+const long DefaultPOP3Port = 110;
+
+Pop3ClientHandler POP3C_ReadHandlers[] = {
+       POP3C_ReadGreeting,
+       POP3C_GetUserState,
+       POP3C_GetPassState,
+       POP3C_GetListCommandState,
+       POP3C_GetOneMessageIDState,
+       POP3C_ReadMessageBodyFollowing,
+       POP3C_ReadMessageBody,
+       POP3C_ReadQuitState,
+};
+
+const long POP3_C_SendTimeouts[POP3C_MaxRead] = {
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100
+};
+const ConstStr POP3C_ReadErrors[POP3C_MaxRead] = {
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")},
+       {HKEY("Connection broken during ")}
+};
+
+Pop3ClientHandler POP3C_SendHandlers[] = {
+       NULL, /* we don't send a greeting */
+       POP3C_SendUser,
+       POP3C_SendPassword,
+       POP3C_SendListCommand,
+       POP3C_GetListOneLine,
+       POP3C_GetOneMessagID,
+       POP3C_SendGetOneMsg,
+       POP3C_SendDelete,
+       POP3C_SendQuit
+};
+
+const long POP3_C_ReadTimeouts[] = {
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100,
+       100
+};
+/*****************************************************************************/
+/*                     POP3 CLIENT DISPATCHER                                */
+/*****************************************************************************/
+
+void POP3SetTimeout(eNextState NextTCPState, pop3aggr *pMsg)
+{
+       double Timeout = 0.0;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+
+       switch (NextTCPState) {
+       case eSendReply:
+       case eSendMore:
+               Timeout = POP3_C_SendTimeouts[pMsg->State];
+/*
+  if (pMsg->State == eDATABody) {
+  / * if we're sending a huge message, we need more time. * /
+  Timeout += StrLength(pMsg->msgtext) / 1024;
+  }
+*/
+               break;
+       case eReadMessage:
+               Timeout = POP3_C_ReadTimeouts[pMsg->State];
+/*
+  if (pMsg->State == eDATATerminateBody) {
+  / * 
+  * some mailservers take a nap before accepting the message
+  * content inspection and such.
+  * /
+  Timeout += StrLength(pMsg->msgtext) / 1024;
+  }
+*/
+               break;
+       case eSendDNSQuery:
+       case eReadDNSReply:
+       case eConnect:
+       case eTerminateConnection:
+       case eAbort:
+       case eReadMore://// TODO
+               return;
+       }
+       SetNextTimeout(&pMsg->IO, Timeout);
+}
+eNextState POP3_C_DispatchReadDone(AsyncIO *IO)
+{
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       pop3aggr *pMsg = IO->Data;
+       eNextState rc;
+
+       rc = POP3C_ReadHandlers[pMsg->State](pMsg);
+       pMsg->State++;
+       POP3SetTimeout(rc, pMsg);
+       return rc;
+}
+eNextState POP3_C_DispatchWriteDone(AsyncIO *IO)
+{
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       pop3aggr *pMsg = IO->Data;
+       eNextState rc;
+
+       rc = POP3C_SendHandlers[pMsg->State](pMsg);
+       POP3SetTimeout(rc, pMsg);
+       return rc;
+}
+
+
+/*****************************************************************************/
+/*                     POP3 CLIENT ERROR CATCHERS                            */
+/*****************************************************************************/
+eNextState POP3_C_Terminate(AsyncIO *IO)
+{
+///    pop3aggr *pMsg = (pop3aggr *)IO->Data;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       FinalizePOP3AggrRun(IO);
+       return eAbort;
+}
+eNextState POP3_C_Timeout(AsyncIO *IO)
+{
+       pop3aggr *pMsg = IO->Data;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State]));
+       return FailAggregationRun(IO);
+}
+eNextState POP3_C_ConnFail(AsyncIO *IO)
+{
+       pop3aggr *pMsg = (pop3aggr *)IO->Data;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       StrBufPlain(IO->ErrMsg, CKEY(POP3C_ReadErrors[pMsg->State]));
+       return FailAggregationRun(IO);
+}
+eNextState POP3_C_Shutdown(AsyncIO *IO)
+{
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+////   pop3aggr *pMsg = IO->Data;
+
+       ////pMsg->MyQEntry->Status = 3;
+       ///StrBufPlain(pMsg->MyQEntry->StatusMessage, HKEY("server shutdown during message retrieval."));
+       FinalizePOP3AggrRun(IO);
+       return eAbort;
+}
+
+
+/**
+ * @brief lineread Handler; understands when to read more POP3 lines, and when this is a one-lined reply.
+ */
+eReadState POP3_C_ReadServerStatus(AsyncIO *IO)
+{
+       eReadState Finished = eBufferNotEmpty; 
+
+       while (Finished == eBufferNotEmpty) {
+               Finished = StrBufChunkSipLine(IO->IOBuf, &IO->RecvBuf);
+               
+               switch (Finished) {
+               case eMustReadMore: /// read new from socket... 
+                       return Finished;
+                       break;
+               case eBufferNotEmpty: /* shouldn't happen... */
+               case eReadSuccess: /// done for now...
+                       if (StrLength(IO->IOBuf) < 4)
+                               continue;
+                       if (ChrPtr(IO->IOBuf)[3] == '-')
+                               Finished = eBufferNotEmpty;
+                       else 
+                               return Finished;
+                       break;
+               case eReadFail: /// WHUT?
+                       ///todo: shut down! 
+                       break;
+               }
+       }
+       return Finished;
+}
+
+/*****************************************************************************
+ * So we connect our Server IP here.                                         *
+ *****************************************************************************/
+eNextState connect_ip(AsyncIO *IO)
+{
+       pop3aggr *cpptr = IO->Data;
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+       
+////   IO->ConnectMe = &cpptr->Pop3Host;
+       /*  Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
+
+       /////// SetConnectStatus(IO);
+
+       return InitEventIO(IO, cpptr, 
+                          POP3_C_ConnTimeout, 
+                          POP3_C_ReadTimeouts[0],
+                          1);
+}
+
+eNextState get_one_host_ip_done(AsyncIO *IO)
+{
+       pop3aggr *cpptr = IO->Data;
+       struct hostent *hostent;
+
+       QueryCbDone(IO);
+
+       hostent = cpptr->HostLookup.VParsedDNSReply;
+       if ((cpptr->HostLookup.DNSStatus == ARES_SUCCESS) && 
+           (hostent != NULL) ) {
+               memset(&cpptr->Pop3Host.Addr, 0, sizeof(struct in6_addr));
+               if (cpptr->Pop3Host.IPv6) {
+                       memcpy(&cpptr->Pop3Host.Addr.sin6_addr.s6_addr, 
+                              &hostent->h_addr_list[0],
+                              sizeof(struct in6_addr));
+                       
+                       cpptr->Pop3Host.Addr.sin6_family = hostent->h_addrtype;
+                       cpptr->Pop3Host.Addr.sin6_port   = htons(DefaultPOP3Port);
+               }
+               else {
+                       struct sockaddr_in *addr = (struct sockaddr_in*) &cpptr->Pop3Host.Addr;
+                       /* Bypass the ns lookup result like this: IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1"); */
+//                     addr->sin_addr.s_addr = htonl((uint32_t)&hostent->h_addr_list[0]);
+                       memcpy(&addr->sin_addr.s_addr, 
+                              hostent->h_addr_list[0], 
+                              sizeof(uint32_t));
+                       
+                       addr->sin_family = hostent->h_addrtype;
+                       addr->sin_port   = htons(DefaultPOP3Port);
+                       
+               }
+               return connect_ip(IO);
+       }
+       else
+               return eAbort;
+}
+
+eNextState get_one_host_ip(AsyncIO *IO)
+{
+       pop3aggr *cpptr = IO->Data;
+       /* 
+        * here we start with the lookup of one host. it might be...
+        * - the relay host *sigh*
+        * - the direct hostname if there was no mx record
+        * - one of the mx'es
+        */ 
+
+       InitC_ares_dns(IO);
+
+       CtdlLogPrintf(CTDL_DEBUG, "POP3: %s\n", __FUNCTION__);
+
+       CtdlLogPrintf(CTDL_DEBUG, 
+                     "POP3 client[%ld]: looking up %s-Record %s : %d ...\n", 
+                     cpptr->n, 
+                     (cpptr->Pop3Host.IPv6)? "aaaa": "a",
+                     cpptr->Pop3Host.Host, 
+                     cpptr->Pop3Host.Port);
+
+       if (!QueueQuery((cpptr->Pop3Host.IPv6)? ns_t_aaaa : ns_t_a, 
+                       cpptr->Pop3Host.Host, 
+                       &cpptr->IO, 
+                       &cpptr->HostLookup, 
+                       get_one_host_ip_done))
+       {
+//             cpptr->MyQEntry->Status = 5;
+//             StrBufPrintf(SendMsg->MyQEntry->StatusMessage, 
+//                          "No MX hosts found for <%s>", SendMsg->node);
+               cpptr->IO.NextState = eTerminateConnection;
+               return IO->NextState;
+       }
+       IO->NextState = eReadDNSReply;
+       return IO->NextState;
+}
+
+
+
+int pop3_do_fetching(pop3aggr *cpptr)
+{
+       CitContext *SubC;
+
+       cpptr->IO.Data          = cpptr;
+
+       cpptr->IO.SendDone      = POP3_C_DispatchWriteDone;
+       cpptr->IO.ReadDone      = POP3_C_DispatchReadDone;
+       cpptr->IO.Terminate     = POP3_C_Terminate;
+       cpptr->IO.LineReader    = POP3_C_ReadServerStatus;
+       cpptr->IO.ConnFail      = POP3_C_ConnFail;
+       cpptr->IO.Timeout       = POP3_C_Timeout;
+       cpptr->IO.ShutdownAbort = POP3_C_Shutdown;
+       
+       cpptr->IO.SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
+       cpptr->IO.RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
+       cpptr->IO.IOBuf         = NewStrBuf();
+       
+       cpptr->IO.NextState     = eReadMessage;
+/* TODO
+   CtdlLogPrintf(CTDL_DEBUG, "POP3: %s %s %s <password>\n", roomname, pop3host, pop3user);
+   CtdlLogPrintf(CTDL_NOTICE, "Connecting to <%s>\n", pop3host);
+*/
+       
+       SubC = CloneContext (CC);
+       SubC->session_specific_data = (char*) cpptr;
+       cpptr->IO.CitContext = SubC;
+
+       if (cpptr->IO.ConnectMe->IsIP) {
+               QueueEventContext(&cpptr->IO,
+                                 connect_ip);
+       }
+       else { /* uneducated admin has chosen to add DNS to the equation... */
+               QueueEventContext(&cpptr->IO,
+                                 get_one_host_ip);
+       }
+       return 1;
+}
 
 /*
  * Scan a room's netconfig to determine whether it requires POP3 aggregation
  */
 void pop3client_scan_room(struct ctdlroom *qrbuf, void *data)
 {
+       StrBuf *CfgData;
+       StrBuf *CfgType;
+       StrBuf *Line;
+
+       struct stat statbuf;
        char filename[PATH_MAX];
-       char buf[1024];
-       char instr[32];
-       FILE *fp;
-       struct pop3aggr *pptr;
+       int  fd;
+       int Done;
+       void *vptr;
+       const char *CfgPtr, *lPtr;
+       const char *Err;
 
-       if (CtdlThreadCheckStop())
+       pop3_room_counter *Count = NULL;
+//     pop3aggr *cpptr;
+
+       citthread_mutex_lock(&POP3QueueMutex);
+       if (GetHash(POP3QueueRooms, LKEY(qrbuf->QRnumber), &vptr))
+       {
+               CtdlLogPrintf(CTDL_DEBUG, 
+                             "pop3client: [%ld] %s already in progress.\n", 
+                             qrbuf->QRnumber, 
+                             qrbuf->QRname);
+               citthread_mutex_unlock(&POP3QueueMutex);
                return;
+       }
+       citthread_mutex_unlock(&POP3QueueMutex);
 
        assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
 
+       if (CtdlThreadCheckStop())
+               return;
+               
        /* Only do net processing for rooms that have netconfigs */
-       fp = fopen(filename, "r");
-       if (fp == NULL) {
+       fd = open(filename, 0);
+       if (fd <= 0) {
+               //CtdlLogPrintf(CTDL_DEBUG, "rssclient: %s no config.\n", qrbuf->QRname);
+               return;
+       }
+       if (CtdlThreadCheckStop())
+               return;
+       if (fstat(fd, &statbuf) == -1) {
+               CtdlLogPrintf(CTDL_DEBUG,  "ERROR: could not stat configfile '%s' - %s\n",
+                             filename, strerror(errno));
                return;
        }
+       if (CtdlThreadCheckStop())
+               return;
+       CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
+       if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
+               close(fd);
+               FreeStrBuf(&CfgData);
+               CtdlLogPrintf(CTDL_DEBUG,  "ERROR: reading config '%s' - %s<br>\n",
+                             filename, strerror(errno));
+               return;
+       }
+       close(fd);
+       if (CtdlThreadCheckStop())
+               return;
+       
+       CfgPtr = NULL;
+       CfgType = NewStrBuf();
+       Line = NewStrBufPlain(NULL, StrLength(CfgData));
+       Done = 0;
+
+       while (!Done)
+       {
+               Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
+               if (StrLength(Line) > 0)
+               {
+                       lPtr = NULL;
+                       StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
+                       if (!strcasecmp("pop3client", ChrPtr(CfgType)))
+                       {
+                               pop3aggr *cptr;
+
+                               if (Count == NULL)
+                               {
+                                       Count = malloc(sizeof(pop3_room_counter));
+                                       Count->count = 0;
+                               }
+                               Count->count ++;
+                               cptr = (pop3aggr *) malloc(sizeof(pop3aggr));
+                               memset(cptr, 0, sizeof(pop3aggr));
+                               /// TODO do we need this? cptr->roomlist_parts = 1;
+                               cptr->rooms = NewStrBufPlain(qrbuf->QRname, -1);
+                               cptr->pop3user = NewStrBufPlain(NULL, StrLength(Line));
+                               cptr->pop3pass = NewStrBufPlain(NULL, StrLength(Line));
+                               cptr->Url = NewStrBuf();
+
+                               StrBufExtract_NextToken(cptr->Url, Line, &lPtr, '|');
+                               StrBufExtract_NextToken(cptr->pop3user, Line, &lPtr, '|');
+                               StrBufExtract_NextToken(cptr->pop3pass, Line, &lPtr, '|');
+                               cptr->keep = StrBufExtractNext_long(Line, &lPtr, '|');
+                               cptr->interval = StrBufExtractNext_long(Line, &lPtr, '|');
+                   
+                               ParseURL(&cptr->IO.ConnectMe, cptr->Url, 110);
+
+                               cptr->IO.ConnectMe->CurlCreds = cptr->pop3user;
+                               cptr->IO.ConnectMe->User = ChrPtr(cptr->IO.ConnectMe->CurlCreds);
+                               cptr->IO.ConnectMe->UrlWithoutCred = cptr->pop3pass;
+                               cptr->IO.ConnectMe->Pass = ChrPtr(cptr->IO.ConnectMe->UrlWithoutCred);
+
+
+
+#if 0 
+/* todo: we need to reunite the url to be shure. */
+                               
+                               citthread_mutex_lock(&POP3ueueMutex);
+                               GetHash(POP3FetchUrls, SKEY(ptr->Url), &vptr);
+                               use_this_cptr = (pop3aggr *)vptr;
+                               
+                               if (use_this_rncptr != NULL)
+                               {
+                                       /* mustn't attach to an active session */
+                                       if (use_this_cptr->RefCount > 0)
+                                       {
+                                               DeletePOP3Cfg(cptr);
+                                               Count->count--;
+                                       }
+                                       else 
+                                       {
+                                               long *QRnumber;
+                                               StrBufAppendBufPlain(use_this_cptr->rooms, 
+                                                                    qrbuf->QRname, 
+                                                                    -1, 0);
+                                               if (use_this_cptr->roomlist_parts == 1)
+                                               {
+                                                       use_this_cptr->OtherQRnumbers = NewHash(1, lFlathash);
+                                               }
+                                               QRnumber = (long*)malloc(sizeof(long));
+                                               *QRnumber = qrbuf->QRnumber;
+                                               Put(use_this_cptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL);
+                                               use_this_cptr->roomlist_parts++;
+                                       }
+                                       citthread_mutex_unlock(&POP3QueueMutex);
+                                       continue;
+                               }
+                               citthread_mutex_unlock(&RSSQueueMutex);
+#endif
+
+                               citthread_mutex_lock(&POP3QueueMutex);
+                               Put(POP3FetchUrls, SKEY(cptr->Url), cptr, DeletePOP3Aggregator);
+                               citthread_mutex_unlock(&POP3QueueMutex);
 
-       while (fgets(buf, sizeof buf, fp) != NULL) {
-               buf[strlen(buf)-1] = 0;
-
-               extract_token(instr, buf, 0, '|', sizeof instr);
-               if (!strcasecmp(instr, "pop3client")) {
-                       pptr = (struct pop3aggr *) malloc(sizeof(struct pop3aggr));
-                       if (pptr != NULL) {
-                               safestrncpy(pptr->roomname, qrbuf->QRname, sizeof pptr->roomname);
-                               extract_token(pptr->pop3host, buf, 1, '|', sizeof pptr->pop3host);
-                               extract_token(pptr->pop3user, buf, 2, '|', sizeof pptr->pop3user);
-                               extract_token(pptr->pop3pass, buf, 3, '|', sizeof pptr->pop3pass);
-                               pptr->keep = extract_int(buf, 4);
-                               pptr->interval = extract_long(buf, 5);
-                               pptr->next = palist;
-                               palist = pptr;
                        }
-               }
 
-       }
+               }
 
-       fclose(fp);
+               ///fclose(fp);
 
+       }
 }
 
 
 void pop3client_scan(void) {
        static time_t last_run = 0L;
        static int doing_pop3client = 0;
-       struct pop3aggr *pptr;
+///    struct pop3aggr *pptr;
        time_t fastest_scan;
+       HashPos *it;
+       long len;
+       const char *Key;
+       void *vrptr;
+       pop3aggr *cptr;
 
        if (config.c_pop3_fastest < config.c_pop3_fetch)
                fastest_scan = config.c_pop3_fastest;
@@ -331,15 +948,18 @@ void pop3client_scan(void) {
        CtdlLogPrintf(CTDL_DEBUG, "pop3client started\n");
        CtdlForEachRoom(pop3client_scan_room, NULL);
 
-       while (palist != NULL && !CtdlThreadCheckStop()) {
-               if ((palist->interval && time(NULL) > (last_run + palist->interval))
-                       || (time(NULL) > last_run + config.c_pop3_fetch))
-                               pop3_do_fetching(palist->roomname, palist->pop3host,
-                                       palist->pop3user, palist->pop3pass, palist->keep);
-               pptr = palist;
-               palist = palist->next;
-               free(pptr);
+
+       citthread_mutex_lock(&POP3QueueMutex);
+       it = GetNewHashPos(POP3FetchUrls, 0);
+       while (GetNextHashPos(POP3FetchUrls, it, &len, &Key, &vrptr) && 
+              (vrptr != NULL)) {
+               cptr = (pop3aggr *)vrptr;
+               if (cptr->RefCount == 0) 
+                       if (!pop3_do_fetching(cptr))
+                               DeletePOP3Aggregator(cptr);////TODO
        }
+       DeleteHashPos(&it);
+       citthread_mutex_unlock(&POP3QueueMutex);
 
        CtdlLogPrintf(CTDL_DEBUG, "pop3client ended\n");
        last_run = time(NULL);
@@ -347,13 +967,23 @@ void pop3client_scan(void) {
 }
 
 
+void pop3_cleanup(void)
+{
+       citthread_mutex_destroy(&POP3QueueMutex);
+       DeleteHash(&POP3FetchUrls);
+       DeleteHash(&POP3QueueRooms);
+}
+
 CTDL_MODULE_INIT(pop3client)
 {
        if (!threading)
        {
+               citthread_mutex_init(&POP3QueueMutex, NULL);
+               POP3QueueRooms = NewHash(1, lFlathash);
+               POP3FetchUrls = NewHash(1, NULL);
                CtdlRegisterSessionHook(pop3client_scan, EVT_TIMER);
+                CtdlRegisterCleanupHook(pop3_cleanup);
        }
-       
        /* return our Subversion id for the Log */
         return "pop3client";
 }