Work on RSS Feed
authorWilfried Goesgens <dothebart@citadel.org>
Wed, 2 Nov 2011 07:27:58 +0000 (08:27 +0100)
committerWilfried Goesgens <dothebart@citadel.org>
Wed, 2 Nov 2011 07:27:58 +0000 (08:27 +0100)
  - when finalizing an http request, evaluate the reply of IO->SendDone() for what to do.
  - flip start/stop on cURL IO events *argl*
  - don't fork per message DB I/O contexts, but remember messages on parsing and save them sequential
    -> simpler & better controleable.

citadel/modules/eventclient/serv_eventclient.c
citadel/modules/rssclient/rss_atom_parser.c
citadel/modules/rssclient/rss_atom_parser.h
citadel/modules/rssclient/serv_rssclient.c

index da79852d127904874b38bf5f9505e24a867bd267..2f9c5bc856aefc0e5ce67b13f2775e0d0670007e 100644 (file)
@@ -134,12 +134,28 @@ gotstatus(int nnrun)
                        IO->HttpReq.chnd = NULL;
 
                         IO->HttpReq.attached = 0;
-                        IO->SendDone(IO);
-
-                        FreeStrBuf(&IO->HttpReq.ReplyData);
-                        FreeURL(&IO->ConnectMe);
-                        RemoveContext(IO->CitContext);
-                        IO->Terminate(IO);
+                        switch(IO->SendDone(IO))
+                       {
+                       case eDBQuery:
+                               break;
+                       case eSendDNSQuery:
+                       case eReadDNSReply:
+                       case eConnect:
+                       case eSendReply: 
+                       case eSendMore:
+                       case eSendFile:
+                       case eReadMessage: 
+                       case eReadMore:
+                       case eReadPayload:
+                       case eReadFile:
+                               break;
+                       case eTerminateConnection:
+                       case eAbort:
+                               FreeStrBuf(&IO->HttpReq.ReplyData);
+                               FreeURL(&IO->ConnectMe);
+                               RemoveContext(IO->CitContext);
+                               IO->Terminate(IO);
+                       }
                 }
         }
 }
@@ -208,7 +224,6 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
         AsyncIO *IO = (AsyncIO*) vIO;
         CURLcode sta;
 
-        EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
        if (IO == NULL) {
                 sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
                 if (sta) {
@@ -225,8 +240,8 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
                ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
                ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
                curl_multi_assign(mhnd, fd, IO);
-
        }
+        EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action);
 
        switch (action)
        {
@@ -239,12 +254,12 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO)
                 ev_io_stop(event_base, &IO->send_event);
                break;
        case CURL_POLL_IN:
-                ev_io_stop(event_base, &IO->recv_event);
-                ev_io_start(event_base, &IO->send_event);
+                ev_io_start(event_base, &IO->recv_event);
+                ev_io_stop(event_base, &IO->send_event);
                break;
        case CURL_POLL_OUT:
-                ev_io_stop(event_base, &IO->send_event);
-                ev_io_start(event_base, &IO->recv_event);
+                ev_io_start(event_base, &IO->send_event);
+                ev_io_stop(event_base, &IO->recv_event);
                break;
        case CURL_POLL_INOUT:
                 ev_io_start(event_base, &IO->send_event);
index 7f1d612d624dbac9ff667d7ce454fd7d2a113fec..d60d35ae2b0c1644ea51b4a77aa00a5afb5ab120 100644 (file)
@@ -609,11 +609,13 @@ size_t rss_libcurl_callback(void *ptr, size_t size, size_t nmemb, void *stream)
 
 eNextState ParseRSSReply(AsyncIO *IO)
 {
+       StrBuf *Buf;
        rss_aggregator *rssc;
        rss_item *ri;
        const char *at;
        char *ptr;
        long len;
+       const char *Key;
 
        rssc = IO->Data;
        pthread_mutex_lock(&RSSQueueMutex);
@@ -647,10 +649,14 @@ eNextState ParseRSSReply(AsyncIO *IO)
        rssc->xp = XML_ParserCreateNS(ptr, ':');
        if (!rssc->xp) {
                syslog(LOG_DEBUG, "Cannot create XML parser!\n");
-               goto shutdown;
+               pthread_mutex_lock(&RSSQueueMutex);
+               rssc->RefCount --;
+               pthread_mutex_unlock(&RSSQueueMutex);
+               return eTerminateConnection;
        }
        FlushStrBuf(rssc->Key);
 
+       rssc->Messages = NewHash(1, Flathash);
        XML_SetElementHandler(rssc->xp, rss_xml_start, rss_xml_end);
        XML_SetCharacterDataHandler(rssc->xp, rss_xml_chardata);
        XML_SetUserData(rssc->xp, rssc);
@@ -671,19 +677,23 @@ eNextState ParseRSSReply(AsyncIO *IO)
                      XML_ErrorString(
                              XML_GetErrorCode(rssc->xp)));
 
