X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=dfed6bc992d878905ef5b9c00ec91a5e64d46979;hb=f57854388b4698ef8b1e260bea207ba1709bb8a2;hp=c2d95d34a88de0e68b0510524129fcfccafa16ba;hpb=b160dd583e9165da4689e1a09cd4d675dedb6ca4;p=citadel.git diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index c2d95d34a..dfed6bc99 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -69,7 +69,7 @@ struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; int RSSClientDebugEnabled = 0; -#define N ((rss_aggregator*)IO->Data)->QRnumber +#define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0)) @@ -93,6 +93,28 @@ int RSSClientDebugEnabled = 0; DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \ IO->ID, N) +typedef enum _RSSState { + eRSSCreated, + eRSSFetching, + eRSSFailure, + eRSSParsing, + eRSSUT +} RSSState; +ConstStr RSSStates[] = { + {HKEY("Aggregator created")}, + {HKEY("Fetching content")}, + {HKEY("Failed")}, + {HKEY("parsing content")}, + {HKEY("checking usetable")} +}; + +static void SetRSSState(AsyncIO *IO, RSSState State) +{ + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1); +} + void DeleteRoomReference(long QRnumber) { HashPos *At; @@ -119,7 +141,7 @@ void DeleteRoomReference(long QRnumber) void UnlinkRooms(rss_aggregator *RSSAggr) { - DeleteRoomReference(RSSAggr->QRnumber); + DeleteRoomReference(RSSAggr->Cfg.QRnumber); if (RSSAggr->OtherQRnumbers != NULL) { long HKLen; @@ -135,8 +157,8 @@ void UnlinkRooms(rss_aggregator *RSSAggr) &vData) && (vData != NULL)) { - long *lData = (long*) vData; - DeleteRoomReference(*lData); + pRSSConfig *Data = (pRSSConfig*) vData; + DeleteRoomReference(Data->QRnumber); } DeleteHashPos(&At); @@ -164,7 +186,9 @@ void DeleteRssCfg(void *vptr) { rss_aggregator *RSSAggr = (rss_aggregator *)vptr; AsyncIO *IO = &RSSAggr->IO; - EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); + + if (IO->CitContext != NULL) + EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); FreeStrBuf(&RSSAggr->Url); FreeStrBuf(&RSSAggr->rooms); @@ -369,10 +393,8 @@ eNextState RSSSaveMessage(AsyncIO *IO) CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, - SKEY(RSSAggr->ThisMsg->MsgGUID), - &RSSAggr->ThisMsg->ut, - sizeof(struct UseTable) ); + + CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID); if (GetNextHashPos(RSSAggr->Messages, RSSAggr->Pos, @@ -387,27 +409,25 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { const char *Key; long len; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, - ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO - Ctx->ThisMsg->ut.ut_timestamp = time(NULL); - - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); +// todo: expiry? #ifndef DEBUG_RSS - if (cdbut != NULL) { + SetRSSState(IO, eRSSUT); + if (CheckIfAlreadySeen("RSS Item Seen", + Ctx->ThisMsg->MsgGUID, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { /* Item has already been seen */ EVRSSC_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); - cdb_free(cdbut); - - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), - &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); + SetRSSState(IO, eRSSParsing); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, @@ -422,18 +442,49 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) else #endif { + SetRSSState(IO, eRSSParsing); + NextDBOperation(IO, RSSSaveMessage); return eSendMore; } + return eSendMore; +} + +void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now) +{ + OneRoomNetCfg* pRNCfg; + begin_critical_section(S_NETCONFIGS); + pRNCfg = CtdlGetNetCfgForRoom (pCfg->QRnumber); + if (pRNCfg != NULL) + { + RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if (RSSCfg == pCfg->pCfg) + break; + + RSSCfg = RSSCfg->next; + } + if (RSSCfg != NULL) + { + pRNCfg->changed = 1; + RSSCfg->last_known_good = now; + } + } + + end_critical_section(S_NETCONFIGS); } eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) { - struct UseTable ut; + HashPos *it = NULL; + long len; + const char *Key; + pRSSConfig *pCfg; u_char rawdigest[MD5_DIGEST_LEN]; struct MD5Context md5context; StrBuf *guid; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; if (IO->HttpReq.httpcode != 200) @@ -442,6 +493,7 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) long lens[2]; const char *strs[2]; + SetRSSState(IO, eRSSFailure); ErrMsg = NewStrBuf(); EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n", IO->HttpReq.httpcode); @@ -462,11 +514,43 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) CtdlAideFPMessage( ChrPtr(ErrMsg), "RSS Aggregation run failure", - 2, strs, (long*) &lens); + 2, strs, (long*) &lens, + IO->Now, + IO->ID, CCID); + FreeStrBuf(&ErrMsg); + EVRSSC_syslog(LOG_DEBUG, + "RSS feed returned an invalid http status code. <%s>\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode); return eAbort; } + pCfg = &Ctx->Cfg; + + while (pCfg != NULL) + { + UpdateLastKnownGood (pCfg, IO->Now); + if ((Ctx->roomlist_parts > 1) && + (it == NULL)) + { + it = GetNewHashPos(RSSFetchUrls, 0); + } + if (it != NULL) + { + void *vptr; + if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr)) + pCfg = vptr; + else + pCfg = NULL; + } + else + pCfg = NULL; + } + DeleteHashPos (&it); + + SetRSSState(IO, eRSSUT); + MD5Init(&md5context); MD5Update(&md5context, @@ -483,31 +567,31 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) if (StrLength(guid) > 40) StrBufCutAt(guid, 40, NULL); /* Find out if we've already seen this item */ - memcpy(ut.ut_msgid, SKEY(guid)); - ut.ut_timestamp = time(NULL); - cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid)); #ifndef DEBUG_RSS - if (cdbut != NULL) { - /* Item has already been seen */ - EVRSSC_syslog(LOG_DEBUG, - "%s has already been seen\n", - ChrPtr(Ctx->Url)); - cdb_free(cdbut); - } - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(guid), - &ut, sizeof(struct UseTable) ); + if (CheckIfAlreadySeen("RSS Whole", + guid, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { + FreeStrBuf(&guid); + + EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url)); + return eAbort; + } FreeStrBuf(&guid); - if (cdbut != NULL) return eAbort; #endif + SetRSSState(IO, eRSSParsing); return RSSAggregator_ParseReply(IO); } eNextState RSSAggregator_FinishHttp(AsyncIO *IO) { + StopCurlWatchers(IO); return QueueDBOperation(IO, RSSAggregator_AnalyseReply); } @@ -540,6 +624,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); return 0; } + SetRSSState(IO, eRSSCreated); safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host, ChrPtr(RSSAggr->Url), @@ -549,6 +634,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80); CurlPrepareURL(RSSAggr->IO.ConnectMe); + SetRSSState(IO, eRSSFetching); QueueCurlContext(&RSSAggr->IO); return 1; } @@ -558,8 +644,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) */ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) { - const RoomNetCfgLine *pLine; - rss_room_counter *Count = NULL; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; rss_aggregator *RSSAggr = NULL; rss_aggregator *use_this_RSSAggr = NULL; void *vptr; @@ -578,34 +663,18 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR if (server_shutting_down) return; - pLine = OneRNCFG->NetConfigs[pop3client]; - - while (pLine != NULL) + while (RSSCfg != NULL) { - if (Count == NULL) - { - Count = malloc( - sizeof(rss_room_counter)); - Count->count = 0; - } - Count->count ++; - RSSAggr = (rss_aggregator *) malloc( - sizeof(rss_aggregator)); - - memset (RSSAggr, 0, sizeof(rss_aggregator)); - RSSAggr->QRnumber = qrbuf->QRnumber; - RSSAggr->roomlist_parts = 1; - RSSAggr->Url = NewStrBufDup(pLine->Value[1]); - pthread_mutex_lock(&RSSQueueMutex); GetHash(RSSFetchUrls, - SKEY(RSSAggr->Url), + SKEY(RSSCfg->Url), &vptr); use_this_RSSAggr = (rss_aggregator *)vptr; if (use_this_RSSAggr != NULL) { - long *QRnumber; + pRSSConfig *pRSSCfg; + StrBufAppendBufPlain( use_this_RSSAggr->rooms, qrbuf->QRname, @@ -615,23 +684,34 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR use_this_RSSAggr->OtherQRnumbers = NewHash(1, lFlathash); } - QRnumber = (long*)malloc(sizeof(long)); - *QRnumber = qrbuf->QRnumber; + + pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig)); + + pRSSCfg->QRnumber = qrbuf->QRnumber; + pRSSCfg->pCfg = RSSCfg; + Put(use_this_RSSAggr->OtherQRnumbers, LKEY(qrbuf->QRnumber), - QRnumber, + pRSSCfg, NULL); use_this_RSSAggr->roomlist_parts++; pthread_mutex_unlock(&RSSQueueMutex); - FreeStrBuf(&RSSAggr->Url); - free(RSSAggr); - RSSAggr = NULL; + RSSCfg = RSSCfg->next; continue; } pthread_mutex_unlock(&RSSQueueMutex); + RSSAggr = (rss_aggregator *) malloc( + sizeof(rss_aggregator)); + + memset (RSSAggr, 0, sizeof(rss_aggregator)); + RSSAggr->Cfg.QRnumber = qrbuf->QRnumber; + RSSAggr->Cfg.pCfg = RSSCfg; + RSSAggr->roomlist_parts = 1; + RSSAggr->Url = NewStrBufDup(RSSCfg->Url); + RSSAggr->ItemType = RSS_UNSET; RSSAggr->rooms = NewStrBufPlain( @@ -645,6 +725,7 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneR DeleteRssCfg); pthread_mutex_unlock(&RSSQueueMutex); + RSSCfg = RSSCfg->next; } } @@ -688,21 +769,29 @@ void rssclient_scan(void) { } become_session(&rss_CC); - EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n"); + EVRSSQM_syslog(LOG_DEBUG, "rssclient started"); CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient); - pthread_mutex_lock(&RSSQueueMutex); - - it = GetNewHashPos(RSSFetchUrls, 0); - while (!server_shutting_down && - GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && - (vrptr != NULL)) { - rptr = (rss_aggregator *)vrptr; - if (!rss_do_fetching(rptr)) - UnlinkRSSAggregator(rptr); + if (GetCount(RSSFetchUrls) > 0) + { + pthread_mutex_lock(&RSSQueueMutex); + EVRSSQ_syslog(LOG_DEBUG, + "rssclient starting %d Clients", + GetCount(RSSFetchUrls)); + + it = GetNewHashPos(RSSFetchUrls, 0); + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + (vrptr != NULL)) { + rptr = (rss_aggregator *)vrptr; + if (!rss_do_fetching(rptr)) + UnlinkRSSAggregator(rptr); + } + DeleteHashPos(&it); + pthread_mutex_unlock(&RSSQueueMutex); } - DeleteHashPos(&it); - pthread_mutex_unlock(&RSSQueueMutex); + else + EVRSSQM_syslog(LOG_DEBUG, "Nothing to do."); EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n"); return; @@ -720,11 +809,93 @@ void LogDebugEnableRSSClient(const int n) RSSClientDebugEnabled = n; } + +typedef struct __RSSVetoInfo { + StrBuf *ErrMsg; + time_t Now; + int Veto; +}RSSVetoInfo; + +void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) +{ + RSSVetoInfo *Info = (RSSVetoInfo *) data; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if ((RSSCfg->last_known_good != 0) && + (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now)) + { + StrBufAppendPrintf(Info->ErrMsg, + "RSS feed not seen for a %d days:: <", + (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60)); + + StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0); + StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0); + } + RSSCfg = RSSCfg->next; + } +} + +int RSSCheckUsetableVeto(StrBuf *ErrMsg) +{ + RSSVetoInfo Info; + + Info.ErrMsg = ErrMsg; + Info.Now = time (NULL); + Info.Veto = 0; + + CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient); + + return Info.Veto;; +} + + + + +void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG) +{ + RSSCfgLine *RSSCfg; + + RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine)); + RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line)); + + + StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|'); + RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|'); + + + RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C]; + OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg; +} + +void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) data; + + StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0); + StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0); + StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0); + StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good); +} + +void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) *data; + + FreeStrBuf(&RSSCfg->Url); + free(*data); + *data = NULL; +} + + CTDL_MODULE_INIT(rssclient) { if (!threading) { - CtdlREGISTERRoomCfgType(rssclient, ParseGeneric, 0, 1, SerializeGeneric, DeleteGenericCfgLine); /// todo: implement rss specific parser + CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0); + + CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine); pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL);