From 5ec1b7756ba201839aac2dfbf9165a1888f93a0d Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Wed, 2 Nov 2011 08:27:58 +0100 Subject: [PATCH] Work on RSS Feed - when finalizing an http request, evaluate the reply of IO->SendDone() for what to do. - flip start/stop on cURL IO events *argl* - don't fork per message DB I/O contexts, but remember messages on parsing and save them sequential -> simpler & better controleable. --- .../modules/eventclient/serv_eventclient.c | 39 +++++--- citadel/modules/rssclient/rss_atom_parser.c | 26 +++-- citadel/modules/rssclient/rss_atom_parser.h | 18 +++- citadel/modules/rssclient/serv_rssclient.c | 98 +++++++++++-------- 4 files changed, 118 insertions(+), 63 deletions(-) diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index da79852d1..2f9c5bc85 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -134,12 +134,28 @@ gotstatus(int nnrun) IO->HttpReq.chnd = NULL; IO->HttpReq.attached = 0; - IO->SendDone(IO); - - FreeStrBuf(&IO->HttpReq.ReplyData); - FreeURL(&IO->ConnectMe); - RemoveContext(IO->CitContext); - IO->Terminate(IO); + switch(IO->SendDone(IO)) + { + case eDBQuery: + break; + case eSendDNSQuery: + case eReadDNSReply: + case eConnect: + case eSendReply: + case eSendMore: + case eSendFile: + case eReadMessage: + case eReadMore: + case eReadPayload: + case eReadFile: + break; + case eTerminateConnection: + case eAbort: + FreeStrBuf(&IO->HttpReq.ReplyData); + FreeURL(&IO->ConnectMe); + RemoveContext(IO->CitContext); + IO->Terminate(IO); + } } } } @@ -208,7 +224,6 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) AsyncIO *IO = (AsyncIO*) vIO; CURLcode sta; - EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); if (sta) { @@ -225,8 +240,8 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) ev_io_init(&IO->recv_event, &got_in, fd, EV_READ); ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE); curl_multi_assign(mhnd, fd, IO); - } + EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); switch (action) { @@ -239,12 +254,12 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) ev_io_stop(event_base, &IO->send_event); break; case CURL_POLL_IN: - ev_io_stop(event_base, &IO->recv_event); - ev_io_start(event_base, &IO->send_event); + ev_io_start(event_base, &IO->recv_event); + ev_io_stop(event_base, &IO->send_event); break; case CURL_POLL_OUT: - ev_io_stop(event_base, &IO->send_event); - ev_io_start(event_base, &IO->recv_event); + ev_io_start(event_base, &IO->send_event); + ev_io_stop(event_base, &IO->recv_event); break; case CURL_POLL_INOUT: ev_io_start(event_base, &IO->send_event); diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index 7f1d612d6..d60d35ae2 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -609,11 +609,13 @@ size_t rss_libcurl_callback(void *ptr, size_t size, size_t nmemb, void *stream) eNextState ParseRSSReply(AsyncIO *IO) { + StrBuf *Buf; rss_aggregator *rssc; rss_item *ri; const char *at; char *ptr; long len; + const char *Key; rssc = IO->Data; pthread_mutex_lock(&RSSQueueMutex); @@ -647,10 +649,14 @@ eNextState ParseRSSReply(AsyncIO *IO) rssc->xp = XML_ParserCreateNS(ptr, ':'); if (!rssc->xp) { syslog(LOG_DEBUG, "Cannot create XML parser!\n"); - goto shutdown; + pthread_mutex_lock(&RSSQueueMutex); + rssc->RefCount --; + pthread_mutex_unlock(&RSSQueueMutex); + return eTerminateConnection; } FlushStrBuf(rssc->Key); + rssc->Messages = NewHash(1, Flathash); XML_SetElementHandler(rssc->xp, rss_xml_start, rss_xml_end); XML_SetCharacterDataHandler(rssc->xp, rss_xml_chardata); XML_SetUserData(rssc->xp, rssc); @@ -671,19 +677,23 @@ eNextState ParseRSSReply(AsyncIO *IO) XML_ErrorString( XML_GetErrorCode(rssc->xp))); -shutdown: XML_ParserFree(rssc->xp); - flush_rss_item(ri); FreeStrBuf(&rssc->CData); FreeStrBuf(&rssc->Key); - ///Cfg->next_poll = time(NULL) + config.c_net_freq; + Buf = NewStrBufDup(rssc->rooms); + rssc->recp.recp_room = SmashStrBuf(&Buf); + rssc->recp.num_room = rssc->roomlist_parts; + rssc->recp.recptypes_magic = RECPTYPES_MAGIC; - pthread_mutex_lock(&RSSQueueMutex); - rssc->RefCount --; - pthread_mutex_unlock(&RSSQueueMutex); - return eTerminateConnection; + rssc->Pos = GetNewHashPos(rssc->Messages, 1); + + ///Cfg->next_poll = time(NULL) + config.c_net_freq; + if (GetNextHashPos(rssc->Messages, rssc->Pos, &len, &Key, (void**) &rssc->ThisMsg)) + return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); + else + return eAbort; } diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index b5bd79c4b..9cb40219b 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -61,6 +61,13 @@ struct rss_room_counter { long QRnumber; }; +typedef struct __networker_save_message { + struct CtdlMessage *Msg; + StrBuf *MsgGUID; + StrBuf *Message; + struct UseTable ut; +} networker_save_message; + struct rss_aggregator { AsyncIO IO; XML_Parser xp; @@ -78,16 +85,19 @@ struct rss_aggregator { StrBuf *CData; StrBuf *Key; - + rss_item *Item; - + struct recptypes recp; + HashPos *Pos; + HashList *Messages; + networker_save_message *ThisMsg; const rss_xml_handler *Current; }; - - eNextState ParseRSSReply(AsyncIO *IO); void rss_save_item(rss_item *ri, rss_aggregator *Cfg); + +eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO); diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index c125f6174..7e5cb481d 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -89,15 +89,6 @@ void AppendLink(StrBuf *Message, StrBuf *link, StrBuf *LinkTitle, const char *Ti StrBufAppendBufPlain(Message, HKEY("
\n"), 0); } } -typedef struct __networker_save_message { - AsyncIO IO; - struct CtdlMessage *Msg; - struct recptypes *recp; - rss_aggregator *Cfg; - StrBuf *MsgGUID; - StrBuf *Message; - struct UseTable ut; -} networker_save_message; void DeleteRoomReference(long QRnumber) @@ -162,7 +153,7 @@ void UnlinkRSSAggregator(rss_aggregator *Cfg) DeleteHashPos(&At); last_run = time(NULL); } - +/* eNextState FreeNetworkSaveMessage (AsyncIO *IO) { networker_save_message *Ctx = (networker_save_message *) IO->Data; @@ -187,6 +178,16 @@ eNextState FreeNetworkSaveMessage (AsyncIO *IO) last_run = time(NULL); return eAbort; } +*/ +void FreeNetworkSaveMessage (void *vMsg) +{ + networker_save_message *Msg = (networker_save_message *) vMsg; + + CtdlFreeMessage(Msg->Msg); + FreeStrBuf(&Msg->Message); + FreeStrBuf(&Msg->MsgGUID); + free(Msg); +} eNextState AbortNetworkSaveMessage (AsyncIO *IO) { @@ -195,39 +196,51 @@ eNextState AbortNetworkSaveMessage (AsyncIO *IO) eNextState RSSSaveMessage(AsyncIO *IO) { - networker_save_message *Ctx = (networker_save_message *) IO->Data; + long len; + const char *Key; + rss_aggregator *Ctx = (rss_aggregator *) IO->Data; - Ctx->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->Message); + Ctx->ThisMsg->Msg->cm_fields['M'] = SmashStrBuf(&Ctx->ThisMsg->Message); - CtdlSubmitMsg(Ctx->Msg, Ctx->recp, NULL, 0); + CtdlSubmitMsg(Ctx->ThisMsg->Msg, &Ctx->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, SKEY(Ctx->MsgGUID), &Ctx->ut, sizeof(struct UseTable) ); - - return eTerminateConnection; + cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); + + if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) + return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); + else + return eAbort; } eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { + const char *Key; + long len; struct cdbdata *cdbut; - networker_save_message *Ctx = (networker_save_message *) IO->Data; + rss_aggregator *Ctx = (rss_aggregator *) IO->Data; + /* Find out if we've already seen this item */ - strcpy(Ctx->ut.ut_msgid, ChrPtr(Ctx->MsgGUID)); /// TODO - Ctx->ut.ut_timestamp = time(NULL); + strcpy(Ctx->ThisMsg->ut.ut_msgid, ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO + Ctx->ThisMsg->ut.ut_timestamp = time(NULL); - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->MsgGUID)); + cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); #ifndef DEBUG_RSS if (cdbut != NULL) { /* Item has already been seen */ - syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->MsgGUID)); + syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ cdb_store(CDB_USETABLE, - SKEY(Ctx->MsgGUID), - &Ctx->ut, sizeof(struct UseTable) ); - return eAbort; + SKEY(Ctx->ThisMsg->MsgGUID), + &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); + + if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) + return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); + else + return eAbort; } else #endif @@ -236,7 +249,8 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) return eSendMore; } } -void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregator *Cfg) +/* +void RSSAddSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf *MsgGUID, StrBuf *MessageBody, rss_aggregat *Cfg) { networker_save_message *Ctx; @@ -259,7 +273,7 @@ void RSSQueueSaveMessage(struct CtdlMessage *Msg, struct recptypes *recp, StrBuf Ctx->IO.ShutdownAbort = AbortNetworkSaveMessage; QueueDBOperation(&Ctx->IO, RSS_FetchNetworkUsetableEntry); } - +*/ /* * Commit a fetched and parsed RSS item to disk @@ -270,21 +284,12 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) struct MD5Context md5context; u_char rawdigest[MD5_DIGEST_LEN]; struct CtdlMessage *msg; - struct recptypes *recp = NULL; int msglen = 0; StrBuf *Message; StrBuf *guid; - StrBuf *Buf; - - recp = (struct recptypes *) malloc(sizeof(struct recptypes)); - if (recp == NULL) return; - memset(recp, 0, sizeof(struct recptypes)); - Buf = NewStrBufDup(Cfg->rooms); - recp->recp_room = SmashStrBuf(&Buf); - recp->num_room = Cfg->roomlist_parts; - recp->recptypes_magic = RECPTYPES_MAGIC; + + int n; - Cfg->RefCount ++; /* Construct a GUID to use in the S_USETABLE table. * If one is not present in the item itself, make one up. */ @@ -420,7 +425,19 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) AppendLink(Message, ri->reLink, ri->reLinkTitle, "Reply to this"); StrBufAppendBufPlain(Message, HKEY("\n"), 0); - RSSQueueSaveMessage(msg, recp, guid, Message, Cfg); + + + networker_save_message *SaveMsg; + + SaveMsg = (networker_save_message *) malloc(sizeof(networker_save_message)); + memset(SaveMsg, 0, sizeof(networker_save_message)); + + SaveMsg->MsgGUID = guid; + SaveMsg->Message = Message; + SaveMsg->Msg = msg; + + n = GetCount(Cfg->Messages) + 1; + Put(Cfg->Messages, IKEY(n), SaveMsg, FreeNetworkSaveMessage); } @@ -525,7 +542,10 @@ eNextState RSSAggregatorTerminate(AsyncIO *IO) GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); DeleteEntryFromHash(RSSFetchUrls, At); pthread_mutex_unlock(&RSSQueueMutex); - + DeleteHashPos (&rncptr->Pos); + DeleteHash (&rncptr->Messages); + if (rncptr->recp.recp_room != NULL) + free(rncptr->recp.recp_room); DeleteHashPos(&At); return eAbort; } -- 2.30.2