-shutdown:
        XML_ParserFree(rssc->xp);
-
        flush_rss_item(ri);
        FreeStrBuf(&rssc->CData);
        FreeStrBuf(&rssc->Key);
 
-        ///Cfg->next_poll = time(NULL) + config.c_net_freq; 
+       Buf = NewStrBufDup(rssc->rooms);
+       rssc->recp.recp_room = SmashStrBuf(&Buf);
+       rssc->recp.num_room = rssc->roomlist_parts;
+       rssc->recp.recptypes_magic = RECPTYPES_MAGIC;
 
-       pthread_mutex_lock(&RSSQueueMutex);
-       rssc->RefCount --;
-       pthread_mutex_unlock(&RSSQueueMutex);
-       return eTerminateConnection;
+       rssc->Pos = GetNewHashPos(rssc->Messages, 1);
+
+        ///Cfg->next_poll = time(NULL) + config.c_net_freq; 
+       if (GetNextHashPos(rssc->Messages, rssc->Pos, &len, &Key, (void**) &rssc->ThisMsg))
+               return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+       else
+               return eAbort;
 }
 
 
index b5bd79c4b9b66322f30520329f86d0d800554b3a..9cb40219b44e62da59f72da84b7ee52ed9d30129 100644 (file)
@@ -61,6 +61,13 @@ struct rss_room_counter {
        long QRnumber;
 };
 
+typedef struct __networker_save_message {
+       struct CtdlMessage *Msg;
+       StrBuf *MsgGUID;
+       StrBuf *Message;
+       struct UseTable ut;
+} networker_save_message;
+
 struct rss_aggregator {
        AsyncIO          IO;
        XML_Parser       xp;
@@ -78,16 +85,19 @@ struct rss_aggregator {
                        
        StrBuf          *CData;
        StrBuf          *Key;
-                       
+
        rss_item        *Item;
-       
+       struct recptypes recp;
+       HashPos         *Pos;
+       HashList        *Messages;
+       networker_save_message *ThisMsg;
        const rss_xml_handler *Current;
 };
 
 
 
-
-
 eNextState ParseRSSReply(AsyncIO *IO);
 
 void rss_save_item(rss_item *ri, rss_aggregator *Cfg);
+
+eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO);
index c125f61748aa956a38d48a3915135e1e737db01e..7e5cb481dc338636f25bcb8f322db873f8f6f92b 100644 (file)
@@ -89,15 +89,6 @@ void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Ti
                StrBufAppendBufPlain(Message, HKEY("</a><br>\n"), 0);
        }
 }
-typedef struct __networker_save_message {
-       AsyncIO IO;
-       struct CtdlMessage *Msg;
-       struct recptypes *recp;
-       rss_aggregator *Cfg;
-       StrBuf *MsgGUID;
-       StrBuf *Message;
-       struct UseTable ut;
-} networker_save_message;
 
 
 void DeleteRoomReference(long QRnumber)
@@ -162,7 +153,7 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg)
        DeleteHashPos(&At);
        last_run = time(NULL);
 }
