From f6877197c739711b051826923760edec66006376 Mon Sep 17 00:00:00 2001 From: Wilfried Goesgens Date: Fri, 4 Nov 2011 00:23:27 +0100 Subject: [PATCH] Work on evented RSS client & libev+libcurl integration - finalize simplification of when to delete what - add more logging --- citadel/context.c | 2 +- citadel/event_client.c | 6 +- .../modules/eventclient/serv_eventclient.c | 34 ++++++-- citadel/modules/rssclient/rss_atom_parser.c | 18 ++-- citadel/modules/rssclient/rss_atom_parser.h | 1 - citadel/modules/rssclient/serv_rssclient.c | 82 +++++++------------ 6 files changed, 75 insertions(+), 68 deletions(-) diff --git a/citadel/context.c b/citadel/context.c index 0b433502e..dd51112cc 100644 --- a/citadel/context.c +++ b/citadel/context.c @@ -345,7 +345,7 @@ void RemoveContext (CitContext *con) c = "WTF?"; } syslog(LOG_DEBUG, "RemoveContext(%s) session %d", c, con->cs_pid); - cit_backtrace(); +/// cit_backtrace(); /* Run any cleanup routines registered by loadable modules. * Note: We have to "become_session()" because the cleanup functions diff --git a/citadel/event_client.c b/citadel/event_client.c index e360a124a..94777828a 100644 --- a/citadel/event_client.c +++ b/citadel/event_client.c @@ -116,7 +116,7 @@ void ShutDownDBCLient(AsyncIO *IO) CitContext *Ctx =IO->CitContext; become_session(Ctx); - EVM_syslog(LOG_DEBUG, "DBEVENT\n"); + EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n"); ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown); assert(IO->Terminate); @@ -154,7 +154,9 @@ DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents) break; case eTerminateConnection: case eAbort: - ShutDownDBCLient(IO); + ev_idle_stop(event_db, &IO->db_unwind_stack); + ev_cleanup_stop(loop, &IO->db_abort_by_shutdown); + ShutDownDBCLient(IO); } } diff --git a/citadel/modules/eventclient/serv_eventclient.c b/citadel/modules/eventclient/serv_eventclient.c index 2f9c5bc85..153f1847e 100644 --- a/citadel/modules/eventclient/serv_eventclient.c +++ b/citadel/modules/eventclient/serv_eventclient.c @@ -60,7 +60,7 @@ ev_loop *event_base; -long EvIDSource = 0; +long EvIDSource = 1; /***************************************************************************** * libevent / curl integration * *****************************************************************************/ @@ -223,6 +223,7 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) char *f; AsyncIO *IO = (AsyncIO*) vIO; CURLcode sta; + const char *Action; if (IO == NULL) { sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f); @@ -231,6 +232,7 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) return -1; } IO = (AsyncIO *) f; + EV_syslog(LOG_DEBUG, "EVCURL: got socket for URL: %s\n", IO->ConnectMe->PlainUrl); if (IO->SendBuf.fd != 0) { ev_io_stop(event_base, &IO->recv_event); @@ -241,7 +243,29 @@ gotwatchsock(CURL *easy, curl_socket_t fd, int action, void *cglobal, void *vIO) ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE); curl_multi_assign(mhnd, fd, IO); } - EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%d\n", (int)fd, action); + + Action = ""; + switch (action) + { + case CURL_POLL_NONE: + Action = "CURL_POLL_NONE"; + break; + case CURL_POLL_REMOVE: + Action = "CURL_POLL_REMOVE"; + break; + case CURL_POLL_IN: + Action = "CURL_POLL_IN"; + break; + case CURL_POLL_OUT: + Action = "CURL_POLL_OUT"; + break; + case CURL_POLL_INOUT: + Action = "CURL_POLL_INOUT"; + break; + } + + + EV_syslog(LOG_DEBUG, "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n", (int)fd, Action, action); switch (action) { @@ -329,7 +353,7 @@ int evcurl_init(AsyncIO *IO, OPT(NOSIGNAL, (long)1); OPT(FAILONERROR, (long)1); OPT(ENCODING, ""); - OPT(FOLLOWLOCATION, (long)1); + OPT(FOLLOWLOCATION, (long)0); OPT(MAXREDIRS, (long)7); OPT(USERAGENT, CITADEL); @@ -455,7 +479,7 @@ static void QueueEventAddCallback(EV_P_ ev_async *w, int revents) } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "EVENT Q Read done.\n"); + syslog(LOG_DEBUG, "EVENT Q Add done.\n"); } @@ -583,7 +607,7 @@ static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents) } DeleteHashPos(&It); DeleteHashContent(&q); - syslog(LOG_DEBUG, "DBEVENT Q Read done.\n"); + syslog(LOG_DEBUG, "DBEVENT Q Add done.\n"); } diff --git a/citadel/modules/rssclient/rss_atom_parser.c b/citadel/modules/rssclient/rss_atom_parser.c index d60d35ae2..9bbeb8a1f 100644 --- a/citadel/modules/rssclient/rss_atom_parser.c +++ b/citadel/modules/rssclient/rss_atom_parser.c @@ -617,10 +617,17 @@ eNextState ParseRSSReply(AsyncIO *IO) long len; const char *Key; + + if (IO->HttpReq.httpcode != 200) + { + + EV_syslog(LOG_DEBUG, "need a 200, got a %ld !\n", + IO->HttpReq.httpcode); +// TODO: aide error message with rate limit + return eAbort; + } + rssc = IO->Data; - pthread_mutex_lock(&RSSQueueMutex); - rssc->RefCount ++; - pthread_mutex_unlock(&RSSQueueMutex); ri = rssc->Item; rssc->CData = NewStrBufPlain(NULL, SIZ); rssc->Key = NewStrBuf(); @@ -649,10 +656,7 @@ eNextState ParseRSSReply(AsyncIO *IO) rssc->xp = XML_ParserCreateNS(ptr, ':'); if (!rssc->xp) { syslog(LOG_DEBUG, "Cannot create XML parser!\n"); - pthread_mutex_lock(&RSSQueueMutex); - rssc->RefCount --; - pthread_mutex_unlock(&RSSQueueMutex); - return eTerminateConnection; + return eAbort; } FlushStrBuf(rssc->Key); diff --git a/citadel/modules/rssclient/rss_atom_parser.h b/citadel/modules/rssclient/rss_atom_parser.h index 9cb40219b..482c7c627 100644 --- a/citadel/modules/rssclient/rss_atom_parser.h +++ b/citadel/modules/rssclient/rss_atom_parser.h @@ -72,7 +72,6 @@ struct rss_aggregator { AsyncIO IO; XML_Parser xp; - int RefCount; int ItemType; int roomlist_parts; diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 7e5cb481d..24e3d6097 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -208,7 +208,7 @@ eNextState RSSSaveMessage(AsyncIO *IO) cdb_store(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID), &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) - return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); + return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; } @@ -229,7 +229,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) #ifndef DEBUG_RSS if (cdbut != NULL) { /* Item has already been seen */ - syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); + EV_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); cdb_free(cdbut); /* rewrite the record anyway, to update the timestamp */ @@ -238,7 +238,7 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, &len, &Key, (void**) &Ctx->ThisMsg)) - return QueueDBOperation(IO, RSS_FetchNetworkUsetableEntry); + return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry); else return eAbort; } @@ -287,6 +287,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) int msglen = 0; StrBuf *Message; StrBuf *guid; + AsyncIO *IO = &Cfg->IO; int n; @@ -314,7 +315,7 @@ void rss_save_item(rss_item *ri, rss_aggregator *Cfg) } /* translate Item into message. */ - syslog(LOG_DEBUG, "RSS: translating item...\n"); + EVM_syslog(LOG_DEBUG, "RSS: translating item...\n"); if (ri->description == NULL) ri->description = NewStrBufPlain(HKEY("")); StrBufSpaceToBlank(ri->description); msg = malloc(sizeof(struct CtdlMessage)); @@ -456,7 +457,6 @@ int rss_do_fetching(rss_aggregator *Cfg) if ((Cfg->next_poll != 0) && (now < Cfg->next_poll)) return 0; - Cfg->RefCount++; ri = (rss_item*) malloc(sizeof(rss_item)); memset(ri, 0, sizeof(rss_item)); @@ -489,6 +489,8 @@ int rss_do_fetching(rss_aggregator *Cfg) void DeleteRssCfg(void *vptr) { rss_aggregator *rncptr = (rss_aggregator *)vptr; + AsyncIO *IO = &rncptr->IO; + EVM_syslog(LOG_DEBUG, "RSS: destroying\n"); FreeStrBuf(&rncptr->Url); FreeStrBuf(&rncptr->rooms); @@ -498,6 +500,12 @@ void DeleteRssCfg(void *vptr) DeleteHash(&rncptr->OtherQRnumbers); FreeURL(&rncptr->IO.ConnectMe); + DeleteHashPos (&rncptr->Pos); + DeleteHash (&rncptr->Messages); + if (rncptr->recp.recp_room != NULL) + free(rncptr->recp.recp_room); + + if (rncptr->Item != NULL) { FreeStrBuf(&rncptr->Item->guid); @@ -520,33 +528,11 @@ void DeleteRssCfg(void *vptr) eNextState RSSAggregatorTerminate(AsyncIO *IO) { rss_aggregator *rncptr = (rss_aggregator *)IO->Data; - - HashPos *At; - long HKLen; - const char *HK; - void *vData; - pthread_mutex_lock(&RSSQueueMutex); - rncptr->RefCount --; - if (rncptr->RefCount == 0) - { - UnlinkRSSAggregator(rncptr); - - } - pthread_mutex_unlock(&RSSQueueMutex); + EVM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); - At = GetNewHashPos(RSSFetchUrls, 0); - pthread_mutex_lock(&RSSQueueMutex); - GetHashPosFromKey(RSSFetchUrls, SKEY(rncptr->Url), At); - GetHashPos(RSSFetchUrls, At, &HKLen, &HK, &vData); - DeleteEntryFromHash(RSSFetchUrls, At); - pthread_mutex_unlock(&RSSQueueMutex); - DeleteHashPos (&rncptr->Pos); - DeleteHash (&rncptr->Messages); - if (rncptr->recp.recp_room != NULL) - free(rncptr->recp.recp_room); - DeleteHashPos(&At); + UnlinkRSSAggregator(rncptr); return eAbort; } @@ -648,29 +634,20 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) use_this_rncptr = (rss_aggregator *)vptr; if (use_this_rncptr != NULL) { - /* mustn't attach to an active session */ - if (use_this_rncptr->RefCount > 0) - { - DeleteRssCfg(rncptr); - Count->count--; - } - else + long *QRnumber; + StrBufAppendBufPlain(use_this_rncptr->rooms, + qrbuf->QRname, + -1, 0); + if (use_this_rncptr->roomlist_parts == 1) { - long *QRnumber; - StrBufAppendBufPlain(use_this_rncptr->rooms, - qrbuf->QRname, - -1, 0); - if (use_this_rncptr->roomlist_parts == 1) - { - use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash); - } - QRnumber = (long*)malloc(sizeof(long)); - *QRnumber = qrbuf->QRnumber; - Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); - use_this_rncptr->roomlist_parts++; + use_this_rncptr->OtherQRnumbers = NewHash(1, lFlathash); } - pthread_mutex_unlock(&RSSQueueMutex); + QRnumber = (long*)malloc(sizeof(long)); + *QRnumber = qrbuf->QRnumber; + Put(use_this_rncptr->OtherQRnumbers, LKEY(qrbuf->QRnumber), QRnumber, NULL); + use_this_rncptr->roomlist_parts++; + pthread_mutex_unlock(&RSSQueueMutex); FreeStrBuf(&rncptr->Url); free(rncptr); @@ -727,6 +704,8 @@ void rssclient_scan(void) { */ if (doing_rssclient) return; doing_rssclient = 1; + if ((GetCount(RSSQueueRooms) > 0) || (GetCount(RSSFetchUrls) > 0)) + return; syslog(LOG_DEBUG, "rssclient started\n"); CtdlForEachRoom(rssclient_scan_room, NULL); @@ -738,9 +717,8 @@ void rssclient_scan(void) { GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && (vrptr != NULL)) { rptr = (rss_aggregator *)vrptr; - if (rptr->RefCount == 0) - if (!rss_do_fetching(rptr)) - UnlinkRSSAggregator(rptr); + if (!rss_do_fetching(rptr)) + UnlinkRSSAggregator(rptr); } DeleteHashPos(&it); pthread_mutex_unlock(&RSSQueueMutex); -- 2.30.2