X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=b2a069fd6a2ecabe0ccc172e84a9dcc86d0145f8;hb=d262741cce924786b2de5f3f10f5470e5d45743e;hp=82e206792bb77ea32d404e609769be0911341929;hpb=222d2f63b0f523ce6b79e4b2ca9767f80e0bf85e;p=citadel.git diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 82e206792..b2a069fd6 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -69,7 +69,7 @@ struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; int RSSClientDebugEnabled = 0; -#define N ((rss_aggregator*)IO->Data)->QRnumber +#define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0)) @@ -93,6 +93,28 @@ int RSSClientDebugEnabled = 0; DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \ IO->ID, N) +typedef enum _RSSState { + eRSSCreated, + eRSSFetching, + eRSSFailure, + eRSSParsing, + eRSSUT +} RSSState; +ConstStr RSSStates[] = { + {HKEY("Aggregator created")}, + {HKEY("Fetching content")}, + {HKEY("Failed")}, + {HKEY("parsing content")}, + {HKEY("checking usetable")} +}; + +static void SetRSSState(AsyncIO *IO, RSSState State) +{ + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1); +} + void DeleteRoomReference(long QRnumber) { HashPos *At; @@ -119,7 +141,7 @@ void DeleteRoomReference(long QRnumber) void UnlinkRooms(rss_aggregator *RSSAggr) { - DeleteRoomReference(RSSAggr->QRnumber); + DeleteRoomReference(RSSAggr->Cfg.QRnumber); if (RSSAggr->OtherQRnumbers != NULL) { long HKLen; @@ -135,8 +157,8 @@ void UnlinkRooms(rss_aggregator *RSSAggr) &vData) && (vData != NULL)) { - long *lData = (long*) vData; - DeleteRoomReference(*lData); + pRSSConfig *Data = (pRSSConfig*) vData; + DeleteRoomReference(Data->QRnumber); } DeleteHashPos(&At); @@ -164,7 +186,9 @@ void DeleteRssCfg(void *vptr) { rss_aggregator *RSSAggr = (rss_aggregator *)vptr; AsyncIO *IO = &RSSAggr->IO; - EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); + + if (IO->CitContext != NULL) + EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); FreeStrBuf(&RSSAggr->Url); FreeStrBuf(&RSSAggr->rooms); @@ -369,10 +393,8 @@ eNextState RSSSaveMessage(AsyncIO *IO) CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, - SKEY(RSSAggr->ThisMsg->MsgGUID), - &RSSAggr->ThisMsg->ut, - sizeof(struct UseTable) ); + + CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, IO->Now, 0, eWrite, IO->ID, CCID); if (GetNextHashPos(RSSAggr->Messages, RSSAggr->Pos, @@ -387,27 +409,25 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { const char *Key; long len; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, - ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO - Ctx->ThisMsg->ut.ut_timestamp = time(NULL); - - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); +// todo: expiry? #ifndef DEBUG_RSS - if (cdbut != NULL) { + SetRSSState(IO, eRSSUT); + if (CheckIfAlreadySeen("RSS Item Seen", + Ctx->ThisMsg->MsgGUID, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { /* Item has already been seen */ EVRSSC_syslog(LOG_DEBUG, "%s has already been seen\n", ChrPtr(Ctx->ThisMsg->MsgGUID)); - cdb_free(cdbut); - - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), - &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); + SetRSSState(IO, eRSSParsing); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, @@ -422,18 +442,49 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) else #endif { + SetRSSState(IO, eRSSParsing); + NextDBOperation(IO, RSSSaveMessage); return eSendMore; } + return eSendMore; +} + +void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now) +{ + OneRoomNetCfg* pRNCfg; + begin_critical_section(S_NETCONFIGS); + pRNCfg = CtdlGetNetCfgForRoom (pCfg->QRnumber); + if (pRNCfg != NULL) + { + RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if (RSSCfg == pCfg->pCfg) + break; + + RSSCfg = RSSCfg->Next; + } + if (RSSCfg != NULL) + { + pRNCfg->changed = 1; + RSSCfg->last_known_good = now; + } + } + + end_critical_section(S_NETCONFIGS); } eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) { - struct UseTable ut; + HashPos *it = NULL; + long len; + const char *Key; + pRSSConfig *pCfg; u_char rawdigest[MD5_DIGEST_LEN]; struct MD5Context md5context; StrBuf *guid; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; if (IO->HttpReq.httpcode != 200) @@ -442,6 +493,7 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) long lens[2]; const char *strs[2]; + SetRSSState(IO, eRSSFailure); ErrMsg = NewStrBuf(); EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n", IO->HttpReq.httpcode); @@ -462,11 +514,41 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) CtdlAideFPMessage( ChrPtr(ErrMsg), "RSS Aggregation run failure", - 2, strs, (long*) &lens); + 2, strs, (long*) &lens, + IO->Now, + IO->ID, CCID); + FreeStrBuf(&ErrMsg); + EVRSSC_syslog(LOG_DEBUG, + "RSS feed returned an invalid http status code. <%s>\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode); return eAbort; } + pCfg = &Ctx->Cfg; + + while (pCfg != NULL) + { + UpdateLastKnownGood (pCfg, IO->Now); + if ((Ctx->roomlist_parts > 1) && + (it == NULL)) + { + it = GetNewHashPos(RSSFetchUrls, 0); + } + if (it != NULL) + { + void *vptr; + GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr); + pCfg = vptr; + } + else + pCfg = NULL; + } + DeleteHashPos (&it); + + SetRSSState(IO, eRSSUT); + MD5Init(&md5context); MD5Update(&md5context, @@ -483,26 +565,25 @@ eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) if (StrLength(guid) > 40) StrBufCutAt(guid, 40, NULL); /* Find out if we've already seen this item */ - memcpy(ut.ut_msgid, SKEY(guid)); - ut.ut_timestamp = time(NULL); - cdbut = cdb_fetch(CDB_USETABLE, SKEY(guid)); #ifndef DEBUG_RSS - if (cdbut != NULL) { - /* Item has already been seen */ - EVRSSC_syslog(LOG_DEBUG, - "%s has already been seen\n", - ChrPtr(Ctx->Url)); - cdb_free(cdbut); - } - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(guid), - &ut, sizeof(struct UseTable) ); + if (CheckIfAlreadySeen("RSS Whole", + guid, + IO->Now, + IO->Now - USETABLE_ANTIEXPIRE, + eCheckUpdate, + IO->ID, CCID) + != 0) + { + FreeStrBuf(&guid); + + EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url)); + return eAbort; + } FreeStrBuf(&guid); - if (cdbut != NULL) return eAbort; #endif + SetRSSState(IO, eRSSParsing); return RSSAggregator_ParseReply(IO); } @@ -540,6 +621,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); return 0; } + SetRSSState(IO, eRSSCreated); safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host, ChrPtr(RSSAggr->Url), @@ -549,6 +631,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80); CurlPrepareURL(RSSAggr->IO.ConnectMe); + SetRSSState(IO, eRSSFetching); QueueCurlContext(&RSSAggr->IO); return 1; } @@ -556,21 +639,12 @@ int rss_do_fetching(rss_aggregator *RSSAggr) /* * Scan a room's netconfig to determine whether it is requesting any RSS feeds */ -void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) +void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) { - StrBuf *CfgData=NULL; - StrBuf *CfgType; - StrBuf *Line; - rss_room_counter *Count = NULL; - struct stat statbuf; - char filename[PATH_MAX]; - int fd; - int Done; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; rss_aggregator *RSSAggr = NULL; rss_aggregator *use_this_RSSAggr = NULL; void *vptr; - const char *CfgPtr, *lPtr; - const char *Err; pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) @@ -584,143 +658,72 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) } pthread_mutex_unlock(&RSSQueueMutex); - assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); + if (server_shutting_down) return; - if (server_shutting_down) - return; + while (RSSCfg != NULL) + { + pthread_mutex_lock(&RSSQueueMutex); + GetHash(RSSFetchUrls, + SKEY(RSSCfg->Url), + &vptr); - /* Only do net processing for rooms that have netconfigs */ - fd = open(filename, 0); - if (fd <= 0) { - /* syslog(LOG_DEBUG, - "rssclient: %s no config.\n", - qrbuf->QRname); */ - return; - } + use_this_RSSAggr = (rss_aggregator *)vptr; + if (use_this_RSSAggr != NULL) + { + pRSSConfig *pRSSCfg; - if (server_shutting_down) - return; + StrBufAppendBufPlain( + use_this_RSSAggr->rooms, + qrbuf->QRname, + -1, 0); + if (use_this_RSSAggr->roomlist_parts==1) + { + use_this_RSSAggr->OtherQRnumbers + = NewHash(1, lFlathash); + } - if (fstat(fd, &statbuf) == -1) { - EVRSSQ_syslog(LOG_DEBUG, - "ERROR: could not stat configfile '%s' - %s\n", - filename, - strerror(errno)); - return; - } + pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig)); - if (server_shutting_down) - return; + pRSSCfg->QRnumber = qrbuf->QRnumber; + pRSSCfg->pCfg = RSSCfg; - CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + Put(use_this_RSSAggr->OtherQRnumbers, + LKEY(qrbuf->QRnumber), + pRSSCfg, + NULL); + use_this_RSSAggr->roomlist_parts++; - if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { - close(fd); - FreeStrBuf(&CfgData); - EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s
\n", - filename, strerror(errno)); - return; - } - close(fd); - if (server_shutting_down) - return; + pthread_mutex_unlock(&RSSQueueMutex); - CfgPtr = NULL; - CfgType = NewStrBuf(); - Line = NewStrBufPlain(NULL, StrLength(CfgData)); - Done = 0; - while (!Done) - { - Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0; - if (StrLength(Line) > 0) - { - lPtr = NULL; - StrBufExtract_NextToken(CfgType, Line, &lPtr, '|'); - if (!strcasecmp("rssclient", ChrPtr(CfgType))) - { - if (Count == NULL) - { - Count = malloc( - sizeof(rss_room_counter)); - Count->count = 0; - } - Count->count ++; - RSSAggr = (rss_aggregator *) malloc( - sizeof(rss_aggregator)); - - memset (RSSAggr, 0, sizeof(rss_aggregator)); - RSSAggr->QRnumber = qrbuf->QRnumber; - RSSAggr->roomlist_parts = 1; - RSSAggr->Url = NewStrBuf(); - - StrBufExtract_NextToken(RSSAggr->Url, - Line, - &lPtr, - '|'); - - pthread_mutex_lock(&RSSQueueMutex); - GetHash(RSSFetchUrls, - SKEY(RSSAggr->Url), - &vptr); - - use_this_RSSAggr = (rss_aggregator *)vptr; - if (use_this_RSSAggr != NULL) - { - long *QRnumber; - StrBufAppendBufPlain( - use_this_RSSAggr->rooms, - qrbuf->QRname, - -1, 0); - if (use_this_RSSAggr->roomlist_parts==1) - { - use_this_RSSAggr->OtherQRnumbers - = NewHash(1, lFlathash); - } - QRnumber = (long*)malloc(sizeof(long)); - *QRnumber = qrbuf->QRnumber; - Put(use_this_RSSAggr->OtherQRnumbers, - LKEY(qrbuf->QRnumber), - QRnumber, - NULL); - use_this_RSSAggr->roomlist_parts++; - - pthread_mutex_unlock(&RSSQueueMutex); - - FreeStrBuf(&RSSAggr->Url); - free(RSSAggr); - RSSAggr = NULL; - continue; - } - pthread_mutex_unlock(&RSSQueueMutex); - - RSSAggr->ItemType = RSS_UNSET; - - RSSAggr->rooms = NewStrBufPlain( - qrbuf->QRname, -1); - - pthread_mutex_lock(&RSSQueueMutex); - - Put(RSSFetchUrls, - SKEY(RSSAggr->Url), - RSSAggr, - DeleteRssCfg); - - pthread_mutex_unlock(&RSSQueueMutex); - } + RSSCfg = RSSCfg->Next; + continue; } - } - if (Count != NULL) - { - Count->QRnumber = qrbuf->QRnumber; + pthread_mutex_unlock(&RSSQueueMutex); + + RSSAggr = (rss_aggregator *) malloc( + sizeof(rss_aggregator)); + + memset (RSSAggr, 0, sizeof(rss_aggregator)); + RSSAggr->Cfg.QRnumber = qrbuf->QRnumber; + RSSAggr->Cfg.pCfg = RSSCfg; + RSSAggr->roomlist_parts = 1; + RSSAggr->Url = NewStrBufDup(RSSCfg->Url); + + RSSAggr->ItemType = RSS_UNSET; + + RSSAggr->rooms = NewStrBufPlain( + qrbuf->QRname, -1); + pthread_mutex_lock(&RSSQueueMutex); - EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n", - qrbuf->QRnumber, qrbuf->QRname); - Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); + + Put(RSSFetchUrls, + SKEY(RSSAggr->Url), + RSSAggr, + DeleteRssCfg); + pthread_mutex_unlock(&RSSQueueMutex); + RSSCfg = RSSCfg->Next; } - FreeStrBuf(&CfgData); - FreeStrBuf(&CfgType); - FreeStrBuf(&Line); } /* @@ -763,21 +766,29 @@ void rssclient_scan(void) { } become_session(&rss_CC); - EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n"); - CtdlForEachRoom(rssclient_scan_room, NULL); - - pthread_mutex_lock(&RSSQueueMutex); + EVRSSQM_syslog(LOG_DEBUG, "rssclient started"); + CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient); - it = GetNewHashPos(RSSFetchUrls, 0); - while (!server_shutting_down && - GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && - (vrptr != NULL)) { - rptr = (rss_aggregator *)vrptr; - if (!rss_do_fetching(rptr)) - UnlinkRSSAggregator(rptr); + if (GetCount(RSSFetchUrls) > 0) + { + pthread_mutex_lock(&RSSQueueMutex); + EVRSSQ_syslog(LOG_DEBUG, + "rssclient starting %d Clients", + GetCount(RSSFetchUrls)); + + it = GetNewHashPos(RSSFetchUrls, 0); + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + (vrptr != NULL)) { + rptr = (rss_aggregator *)vrptr; + if (!rss_do_fetching(rptr)) + UnlinkRSSAggregator(rptr); + } + DeleteHashPos(&it); + pthread_mutex_unlock(&RSSQueueMutex); } - DeleteHashPos(&it); - pthread_mutex_unlock(&RSSQueueMutex); + else + EVRSSQM_syslog(LOG_DEBUG, "Nothing to do."); EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n"); return; @@ -795,11 +806,93 @@ void LogDebugEnableRSSClient(const int n) RSSClientDebugEnabled = n; } + +typedef struct __RSSVetoInfo { + StrBuf *ErrMsg; + time_t Now; + int Veto; +}RSSVetoInfo; + +void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) +{ + RSSVetoInfo *Info = (RSSVetoInfo *) data; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if ((RSSCfg->last_known_good != 0) && + (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now)) + { + StrBufAppendPrintf(Info->ErrMsg, + "RSS feed not seen for a %d days:: <", + (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60)); + + StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0); + StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0); + } + RSSCfg = RSSCfg->Next; + } +} + +int RSSCheckUsetableVeto(StrBuf *ErrMsg) +{ + RSSVetoInfo Info; + + Info.ErrMsg = ErrMsg; + Info.Now = time (NULL); + Info.Veto = 0; + + CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient); + + return Info.Veto;; +} + + + + +void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG) +{ + RSSCfgLine *RSSCfg; + + RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine)); + RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line)); + + + StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|'); + RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|'); + + + RSSCfg->Next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C]; + OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg; +} + +void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) data; + + StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0); + StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0); + StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0); + StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good); +} + +void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) data; + + FreeStrBuf(&RSSCfg->Url); + free(*data); + *data = NULL; +} + + CTDL_MODULE_INIT(rssclient) { - if (threading) + if (!threading) { - CtdlFillSystemContext(&rss_CC, "rssclient"); + CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0); + + CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine); pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); @@ -808,5 +901,9 @@ CTDL_MODULE_INIT(rssclient) CtdlRegisterEVCleanupHook(rss_cleanup); CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled); } + else + { + CtdlFillSystemContext(&rss_CC, "rssclient"); + } return "rssclient"; }