-
+/*
 eNextState FreeNetworkSaveMessage (AsyncIO *IO)
 {
        networker_save_message *Ctx = (networker_save_message *) IO->Data;
@@ -187,6 +178,16 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO)
        last_run = time(NULL);
        return eAbort;
 }
+*/
+void FreeNetworkSaveMessage (void *vMsg)
+{
+       networker_save_message *Msg = (networker_save_message *) vMsg;
+
+       CtdlFreeMessage(Msg->Msg);
+       FreeStrBuf(&Msg->Message);
+       FreeStrBuf(&Msg->MsgGUID);
+       free(Msg);
+}
 
 eNextState AbortNetworkSaveMessage (AsyncIO *IO)
 {
@@ -195,39 +196,51 @@ eNextState AbortNetworkSaveMessage (AsyncIO *IO)
 
 eNextState RSSSaveMessage(AsyncIO *IO)
 {
-       networker_save_message *Ctx = (networker_save_message *) IO->Data;
+       long len;
+       const char *Key;
+       rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
 
-       Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message);
+       Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message);
 
-       CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0);
+       CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0);
 
        /* write the uidl to the use table so we don't store this item again */
-       cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) );
-
-       return eTerminateConnection;
+       cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
+       
+       if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
+               return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+       else
+               return eAbort;
 }
 
 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
 {
+       const char *Key;
+       long len;
        struct cdbdata *cdbut;
-       networker_save_message *Ctx = (networker_save_message *) IO->Data;
+       rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
+
 
        /* Find out if we've already seen this item */
-       strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO
-       Ctx->ut.ut_timestamp = time(NULL);
+       strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO
+       Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
 
-       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID));
+       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
 #ifndef DEBUG_RSS
        if (cdbut != NULL) {
                /* Item has already been seen */
-               syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID));
+               syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID));
                cdb_free(cdbut);
 
                /* rewrite the record anyway, to update the timestamp */
                cdb_store(CDB_USETABLE, 
-                         SKEY(Ctx->MsgGUID), 
-                         &Ctx->ut, sizeof(struct UseTable) );
-               return eAbort;
+                         SKEY(Ctx->ThisMsg->MsgGUID), 
+                         &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
+
+               if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg))
+                       return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry);
+               else
+                       return eAbort;
        }
        else
 #endif
@@ -236,7 +249,8 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
                return eSendMore;
        }
 }
-void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg)
+/*
+void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg)
 {
        networker_save_message *Ctx;
 
@@ -259,7 +273,7 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf
        Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage;
        QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry);
 }
-
+*/
 
 /*
  * Commit a fetched and parsed RSS item to disk
@@ -270,21 +284,12 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        struct MD5Context md5context;
        u_char rawdigest[MD5_DIGEST_LEN];
        struct CtdlMessage *msg;
-       struct recptypes *recp = NULL;
        int msglen = 0;
        StrBuf *Message;
        StrBuf *guid;
-       StrBuf *Buf;
-
-       recp = (struct recptypes *) malloc(sizeof(struct recptypes));
-       if (recp == NULL) return;
-       memset(recp, 0, sizeof(struct recptypes));
-       Buf = NewStrBufDup(Cfg->rooms);
-       recp->recp_room = SmashStrBuf(&Buf);
-       recp->num_room = Cfg->roomlist_parts;
-       recp->recptypes_magic = RECPTYPES_MAGIC;
+
+       int n;
    
-       Cfg->RefCount ++;
        /* Construct a GUID to use in the S_USETABLE table.
         * If one is not present in the item itself, make one up.
         */
@@ -420,7 +425,19 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg)
        AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this");
        StrBufAppendBufPlain(Message, HKEY("</body></html>\n"), 0);
 
-       RSSQueueSaveMessage(msg, recp, guid, Message, Cfg);
+
+
+       networker_save_message *SaveMsg;
+
+       SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message));
+       memset(SaveMsg, 0, sizeof(networker_save_message));
+       
+       SaveMsg->MsgGUID = guid;
+       SaveMsg->Message = Message;
+       SaveMsg->Msg = msg;
+
+       n = GetCount(Cfg->Messages) + 1;
+       Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage);
 }
 
 
@@ -525,7 +542,10 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO)
        GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData);
        DeleteEntryFromHash(RSSFetchUrls, At);
        pthread_mutex_unlock(&RSSQueueMutex);
-
+       DeleteHashPos (&rncptr->Pos);
+       DeleteHash (&rncptr->Messages);
+       if (rncptr->recp.recp_room != NULL)
+               free(rncptr->recp.recp_room);
        DeleteHashPos(&At);
        return eAbort;
 }