X-Git-Url: https://code.citadel.org/?a=blobdiff_plain;f=citadel%2Fmodules%2Frssclient%2Fserv_rssclient.c;h=acae637871a5830a0b3aac56ead9ab2662b36738;hb=d268ace45e855ddadeea5c837a31dea33f0f26bd;hp=638300feb439abdbee4bb7cb42afa3fbcd825f82;hpb=c064b240ff457c4bcdc119aaf7a6a09ff31952e2;p=citadel.git diff --git a/citadel/modules/rssclient/serv_rssclient.c b/citadel/modules/rssclient/serv_rssclient.c index 638300feb..acae63787 100644 --- a/citadel/modules/rssclient/serv_rssclient.c +++ b/citadel/modules/rssclient/serv_rssclient.c @@ -1,7 +1,7 @@ /* * Bring external RSS feeds into rooms. * - * Copyright (c) 2007-2012 by the citadel.org team + * Copyright (c) 2007-2016 by the citadel.org team * * This program is open source software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 3. @@ -69,19 +69,19 @@ struct CitContext rss_CC; struct rssnetcfg *rnclist = NULL; int RSSClientDebugEnabled = 0; -#define N ((rss_aggregator*)IO->Data)->QRnumber +#define N ((rss_aggregator*)IO->Data)->Cfg.QRnumber #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0)) #define EVRSSC_syslog(LEVEL, FORMAT, ...) \ DBGLOG(LEVEL) syslog(LEVEL, \ - "IO[%ld]CC[%d][%ld]RSS" FORMAT, \ - IO->ID, CCID, N, __VA_ARGS__) + "%s[%ld]CC[%d][%ld]RSS" FORMAT, \ + IOSTR, IO->ID, CCID, N, __VA_ARGS__) #define EVRSSCM_syslog(LEVEL, FORMAT) \ DBGLOG(LEVEL) syslog(LEVEL, \ - "IO[%ld]CC[%d][%ld]RSS" FORMAT, \ - IO->ID, CCID, N) + "%s[%ld]CC[%d][%ld]RSS" FORMAT, \ + IOSTR, IO->ID, CCID, N) #define EVRSSQ_syslog(LEVEL, FORMAT, ...) \ DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT, \ @@ -90,8 +90,61 @@ int RSSClientDebugEnabled = 0; DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT) #define EVRSSCSM_syslog(LEVEL, FORMAT) \ - DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT, \ - IO->ID, N) + DBGLOG(LEVEL) syslog(LEVEL, "%s[%ld][%ld]RSS" FORMAT, \ + IOSTR, IO->ID, N) + +typedef enum _RSSState { + eRSSCreated, + eRSSFetching, + eRSSFailure, + eRSSParsing, + eRSSUT +} RSSState; +ConstStr RSSStates[] = { + {HKEY("Aggregator created")}, + {HKEY("Fetching content")}, + {HKEY("Failed")}, + {HKEY("parsing content")}, + {HKEY("checking usetable")} +}; + + +static size_t GetLocationString( void *ptr, size_t size, size_t nmemb, void *userdata) +{ +#define LOCATION "location" + if (strncasecmp((char*)ptr, LOCATION, sizeof(LOCATION) - 1) == 0) + { + AsyncIO *IO = (AsyncIO *) userdata; + rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data; + + char *pch = (char*) ptr; + char *pche; + + pche = pch + (size * nmemb); + pch += sizeof(LOCATION); + + while (isspace(*pch) || (*pch == ':')) + pch ++; + + while (isspace(*pche) || (*pche == '\0')) + pche--; + if (RSSAggr->RedirectUrl == NULL) { + RSSAggr->RedirectUrl = NewStrBufPlain(pch, pche - pch + 1); + } + else { + FlushStrBuf(RSSAggr->RedirectUrl); + StrBufPlain(RSSAggr->RedirectUrl, pch, pche - pch + 1); + } + } + return size * nmemb; +} + +static void SetRSSState(AsyncIO *IO, RSSState State) +{ + CitContext* CCC = IO->CitContext; + if (CCC != NULL) + memcpy(CCC->cs_clientname, RSSStates[State].Key, RSSStates[State].len + 1); +} void DeleteRoomReference(long QRnumber) { @@ -119,7 +172,7 @@ void DeleteRoomReference(long QRnumber) void UnlinkRooms(rss_aggregator *RSSAggr) { - DeleteRoomReference(RSSAggr->QRnumber); + DeleteRoomReference(RSSAggr->Cfg.QRnumber); if (RSSAggr->OtherQRnumbers != NULL) { long HKLen; @@ -135,8 +188,8 @@ void UnlinkRooms(rss_aggregator *RSSAggr) &vData) && (vData != NULL)) { - long *lData = (long*) vData; - DeleteRoomReference(*lData); + pRSSConfig *Data = (pRSSConfig*) vData; + DeleteRoomReference(Data->QRnumber); } DeleteHashPos(&At); @@ -164,9 +217,12 @@ void DeleteRssCfg(void *vptr) { rss_aggregator *RSSAggr = (rss_aggregator *)vptr; AsyncIO *IO = &RSSAggr->IO; - EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); + + if (IO->CitContext != NULL) + EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n"); FreeStrBuf(&RSSAggr->Url); + FreeStrBuf(&RSSAggr->RedirectUrl); FreeStrBuf(&RSSAggr->rooms); FreeStrBuf(&RSSAggr->CData); FreeStrBuf(&RSSAggr->Key); @@ -186,6 +242,7 @@ void DeleteRssCfg(void *vptr) } FreeAsyncIOContents(&RSSAggr->IO); + memset(RSSAggr, 0, sizeof(rss_aggregator)); free(RSSAggr); } @@ -195,7 +252,7 @@ eNextState RSSAggregator_Terminate(AsyncIO *IO) EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); - + StopCurlWatchers(IO); UnlinkRSSAggregator(RSSAggr); return eAbort; } @@ -207,6 +264,7 @@ eNextState RSSAggregator_TerminateDB(AsyncIO *IO) EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n"); + StopDBWatchers(&RSSAggr->IO); UnlinkRSSAggregator(RSSAggr); return eAbort; } @@ -222,15 +280,148 @@ eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO) EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl); - + StopCurlWatchers(IO); UnlinkRSSAggregator(RSSAggr); return eAbort; } +void AppendLink(StrBuf *Message, + StrBuf *link, + StrBuf *LinkTitle, + const char *Title) +{ + if (StrLength(link) > 0) + { + StrBufAppendBufPlain(Message, HKEY(""), 0); + if (StrLength(LinkTitle) > 0) + StrBufAppendBuf(Message, LinkTitle, 0); + else if ((Title != NULL) && !IsEmptyStr(Title)) + StrBufAppendBufPlain(Message, Title, -1, 0); + else + StrBufAppendBuf(Message, link, 0); + StrBufAppendBufPlain(Message, HKEY("
\n"), 0); + } +} + -eNextState AbortNetworkSaveMessage (AsyncIO *IO) +int rss_format_item(AsyncIO *IO, networker_save_message *SaveMsg) { - return eAbort; ///TODO + StrBuf *Message; + int msglen = 0; + + if (StrLength(SaveMsg->description) + + StrLength(SaveMsg->link) + + StrLength(SaveMsg->linkTitle) + + StrLength(SaveMsg->reLink) + + StrLength(SaveMsg->reLinkTitle) + + StrLength(SaveMsg->title) == 0) + { + EVRSSCM_syslog(LOG_INFO, "Refusing to save empty message."); + return 0; + } + + CM_Flush(&SaveMsg->Msg); + + if (SaveMsg->author_or_creator != NULL) { + + char *From; + StrBuf *Encoded = NULL; + int FromAt; + + From = html_to_ascii(ChrPtr(SaveMsg->author_or_creator), + StrLength(SaveMsg->author_or_creator), + 512, 0); + StrBufPlain(SaveMsg->author_or_creator, From, -1); + StrBufTrim(SaveMsg->author_or_creator); + free(From); + + FromAt = strchr(ChrPtr(SaveMsg->author_or_creator), '@') != NULL; + if (!FromAt && StrLength (SaveMsg->author_email) > 0) + { + StrBufRFC2047encode(&Encoded, SaveMsg->author_or_creator); + CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded); + CM_SetAsFieldSB(&SaveMsg->Msg, eMessagePath, &SaveMsg->author_email); + } + else + { + if (FromAt) + { + CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &SaveMsg->author_or_creator); + CM_CopyField(&SaveMsg->Msg, eMessagePath, eAuthor); + } + else + { + StrBufRFC2047encode(&Encoded, + SaveMsg->author_or_creator); + CM_SetAsFieldSB(&SaveMsg->Msg, eAuthor, &Encoded); + CM_SetField(&SaveMsg->Msg, eMessagePath, HKEY("rss@localhost")); + + } + } + } + else { + CM_SetField(&SaveMsg->Msg, eAuthor, HKEY("rss")); + } + + CM_SetField(&SaveMsg->Msg, eNodeName, CtdlGetConfigStr("c_nodename"), strlen(CtdlGetConfigStr("c_nodename"))); + if (SaveMsg->title != NULL) { + long len; + char *Sbj; + StrBuf *Encoded, *QPEncoded; + + QPEncoded = NULL; + StrBufSpaceToBlank(SaveMsg->title); + len = StrLength(SaveMsg->title); + Sbj = html_to_ascii(ChrPtr(SaveMsg->title), len, 512, 0); + if (!IsEmptyStr(Sbj)) { + len = strlen(Sbj); + if ((Sbj[len - 1] == '\n')) + { + len --; + Sbj[len] = '\0'; + } + Encoded = NewStrBufPlain(Sbj, len); + + + StrBufTrim(Encoded); + StrBufRFC2047encode(&QPEncoded, Encoded); + + CM_SetAsFieldSB(&SaveMsg->Msg, eMsgSubject, &QPEncoded); + FreeStrBuf(&Encoded); + } + if (Sbj != NULL) { + free(Sbj); + } + } + if (SaveMsg->link == NULL) + SaveMsg->link = NewStrBufPlain(HKEY("")); + +#if 0 /* temporarily disable shorter urls. */ + SaveMsg->Msg.cm_fields[TMP_SHORTER_URLS] = + GetShorterUrls(SaveMsg->description); +#endif + + msglen += 1024 + StrLength(SaveMsg->link) + StrLength(SaveMsg->description) ; + + Message = NewStrBufPlain(NULL, msglen); + + StrBufPlain(Message, HKEY( + "Content-type: text/html; charset=\"UTF-8\"\r\n\r\n" + "\n")); +#if 0 /* disable shorter url for now. */ + SaveMsg->Msg.cm_fields[TMP_SHORTER_URL_OFFSET] = StrLength(Message); +#endif + StrBufAppendBuf(Message, SaveMsg->description, 0); + StrBufAppendBufPlain(Message, HKEY("

\n"), 0); + + AppendLink(Message, SaveMsg->link, SaveMsg->linkTitle, NULL); + AppendLink(Message, SaveMsg->reLink, SaveMsg->reLinkTitle, "Reply to this"); + StrBufAppendBufPlain(Message, HKEY("\n"), 0); + + SaveMsg->Message = Message; + return 1; } eNextState RSSSaveMessage(AsyncIO *IO) @@ -239,16 +430,17 @@ eNextState RSSSaveMessage(AsyncIO *IO) const char *Key; rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data; - RSSAggr->ThisMsg->Msg.cm_fields['M'] = - SmashStrBuf(&RSSAggr->ThisMsg->Message); - - CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); - - /* write the uidl to the use table so we don't store this item again */ - cdb_store(CDB_USETABLE, - SKEY(RSSAggr->ThisMsg->MsgGUID), - &RSSAggr->ThisMsg->ut, - sizeof(struct UseTable) ); + if (rss_format_item(IO, RSSAggr->ThisMsg)) + { + CM_SetAsFieldSB(&RSSAggr->ThisMsg->Msg, eMesageText, + &RSSAggr->ThisMsg->Message); + + CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0); + + /* write the uidl to the use table so we don't store this item again */ + + CheckIfAlreadySeen("RSS Item Insert", RSSAggr->ThisMsg->MsgGUID, EvGetNow(IO), 0, eWrite, CCID, IO->ID); + } if (GetNextHashPos(RSSAggr->Messages, RSSAggr->Pos, @@ -261,29 +453,31 @@ eNextState RSSSaveMessage(AsyncIO *IO) eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) { + static const time_t antiExpire = USETABLE_ANTIEXPIRE_HIRES; +#ifndef DEBUG_RSS + time_t seenstamp = 0; const char *Key; long len; - struct cdbdata *cdbut; rss_aggregator *Ctx = (rss_aggregator *) IO->Data; /* Find out if we've already seen this item */ - strcpy(Ctx->ThisMsg->ut.ut_msgid, - ChrPtr(Ctx->ThisMsg->MsgGUID)); /// TODO - Ctx->ThisMsg->ut.ut_timestamp = time(NULL); - - cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID)); -#ifndef DEBUG_RSS - if (cdbut != NULL) { +// todo: expiry? + SetRSSState(IO, eRSSUT); + seenstamp = CheckIfAlreadySeen("RSS Item Seen", + Ctx->ThisMsg->MsgGUID, + EvGetNow(IO), + antiExpire, + eCheckUpdate, + CCID, IO->ID); + if (seenstamp != 0) + { /* Item has already been seen */ EVRSSC_syslog(LOG_DEBUG, - "%s has already been seen\n", - ChrPtr(Ctx->ThisMsg->MsgGUID)); - cdb_free(cdbut); + "%s has already been seen - %ld < %ld", + ChrPtr(Ctx->ThisMsg->MsgGUID), + seenstamp, antiExpire); - /* rewrite the record anyway, to update the timestamp */ - cdb_store(CDB_USETABLE, - SKEY(Ctx->ThisMsg->MsgGUID), - &Ctx->ThisMsg->ut, sizeof(struct UseTable) ); + SetRSSState(IO, eRSSParsing); if (GetNextHashPos(Ctx->Messages, Ctx->Pos, @@ -298,9 +492,224 @@ eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO) else #endif { + /* Item has already been seen */ + EVRSSC_syslog(LOG_DEBUG, + "%s Parsing - %ld >= %ld", + ChrPtr(Ctx->ThisMsg->MsgGUID), + seenstamp, antiExpire); + SetRSSState(IO, eRSSParsing); + NextDBOperation(IO, RSSSaveMessage); return eSendMore; } + return eSendMore; +} + +void UpdateLastKnownGood(pRSSConfig *pCfg, time_t now) +{ + OneRoomNetCfg *pRNCfg; + begin_critical_section(S_NETCONFIGS); + pRNCfg = CtdlGetNetCfgForRoom(pCfg->QRnumber); + if (pRNCfg != NULL) + { + RSSCfgLine *RSSCfg = (RSSCfgLine *)pRNCfg->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if (RSSCfg == pCfg->pCfg) + break; + + RSSCfg = RSSCfg->next; + } + if (RSSCfg != NULL) + { + pRNCfg->changed = 1; + RSSCfg->last_known_good = now; + } + } + + SaveRoomNetConfigFile(pRNCfg, pCfg->QRnumber); + FreeRoomNetworkStruct(&pRNCfg); + end_critical_section(S_NETCONFIGS); +} + +eNextState RSSAggregator_AnalyseReply(AsyncIO *IO) +{ + HashPos *it = NULL; + long len; + const char *Key; + pRSSConfig *pCfg; + u_char rawdigest[MD5_DIGEST_LEN]; + struct MD5Context md5context; + StrBuf *guid; + rss_aggregator *Ctx = (rss_aggregator *) IO->Data; + + + if ((IO->HttpReq.httpcode >= 300) && + (IO->HttpReq.httpcode < 400) && + (Ctx->RedirectUrl != NULL)) { + + StrBuf *ErrMsg; + long lens[2]; + const char *strs[2]; + + SetRSSState(IO, eRSSFailure); + ErrMsg = NewStrBuf(); + if (IO) EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n", + IO->HttpReq.httpcode); + strs[0] = ChrPtr(Ctx->Url); + lens[0] = StrLength(Ctx->Url); + + strs[1] = ChrPtr(Ctx->rooms); + lens[1] = StrLength(Ctx->rooms); + + if (IO->HttpReq.CurlError == NULL) + IO->HttpReq.CurlError = ""; + + StrBufPrintf(ErrMsg, + "Error while RSS-Aggregation Run of %s\n" + " need a 200, got a %ld !\n" + " Curl Error message: \n%s / %s\n" + " Redirect header points to: %s\n" + " Response text was: \n" + " \n %s\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode, + IO->HttpReq.errdesc, + IO->HttpReq.CurlError, + ChrPtr(Ctx->RedirectUrl), + ChrPtr(IO->HttpReq.ReplyData) + ); + + CtdlAideFPMessage( + ChrPtr(ErrMsg), + "RSS Aggregation run failure", + 2, strs, (long*) &lens, + CCID, + IO->ID, + EvGetNow(IO)); + + FreeStrBuf(&ErrMsg); + EVRSSC_syslog(LOG_DEBUG, + "RSS feed returned an invalid http status code. <%s>\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode); + return eAbort; + } + else if (IO->HttpReq.httpcode != 200) + { + StrBuf *ErrMsg; + long lens[2]; + const char *strs[2]; + + SetRSSState(IO, eRSSFailure); + ErrMsg = NewStrBuf(); + if (IO) EVRSSC_syslog(LOG_ALERT, "need a 200, got a %ld !\n", + IO->HttpReq.httpcode); + strs[0] = ChrPtr(Ctx->Url); + lens[0] = StrLength(Ctx->Url); + + strs[1] = ChrPtr(Ctx->rooms); + lens[1] = StrLength(Ctx->rooms); + + if (IO->HttpReq.CurlError == NULL) + IO->HttpReq.CurlError = ""; + + StrBufPrintf(ErrMsg, + "Error while RSS-Aggregation Run of %s\n" + " need a 200, got a %ld !\n" + " Curl Error message: \n%s / %s\n" + " Response text was: \n" + " \n %s\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode, + IO->HttpReq.errdesc, + IO->HttpReq.CurlError, + ChrPtr(IO->HttpReq.ReplyData) + ); + + CtdlAideFPMessage( + ChrPtr(ErrMsg), + "RSS Aggregation run failure", + 2, strs, (long*) &lens, + CCID, + IO->ID, + EvGetNow(IO)); + + FreeStrBuf(&ErrMsg); + EVRSSC_syslog(LOG_DEBUG, + "RSS feed returned an invalid http status code. <%s>\n", + ChrPtr(Ctx->Url), + IO->HttpReq.httpcode); + return eAbort; + } + + pCfg = &Ctx->Cfg; + + while (pCfg != NULL) + { + UpdateLastKnownGood (pCfg, EvGetNow(IO)); + if ((Ctx->roomlist_parts > 1) && + (it == NULL)) + { + it = GetNewHashPos(RSSFetchUrls, 0); + } + if (it != NULL) + { + void *vptr; + if (GetNextHashPos(Ctx->OtherQRnumbers, it, &len, &Key, &vptr)) + pCfg = vptr; + else + pCfg = NULL; + } + else + pCfg = NULL; + } + DeleteHashPos (&it); + + SetRSSState(IO, eRSSUT); + + MD5Init(&md5context); + + MD5Update(&md5context, + (const unsigned char*)SKEY(IO->HttpReq.ReplyData)); + + MD5Update(&md5context, + (const unsigned char*)SKEY(Ctx->Url)); + + MD5Final(rawdigest, &md5context); + guid = NewStrBufPlain(NULL, + MD5_DIGEST_LEN * 2 + 12 /* _rss2ctdl*/); + StrBufHexEscAppend(guid, NULL, rawdigest, MD5_DIGEST_LEN); + StrBufAppendBufPlain(guid, HKEY("_rssFM"), 0); + if (StrLength(guid) > 40) + StrBufCutAt(guid, 40, NULL); + /* Find out if we've already seen this item */ + +#ifndef DEBUG_RSS + + if (CheckIfAlreadySeen("RSS Whole", + guid, + EvGetNow(IO), + EvGetNow(IO) - USETABLE_ANTIEXPIRE, + eUpdate, + CCID, IO->ID) + != 0) + { + FreeStrBuf(&guid); + + EVRSSC_syslog(LOG_DEBUG, "RSS feed already seen. <%s>\n", ChrPtr(Ctx->Url)); + return eAbort; + } + FreeStrBuf(&guid); +#endif + SetRSSState(IO, eRSSParsing); + return RSSAggregator_ParseReply(IO); +} + +eNextState RSSAggregator_FinishHttp(AsyncIO *IO) +{ + return CurlQueueDBOperation(IO, RSSAggregator_AnalyseReply); } /* @@ -311,6 +720,9 @@ int rss_do_fetching(rss_aggregator *RSSAggr) AsyncIO *IO = &RSSAggr->IO; rss_item *ri; time_t now; + CURLcode sta; + CURL *chnd; + now = time(NULL); @@ -324,7 +736,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) if (! InitcURLIOStruct(&RSSAggr->IO, RSSAggr, "Citadel RSS Client", - RSSAggregator_ParseReply, + RSSAggregator_FinishHttp, RSSAggregator_Terminate, RSSAggregator_TerminateDB, RSSAggregator_ShutdownAbort)) @@ -332,6 +744,10 @@ int rss_do_fetching(rss_aggregator *RSSAggr) EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n"); return 0; } + chnd = IO->HttpReq.chnd; + OPT(HEADERDATA, IO); + OPT(HEADERFUNCTION, GetLocationString); + SetRSSState(IO, eRSSCreated); safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host, ChrPtr(RSSAggr->Url), @@ -341,6 +757,7 @@ int rss_do_fetching(rss_aggregator *RSSAggr) ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80); CurlPrepareURL(RSSAggr->IO.ConnectMe); + SetRSSState(IO, eRSSFetching); QueueCurlContext(&RSSAggr->IO); return 1; } @@ -348,21 +765,14 @@ int rss_do_fetching(rss_aggregator *RSSAggr) /* * Scan a room's netconfig to determine whether it is requesting any RSS feeds */ -void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) +void rssclient_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) { - StrBuf *CfgData=NULL; - StrBuf *CfgType; - StrBuf *Line; - rss_room_counter *Count = NULL; - struct stat statbuf; - char filename[PATH_MAX]; - int fd; - int Done; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; rss_aggregator *RSSAggr = NULL; rss_aggregator *use_this_RSSAggr = NULL; void *vptr; - const char *CfgPtr, *lPtr; - const char *Err; + + TRACE; pthread_mutex_lock(&RSSQueueMutex); if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr)) @@ -376,143 +786,72 @@ void rssclient_scan_room(struct ctdlroom *qrbuf, void *data) } pthread_mutex_unlock(&RSSQueueMutex); - assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir); + if (server_shutting_down) return; - if (server_shutting_down) - return; + while (RSSCfg != NULL) + { + pthread_mutex_lock(&RSSQueueMutex); + GetHash(RSSFetchUrls, + SKEY(RSSCfg->Url), + &vptr); - /* Only do net processing for rooms that have netconfigs */ - fd = open(filename, 0); - if (fd <= 0) { - /* syslog(LOG_DEBUG, - "rssclient: %s no config.\n", - qrbuf->QRname); */ - return; - } + use_this_RSSAggr = (rss_aggregator *)vptr; + if (use_this_RSSAggr != NULL) + { + pRSSConfig *pRSSCfg; - if (server_shutting_down) - return; + StrBufAppendBufPlain( + use_this_RSSAggr->rooms, + qrbuf->QRname, + -1, 0); + if (use_this_RSSAggr->roomlist_parts==1) + { + use_this_RSSAggr->OtherQRnumbers + = NewHash(1, lFlathash); + } - if (fstat(fd, &statbuf) == -1) { - EVRSSQ_syslog(LOG_DEBUG, - "ERROR: could not stat configfile '%s' - %s\n", - filename, - strerror(errno)); - return; - } + pRSSCfg = (pRSSConfig *) malloc(sizeof(pRSSConfig)); - if (server_shutting_down) - return; + pRSSCfg->QRnumber = qrbuf->QRnumber; + pRSSCfg->pCfg = RSSCfg; - CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1); + Put(use_this_RSSAggr->OtherQRnumbers, + LKEY(qrbuf->QRnumber), + pRSSCfg, + NULL); + use_this_RSSAggr->roomlist_parts++; - if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) { - close(fd); - FreeStrBuf(&CfgData); - EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s
\n", - filename, strerror(errno)); - return; - } - close(fd); - if (server_shutting_down) - return; + pthread_mutex_unlock(&RSSQueueMutex); - CfgPtr = NULL; - CfgType = NewStrBuf(); - Line = NewStrBufPlain(NULL, StrLength(CfgData)); - Done = 0; - while (!Done) - { - Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0; - if (StrLength(Line) > 0) - { - lPtr = NULL; - StrBufExtract_NextToken(CfgType, Line, &lPtr, '|'); - if (!strcasecmp("rssclient", ChrPtr(CfgType))) - { - if (Count == NULL) - { - Count = malloc( - sizeof(rss_room_counter)); - Count->count = 0; - } - Count->count ++; - RSSAggr = (rss_aggregator *) malloc( - sizeof(rss_aggregator)); - - memset (RSSAggr, 0, sizeof(rss_aggregator)); - RSSAggr->QRnumber = qrbuf->QRnumber; - RSSAggr->roomlist_parts = 1; - RSSAggr->Url = NewStrBuf(); - - StrBufExtract_NextToken(RSSAggr->Url, - Line, - &lPtr, - '|'); - - pthread_mutex_lock(&RSSQueueMutex); - GetHash(RSSFetchUrls, - SKEY(RSSAggr->Url), - &vptr); - - use_this_RSSAggr = (rss_aggregator *)vptr; - if (use_this_RSSAggr != NULL) - { - long *QRnumber; - StrBufAppendBufPlain( - use_this_RSSAggr->rooms, - qrbuf->QRname, - -1, 0); - if (use_this_RSSAggr->roomlist_parts==1) - { - use_this_RSSAggr->OtherQRnumbers - = NewHash(1, lFlathash); - } - QRnumber = (long*)malloc(sizeof(long)); - *QRnumber = qrbuf->QRnumber; - Put(use_this_RSSAggr->OtherQRnumbers, - LKEY(qrbuf->QRnumber), - QRnumber, - NULL); - use_this_RSSAggr->roomlist_parts++; - - pthread_mutex_unlock(&RSSQueueMutex); - - FreeStrBuf(&RSSAggr->Url); - free(RSSAggr); - RSSAggr = NULL; - continue; - } - pthread_mutex_unlock(&RSSQueueMutex); - - RSSAggr->ItemType = RSS_UNSET; - - RSSAggr->rooms = NewStrBufPlain( - qrbuf->QRname, -1); - - pthread_mutex_lock(&RSSQueueMutex); - - Put(RSSFetchUrls, - SKEY(RSSAggr->Url), - RSSAggr, - DeleteRssCfg); - - pthread_mutex_unlock(&RSSQueueMutex); - } + RSSCfg = RSSCfg->next; + continue; } - } - if (Count != NULL) - { - Count->QRnumber = qrbuf->QRnumber; + pthread_mutex_unlock(&RSSQueueMutex); + + RSSAggr = (rss_aggregator *) malloc( + sizeof(rss_aggregator)); + + memset (RSSAggr, 0, sizeof(rss_aggregator)); + RSSAggr->Cfg.QRnumber = qrbuf->QRnumber; + RSSAggr->Cfg.pCfg = RSSCfg; + RSSAggr->roomlist_parts = 1; + RSSAggr->Url = NewStrBufDup(RSSCfg->Url); + + RSSAggr->ItemType = RSS_UNSET; + + RSSAggr->rooms = NewStrBufPlain( + qrbuf->QRname, -1); + pthread_mutex_lock(&RSSQueueMutex); - EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n", - qrbuf->QRnumber, qrbuf->QRname); - Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL); + + Put(RSSFetchUrls, + SKEY(RSSAggr->Url), + RSSAggr, + DeleteRssCfg); + pthread_mutex_unlock(&RSSQueueMutex); + RSSCfg = RSSCfg->next; } - FreeStrBuf(&CfgData); - FreeStrBuf(&CfgType); - FreeStrBuf(&Line); } /* @@ -555,21 +894,29 @@ void rssclient_scan(void) { } become_session(&rss_CC); - EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n"); - CtdlForEachRoom(rssclient_scan_room, NULL); + EVRSSQM_syslog(LOG_DEBUG, "rssclient started"); + CtdlForEachNetCfgRoom(rssclient_scan_room, NULL, rssclient); - pthread_mutex_lock(&RSSQueueMutex); - - it = GetNewHashPos(RSSFetchUrls, 0); - while (!server_shutting_down && - GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && - (vrptr != NULL)) { - rptr = (rss_aggregator *)vrptr; - if (!rss_do_fetching(rptr)) - UnlinkRSSAggregator(rptr); + if (GetCount(RSSFetchUrls) > 0) + { + pthread_mutex_lock(&RSSQueueMutex); + EVRSSQ_syslog(LOG_DEBUG, + "rssclient starting %d Clients", + GetCount(RSSFetchUrls)); + + it = GetNewHashPos(RSSFetchUrls, 0); + while (!server_shutting_down && + GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) && + (vrptr != NULL)) { + rptr = (rss_aggregator *)vrptr; + if (!rss_do_fetching(rptr)) + UnlinkRSSAggregator(rptr); + } + DeleteHashPos(&it); + pthread_mutex_unlock(&RSSQueueMutex); } - DeleteHashPos(&it); - pthread_mutex_unlock(&RSSQueueMutex); + else + EVRSSQM_syslog(LOG_DEBUG, "Nothing to do."); EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n"); return; @@ -582,23 +929,109 @@ void rss_cleanup(void) DeleteHash(&RSSQueueRooms); } -void LogDebugEnableRSSClient(void) +void LogDebugEnableRSSClient(const int n) +{ + RSSClientDebugEnabled = n; +} + + +typedef struct __RSSVetoInfo { + StrBuf *ErrMsg; + time_t Now; + int Veto; +}RSSVetoInfo; + +void rssclient_veto_scan_room(struct ctdlroom *qrbuf, void *data, OneRoomNetCfg *OneRNCFG) +{ + RSSVetoInfo *Info = (RSSVetoInfo *) data; + const RSSCfgLine *RSSCfg = (RSSCfgLine *)OneRNCFG->NetConfigs[rssclient]; + + while (RSSCfg != NULL) + { + if ((RSSCfg->last_known_good != 0) && + (RSSCfg->last_known_good + USETABLE_ANTIEXPIRE < Info->Now)) + { + StrBufAppendPrintf(Info->ErrMsg, + "RSS feed not seen for a %d days:: <", + (Info->Now - RSSCfg->last_known_good) / (24 * 60 * 60)); + + StrBufAppendBuf(Info->ErrMsg, RSSCfg->Url, 0); + StrBufAppendBufPlain(Info->ErrMsg, HKEY(">\n"), 0); + } + RSSCfg = RSSCfg->next; + } +} + +int RSSCheckUsetableVeto(StrBuf *ErrMsg) +{ + RSSVetoInfo Info; + + Info.ErrMsg = ErrMsg; + Info.Now = time (NULL); + Info.Veto = 0; + + CtdlForEachNetCfgRoom(rssclient_veto_scan_room, &Info, rssclient); + + return Info.Veto;; +} + + + + +void ParseRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *Line, const char *LinePos, OneRoomNetCfg *OneRNCFG) { - RSSClientDebugEnabled = 1; + RSSCfgLine *RSSCfg; + + RSSCfg = (RSSCfgLine *) malloc (sizeof(RSSCfgLine)); + RSSCfg->Url = NewStrBufPlain (NULL, StrLength (Line)); + + + StrBufExtract_NextToken(RSSCfg->Url, Line, &LinePos, '|'); + RSSCfg->last_known_good = StrBufExtractNext_long(Line, &LinePos, '|'); + + + RSSCfg->next = (RSSCfgLine *)OneRNCFG->NetConfigs[ThisOne->C]; + OneRNCFG->NetConfigs[ThisOne->C] = (RoomNetCfgLine*) RSSCfg; } +void SerializeRSSClientCfgLine(const CfgLineType *ThisOne, StrBuf *OutputBuffer, OneRoomNetCfg *RNCfg, RoomNetCfgLine *data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) data; + + StrBufAppendBufPlain(OutputBuffer, CKEY(ThisOne->Str), 0); + StrBufAppendBufPlain(OutputBuffer, HKEY("|"), 0); + StrBufAppendBufPlain(OutputBuffer, SKEY(RSSCfg->Url), 0); + StrBufAppendPrintf(OutputBuffer, "|%ld\n", RSSCfg->last_known_good); +} + +void DeleteRSSClientCfgLine(const CfgLineType *ThisOne, RoomNetCfgLine **data) +{ + RSSCfgLine *RSSCfg = (RSSCfgLine*) *data; + + FreeStrBuf(&RSSCfg->Url); + free(*data); + *data = NULL; +} + + CTDL_MODULE_INIT(rssclient) { - if (threading) + if (!threading) { - CtdlFillSystemContext(&rss_CC, "rssclient"); + CtdlRegisterTDAPVetoHook (RSSCheckUsetableVeto, CDB_USETABLE, 0); + + CtdlREGISTERRoomCfgType(rssclient, ParseRSSClientCfgLine, 0, 1, SerializeRSSClientCfgLine, DeleteRSSClientCfgLine); pthread_mutex_init(&RSSQueueMutex, NULL); RSSQueueRooms = NewHash(1, lFlathash); RSSFetchUrls = NewHash(1, NULL); syslog(LOG_INFO, "%s\n", curl_version()); - CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER); + CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300); CtdlRegisterEVCleanupHook(rss_cleanup); - CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient); + CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled); + } + else + { + CtdlFillSystemContext(&rss_CC, "rssclient"); } return "rssclient"; }